diff --git a/Readme.md b/Readme.md index bc058dbc..2b7061c9 100644 --- a/Readme.md +++ b/Readme.md @@ -129,6 +129,30 @@ more compact than Nalimov tablebases, while still storing all information needed for optimal play and in addition being able to take into account the 50-move rule. +## Stockfish on distributed memory systems + +The cluster branch allows for running Stockfish on a cluster of servers (nodes) +that are connected with a high-speed and low-latency network, using the message +passing interface (MPI). In this case, one MPI process should be run per node, +and UCI options can be used to set the number of threads/hash per node as usual. +Typically, the engine will be invoked as +``` +mpirun -np N /path/to/stockfish +``` +where ```N``` stands for the number of MPI processes used. To build the cluster +branch, it is sufficient to specify ```COMPILER=mpicxx``` on the make command line, +and do a clean build: +``` +make -j ARCH=x86-64-modern clean build COMPILER=mpicxx +``` +If the name of the compiler wrapper (typically mpicxx, but sometimes e.g. CC) does +not match ```mpi``` an edit to the Makefile is required. Make sure that the MPI +installation is configured to support ```MPI_THREAD_MULTIPLE```, this might require +adding system specific compiler options to the Makefile. Stockfish employs +non-blocking (asynchronous) communication, and benefits from an MPI +implementation that efficiently supports this. Some MPI implentations might benefit +from leaving 1 core/thread free for these asynchronous communications, and might require +setting additional environment variables. Refer to your MPI documentation for more info. ## Compiling Stockfish yourself from the sources diff --git a/src/cluster.cpp b/src/cluster.cpp index 3e431661..efa1ab57 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -2,7 +2,7 @@ Stockfish, a UCI chess playing engine derived from Glaurung 2.1 Copyright (C) 2004-2008 Tord Romstad (Glaurung author) Copyright (C) 2008-2015 Marco Costalba, Joona Kiiski, Tord Romstad - Copyright (C) 2015-2018 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad + Copyright (C) 2015-2019 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad Stockfish is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,37 +32,46 @@ #include "cluster.h" #include "thread.h" #include "tt.h" +#include "timeman.h" namespace Cluster { +// Total number of ranks and rank within the communicator static int world_rank = MPI_PROC_NULL; static int world_size = 0; +// Signals between ranks exchange basic info using a dedicated communicator +static MPI_Comm signalsComm = MPI_COMM_NULL; static MPI_Request reqSignals = MPI_REQUEST_NULL; static uint64_t signalsCallCounter = 0; -enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_NB = 3}; +// Signals are the number of nodes searched, stop, table base hits, transposition table saves +enum Signals : int { SIG_NODES = 0, SIG_STOP = 1, SIG_TB = 2, SIG_TTS = 3, SIG_NB = 4}; static uint64_t signalsSend[SIG_NB] = {}; static uint64_t signalsRecv[SIG_NB] = {}; - static uint64_t nodesSearchedOthers = 0; static uint64_t tbHitsOthers = 0; +static uint64_t TTsavesOthers = 0; static uint64_t stopSignalsPosted = 0; +// The UCI threads of each rank exchange use a dedicated communicator static MPI_Comm InputComm = MPI_COMM_NULL; -static MPI_Comm TTComm = MPI_COMM_NULL; + +// bestMove requires MoveInfo communicators and data types static MPI_Comm MoveComm = MPI_COMM_NULL; -static MPI_Comm signalsComm = MPI_COMM_NULL; - -static std::vector TTRecvBuff; -static MPI_Request reqGather = MPI_REQUEST_NULL; -static uint64_t gathersPosted = 0; - -static std::atomic TTCacheCounter = {}; - static MPI_Datatype MIDatatype = MPI_DATATYPE_NULL; +// TT entries are communicated with a dedicated communicator. +// The receive buffer is used to gather information from all ranks. +// THe TTCacheCounter tracks the number of local elements that are ready to be sent. +static MPI_Comm TTComm = MPI_COMM_NULL; +static MPI_Request reqGather = MPI_REQUEST_NULL; +static uint64_t gathersPosted = 0; +static std::vector TTRecvBuff; +static std::atomic TTCacheCounter = {}; +/// Initialize MPI and associated data types. Note that the MPI library must be configured +/// to support MPI_THREAD_MULTIPLE, since multiple threads access MPI simultaneously. void init() { int thread_support; @@ -91,10 +100,9 @@ void init() { MPI_Comm_dup(MPI_COMM_WORLD, &signalsComm); } +/// Finalize MPI and free the associated data types. void finalize() { - - // free data tyes and communicators MPI_Type_free(&MIDatatype); MPI_Comm_free(&InputComm); @@ -105,16 +113,19 @@ void finalize() { MPI_Finalize(); } +/// Return the total number of ranks int size() { return world_size; } +/// Return the rank (index) of the process int rank() { return world_rank; } +/// The receive buffer depends on the number of MPI ranks and threads, resize as needed void ttRecvBuff_resize(size_t nThreads) { TTRecvBuff.resize(TTCacheSize * world_size * nThreads); @@ -122,7 +133,10 @@ void ttRecvBuff_resize(size_t nThreads) { } - +/// As input is only received by the root (rank 0) of the cluster, this input must be relayed +/// to the UCI threads of all ranks, in order to setup the position, etc. We do this with a +/// dedicated getline implementation, where the root broadcasts to all other ranks the received +/// information. bool getline(std::istream& input, std::string& str) { int size; @@ -136,7 +150,8 @@ bool getline(std::istream& input, std::string& str) { size = vec.size(); } - // Some MPI implementations use busy-wait polling, while we need yielding + // Some MPI implementations use busy-wait polling, while we need yielding as otherwise + // the UCI thread on the non-root ranks would be consuming resources. static MPI_Request reqInput = MPI_REQUEST_NULL; MPI_Ibcast(&size, 1, MPI_INT, 0, InputComm, &reqInput); if (is_root()) @@ -154,6 +169,7 @@ bool getline(std::istream& input, std::string& str) { } } + // Broadcast received string if (!is_root()) vec.resize(size); MPI_Bcast(vec.data(), size, MPI_CHAR, 0, InputComm); @@ -164,6 +180,7 @@ bool getline(std::istream& input, std::string& str) { return state; } +/// Sending part of the signal communication loop void signals_send() { signalsSend[SIG_NODES] = Threads.nodes_searched(); @@ -174,23 +191,33 @@ void signals_send() { ++signalsCallCounter; } +/// Processing part of the signal communication loop. +/// For some counters (e.g. nodes) we only keep their sum on the other nodes +/// allowing to add local counters at any time for more fine grained process, +/// which is useful to indicate progress during early iterations, and to have +/// node counts that exactly match the non-MPI code in the single rank case. +/// This call also propagates the stop signal between ranks. void signals_process() { nodesSearchedOthers = signalsRecv[SIG_NODES] - signalsSend[SIG_NODES]; tbHitsOthers = signalsRecv[SIG_TB] - signalsSend[SIG_TB]; + TTsavesOthers = signalsRecv[SIG_TTS] - signalsSend[SIG_TTS]; stopSignalsPosted = signalsRecv[SIG_STOP]; if (signalsRecv[SIG_STOP] > 0) Threads.stop = true; } +/// During search, most message passing is asynchronous, but at the end of +/// search it makes sense to bring them to a common, finalized state. void signals_sync() { while(stopSignalsPosted < uint64_t(size())) signals_poll(); - // finalize outstanding messages of the signal loops. We might have issued one call less than needed on some ranks. + // Finalize outstanding messages of the signal loops. + // We might have issued one call less than needed on some ranks. uint64_t globalCounter; - MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm); // MoveComm needed + MPI_Allreduce(&signalsCallCounter, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm); if (signalsCallCounter < globalCounter) { MPI_Wait(&reqSignals, MPI_STATUS_IGNORE); @@ -201,7 +228,7 @@ void signals_sync() { signals_process(); - // finalize outstanding messages in the gather loop + // Finalize outstanding messages in the gather loop MPI_Allreduce(&gathersPosted, &globalCounter, 1, MPI_UINT64_T, MPI_MAX, MoveComm); if (gathersPosted < globalCounter) { @@ -217,16 +244,19 @@ void signals_sync() { } +/// Initialize signal counters to zero. void signals_init() { - stopSignalsPosted = tbHitsOthers = nodesSearchedOthers = 0; + stopSignalsPosted = tbHitsOthers = TTsavesOthers = nodesSearchedOthers = 0; signalsSend[SIG_NODES] = signalsRecv[SIG_NODES] = 0; signalsSend[SIG_TB] = signalsRecv[SIG_TB] = 0; + signalsSend[SIG_TTS] = signalsRecv[SIG_TTS] = 0; signalsSend[SIG_STOP] = signalsRecv[SIG_STOP] = 0; } +/// Poll the signal loop, and start next round as needed. void signals_poll() { int flag; @@ -238,14 +268,39 @@ void signals_poll() { } } +/// Provide basic info related the cluster performance, in particular, the number of signals send, +/// signals per sounds (sps), the number of gathers, the number of positions gathered (per node and per second, gpps) +/// The number of TT saves and TT saves per second. If gpps equals approximately TTSavesps the gather loop has enough bandwidth. +void cluster_info(Depth depth) { + + TimePoint elapsed = Time.elapsed() + 1; + uint64_t TTSaves = TT_saves(); + + sync_cout << "info depth " << depth / ONE_PLY << " cluster " + << " signals " << signalsCallCounter << " sps " << signalsCallCounter * 1000 / elapsed + << " gathers " << gathersPosted << " gpps " << TTRecvBuff.size() * gathersPosted * 1000 / elapsed + << " TTSaves " << TTSaves << " TTSavesps " << TTSaves * 1000 / elapsed + << sync_endl; +} + +/// When a TT entry is saved, additional steps are taken if the entry is of sufficient depth. +/// If sufficient entries has been collected, a communication is initiated. +/// If a communication has been completed, the received results are saved to the TT. void save(Thread* thread, TTEntry* tte, Key k, Value v, bool PvHit, Bound b, Depth d, Move m, Value ev) { + // Standard save to the TT tte->save(k, v, PvHit, b, d, m, ev); + // If the entry is of sufficient depth to be worth communicating, take action. if (d > 3 * ONE_PLY) { - // Try to add to thread's send buffer + // count the TTsaves to information: this should be relatively similar + // to the number of entries we can send/recv. + thread->TTsaves.fetch_add(1, std::memory_order_relaxed); + + // Add to thread's send buffer, the locking here avoids races when the master thread + // prepares the send buffer. { std::lock_guard lk(thread->ttCache.mutex); thread->ttCache.buffer.replace(KeyedTTEntry(k,*tte)); @@ -254,7 +309,8 @@ void save(Thread* thread, TTEntry* tte, size_t recvBuffPerRankSize = Threads.size() * TTCacheSize; - // Communicate on main search thread + // Communicate on main search thread, as soon the threads combined have collected + // sufficient data to fill the send buffers. if (thread == Threads.main() && TTCacheCounter > size() * recvBuffPerRankSize) { // Test communication status @@ -267,7 +323,7 @@ void save(Thread* thread, TTEntry* tte, // Save all received entries to TT, and store our TTCaches, ready for the next round of communication for (size_t irank = 0; irank < size_t(size()) ; ++irank) { - if (irank == size_t(rank())) + if (irank == size_t(rank())) // this is our part, fill the part of the buffer for sending { // Copy from the thread caches to the right spot in the buffer size_t i = irank * recvBuffPerRankSize; @@ -284,7 +340,7 @@ void save(Thread* thread, TTEntry* tte, TTCacheCounter = 0; } - else + else // process data received from the corresponding rank. for (size_t i = irank * recvBuffPerRankSize; i < (irank + 1) * recvBuffPerRankSize; ++i) { auto&& e = TTRecvBuff[i]; @@ -302,7 +358,7 @@ void save(Thread* thread, TTEntry* tte, TTComm, &reqGather); ++gathersPosted; - // Force check of time on the next occasion. + // Force check of time on the next occasion, the above actions might have taken some time. static_cast(thread)->callsCnt = 0; } @@ -310,8 +366,9 @@ void save(Thread* thread, TTEntry* tte, } } - -// TODO update to the scheme in master.. can this use aggregation of votes? +/// Picks the bestMove across ranks, and send the associated info and PV to the root of the cluster. +/// Note that this bestMove and PV must be output by the root, the guarantee proper ordering of output. +/// TODO update to the scheme in master.. can this use aggregation of votes? void pick_moves(MoveInfo& mi, std::string& PVLine) { MoveInfo* pMoveInfo = NULL; @@ -369,16 +426,25 @@ void pick_moves(MoveInfo& mi, std::string& PVLine) { } +/// Return nodes searched (lazily updated cluster wide in the signal loop) uint64_t nodes_searched() { return nodesSearchedOthers + Threads.nodes_searched(); } +/// Return table base hits (lazily updated cluster wide in the signal loop) uint64_t tb_hits() { return tbHitsOthers + Threads.tb_hits(); } +/// Return the number of saves to the TT buffers, (lazily updated cluster wide in the signal loop) +uint64_t TT_saves() { + + return TTsavesOthers + Threads.TT_saves(); +} + + } #else @@ -398,6 +464,11 @@ uint64_t tb_hits() { return Threads.tb_hits(); } +uint64_t TT_saves() { + + return Threads.TT_saves(); +} + } #endif // USE_MPI diff --git a/src/cluster.h b/src/cluster.h index 1cd5c66a..38ad1253 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -2,7 +2,7 @@ Stockfish, a UCI chess playing engine derived from Glaurung 2.1 Copyright (C) 2004-2008 Tord Romstad (Glaurung author) Copyright (C) 2008-2015 Marco Costalba, Joona Kiiski, Tord Romstad - Copyright (C) 2015-2017 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad + Copyright (C) 2015-2019 Marco Costalba, Joona Kiiski, Gary Linscott, Tord Romstad Stockfish is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -30,8 +30,19 @@ class Thread; +/// The Cluster namespace contains functionality required to run on distributed +/// memory architectures using MPI as the message passing interface. On a high level, +/// a 'lazy SMP'-like scheme is implemented where TT saves of sufficient depth are +/// collected on each rank and distributed to, and used by, all other ranks, +/// which search essentially independently. The root (MPI rank 0) of the cluster +/// is responsible for all I/O and time management, communicating this info to +/// the other ranks as needed. UCI options such as Threads and Hash specify these +/// quantities per MPI rank. It is recommended to have one rank (MPI process) per node. +/// For the non-MPI case, wrappers that will be compiler-optimized away are provided. + namespace Cluster { +/// Basic info to find the cluster-wide bestMove struct MoveInfo { int move; int ponder; @@ -41,9 +52,12 @@ struct MoveInfo { }; #ifdef USE_MPI -using KeyedTTEntry = std::pair; +// store the TTEntry with its full key, so it can be saved on the receiver side +using KeyedTTEntry = std::pair; constexpr std::size_t TTCacheSize = 16; + +// Threads locally cache their high-depth TT entries till a batch can be send by MPI template class TTCache : public std::array { struct Compare { @@ -54,6 +68,8 @@ template class TTCache : public std::array { Compare compare; public: + + // Keep a heap of entries replacing low depth with high depth entries bool replace(const KeyedTTEntry& value) { if (compare(value, this->front())) @@ -78,6 +94,8 @@ void pick_moves(MoveInfo& mi, std::string& PVLine); void ttRecvBuff_resize(size_t nThreads); uint64_t nodes_searched(); uint64_t tb_hits(); +uint64_t TT_saves(); +void cluster_info(Depth depth); void signals_init(); void signals_poll(); void signals_sync(); @@ -95,6 +113,8 @@ inline void pick_moves(MoveInfo&, std::string&) { } inline void ttRecvBuff_resize(size_t) { } uint64_t nodes_searched(); uint64_t tb_hits(); +uint64_t TT_saves(); +inline void cluster_info(Depth) { } inline void signals_init() { } inline void signals_poll() { } inline void signals_sync() { } diff --git a/src/search.cpp b/src/search.cpp index d81d81d1..98ce30a7 100644 --- a/src/search.cpp +++ b/src/search.cpp @@ -466,7 +466,10 @@ void Thread::search() { && multiPV == 1 && (bestValue <= alpha || bestValue >= beta) && Time.elapsed() > 3000) + { sync_cout << UCI::pv(rootPos, rootDepth, alpha, beta) << sync_endl; + Cluster::cluster_info(rootDepth); + } // In case of failing low/high increase aspiration window and // re-search, otherwise exit the loop. @@ -501,7 +504,10 @@ void Thread::search() { if ( Cluster::is_root() && mainThread && (Threads.stop || pvIdx + 1 == multiPV || Time.elapsed() > 3000)) + { sync_cout << UCI::pv(rootPos, rootDepth, alpha, beta) << sync_endl; + Cluster::cluster_info(rootDepth); + } } if (!Threads.stop) diff --git a/src/thread.cpp b/src/thread.cpp index 932376a3..29267963 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -195,7 +195,7 @@ void ThreadPool::start_thinking(Position& pos, StateListPtr& states, for (Thread* th : *this) { - th->nodes = th->tbHits = th->nmpMinPly = 0; + th->nodes = th->tbHits = th->TTsaves = th->nmpMinPly = 0; th->rootDepth = th->completedDepth = DEPTH_ZERO; th->rootMoves = rootMoves; th->rootPos.set(pos.fen(), pos.is_chess960(), &setupStates->back(), th); diff --git a/src/thread.h b/src/thread.h index f7e88f05..6b078603 100644 --- a/src/thread.h +++ b/src/thread.h @@ -64,7 +64,7 @@ public: size_t pvIdx, pvLast; int selDepth, nmpMinPly; Color nmpColor; - std::atomic nodes, tbHits; + std::atomic nodes, tbHits, TTsaves; Position rootPos; Search::RootMoves rootMoves; @@ -112,6 +112,7 @@ struct ThreadPool : public std::vector { MainThread* main() const { return static_cast(front()); } uint64_t nodes_searched() const { return accumulate(&Thread::nodes); } uint64_t tb_hits() const { return accumulate(&Thread::tbHits); } + uint64_t TT_saves() const { return accumulate(&Thread::TTsaves); } std::atomic_bool stop, ponder, stopOnPonderhit;