38 #ifndef TRANSFER_MPIWINCACHE_H
39 #define TRANSFER_MPIWINCACHE_H
49 #include "utils/logger.h"
149 unsigned int cacheSize,
151 unsigned long blockCount,
152 unsigned long blockSize,
170 blockCount, mpiComm, numaComm);
193 if (MPI_Win_create(cache,
244 unsigned long dictOffset,
unsigned long offset)
251 #ifndef THREADSAFE_MPI
257 #endif // THREADSAFE_MPI
259 int mpiResult; NDBG_UNUSED(mpiResult);
263 mpiResult = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dictRank, 0,
m_dictWin);
264 assert(mpiResult == MPI_SUCCESS);
266 if (dictRank !=
mpiComm().rank()) {
267 mpiResult = MPI_Get(dictEntry,
275 assert(mpiResult == MPI_SUCCESS);
283 mpiResult = MPI_Win_unlock(dictRank,
m_dictWin);
284 assert(mpiResult == MPI_SUCCESS);
286 #ifndef THREADSAFE_MPI
288 #endif // THREADSAFE_MPI
292 #ifndef THREADSAFE_MPI
294 #endif // THREADSAFE_MPI
296 mpiResult = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dictRank, 0,
m_dictWin);
297 assert(mpiResult == MPI_SUCCESS);
299 if (dictRank !=
mpiComm().rank()) {
300 mpiResult = MPI_Get(dictEntry,
308 assert(mpiResult == MPI_SUCCESS);
310 mpiResult = MPI_Win_flush_local(dictRank,
m_dictWin);
311 assert(mpiResult == MPI_SUCCESS);
318 if (dictRank ==
mpiComm().rank())
325 mpiResult = MPI_Put(dictEntry,
333 assert(mpiResult == MPI_SUCCESS);
336 mpiResult = MPI_Win_unlock(dictRank,
m_dictWin);
337 assert(mpiResult == MPI_SUCCESS);
339 #ifndef THREADSAFE_MPI
341 #endif // THREADSAFE_MPI
356 bool transfer(
long entry,
unsigned long blockId,
unsigned char *cache,
bool &retry)
368 if (rank ==
mpiComm().rank()) {
374 int mpiResult; NDBG_UNUSED(mpiResult);
376 #ifndef THREADSAFE_MPI
381 #endif // THREADSAFE_MPI
384 mpiResult = MPI_Win_lock(MPI_LOCK_SHARED, rank,
386 assert(mpiResult == MPI_SUCCESS);
389 mpiResult = MPI_Get(cache,
397 assert(mpiResult == MPI_SUCCESS);
401 assert(mpiResult == MPI_SUCCESS);
416 unsigned long dictOffset,
unsigned long offset)
418 #ifndef THREADSAFE_MPI
424 #endif // THREADSAFE_MPI
426 int mpiResult; NDBG_UNUSED(mpiResult);
428 mpiResult = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dictRank, 0,
m_dictWin);
429 assert(mpiResult == MPI_SUCCESS);
434 mpiResult = MPI_Win_unlock(dictRank,
m_dictWin);
435 assert(mpiResult == MPI_SUCCESS);
451 unsigned long dictOffset,
unsigned long offset)
454 endTransfer(blockId, dictRank, dictOffset, offset);
465 void deleteBlock(
long blockId,
int dictRank,
unsigned long dictOffset,
466 unsigned long offset)
476 #ifndef THREADSAFE_MPI
482 #endif // THREADSAFE_MPI
484 int mpiResult; NDBG_UNUSED(mpiResult);
488 mpiResult = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dictRank, 0,
m_dictWin);
489 assert(mpiResult == MPI_SUCCESS);
491 if (dictRank !=
mpiComm().rank()) {
492 mpiResult = MPI_Get(dictEntry,
500 assert(mpiResult == MPI_SUCCESS);
508 mpiResult = MPI_Win_flush(dictRank,
m_dictWin);
509 assert(mpiResult == MPI_SUCCESS);
512 mpiResult = MPI_Win_unlock(dictRank,
m_dictWin);
513 assert(mpiResult == MPI_SUCCESS);
515 #ifndef THREADSAFE_MPI
517 #endif // THREADSAFE_MPI
521 #ifndef THREADSAFE_MPI
523 #endif // THREADSAFE_MPI
525 mpiResult = MPI_Win_lock(MPI_LOCK_EXCLUSIVE, dictRank, 0,
m_dictWin);
526 assert(mpiResult == MPI_SUCCESS);
528 if (dictRank !=
mpiComm().rank()) {
529 mpiResult = MPI_Get(dictEntry,
537 assert(mpiResult == MPI_SUCCESS);
539 mpiResult = MPI_Win_flush_local(dictRank,
m_dictWin);
540 assert(mpiResult == MPI_SUCCESS);
545 if (dictRank ==
mpiComm().rank())
552 mpiResult = MPI_Put(dictEntry,
560 assert(mpiResult == MPI_SUCCESS);
566 mpiResult = MPI_Win_unlock(dictRank,
m_dictWin);
567 assert(mpiResult == MPI_SUCCESS);
569 #ifndef THREADSAFE_MPI
571 #endif // THREADSAFE_MPI
575 #endif // ASAGI_NOMPI
579 #endif // TRANSFER_MPIWINCACHE_H
threads::Mutex * m_dictWinMutex
void deleteBlockInfo(long *dictEntry, long entry)
asagi::Grid::Error broadcast(T &value, unsigned int rootDomain=0)
unsigned int totalDictEntrySize() const
long startTransfer(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
unsigned int m_numaDomainId
asagi::Grid::Error init(unsigned int cacheSize, unsigned long blockCount, const mpi::MPIComm &mpiComm, numa::NumaComm &numaComm)
void deleteBlock(long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
static const unsigned int MAX_DICT_SIZE
long fetchAndUpdateBlockInfo(long *dictEntry, unsigned long newEntry)
void release(int rank, unsigned long offset)
Release the lock for a block.
unsigned int dictEntrySize() const
void endTransfer(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
void addBlock(unsigned long blockId, int dictRank, unsigned long dictOffset, unsigned long offset)
Include file for C and C++ API.
unsigned int domainId() const
long acquire(int rank, unsigned long offset)
Acquire a lock.
void init(MPIComm &comm, MPI_Win window, const numa::NumaComm &numa)
unsigned long rankCacheSize() const
unsigned int totalDomains() const
virtual MPI_Datatype getMPIType() const =0
bool transfer(long entry, unsigned long blockId, unsigned char *cache, bool &retry)
void initMutexMem(long &lock)
unsigned long m_blockSize
threads::Mutex * m_cacheWinMutex
virtual unsigned int size() const =0
const mpi::MPIComm & mpiComm() const
const long * dictionary() const
asagi::Grid::Error init(unsigned char *cache, unsigned int cacheSize, const cache::CacheManager &cacheManager, unsigned long blockCount, unsigned long blockSize, const types::Type &type, mpi::MPIComm &mpiComm, numa::NumaComm &numaComm)