LLVM 22.0.0git
ThreadPool.h
Go to the documentation of this file.
1//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8//
9// This file defines a crude C++11 based thread pool.
10//
11//===----------------------------------------------------------------------===//
12
13#ifndef LLVM_SUPPORT_THREADPOOL_H
14#define LLVM_SUPPORT_THREADPOOL_H
15
16#include "llvm/ADT/DenseMap.h"
17#include "llvm/Config/llvm-config.h"
21#include "llvm/Support/thread.h"
22
23#include <future>
24
25#include <condition_variable>
26#include <deque>
27#include <functional>
28#include <memory>
29#include <mutex>
30#include <utility>
31
32namespace llvm {
33
35
36/// This defines the abstract base interface for a ThreadPool allowing
37/// asynchronous parallel execution on a defined number of threads.
38///
39/// It is possible to reuse one thread pool for different groups of tasks
40/// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using
41/// the same queue, but it is possible to wait only for a specific group of
42/// tasks to finish.
43///
44/// It is also possible for worker threads to submit new tasks and wait for
45/// them. Note that this may result in a deadlock in cases such as when a task
46/// (directly or indirectly) tries to wait for its own completion, or when all
47/// available threads are used up by tasks waiting for a task that has no thread
48/// left to run on (this includes waiting on the returned future). It should be
49/// generally safe to wait() for a group as long as groups do not form a cycle.
51 /// The actual method to enqueue a task to be defined by the concrete
52 /// implementation.
53 virtual void asyncEnqueue(std::function<void()> Task,
54 ThreadPoolTaskGroup *Group) = 0;
55
56public:
57 /// Destroying the pool will drain the pending tasks and wait. The current
58 /// thread may participate in the execution of the pending tasks.
60
61 /// Blocking wait for all the threads to complete and the queue to be empty.
62 /// It is an error to try to add new tasks while blocking on this call.
63 /// Calling wait() from a task would deadlock waiting for itself.
64 virtual void wait() = 0;
65
66 /// Blocking wait for only all the threads in the given group to complete.
67 /// It is possible to wait even inside a task, but waiting (directly or
68 /// indirectly) on itself will deadlock. If called from a task running on a
69 /// worker thread, the call may process pending tasks while waiting in order
70 /// not to waste the thread.
71 virtual void wait(ThreadPoolTaskGroup &Group) = 0;
72
73 /// Returns the maximum number of worker this pool can eventually grow to.
74 virtual unsigned getMaxConcurrency() const = 0;
75
76 /// Asynchronous submission of a task to the pool. The returned future can be
77 /// used to wait for the task to finish and is *non-blocking* on destruction.
78 template <typename Function, typename... Args>
79 auto async(Function &&F, Args &&...ArgList) {
80 auto Task =
81 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
82 return async(std::move(Task));
83 }
84
85 /// Overload, task will be in the given task group.
86 template <typename Function, typename... Args>
87 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) {
88 auto Task =
89 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
90 return async(Group, std::move(Task));
91 }
92
93 /// Asynchronous submission of a task to the pool. The returned future can be
94 /// used to wait for the task to finish and is *non-blocking* on destruction.
95 template <typename Func>
96 auto async(Func &&F) -> std::shared_future<decltype(F())> {
97 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
98 nullptr);
99 }
100
101 template <typename Func>
102 auto async(ThreadPoolTaskGroup &Group, Func &&F)
103 -> std::shared_future<decltype(F())> {
104 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)),
105 &Group);
106 }
107
108private:
109 /// Asynchronous submission of a task to the pool. The returned future can be
110 /// used to wait for the task to finish and is *non-blocking* on destruction.
111 template <typename ResTy>
112 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task,
113 ThreadPoolTaskGroup *Group) {
114 auto Future = std::async(std::launch::deferred, std::move(Task)).share();
115 asyncEnqueue([Future]() { Future.wait(); }, Group);
116 return Future;
117 }
118};
119
120#if LLVM_ENABLE_THREADS
121/// A ThreadPool implementation using std::threads.
122///
123/// The pool keeps a vector of threads alive, waiting on a condition variable
124/// for some work to become available.
125class LLVM_ABI StdThreadPool : public ThreadPoolInterface {
126public:
127 /// Construct a pool using the hardware strategy \p S for mapping hardware
128 /// execution resources (threads, cores, CPUs)
129 /// Defaults to using the maximum execution resources in the system, but
130 /// accounting for the affinity mask.
131 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency());
132
133 /// Blocking destructor: the pool will wait for all the threads to complete.
134 ~StdThreadPool() override;
135
136 /// Blocking wait for all the threads to complete and the queue to be empty.
137 /// It is an error to try to add new tasks while blocking on this call.
138 /// Calling wait() from a task would deadlock waiting for itself.
139 void wait() override;
140
141 /// Blocking wait for only all the threads in the given group to complete.
142 /// It is possible to wait even inside a task, but waiting (directly or
143 /// indirectly) on itself will deadlock. If called from a task running on a
144 /// worker thread, the call may process pending tasks while waiting in order
145 /// not to waste the thread.
146 void wait(ThreadPoolTaskGroup &Group) override;
147
148 /// Returns the maximum number of worker threads in the pool, not the current
149 /// number of threads!
150 unsigned getMaxConcurrency() const override { return MaxThreadCount; }
151
152 /// Returns true if the current thread is a worker thread of this thread pool.
153 bool isWorkerThread() const;
154
155private:
156 /// Returns true if all tasks in the given group have finished (nullptr means
157 /// all tasks regardless of their group). QueueLock must be locked.
158 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const;
159
160 /// Asynchronous submission of a task to the pool. The returned future can be
161 /// used to wait for the task to finish and is *non-blocking* on destruction.
162 void asyncEnqueue(std::function<void()> Task,
163 ThreadPoolTaskGroup *Group) override {
164 int requestedThreads;
165 {
166 // Lock the queue and push the new task
167 std::unique_lock<std::mutex> LockGuard(QueueLock);
168
169 // Don't allow enqueueing after disabling the pool
170 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
171 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
172 requestedThreads = ActiveThreads + Tasks.size();
173 }
174 QueueCondition.notify_one();
175 grow(requestedThreads);
176 }
177
178 /// Grow to ensure that we have at least `requested` Threads, but do not go
179 /// over MaxThreadCount.
180 void grow(int requested);
181
182 void processTasks(ThreadPoolTaskGroup *WaitingForGroup);
183
184 /// Threads in flight
185 std::vector<llvm::thread> Threads;
186 /// Lock protecting access to the Threads vector.
187 mutable llvm::sys::RWMutex ThreadsLock;
188
189 /// Tasks waiting for execution in the pool.
190 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
191
192 /// Locking and signaling for accessing the Tasks queue.
193 std::mutex QueueLock;
194 std::condition_variable QueueCondition;
195
196 /// Signaling for job completion (all tasks or all tasks in a group).
197 std::condition_variable CompletionCondition;
198
199 /// Keep track of the number of thread actually busy
200 unsigned ActiveThreads = 0;
201 /// Number of threads active for tasks in the given group (only non-zero).
202 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups;
203
204 /// Signal for the destruction of the pool, asking thread to exit.
205 bool EnableFlag = true;
206
207 const ThreadPoolStrategy Strategy;
208
209 /// Maximum number of threads to potentially grow this pool to.
210 const unsigned MaxThreadCount;
211};
212#endif // LLVM_ENABLE_THREADS
213
214/// A non-threaded implementation.
216public:
217 /// Construct a non-threaded pool, ignoring using the hardware strategy.
219
220 /// Blocking destructor: the pool will first execute the pending tasks.
221 ~SingleThreadExecutor() override;
222
223 /// Blocking wait for all the tasks to execute first
224 void wait() override;
225
226 /// Blocking wait for only all the tasks in the given group to complete.
227 void wait(ThreadPoolTaskGroup &Group) override;
228
229 /// Returns always 1: there is no concurrency.
230 unsigned getMaxConcurrency() const override { return 1; }
231
232 /// Returns true if the current thread is a worker thread of this thread pool.
233 bool isWorkerThread() const;
234
235private:
236 /// Asynchronous submission of a task to the pool. The returned future can be
237 /// used to wait for the task to finish and is *non-blocking* on destruction.
238 void asyncEnqueue(std::function<void()> Task,
239 ThreadPoolTaskGroup *Group) override {
240 Tasks.emplace_back(std::make_pair(std::move(Task), Group));
241 }
242
243 /// Tasks waiting for execution in the pool.
244 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks;
245};
246
247#if LLVM_ENABLE_THREADS
248using DefaultThreadPool = StdThreadPool;
249#else
251#endif
252
253/// A group of tasks to be run on a thread pool. Thread pool tasks in different
254/// groups can run on the same threadpool but can be waited for separately.
255/// It is even possible for tasks of one group to submit and wait for tasks
256/// of another group, as long as this does not form a loop.
258public:
259 /// The ThreadPool argument is the thread pool to forward calls to.
261
262 /// Blocking destructor: will wait for all the tasks in the group to complete
263 /// by calling ThreadPool::wait().
265
266 /// Calls ThreadPool::async() for this group.
267 template <typename Function, typename... Args>
268 inline auto async(Function &&F, Args &&...ArgList) {
269 return Pool.async(*this, std::forward<Function>(F),
270 std::forward<Args>(ArgList)...);
271 }
272
273 /// Calls ThreadPool::wait() for this group.
274 void wait() { Pool.wait(*this); }
275
276private:
278};
279
280} // namespace llvm
281
282#endif // LLVM_SUPPORT_THREADPOOL_H
assert(UImm &&(UImm !=~static_cast< T >(0)) &&"Invalid immediate!")
#define LLVM_ABI
Definition Compiler.h:213
This file defines the DenseMap class.
#define F(x, y, z)
Definition MD5.cpp:55
A non-threaded implementation.
Definition ThreadPool.h:215
SingleThreadExecutor(ThreadPoolStrategy ignored={})
Construct a non-threaded pool, ignoring using the hardware strategy.
void wait() override
Blocking wait for all the tasks to execute first.
unsigned getMaxConcurrency() const override
Returns always 1: there is no concurrency.
Definition ThreadPool.h:230
This defines the abstract base interface for a ThreadPool allowing asynchronous parallel execution on...
Definition ThreadPool.h:50
auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList)
Overload, task will be in the given task group.
Definition ThreadPool.h:87
virtual void wait()=0
Blocking wait for all the threads to complete and the queue to be empty.
auto async(ThreadPoolTaskGroup &Group, Func &&F) -> std::shared_future< decltype(F())>
Definition ThreadPool.h:102
virtual unsigned getMaxConcurrency() const =0
Returns the maximum number of worker this pool can eventually grow to.
auto async(Func &&F) -> std::shared_future< decltype(F())>
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:96
virtual ~ThreadPoolInterface()
Destroying the pool will drain the pending tasks and wait.
auto async(Function &&F, Args &&...ArgList)
Asynchronous submission of a task to the pool.
Definition ThreadPool.h:79
virtual void wait(ThreadPoolTaskGroup &Group)=0
Blocking wait for only all the threads in the given group to complete.
This tells how a thread pool will be used.
Definition Threading.h:115
A group of tasks to be run on a thread pool.
Definition ThreadPool.h:257
auto async(Function &&F, Args &&...ArgList)
Calls ThreadPool::async() for this group.
Definition ThreadPool.h:268
void wait()
Calls ThreadPool::wait() for this group.
Definition ThreadPool.h:274
~ThreadPoolTaskGroup()
Blocking destructor: will wait for all the tasks in the group to complete by calling ThreadPool::wait...
Definition ThreadPool.h:264
ThreadPoolTaskGroup(ThreadPoolInterface &Pool)
The ThreadPool argument is the thread pool to forward calls to.
Definition ThreadPool.h:260
SmartRWMutex< false > RWMutex
Definition RWMutex.h:165
This is an optimization pass for GlobalISel generic memory operations.
ThreadPoolStrategy hardware_concurrency(unsigned ThreadCount=0)
Returns a default thread strategy where all available hardware resources are to be used,...
Definition Threading.h:185
SingleThreadExecutor DefaultThreadPool
Definition ThreadPool.h:250