From ac43bef5c5808313c2201d6f01dc96926fc8e2cd Mon Sep 17 00:00:00 2001 From: Joost VandeVondele Date: Thu, 27 Dec 2018 15:42:53 +0100 Subject: [PATCH] [Cluster] Improve message passing part. This rewrites in part the message passing part, using in place gather, and collecting, rather than merging, the data of all threads. neutral with a single thread per rank: Score of new-2mpi-1t vs old-2mpi-1t: 789 - 787 - 2615 [0.500] 4191 Elo difference: 0.17 +/- 6.44 likely progress with multiple threads per rank: Score of new-2mpi-36t vs old-2mpi-36t: 76 - 53 - 471 [0.519] 600 Elo difference: 13.32 +/- 12.85 --- src/cluster.cpp | 107 ++++++++++++++++++++++++++++++------------------ src/cluster.h | 6 ++- src/thread.cpp | 3 ++ src/thread.h | 5 +-- 4 files changed, 76 insertions(+), 45 deletions(-) diff --git a/src/cluster.cpp b/src/cluster.cpp index 293a5c96..40f5aae4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -54,10 +54,15 @@ static MPI_Comm TTComm = MPI_COMM_NULL; static MPI_Comm MoveComm = MPI_COMM_NULL; static MPI_Comm signalsComm = MPI_COMM_NULL; -static std::vector TTBuff; +static std::vector TTRecvBuff; +static MPI_Request reqGather = MPI_REQUEST_NULL; +static uint64_t gathersPosted = 0; + +static std::atomic TTCacheCounter = {}; static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL; + void init() { int thread_support; @@ -72,8 +77,6 @@ void init() { MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); MPI_Comm_size(MPI_COMM_WORLD, &world_size); - TTBuff.resize(TTSendBufferSize * world_size); - const std::array MIdisps = {offsetof(MoveInfo, move), offsetof(MoveInfo, depth), offsetof(MoveInfo, score), @@ -111,6 +114,13 @@ int rank() { return world_rank; } +void ttRecvBuff_resize(size_t nThreads) { + + TTRecvBuff.resize(TTCacheSize * world_size * nThreads); + std::fill(TTRecvBuff.begin(), TTRecvBuff.end(), KeyedTTEntry()); + +} + bool getline(std::istream& input, std::string& str) { @@ -189,6 +199,18 @@ void signals_sync() { signals_process(); + // finalize outstanding messages in the gather loop + MPI_Allreduce(&gathersPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm); + if (gathersPosted < globalCounter) + { + size_t recvBuffPerRankSize = Threads.size() * TTCacheSize; + MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, + TTRecvBuff.data(), recvBuffPerRankSize * sizeof(KeyedTTEntry), MPI_BYTE, + TTComm, &reqGather); + ++gathersPosted; + } + assert(gathersPosted == globalCounter); + } void signals_init() { @@ -221,59 +243,64 @@ void save(Thread* thread, TTEntry* tte, { // Try to add to thread's send buffer { - std::lock_guard lk(thread->ttBuffer.mutex); - thread->ttBuffer.buffer.replace(KeyedTTEntry(k,*tte)); - ++thread->ttBuffer.counter; + std::lock_guard lk(thread->ttCache.mutex); + thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte)); + ++TTCacheCounter; } - // Communicate on main search thread - if (thread == Threads.main() && thread->ttBuffer.counter * Threads.size() > TTSendBufferSize) - { - static MPI_Request req = MPI_REQUEST_NULL; - static TTSendBuffer send_buff = {}; - int flag; + size_t recvBuffPerRankSize = Threads.size() * TTCacheSize; + // Communicate on main search thread + if (thread == Threads.main() && TTCacheCounter > size() * recvBuffPerRankSize) + { // Test communication status - MPI_Test(&req, &flag, MPI_STATUS_IGNORE); + int flag; + MPI_Test(&reqGather, &flag, MPI_STATUS_IGNORE); // Current communication is complete if (flag) { - // Save all received entries (except ours) + // Save all received entries to TT, and store our TTCaches, ready for the next round of communication for (size_t irank = 0; irank < size_t(size()) ; ++irank) { if (irank == size_t(rank())) - continue; - - for (size_t i = irank * TTSendBufferSize ; i < (irank + 1) * TTSendBufferSize; ++i) { - auto&& e = TTBuff[i]; - bool found; - TTEntry* replace_tte; - replace_tte = TT.probe(e.first, found); - replace_tte->save(e.first, e.second.value(), e.second.bound(), e.second.depth(), - e.second.move(), e.second.eval()); + // Copy from the thread caches to the right spot in the buffer + size_t i = irank * recvBuffPerRankSize; + for (auto&& th : Threads) + { + std::lock_guard lk(th->ttCache.mutex); + + for (auto&& e : th->ttCache.buffer) + TTRecvBuff[i++] = e; + + // Reset thread's send buffer + th->ttCache.buffer = {}; + } + + TTCacheCounter = 0; } - } - - // Reset send buffer - send_buff = {}; - - // Build up new send buffer: best 16 found across all threads - for (auto&& th : Threads) - { - std::lock_guard lk(th->ttBuffer.mutex); - for (auto&& e : th->ttBuffer.buffer) - send_buff.replace(e); - // Reset thread's send buffer - th->ttBuffer.buffer = {}; - th->ttBuffer.counter = 0; + else + for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i) + { + auto&& e = TTRecvBuff[i]; + bool found; + TTEntry* replace_tte; + replace_tte = TT.probe(e.first, found); + replace_tte->save(e.first, e.second.value(), e.second.bound(), e.second.depth(), + e.second.move(), e.second.eval()); + } } // Start next communication - MPI_Iallgather(send_buff.data(), send_buff.size() * sizeof(KeyedTTEntry), MPI_BYTE, - TTBuff.data(), TTSendBufferSize * sizeof(KeyedTTEntry), MPI_BYTE, - TTComm, &req); + MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, + TTRecvBuff.data(), recvBuffPerRankSize * sizeof(KeyedTTEntry), MPI_BYTE, + TTComm, &reqGather); + ++gathersPosted; + + // Force check of time on the next occasion. + static_cast(thread)->callsCnt = 0; + } } } diff --git a/src/cluster.h b/src/cluster.h index 4b80107d..b4bc7649 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -42,8 +42,8 @@ struct MoveInfo { #ifdef USE_MPI using KeyedTTEntry = std::pair; -constexpr std::size_t TTSendBufferSize = 32; -template class TTSendBuffer : public std::array { +constexpr std::size_t TTCacheSize = 32; +template class TTCache : public std::array { struct Compare { inline bool operator()(const KeyedTTEntry& lhs, const KeyedTTEntry& rhs) { @@ -74,6 +74,7 @@ int rank(); inline bool is_root() { return rank() == 0; } void save(Thread* thread, TTEntry* tte, Key k, Value v, Bound b, Depth d, Move m, Value ev); void pick_moves(MoveInfo& mi); +void ttRecvBuff_resize(size_t nThreads); uint64_t nodes_searched(); uint64_t tb_hits(); void signals_init(); @@ -90,6 +91,7 @@ constexpr int rank() { return 0; } constexpr bool is_root() { return true; } inline void save(Thread*, TTEntry* tte, Key k, Value v, Bound b, Depth d, Move m, Value ev) { tte->save(k, v, b, d, m, ev); } inline void pick_moves(MoveInfo&) { } +inline void ttRecvBuff_resize(size_t) { } uint64_t nodes_searched(); uint64_t tb_hits(); inline void signals_init() { } diff --git a/src/thread.cpp b/src/thread.cpp index 18d7692b..932376a3 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -139,6 +139,9 @@ void ThreadPool::set(size_t requested) { // Reallocate the hash with the new threadpool size TT.resize(Options["Hash"]); + + // Adjust cluster buffers + Cluster::ttRecvBuff_resize(requested); } } diff --git a/src/thread.h b/src/thread.h index 4f34de51..f7e88f05 100644 --- a/src/thread.h +++ b/src/thread.h @@ -78,9 +78,8 @@ public: #ifdef USE_MPI struct { Mutex mutex; - Cluster::TTSendBuffer buffer = {}; - size_t counter = 0; - } ttBuffer; + Cluster::TTCache buffer = {}; + } ttCache; #endif };