38 #ifndef MPI_COMMTHREAD_H
39 #define MPI_COMMTHREAD_H
44 #include <unordered_map>
48 #include "utils/logger.h"
133 std::lock_guard<threads::Mutex> lock(
m_lock);
136 int initialized =
m_comm != MPI_COMM_NULL;
137 if (MPI_Allreduce(MPI_IN_PLACE, &initialized, 1, MPI_INT, MPI_LOR, comm)
145 if (MPI_Comm_dup(comm, &
m_comm) != MPI_SUCCESS)
158 int numCPUs = std::thread::hardware_concurrency();
159 if (schedCPU < numCPUs && -schedCPU <= numCPUs) {
165 CPU_SET(schedCPU, &cpuset);
167 CPU_SET(numCPUs - schedCPU, &cpuset);
169 pthread_setaffinity_np(
m_thread,
sizeof(cpu_set_t), &cpuset);
171 logInfo(
m_rank) <<
"ASAGI: Invalid CPU id" << schedCPU
172 <<
"The communication thread will not be pined.";
183 std::lock_guard<threads::Mutex> lock(
m_lock);
192 MPI_Test(&request, &done, MPI_STATUS_IGNORE);
200 logWarning() <<
"ASAGI: Not all receivers are removed correctly.";
221 std::lock_guard<threads::Mutex> lock(
m_lock);
223 if (
m_comm == MPI_COMM_NULL)
227 if (MPI_Allreduce(&
m_nextTag, &tag, 1, MPI_INT, MPI_MAX, comm) != MPI_SUCCESS)
234 detail.
recv = &receiver;
235 MPI_Comm_group(comm, &detail.
group);
264 std::unordered_map<int, ReceiverDetail>::const_iterator it =
m_receiver.find(tag);
266 logWarning() <<
"ASAGI: Sending message from unregistered tag" << tag;
269 MPI_Group_translate_ranks(it->second.group, 1, &recv,
m_group, &recvRank);
288 logError() <<
"Rank is not allowed to send exit command";
295 logError() <<
"Rank is not allowed to send unregister command";
302 std::unordered_map<int, ReceiverDetail>::const_iterator it =
commThread.
m_receiver.find(status.MPI_TAG);
304 logWarning() <<
"ASAGI: Received unregistered message with tag" << status.MPI_TAG;
307 MPI_Group_translate_ranks(
commThread.
m_group, 1, &status.MPI_SOURCE, it->second.group, &senderRank);
308 it->second.recv->recv(senderRank, data);
330 #endif // MPI_COMMTHREAD_H
const MPI_Datatype MPI_MESSAGE
static CommThread commThread
static void * commThreadHandler(void *p)
std::unordered_map< int, ReceiverDetail > m_receiver
asagi::Grid::Error init(int schedCPU, MPI_Comm comm=MPI_COMM_WORLD)
void unregisterReceiver(int tag)
asagi::Grid::Error registerReceiver(MPI_Comm comm, Receiver &receiver, int &tag)
virtual void recv(int sender, message_t data)=0
Include file for C and C++ API.
void send(int tag, int recv, message_t data)
static const int UNREG_TAG
static const int EXIT_TAG