diff --git a/src/engine.cpp b/src/engine.cpp index 3fc27223..6980dd83 100644 --- a/src/engine.cpp +++ b/src/engine.cpp @@ -133,6 +133,11 @@ void Engine::set_numa_config_from_option(const std::string& o) { { numaContext.set_numa_config(NumaConfig::from_system()); } + else if (o == "hardware") + { + // Don't respect affinity set in the system. + numaContext.set_numa_config(NumaConfig::from_system(false)); + } else if (o == "none") { numaContext.set_numa_config(NumaConfig{}); diff --git a/src/numa.h b/src/numa.h index a56d7142..c170c178 100644 --- a/src/numa.h +++ b/src/numa.h @@ -19,6 +19,7 @@ #ifndef NUMA_H_INCLUDED #define NUMA_H_INCLUDED +#include #include #include #include @@ -63,21 +64,9 @@ static constexpr size_t WIN_PROCESSOR_GROUP_SIZE = 64; // https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadselectedcpusetmasks using SetThreadSelectedCpuSetMasks_t = BOOL (*)(HANDLE, PGROUP_AFFINITY, USHORT); -// https://learn.microsoft.com/en-us/windows/win32/api/processtopologyapi/nf-processtopologyapi-setthreadgroupaffinity -using SetThreadGroupAffinity_t = BOOL (*)(HANDLE, const GROUP_AFFINITY*, PGROUP_AFFINITY); - // https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-getthreadselectedcpusetmasks using GetThreadSelectedCpuSetMasks_t = BOOL (*)(HANDLE, PGROUP_AFFINITY, USHORT, PUSHORT); -// https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getprocessaffinitymask -using GetProcessAffinityMask_t = BOOL (*)(HANDLE, PDWORD_PTR, PDWORD_PTR); - -// https://learn.microsoft.com/en-us/windows/win32/api/processtopologyapi/nf-processtopologyapi-getprocessgroupaffinity -using GetProcessGroupAffinity_t = BOOL (*)(HANDLE, PUSHORT, PUSHORT); - -// https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getactiveprocessorcount -using GetActiveProcessorCount_t = DWORD (*)(WORD); - #endif #include "misc.h" @@ -94,14 +83,7 @@ inline CpuIndex get_hardware_concurrency() { // only returns the number of processors in the first group, because only these // are available to std::thread. #ifdef _WIN64 - HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); - auto GetActiveProcessorCount_f = - GetActiveProcessorCount_t((void (*)()) GetProcAddress(k32, "GetActiveProcessorCount")); - - if (GetActiveProcessorCount_f != nullptr) - { - concurrency = GetActiveProcessorCount_f(ALL_PROCESSOR_GROUPS); - } + concurrency = std::max(concurrency, GetActiveProcessorCount(ALL_PROCESSOR_GROUPS)); #endif return concurrency; @@ -109,6 +91,214 @@ inline CpuIndex get_hardware_concurrency() { inline const CpuIndex SYSTEM_THREADS_NB = std::max(1, get_hardware_concurrency()); +#if defined(_WIN64) + +struct WindowsAffinity { + std::optional> oldApi; + std::optional> newApi; + bool isDeterminate = true; + + std::optional> get_combined() const { + // When the affinity is not determinate we treat it as no affinity, + // because otherwise we would have to set affinity to fewer + // processors than we currently have affinity to. + if (!isDeterminate) + return std::nullopt; + + if (!oldApi.has_value()) + return newApi; + if (!newApi.has_value()) + return oldApi; + + std::set intersect; + std::set_intersection(oldApi->begin(), oldApi->end(), newApi->begin(), newApi->end(), + std::inserter(intersect, intersect.begin())); + return intersect; + } +}; + +inline std::pair> get_process_group_affinity() { + WORD numProcGroups = GetActiveProcessorGroupCount(); + + // GetProcessGroupAffinity requires the GroupArray argument to be + // aligned to 4 bytes instead of just 2. + static constexpr size_t GroupArrayMinimumAlignment = 4; + static_assert(GroupArrayMinimumAlignment >= alignof(USHORT)); + + auto GroupArray = std::make_unique( + numProcGroups + (GroupArrayMinimumAlignment / alignof(USHORT) - 1)); + + USHORT GroupCount = static_cast(numProcGroups); + const BOOL status = GetProcessGroupAffinity(GetCurrentProcess(), &GroupCount, GroupArray.get()); + + return std::make_pair(status, std::vector(GroupArray.get(), GroupArray.get() + GroupCount)); +} + +// Since Windows 11 and Windows Server 2022 thread affinities can span +// processor groups and can be set as such by a new WinAPI function. +// However, we may need to force using the old API if we detect +// that the process has affinity set by the old API already and we want to override that. +inline bool use_old_affinity_api() { + HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); + auto SetThreadSelectedCpuSetMasks_f = SetThreadSelectedCpuSetMasks_t( + (void (*)()) GetProcAddress(k32, "SetThreadSelectedCpuSetMasks")); + + if (SetThreadSelectedCpuSetMasks_f == nullptr) + return true; + + auto [status, groupAffinity] = get_process_group_affinity(); + + // If GroupCount > 1 then we know old API was never used and we can stick + // to the new API safely. + if (status != 0 && groupAffinity.size() > 1) + return false; + + return true; +}; + +// On Windows there are two ways to set affinity, and therefore 2 ways to get it. +// These are not consistent, so we have to check both. +// In some cases it is actually not possible to determine affinity. +// For example when two different threads have affinity on different processor groups, +// set using SetThreadAffinityMask, we can't retrieve the actual affinities. +// From documentation on GetProcessAffinityMask: +// > If the calling process contains threads in multiple groups, +// > the function returns zero for both affinity masks. +// In such cases we just give up and assume we have affinity for all processors. +// nullopt means no affinity is set, that is, all processors are allowed +inline WindowsAffinity get_process_affinity() { + HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); + auto GetThreadSelectedCpuSetMasks_f = GetThreadSelectedCpuSetMasks_t( + (void (*)()) GetProcAddress(k32, "GetThreadSelectedCpuSetMasks")); + + WindowsAffinity affinity; + + if (GetThreadSelectedCpuSetMasks_f != nullptr) + { + USHORT RequiredMaskCount; + BOOL status = + GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), nullptr, 0, &RequiredMaskCount); + + // If RequiredMaskCount then these affinities were never set, but it's not consistent + // so GetProcessAffinityMask may still return some affinity. + if (status == 0) + { + affinity.isDeterminate = false; + return affinity; + } + + if (RequiredMaskCount > 0) + { + std::set cpus; + + auto groupAffinities = std::make_unique(RequiredMaskCount); + + GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), groupAffinities.get(), + RequiredMaskCount, &RequiredMaskCount); + + for (USHORT i = 0; i < RequiredMaskCount; ++i) + { + const size_t procGroupIndex = groupAffinities[i].Group; + + for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j) + { + if (groupAffinities[i].Mask & (KAFFINITY(1) << j)) + cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j); + } + } + + affinity.newApi = std::move(cpus); + } + } + + DWORD_PTR proc, sys; + BOOL status = GetProcessAffinityMask(GetCurrentProcess(), &proc, &sys); + + // If proc == 0 then we can't determine affinity because it spans processor groups. + if (status == 0 || proc == 0) + { + affinity.isDeterminate = false; + return affinity; + } + + // If SetProcessAffinityMask was never called the affinity + // must span all processor groups, but if it was called it must only span one. + auto [status2, groupAffinity] = get_process_group_affinity(); + if (status2 == 0) + { + affinity.isDeterminate = false; + return affinity; + } + + // If we have affinity for more than 1 group then at this point we + // can assume SetProcessAffinityMask has never been called and therefore + // according ot old API we do not have any affinity set. + // Otherwise we have to assume we have affinity set and gather the processor IDs. + if (groupAffinity.size() == 1) + { + std::set cpus; + + const size_t procGroupIndex = groupAffinity[0]; + + uint64_t mask = static_cast(proc); + for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j) + { + if (mask & (KAFFINITY(1) << j)) + cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j); + } + + affinity.oldApi = std::move(cpus); + } + + return affinity; +} + +#endif + +#if defined(__linux__) && !defined(__ANDROID__) + +inline std::set get_process_affinity() { + + std::set cpus; + + // For unsupported systems, or in case of a soft error, we may assume all processors + // are available for use. + [[maybe_unused]] auto set_to_all_cpus = [&]() { + for (CpuIndex c = 0; c < SYSTEM_THREADS_NB; ++c) + cpus.insert(c); + }; + + // cpu_set_t by default holds 1024 entries. This may not be enough soon, + // but there is no easy way to determine how many threads there actually is. + // In this case we just choose a reasonable upper bound. + static constexpr CpuIndex MaxNumCpus = 1024 * 64; + + cpu_set_t* mask = CPU_ALLOC(MaxNumCpus); + if (mask == nullptr) + std::exit(EXIT_FAILURE); + + const size_t masksize = CPU_ALLOC_SIZE(MaxNumCpus); + + CPU_ZERO_S(masksize, mask); + + const int status = sched_getaffinity(0, masksize, mask); + + if (status != 0) + { + CPU_FREE(mask); + std::exit(EXIT_FAILURE); + } + + for (CpuIndex c = 0; c < MaxNumCpus; ++c) + if (CPU_ISSET_S(c, masksize, mask)) + cpus.insert(c); + + CPU_FREE(mask); + + return cpus; +} + +#endif // We want to abstract the purpose of storing the numa node index somewhat. // Whoever is using this does not need to know the specifics of the replication @@ -224,7 +414,7 @@ class NumaConfig { std::optional> allowedCpus; if (respectProcessAffinity) - allowedCpus = get_process_affinity(); + allowedCpus = get_process_affinity().get_combined(); // The affinity can't be determined in all cases on Windows, but we at least guarantee // that the number of allowed processors is >= number of processors in the affinity mask. @@ -233,15 +423,6 @@ class NumaConfig { return !allowedCpus.has_value() || allowedCpus->count(c) == 1; }; - // Since Windows 11 and Windows Server 2022 thread affinities can span - // processor groups and can be set as such by a new WinAPI function. - static const bool CanAffinitySpanProcessorGroups = []() { - HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); - auto SetThreadSelectedCpuSetMasks_f = SetThreadSelectedCpuSetMasks_t( - (void (*)()) GetProcAddress(k32, "SetThreadSelectedCpuSetMasks")); - return SetThreadSelectedCpuSetMasks_f != nullptr; - }(); - WORD numProcGroups = GetActiveProcessorGroupCount(); for (WORD procGroup = 0; procGroup < numProcGroups; ++procGroup) { @@ -269,7 +450,8 @@ class NumaConfig { // the new NUMA allocation behaviour was introduced while there was // still no way to set thread affinity spanning multiple processor groups. // See https://learn.microsoft.com/en-us/windows/win32/procthread/numa-support - if (!CanAffinitySpanProcessorGroups) + // We also do this is if need to force old API for some reason. + if (use_old_affinity_api()) { NumaConfig splitCfg = empty(); @@ -307,6 +489,12 @@ class NumaConfig { // We have to ensure no empty NUMA nodes persist. cfg.remove_empty_numa_nodes(); + // If the user explicitly opts out from respecting the current process affinity + // then it may be inconsistent with the current affinity (obviously), so we + // consider it custom. + if (!respectProcessAffinity) + cfg.customAffinity = true; + return cfg; } @@ -510,9 +698,11 @@ class NumaConfig { HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); auto SetThreadSelectedCpuSetMasks_f = SetThreadSelectedCpuSetMasks_t( (void (*)()) GetProcAddress(k32, "SetThreadSelectedCpuSetMasks")); - auto SetThreadGroupAffinity_f = - SetThreadGroupAffinity_t((void (*)()) GetProcAddress(k32, "SetThreadGroupAffinity")); + // We ALWAYS set affinity with the new API if available, + // because there's no downsides, and we forcibly keep it consistent + // with the old API should we need to use it. I.e. we always keep this as a superset + // of what we set with SetThreadGroupAffinity. if (SetThreadSelectedCpuSetMasks_f != nullptr) { // Only available on Windows 11 and Windows Server 2022 onwards. @@ -541,7 +731,9 @@ class NumaConfig { // This is defensive, allowed because this code is not performance critical. SwitchToThread(); } - else if (SetThreadGroupAffinity_f != nullptr) + + // Sometimes we need to force the old API, but do not use it unless necessary. + if (SetThreadSelectedCpuSetMasks_f == nullptr || use_old_affinity_api()) { // On earlier windows version (since windows 7) we can't run a single thread // on multiple processor groups, so we need to restrict the group. @@ -576,7 +768,7 @@ class NumaConfig { HANDLE hThread = GetCurrentThread(); - const BOOL status = SetThreadGroupAffinity_f(hThread, &affinity, nullptr); + const BOOL status = SetThreadGroupAffinity(hThread, &affinity, nullptr); if (status == 0) std::exit(EXIT_FAILURE); @@ -665,138 +857,6 @@ class NumaConfig { return true; } -#if defined(__linux__) && !defined(__ANDROID__) - - static std::set get_process_affinity() { - - std::set cpus; - - // For unsupported systems, or in case of a soft error, we may assume all processors - // are available for use. - [[maybe_unused]] auto set_to_all_cpus = [&]() { - for (CpuIndex c = 0; c < SYSTEM_THREADS_NB; ++c) - cpus.insert(c); - }; - - // cpu_set_t by default holds 1024 entries. This may not be enough soon, - // but there is no easy way to determine how many threads there actually is. - // In this case we just choose a reasonable upper bound. - static constexpr CpuIndex MaxNumCpus = 1024 * 64; - - cpu_set_t* mask = CPU_ALLOC(MaxNumCpus); - if (mask == nullptr) - std::exit(EXIT_FAILURE); - - const size_t masksize = CPU_ALLOC_SIZE(MaxNumCpus); - - CPU_ZERO_S(masksize, mask); - - const int status = sched_getaffinity(0, masksize, mask); - - if (status != 0) - { - CPU_FREE(mask); - std::exit(EXIT_FAILURE); - } - - for (CpuIndex c = 0; c < MaxNumCpus; ++c) - if (CPU_ISSET_S(c, masksize, mask)) - cpus.insert(c); - - CPU_FREE(mask); - - return cpus; - } - -#elif defined(_WIN64) - - // On Windows there are two ways to set affinity, and therefore 2 ways to get it. - // These are not consistent, so we have to check both. - // In some cases it is actually not possible to determine affinity. - // For example when two different threads have affinity on different processor groups, - // set using SetThreadAffinityMask, we can't retrieve the actual affinities. - // From documentation on GetProcessAffinityMask: - // > If the calling process contains threads in multiple groups, - // > the function returns zero for both affinity masks. - // In such cases we just give up and assume we have affinity for all processors. - // nullopt means no affinity is set, that is, all processors are allowed - static std::optional> get_process_affinity() { - HMODULE k32 = GetModuleHandle(TEXT("Kernel32.dll")); - auto GetThreadSelectedCpuSetMasks_f = GetThreadSelectedCpuSetMasks_t( - (void (*)()) GetProcAddress(k32, "GetThreadSelectedCpuSetMasks")); - auto GetProcessAffinityMask_f = - GetProcessAffinityMask_t((void (*)()) GetProcAddress(k32, "GetProcessAffinityMask")); - auto GetProcessGroupAffinity_f = - GetProcessGroupAffinity_t((void (*)()) GetProcAddress(k32, "GetProcessGroupAffinity")); - - if (GetThreadSelectedCpuSetMasks_f != nullptr) - { - std::set cpus; - - USHORT RequiredMaskCount; - GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), nullptr, 0, &RequiredMaskCount); - - // If RequiredMaskCount then these affinities were never set, but it's not consistent - // so GetProcessAffinityMask may still return some affinity. - if (RequiredMaskCount > 0) - { - auto groupAffinities = std::make_unique(RequiredMaskCount); - - GetThreadSelectedCpuSetMasks_f(GetCurrentThread(), groupAffinities.get(), - RequiredMaskCount, &RequiredMaskCount); - - for (USHORT i = 0; i < RequiredMaskCount; ++i) - { - const size_t procGroupIndex = groupAffinities[i].Group; - - for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j) - { - if (groupAffinities[i].Mask & (KAFFINITY(1) << j)) - cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j); - } - } - - return cpus; - } - } - - if (GetProcessAffinityMask_f != nullptr && GetProcessGroupAffinity_f != nullptr) - { - std::set cpus; - - DWORD_PTR proc, sys; - BOOL status = GetProcessAffinityMask_f(GetCurrentProcess(), &proc, &sys); - if (status == 0) - return std::nullopt; - - // We can't determine affinity because it spans processor groups. - if (proc == 0) - return std::nullopt; - - // We are expecting a single group. - USHORT GroupCount = 1; - alignas(4) USHORT GroupArray[1]; - status = GetProcessGroupAffinity_f(GetCurrentProcess(), &GroupCount, GroupArray); - if (status == 0 || GroupCount != 1) - return std::nullopt; - - const size_t procGroupIndex = GroupArray[0]; - - uint64_t mask = static_cast(proc); - for (size_t j = 0; j < WIN_PROCESSOR_GROUP_SIZE; ++j) - { - if (mask & (KAFFINITY(1) << j)) - cpus.insert(procGroupIndex * WIN_PROCESSOR_GROUP_SIZE + j); - } - - return cpus; - } - - return std::nullopt; - } - -#endif - static std::vector indices_from_shortened_string(const std::string& s) { std::vector indices;