LLVM 22.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
17#include "llvm/Config/llvm-config.h"
21#include "llvm/Support/thread.h"
22
23#include <future>
24
25#include <condition_variable>
26#include <deque>
27#include <functional>
28#include <memory>
29#include <mutex>
30#include <utility>
31
32namespace llvm {
33
34class ThreadPoolTaskGroup;
35
36/// This defines the abstract base interface for a ThreadPool allowing
37/// asynchronous parallel execution on a defined number of threads.
38///
39/// It is possible to reuse one thread pool for different groups of tasks
40/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
41/// the same queue, but it is possible to wait only for a specific group of
42/// tasks to finish.
43///
44/// It is also possible for worker threads to submit new tasks and wait for
45/// them. Note that this may result in a deadlock in cases such as when a task
46/// (directly or indirectly) tries to wait for its own completion, or when all
47/// available threads are used up by tasks waiting for a task that has no thread
48/// left to run on (this includes waiting on the returned future). It should be
49/// generally safe to wait() for a group as long as groups do not form a cycle.
51 /// The actual method to enqueue a task to be defined by the concrete
52 /// implementation.
53 virtual void asyncEnqueue(std::function<void()> Task,
54 ThreadPoolTaskGroup *Group) = 0;
55
56public:
57 /// Destroying the pool will drain the pending tasks and wait. The current
58 /// thread may participate in the execution of the pending tasks.
60
61 /// Blocking wait for all the threads to complete and the queue to be empty.
62 /// It is an error to try to add new tasks while blocking on this call.
63 /// Calling wait() from a task would deadlock waiting for itself.
64 virtual void wait() = 0;
65
66 /// Blocking wait for only all the threads in the given group to complete.
67 /// It is possible to wait even inside a task, but waiting (directly or
68 /// indirectly) on itself will deadlock. If called from a task running on a
69 /// worker thread, the call may process pending tasks while waiting in order
70 /// not to waste the thread.
71 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
72
73 /// Returns the maximum number of worker this pool can eventually grow to.
74 virtual unsigned getMaxConcurrency() const = 0;
75
76 /// Asynchronous submission of a task to the pool. The returned future can be
77 /// used to wait for the task to finish and is *non-blocking* on destruction.
78 template <typename Function, typename... Args>
79 auto async(Function &&F, Args &&...ArgList) {
80 auto Task =
81 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
82 return async(std::move(Task));
83 }
84
85 /// Overload, task will be in the given task group.
86 template <typename Function, typename... Args>
87 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
88 auto Task =
89 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
90 return async(Group, std::move(Task));
91 }
92
93 /// Asynchronous submission of a task to the pool. The returned future can be
94 /// used to wait for the task to finish and is *non-blocking* on destruction.
95 template <typename Func>
96 auto async(Func &&F) -> std::shared_future<decltype(F())> {
97 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
98 nullptr);
99 }
100
101 template <typename Func>
102 auto async(ThreadPoolTaskGroup &Group, Func &&F)
103 -> std::shared_future<decltype(F())> {
104 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
105 &Group);
106 }
107
108private:
109 /// Asynchronous submission of a task to the pool. The returned future can be
110 /// used to wait for the task to finish and is *non-blocking* on destruction.
111 template <typename ResTy>
112 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
113 ThreadPoolTaskGroup *Group) {
114 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
115 asyncEnqueue([Future]() { Future.wait(); }, Group);
116 return Future;
117 }
118};
119
120#if LLVM_ENABLE_THREADS
121/// A ThreadPool implementation using std::threads.
122///
123/// The pool keeps a vector of threads alive, waiting on a condition variable
124/// for some work to become available.
125class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
126public:
127 /// Construct a pool using the hardware strategy \p S for mapping hardware
128 /// execution resources (threads, cores, CPUs)
129 /// Defaults to using the maximum execution resources in the system, but
130 /// accounting for the affinity mask.
131 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
132
133 /// Blocking destructor: the pool will wait for all the threads to complete.
134 ~StdThreadPool() override;
135
136 /// Blocking wait for all the threads to complete and the queue to be empty.
137 /// It is an error to try to add new tasks while blocking on this call.
138 /// Calling wait() from a task would deadlock waiting for itself.
139 void wait() override;
140
141 /// Blocking wait for only all the threads in the given group to complete.
142 /// It is possible to wait even inside a task, but waiting (directly or
143 /// indirectly) on itself will deadlock. If called from a task running on a
144 /// worker thread, the call may process pending tasks while waiting in order
145 /// not to waste the thread.
146 void wait(ThreadPoolTaskGroup &Group) override;
147
148 /// Returns the maximum number of worker threads in the pool, not the current
149 /// number of threads!
150 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
151
152 /// Returns true if the current thread is a worker thread of this thread pool.
153 bool isWorkerThread() const;
154
155private:
156 /// Returns true if all tasks in the given group have finished (nullptr means
157 /// all tasks regardless of their group). QueueLock must be locked.
158 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
159
160 /// Asynchronous submission of a task to the pool. The returned future can be
161 /// used to wait for the task to finish and is *non-blocking* on destruction.
162 void asyncEnqueue(std::function<void()> Task,
163 ThreadPoolTaskGroup *Group) override {
164 int requestedThreads;
165 {
166 // Lock the queue and push the new task
167 std::unique_lock<std::mutex> LockGuard(QueueLock);
168
169 // Don't allow enqueueing after disabling the pool
170 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
171 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
172 requestedThreads = ActiveThreads + Tasks.size();
173 }
174 QueueCondition.notify_one();
175 grow(requestedThreads);
176 }
177
178 /// Grow to ensure that we have at least `requested` Threads, but do not go
179 /// over MaxThreadCount.
180 void grow(int requested);
181
182 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
183
184 /// Threads in flight
185 std::vector<llvm::thread> Threads;
186 /// Lock protecting access to the Threads vector.
187 mutable llvm::sys::RWMutex ThreadsLock;
188
189 /// Tasks waiting for execution in the pool.
190 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
191
192 /// Locking and signaling for accessing the Tasks queue.
193 std::mutex QueueLock;
194 std::condition_variable QueueCondition;
195
196 /// Signaling for job completion (all tasks or all tasks in a group).
197 std::condition_variable CompletionCondition;
198
199 /// Keep track of the number of thread actually busy
200 unsigned ActiveThreads = 0;
201 /// Number of threads active for tasks in the given group (only non-zero).
202 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
203
204 /// Signal for the destruction of the pool, asking thread to exit.
205 bool EnableFlag = true;
206
207 const ThreadPoolStrategy Strategy;
208
209 /// Maximum number of threads to potentially grow this pool to.
210 const unsigned MaxThreadCount;
211};
212#endif // LLVM_ENABLE_THREADS
213
214/// A non-threaded implementation.
216public:
217 /// Construct a non-threaded pool, ignoring using the hardware strategy.
219
220 /// Blocking destructor: the pool will first execute the pending tasks.
221 ~SingleThreadExecutor() override;
222
223 /// Blocking wait for all the tasks to execute first
224 void wait() override;
225
226 /// Blocking wait for only all the tasks in the given group to complete.
227 void wait(ThreadPoolTaskGroup &Group) override;
228
229 /// Returns always 1: there is no concurrency.
230 unsigned getMaxConcurrency() const override { return 1; }
231
232 /// Returns true if the current thread is a worker thread of this thread pool.
233 bool isWorkerThread() const;
234
235private:
236 /// Asynchronous submission of a task to the pool. The returned future can be
237 /// used to wait for the task to finish and is *non-blocking* on destruction.
238 void asyncEnqueue(std::function<void()> Task,
239 ThreadPoolTaskGroup *Group) override {
240 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
241 }
242
243 /// Tasks waiting for execution in the pool.
244 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
245};
246
247#if LLVM_ENABLE_THREADS
248using DefaultThreadPool = StdThreadPool;
249#else
251#endif
252
253/// A group of tasks to be run on a thread pool. Thread pool tasks in different
254/// groups can run on the same threadpool but can be waited for separately.
255/// It is even possible for tasks of one group to submit and wait for tasks
256/// of another group, as long as this does not form a loop.
258public:
259 /// The ThreadPool argument is the thread pool to forward calls to.
261
262 /// Blocking destructor: will wait for all the tasks in the group to complete
263 /// by calling ThreadPool::wait().
265
266 /// Calls ThreadPool::async() for this group.
267 template <typename Function, typename... Args>
268 inline auto async(Function &&F, Args &&...ArgList) {
269 return Pool.async(*this, std::forward<Function>(F),
270 std::forward<Args>(ArgList)...);
271 }
272
273 /// Calls ThreadPool::wait() for this group.
274 void wait() { Pool.wait(*this); }
275
276private:
278};
279
280} // namespace llvm
281
282#endif // LLVM_SUPPORT_THREADPOOL_H
assert(UImm &&(UImm !=~static_cast< T >(0)) &&"Invalid immediate!")
#define LLVM_ABI
Definition: Compiler.h:213
Looks at all the uses of the given value Returns the Liveness deduced from the uses of this value Adds all uses that cause the result to be MaybeLive to MaybeLiveRetUses If the result is MaybeLiveUses might be modified but its content should be ignored(since it might not be complete). DeadArgumentEliminationPass
This file defines the DenseMap class.
#define F(x, y, z)
Definition: MD5.cpp:55
A non-threaded implementation.
Definition: ThreadPool.h:215
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition: ThreadPool.h:230
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition: ThreadPool.h:50
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition: ThreadPool.h:87
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition: ThreadPool.h:102
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:96
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition: ThreadPool.h:79
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition: Threading.h:115
A group of tasks to be run on a thread pool.
Definition: ThreadPool.h:257
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition: ThreadPool.h:268
void wait()
Calls ThreadPool::wait() for this group.
Definition: ThreadPool.h:274
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition: ThreadPool.h:264
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition: ThreadPool.h:260
This is an optimization pass for GlobalISel generic memory operations.
Definition: AddressRanges.h:18
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition: Threading.h:185
SingleThreadExecutor DefaultThreadPool
Definition: ThreadPool.h:250