38 #ifndef TRANSFER_MPITHREADCACHE_H
39 #define TRANSFER_MPITHREADCACHE_H
50 #include "utils/logger.h"
95 void recv(
int sender,
unsigned long dictOffset)
97 int mpiResult; NDBG_UNUSED(mpiResult);
102 mpiResult = MPI_Isend(&entry, 1, MPI_LONG, sender,
105 assert(mpiResult == MPI_SUCCESS);
109 mpiResult = MPI_Test(&request, &done, MPI_STATUS_IGNORE);
110 assert(mpiResult == MPI_SUCCESS);
137 void recv(
int sender,
unsigned long blockId)
139 int mpiResult; NDBG_UNUSED(mpiResult);
141 unsigned long cacheOffset;
142 const unsigned char* data;
157 mpiResult = MPI_Isend(const_cast<unsigned char*>(data),
161 assert(mpiResult == MPI_SUCCESS);
165 mpiResult = MPI_Test(&request, &done, MPI_STATUS_IGNORE);
166 assert(mpiResult == MPI_SUCCESS);
169 cacheManager->
unlock(cacheOffset);
174 mpiResult = MPI_Isend(&ack, 1, MPI_CHAR, sender,
177 assert(mpiResult == MPI_SUCCESS);
181 mpiResult = MPI_Test(&request, &done, MPI_STATUS_IGNORE);
182 assert(mpiResult == MPI_SUCCESS);
211 void recv(
int sender,
unsigned long data)
244 void recv(
int sender,
unsigned long data)
353 unsigned int cacheSize,
355 unsigned long blockCount,
356 unsigned long blockSize,
367 blockCount, mpiComm, numaComm);
459 unsigned long dictOffset,
unsigned long offset)
461 if (dictRank ==
mpiComm().rank())
465 int mpiResult; NDBG_UNUSED(mpiResult);
467 std::lock_guard<threads::Mutex> lock(*
m_infoMutex);
472 mpiResult = MPI_Recv(&entry, 1, MPI_LONG, dictRank,
ENTRY_TAG,
473 mpiComm().comm(), MPI_STATUS_IGNORE);
474 assert(mpiResult == MPI_SUCCESS);
489 bool transfer(
long entry,
unsigned long blockId,
unsigned char *cache,
bool &retry)
505 int mpiResult; NDBG_UNUSED(mpiResult);
511 assert(mpiResult == MPI_SUCCESS);
517 assert(mpiResult == MPI_SUCCESS);
524 mpiComm().comm(), MPI_STATUS_IGNORE);
525 assert(mpiResult == MPI_SUCCESS);
534 mpiComm().comm(), MPI_STATUS_IGNORE);
535 assert(mpiResult == MPI_SUCCESS);
550 unsigned long dictOffset,
unsigned long offset)
552 addBlock(blockId, dictRank, dictOffset, offset);
566 unsigned long dictOffset,
unsigned long offset)
568 if (dictRank ==
mpiComm().rank())
569 addBlock(dictOffset, dictRank, offset);
587 void deleteBlock(
long blockId,
int dictRank,
unsigned long dictOffset,
588 unsigned long offset)
594 if (dictRank ==
mpiComm().rank())
615 std::lock_guard<threads::Mutex> lock(
m_mutexes[dictOffset]);
627 void addBlock(
unsigned long dictOffset,
int rank,
unsigned long offset)
629 std::lock_guard<threads::Mutex> lock(
m_mutexes[dictOffset]);
641 void deleteBlock(
unsigned long dictOffset,
int rank,
unsigned long offset)
643 std::lock_guard<threads::Mutex> lock(
m_mutexes[dictOffset]);
659 #endif // ASAGI_NOMPI
663 #endif // TRANSFER_MPITHREADCACHE_H
void init(MPIThreadCache &parent)
unsigned long m_blockSize
static CommThread commThread
threads::Mutex * m_mutexes
void deleteBlockInfo(long *dictEntry, long entry)
asagi::Grid::Error broadcast(T &value, unsigned int rootDomain=0)
void recv(int sender, unsigned long dictOffset)
threads::Mutex * m_blockMutex
void recv(int sender, unsigned long data)
void unregisterReceiver(int tag)
static const int TRANSFER_FAIL_TAG
cache::CacheManager ** m_cacheManager
void init(MPIThreadCache &parent)
asagi::Grid::Error init(unsigned int cacheSize, unsigned long blockCount, const mpi::MPIComm &mpiComm, numa::NumaComm &numaComm)
MPIThreadCache * m_parent
long startTransfer(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
BlockInfoResponder m_blockInfoResponder
asagi::Grid::Error registerReceiver(MPI_Comm comm, Receiver &receiver, int &tag)
void addBlock(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
MPIThreadCache * m_parent
MPIThreadCache * m_parent
void recv(int sender, unsigned long data)
void init(MPIThreadCache &parent)
void init(MPIThreadCache &parent)
long fetchBlockInfo(const long *dictEntry)
bool tryGet(unsigned long blockId, unsigned long &cacheOffset, const unsigned char *&data)
Include file for C and C++ API.
threads::Mutex * m_infoMutex
void recv(int sender, unsigned long blockId)
void send(int tag, int recv, message_t data)
unsigned int domainId() const
MPIThreadCache * m_parent
void deleteBlock(long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
static const int ENTRY_TAG
unsigned long rankCacheSize() const
unsigned int totalDomains() const
virtual MPI_Datatype getMPIType() const =0
int reserveTags(unsigned int num)
void endTransfer(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
BlockTransferer m_blockTransferer
static const int TRANSFER_TAG
void updateBlockInfo(long *dictEntry, unsigned long newEntry)
bool transfer(long entry, unsigned long blockId, unsigned char *cache, bool &retry)
asagi::Grid::Error init(unsigned char *cache, unsigned int cacheSize, cache::CacheManager &cacheManager, unsigned long blockCount, unsigned long blockSize, const types::Type &type, mpi::MPIComm &mpiComm, numa::NumaComm &numaComm)
void addBlock(unsigned long dictOffset, int rank, unsigned long offset)
long fetchBlockInfo(unsigned long dictOffset)
const mpi::MPIComm & mpiComm() const
unsigned int m_numaDomainSize
unsigned int m_numaDomainId
void unlock(unsigned long cacheOffset)
const long * dictionary() const
void deleteBlock(unsigned long dictOffset, int rank, unsigned long offset)