LLVM 22.0.0git
Parallel.cpp
Go to the documentation of this file.
1//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
10#include "llvm/ADT/ScopeExit.h"
11#include "llvm/Config/llvm-config.h"
16
17#include <atomic>
18#include <future>
19#include <memory>
20#include <mutex>
21#include <thread>
22#include <vector>
23
25
26namespace llvm {
27namespace parallel {
28#if LLVM_ENABLE_THREADS
29
30#ifdef _WIN32
31static thread_local unsigned threadIndex = UINT_MAX;
32
33unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
34#else
35thread_local unsigned threadIndex = UINT_MAX;
36#endif
37
38namespace detail {
39
40namespace {
41
42/// An abstract class that takes closures and runs them asynchronously.
43class Executor {
44public:
45 virtual ~Executor() = default;
46 virtual void add(std::function<void()> func) = 0;
47 virtual size_t getThreadCount() const = 0;
48
49 static Executor *getDefaultExecutor();
50};
51
52/// An implementation of an Executor that runs closures on a thread pool
53/// in filo order.
54class ThreadPoolExecutor : public Executor {
55public:
56 explicit ThreadPoolExecutor(ThreadPoolStrategy S) {
57 if (S.UseJobserver)
58 TheJobserver = JobserverClient::getInstance();
59
61 // Spawn all but one of the threads in another thread as spawning threads
62 // can take a while.
63 Threads.reserve(ThreadCount);
64 Threads.resize(1);
65 std::lock_guard<std::mutex> Lock(Mutex);
66 // Use operator[] before creating the thread to avoid data race in .size()
67 // in 'safe libc++' mode.
68 auto &Thread0 = Threads[0];
69 Thread0 = std::thread([this, S] {
70 for (unsigned I = 1; I < ThreadCount; ++I) {
71 Threads.emplace_back([this, S, I] { work(S, I); });
72 if (Stop)
73 break;
74 }
75 ThreadsCreated.set_value();
76 work(S, 0);
77 });
78 }
79
80 // To make sure the thread pool executor can only be created with a parallel
81 // strategy.
82 ThreadPoolExecutor() = delete;
83
84 void stop() {
85 {
86 std::lock_guard<std::mutex> Lock(Mutex);
87 if (Stop)
88 return;
89 Stop = true;
90 }
91 Cond.notify_all();
92 ThreadsCreated.get_future().wait();
93 }
94
95 ~ThreadPoolExecutor() override {
96 stop();
97 std::thread::id CurrentThreadId = std::this_thread::get_id();
98 for (std::thread &T : Threads)
99 if (T.get_id() == CurrentThreadId)
100 T.detach();
101 else
102 T.join();
103 }
104
105 struct Creator {
106 static void *call() { return new ThreadPoolExecutor(strategy); }
107 };
108 struct Deleter {
109 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
110 };
111
112 void add(std::function<void()> F) override {
113 {
114 std::lock_guard<std::mutex> Lock(Mutex);
115 WorkStack.push_back(std::move(F));
116 }
117 Cond.notify_one();
118 }
119
120 size_t getThreadCount() const override { return ThreadCount; }
121
122private:
123 void work(ThreadPoolStrategy S, unsigned ThreadID) {
124 threadIndex = ThreadID;
125 S.apply_thread_strategy(ThreadID);
126 // Note on jobserver deadlock avoidance:
127 // GNU Make grants each invoked process one implicit job slot. Our
128 // JobserverClient models this by returning an implicit JobSlot on the
129 // first successful tryAcquire() in a process. This guarantees forward
130 // progress without requiring a dedicated "always-on" thread here.
131
132 static thread_local std::unique_ptr<ExponentialBackoff> Backoff;
133
134 while (true) {
135 if (TheJobserver) {
136 // Jobserver-mode scheduling:
137 // - Acquire one job slot (with exponential backoff to avoid busy-wait).
138 // - While holding the slot, drain and run tasks from the local queue.
139 // - Release the slot when the queue is empty or when shutting down.
140 // Rationale: Holding a slot amortizes acquire/release overhead over
141 // multiple tasks and avoids requeue/yield churn, while still enforcing
142 // the jobserver’s global concurrency limit. With K available slots,
143 // up to K workers run tasks in parallel; within each worker tasks run
144 // sequentially until the local queue is empty.
145 ExponentialBackoff Backoff(std::chrono::hours(24));
146 JobSlot Slot;
147 do {
148 if (Stop)
149 return;
150 Slot = TheJobserver->tryAcquire();
151 if (Slot.isValid())
152 break;
153 } while (Backoff.waitForNextAttempt());
154
155 auto SlotReleaser = llvm::make_scope_exit(
156 [&] { TheJobserver->release(std::move(Slot)); });
157
158 while (true) {
159 std::function<void()> Task;
160 {
161 std::unique_lock<std::mutex> Lock(Mutex);
162 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
163 if (Stop && WorkStack.empty())
164 return;
165 if (WorkStack.empty())
166 break;
167 Task = std::move(WorkStack.back());
168 WorkStack.pop_back();
169 }
170 Task();
171 }
172 } else {
173 std::unique_lock<std::mutex> Lock(Mutex);
174 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
175 if (Stop)
176 break;
177 auto Task = std::move(WorkStack.back());
178 WorkStack.pop_back();
179 Lock.unlock();
180 Task();
181 }
182 }
183 }
184
185 std::atomic<bool> Stop{false};
186 std::vector<std::function<void()>> WorkStack;
187 std::mutex Mutex;
188 std::condition_variable Cond;
189 std::promise<void> ThreadsCreated;
190 std::vector<std::thread> Threads;
191 unsigned ThreadCount;
192
193 JobserverClient *TheJobserver = nullptr;
194};
195
196// A global raw pointer to the executor. Lifetime is managed by the
197// objects created within createExecutor().
198static Executor *TheExec = nullptr;
199static std::once_flag Flag;
200
201// This function will be called exactly once to create the executor.
202// It contains the necessary platform-specific logic. Since functions
203// called by std::call_once cannot return value, we have to set the
204// executor as a global variable.
205void createExecutor() {
206#ifdef _WIN32
207 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
208 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
209 // stops the thread pool and waits for any worker thread creation to complete
210 // but does not wait for the threads to finish. The wait for worker thread
211 // creation to complete is important as it prevents intermittent crashes on
212 // Windows due to a race condition between thread creation and process exit.
213 //
214 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
215 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
216 // destructor ensures it has been stopped and waits for worker threads to
217 // finish. The wait is important as it prevents intermittent crashes on
218 // Windows when the process is doing a full exit.
219 //
220 // The Windows crashes appear to only occur with the MSVC static runtimes and
221 // are more frequent with the debug static runtime.
222 //
223 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
224
225 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
226 ThreadPoolExecutor::Deleter>
227 ManagedExec;
228 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
229 TheExec = Exec.get();
230#else
231 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
232 // by llvm_shutdown(), worker threads will clean up and invoke TLS
233 // destructors. This can lead to race conditions if other threads attempt to
234 // access TLS objects that have already been destroyed.
235 static ThreadPoolExecutor Exec(strategy);
236 TheExec = &Exec;
237#endif
238}
239
240Executor *Executor::getDefaultExecutor() {
241 // Use std::call_once to lazily and safely initialize the executor.
242 std::call_once(Flag, createExecutor);
243 return TheExec;
244}
245} // namespace
246} // namespace detail
247
248size_t getThreadCount() {
249 return detail::Executor::getDefaultExecutor()->getThreadCount();
250}
251#endif
252
253// Latch::sync() called by the dtor may cause one thread to block. If is a dead
254// lock if all threads in the default executor are blocked. To prevent the dead
255// lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
256// of nested parallel_for_each(), only the outermost one runs parallelly.
258#if LLVM_ENABLE_THREADS
259 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
260 (threadIndex == UINT_MAX)) {}
261#else
262 : Parallel(false) {}
263#endif
265 // We must ensure that all the workloads have finished before decrementing the
266 // instances count.
267 L.sync();
268}
269
270void TaskGroup::spawn(std::function<void()> F) {
271#if LLVM_ENABLE_THREADS
272 if (Parallel) {
273 L.inc();
274 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
275 F();
276 L.dec();
277 });
278 return;
279 }
280#endif
281 F();
282}
283
284} // namespace parallel
285} // namespace llvm
286
287void llvm::parallelFor(size_t Begin, size_t End,
288 llvm::function_ref<void(size_t)> Fn) {
289#if LLVM_ENABLE_THREADS
290 if (parallel::strategy.ThreadsRequested != 1) {
291 auto NumItems = End - Begin;
292 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
293 // overhead on large inputs.
294 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
295 if (TaskSize == 0)
296 TaskSize = 1;
297
299 for (; Begin + TaskSize < End; Begin += TaskSize) {
300 TG.spawn([=, &Fn] {
301 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
302 Fn(I);
303 });
304 }
305 if (Begin != End) {
306 TG.spawn([=, &Fn] {
307 for (size_t I = Begin; I != End; ++I)
308 Fn(I);
309 });
310 }
311 return;
312 }
313#endif
314
315 for (; Begin != End; ++Begin)
316 Fn(Begin);
317}
static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")
global merge func
#define F(x, y, z)
Definition MD5.cpp:55
#define I(x, y, z)
Definition MD5.cpp:58
#define T
const SmallVectorImpl< MachineOperand > & Cond
This file defines the make_scope_exit function, which executes user-defined cleanup logic at scope ex...
static cl::opt< int > ThreadCount("threads", cl::init(0))
A class to help implement exponential backoff.
LLVM_ABI bool waitForNextAttempt()
Blocks while waiting for the next attempt.
A JobSlot represents a single job slot that can be acquired from or released to a jobserver pool.
Definition Jobserver.h:77
The public interface for a jobserver client.
Definition Jobserver.h:135
static JobserverClient * getInstance()
Returns the singleton instance of the JobserverClient.
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
This tells how a thread pool will be used.
Definition Threading.h:115
LLVM_ABI void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
LLVM_ABI unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
Definition Threading.cpp:42
bool UseJobserver
If true, the thread pool will attempt to coordinate with a GNU Make jobserver, acquiring a job slot b...
Definition Threading.h:149
An efficient, type-erasing, non-owning reference to a callable.
LLVM_ABI void spawn(std::function< void()> f)
Definition Parallel.cpp:270
LLVM_ABI ThreadPoolStrategy strategy
Definition Parallel.cpp:24
unsigned getThreadIndex()
Definition Parallel.h:55
size_t getThreadCount()
Definition Parallel.h:56
SmartMutex< false > Mutex
Mutex - A standard, always enforced mutex.
Definition Mutex.h:66
This is an optimization pass for GlobalISel generic memory operations.
detail::scope_exit< std::decay_t< Callable > > make_scope_exit(Callable &&F)
Definition ScopeExit.h:59
LLVM_ABI void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition Parallel.cpp:287