Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
ThreadPool.hh
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class header file
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#pragma once
32
33#include "PTL/AutoLock.hh"
34#include "PTL/ThreadData.hh"
35#include "PTL/Threading.hh"
36#include "PTL/VTask.hh"
37#include "PTL/VTaskGroup.hh"
38#include "PTL/VUserTaskQueue.hh"
39
40#ifdef PTL_USE_TBB
41# include <tbb/global_control.h>
42# include <tbb/tbb.h>
43#endif
44
45// C
46#include <cstdint>
47#include <cstdlib>
48#include <cstring>
49// C++
50#include <atomic>
51#include <deque>
52#include <iostream>
53#include <map>
54#include <memory>
55#include <queue>
56#include <set>
57#include <stack>
58#include <unordered_map>
59#include <vector>
60
61namespace PTL
62{
64{
65public:
66 template <typename KeyT, typename MappedT, typename HashT = KeyT>
67 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
68
69 // pod-types
70 using size_type = size_t;
71 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>;
72 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>;
73 using pool_state_type = std::shared_ptr<std::atomic_short>;
74 using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
75 // objects
77 using lock_t = std::shared_ptr<Mutex>;
78 using condition_t = std::shared_ptr<Condition>;
81 // containers
82 typedef std::deque<ThreadId> thread_list_t;
83 typedef std::vector<bool> bool_list_t;
84 typedef std::map<ThreadId, uintmax_t> thread_id_map_t;
85 typedef std::map<uintmax_t, ThreadId> thread_index_map_t;
86 using thread_vec_t = std::vector<Thread>;
87 // functions
88 typedef std::function<void()> initialize_func_t;
89 typedef std::function<intmax_t(intmax_t)> affinity_func_t;
90
91public:
92 // Constructor and Destructors
94 const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
95 bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
96 const affinity_func_t& = [](intmax_t) {
97 static std::atomic<intmax_t> assigned;
98 intmax_t _assign = assigned++;
99 return _assign % Thread::hardware_concurrency();
100 });
101 // Virtual destructors are required by abstract classes
102 // so add it by default, just in case
103 virtual ~ThreadPool();
104 ThreadPool(const ThreadPool&) = delete;
105 ThreadPool(ThreadPool&&) = default;
106 ThreadPool& operator=(const ThreadPool&) = delete;
108
109public:
110 // Public functions
111 size_type initialize_threadpool(size_type); // start the threads
112 size_type destroy_threadpool(); // destroy the threads
114
115 template <typename FuncT>
116 void execute_on_all_threads(FuncT&& _func);
117
118public:
119 // Public functions related to TBB
120 static bool using_tbb();
121 // enable using TBB if available
122 static void set_use_tbb(bool val);
123
124public:
125 // add tasks for threads to process
126 size_type add_task(task_pointer task, int bin = -1);
127 // size_type add_thread_task(ThreadId id, task_pointer&& task);
128 // add a generic container with iterator
129 template <typename ListT>
130 size_type add_tasks(ListT&);
131
133 Thread* get_thread(std::thread::id id) const;
134
135 task_queue_t* get_queue() const { return m_task_queue; }
136
137 // only relevant when compiled with PTL_USE_TBB
139
140 void set_initialization(initialize_func_t f) { m_init_func = f; }
142 {
143 auto f = []() {};
144 m_init_func = f;
145 }
146
147public:
148 // get the pool state
149 const pool_state_type& state() const { return m_pool_state; }
150 // see how many main task threads there are
151 size_type size() const { return m_pool_size; }
152 // set the thread pool size
153 void resize(size_type _n);
154 // affinity assigns threads to cores, assignment at constructor
155 bool using_affinity() const { return m_use_affinity; }
156 bool is_alive() { return m_alive_flag->load(); }
157 void notify();
158 void notify_all();
159 void notify(size_type);
160 bool is_initialized() const;
162 {
163 return (m_thread_awake) ? m_thread_awake->load() : 0;
164 }
165
166 void set_affinity(affinity_func_t f) { m_affinity_func = f; }
167 void set_affinity(intmax_t i, Thread&);
168
169 void set_verbose(int n) { m_verbose = n; }
170 int get_verbose() const { return m_verbose; }
171 bool is_master() const { return ThisThread::get_id() == m_master_tid; }
172
173public:
174 // read FORCE_NUM_THREADS environment variable
175 static const thread_id_map_t& get_thread_ids();
176 static uintmax_t get_this_thread_id();
177
178protected:
179 void execute_thread(VUserTaskQueue*); // function thread sits in
180 int insert(const task_pointer&, int = -1);
182
183protected:
184 // called in THREAD INIT
185 static void start_thread(ThreadPool*, intmax_t = -1);
186
188 {
189 if(m_thread_active)
190 ++(*m_thread_active);
191 }
192
194 {
195 if(m_thread_active)
196 --(*m_thread_active);
197 }
198
199private:
200 // Private variables
201 // random
202 bool m_use_affinity;
203 bool m_tbb_tp;
204 int m_verbose = 0;
205 size_type m_pool_size = 0;
206 ThreadId m_master_tid;
207 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false);
208 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0);
209 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>();
210 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>();
211
212 // locks
213 lock_t m_task_lock = std::make_shared<Mutex>();
214 // conditions
215 condition_t m_task_cond = std::make_shared<Condition>();
216
217 // containers
218 bool_list_t m_is_joined; // join list
219 bool_list_t m_is_stopped; // lets thread know to stop
220 thread_list_t m_main_threads; // storage for active threads
221 thread_list_t m_stop_threads; // storage for stopped threads
222 thread_vec_t m_threads;
223
224 // task queue
225 task_queue_t* m_task_queue;
226 tbb_task_group_t* m_tbb_task_group;
227
228 // functions
229 initialize_func_t m_init_func;
230 affinity_func_t m_affinity_func;
231
232private:
233 // Private static variables
234 PTL_DLL static thread_id_map_t f_thread_ids;
235 PTL_DLL static bool f_use_tbb;
236};
237
238//--------------------------------------------------------------------------------------//
239inline void
241{
242 // wake up one thread that is waiting for a task to be available
243 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
244 {
245 AutoLock l(*m_task_lock);
246 m_task_cond->notify_one();
247 }
248}
249//--------------------------------------------------------------------------------------//
250inline void
252{
253 // wake all threads
254 AutoLock l(*m_task_lock);
255 m_task_cond->notify_all();
256}
257//--------------------------------------------------------------------------------------//
258inline void
260{
261 if(ntasks == 0)
262 return;
263
264 // wake up as many threads that tasks just added
265 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
266 {
267 AutoLock l(*m_task_lock);
268 if(ntasks < this->size())
269 {
270 for(size_type i = 0; i < ntasks; ++i)
271 m_task_cond->notify_one();
272 }
273 else
274 m_task_cond->notify_all();
275 }
276}
277//--------------------------------------------------------------------------------------//
278// local function for getting the tbb task scheduler
281{
282 static thread_local tbb_global_control_t* _instance = nullptr;
283 return _instance;
284}
285//--------------------------------------------------------------------------------------//
286inline void
288{
289 if(_n == m_pool_size)
290 return;
292 m_task_queue->resize(static_cast<intmax_t>(_n));
293}
294//--------------------------------------------------------------------------------------//
295inline int
297{
298 auto _func = [=]() {
299 (*task)();
300 if(!task->group())
301 delete task;
302 };
303
304 if(m_tbb_tp && m_tbb_task_group)
305 {
306 m_tbb_task_group->run(_func);
307 }
308 else
309 {
310 _func();
311 }
312 // return the number of tasks added to task-list
313 return 0;
314}
315//--------------------------------------------------------------------------------------//
316inline int
317ThreadPool::insert(const task_pointer& task, int bin)
318{
319 static thread_local ThreadData* _data = ThreadData::GetInstance();
320
321 // pass the task to the queue
322 auto ibin = m_task_queue->InsertTask(task, _data, bin);
323 notify();
324 return ibin;
325}
326//--------------------------------------------------------------------------------------//
329{
330 // if not native (i.e. TBB) then return
331 if(!task->is_native_task())
332 return 0;
333
334 // if we haven't built thread-pool, just execute
335 if(!m_alive_flag->load())
336 return static_cast<size_type>(run_on_this(task));
337
338 return static_cast<size_type>(insert(task, bin));
339}
340//--------------------------------------------------------------------------------------//
341template <typename ListT>
344{
345 if(!m_alive_flag) // if we haven't built thread-pool, just execute
346 {
347 for(auto& itr : c)
348 run(itr);
349 c.clear();
350 return 0;
351 }
352
353 // TODO: put a limit on how many tasks can be added at most
354 auto c_size = c.size();
355 for(auto& itr : c)
356 {
357 if(!itr->is_native_task())
358 --c_size;
359 else
360 {
361 //++(m_task_queue);
362 m_task_queue->InsertTask(itr);
363 }
364 }
365 c.clear();
366
367 // notify sleeping threads
368 notify(c_size);
369
370 return c_size;
371}
372//--------------------------------------------------------------------------------------//
373template <typename FuncT>
374inline void
376{
377 if(m_tbb_tp && m_tbb_task_group)
378 {
379#if defined(PTL_USE_TBB)
380 // TBB lazily activates threads to process tasks and the master thread
381 // participates in processing the tasks so getting a specific
382 // function to execute only on the worker threads requires some trickery
383 //
384 auto master_tid = ThisThread::get_id();
385 std::set<std::thread::id> _first;
386 Mutex _mutex;
387 // init function which executes function and returns 1 only once
388 auto _init = [&]() {
389 static thread_local int _once = 0;
390 _mutex.lock();
391 if(_first.find(std::this_thread::get_id()) == _first.end())
392 {
393 // we need to reset this thread-local static for multiple invocations
394 // of the same template instantiation
395 _once = 0;
396 _first.insert(std::this_thread::get_id());
397 }
398 _mutex.unlock();
399 if(_once++ == 0)
400 {
401 _func();
402 return 1;
403 }
404 return 0;
405 };
406 // consumes approximately N milliseconds of cpu time
407 auto _consume = [](long n) {
408 using stl_mutex_t = std::mutex;
409 using unique_lock_t = std::unique_lock<stl_mutex_t>;
410 // a mutex held by one lock
411 stl_mutex_t mutex;
412 // acquire lock
413 unique_lock_t hold_lk(mutex);
414 // associate but defer
415 unique_lock_t try_lk(mutex, std::defer_lock);
416 // get current time
417 auto now = std::chrono::steady_clock::now();
418 // try until time point
419 while(std::chrono::steady_clock::now() < (now + std::chrono::milliseconds(n)))
420 try_lk.try_lock();
421 };
422 // this will collect the number of threads which have
423 // executed the _init function above
424 std::atomic<size_t> _total_init{ 0 };
425 // this is the task passed to the task-group
426 auto _init_task = [&]() {
427 int _ret = 0;
428 // don't let the master thread execute the function
429 if(ThisThread::get_id() != master_tid)
430 {
431 // execute the function
432 _ret = _init();
433 // add the result
434 _total_init += _ret;
435 }
436 // if the function did not return anything, put it to sleep
437 // so TBB will wake other threads to execute the remaining tasks
438 if(_ret == 0)
439 _consume(100);
440 };
441
442 // TBB won't oversubscribe so we need to limit by ncores - 1
443 size_t nitr = 0;
444 size_t _maxp = tbb_global_control()->active_value(
446 size_t _sz = size();
447 size_t _ncore = Threading::GetNumberOfCores() - 1;
448 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
449 auto _fname = __FUNCTION__;
450 auto _write_info = [&]() {
451 std::cerr << "[" << _fname << "]> Total initalized: " << _total_init
452 << ", expected: " << _num << ", max-parallel: " << _maxp
453 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
454 };
455 while(_total_init < _num)
456 {
457 auto _n = _num;
458 while(--_n > 0)
459 m_tbb_task_group->run(_init_task);
460 m_tbb_task_group->wait();
461 // don't loop infinitely but use a strict condition
462 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
463 {
464 _write_info();
465 break;
466 }
467 // at this point we need to exit
468 if(nitr > 4 * (_ncore + 1))
469 {
470 _write_info();
471 break;
472 }
473 }
474 if(get_verbose() > 3)
475 _write_info();
476#endif
477 }
478 else if(get_queue())
479 {
480 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
481 }
482}
483//======================================================================================//
484
485} // namespace PTL
#define PTL_DLL
Definition: Types.hh:52
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
void set_verbose(int n)
Definition: ThreadPool.hh:169
std::shared_ptr< Condition > condition_t
Definition: ThreadPool.hh:78
size_type add_task(task_pointer task, int bin=-1)
Definition: ThreadPool.hh:328
std::shared_ptr< std::atomic_uintmax_t > task_count_type
Definition: ThreadPool.hh:71
std::map< uintmax_t, ThreadId > thread_index_map_t
Definition: ThreadPool.hh:85
ThreadPool(ThreadPool &&)=default
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:120
void record_entry()
Definition: ThreadPool.hh:187
const pool_state_type & state() const
Definition: ThreadPool.hh:149
ThreadPool(const ThreadPool &)=delete
static void start_thread(ThreadPool *, intmax_t=-1)
Definition: ThreadPool.cc:72
static bool using_tbb()
Definition: ThreadPool.cc:92
std::vector< Thread > thread_vec_t
Definition: ThreadPool.hh:86
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:89
size_type add_tasks(ListT &)
Definition: ThreadPool.hh:343
size_t size_type
Definition: ThreadPool.hh:70
void resize(size_type _n)
Definition: ThreadPool.hh:287
static void set_use_tbb(bool val)
Definition: ThreadPool.cc:100
virtual ~ThreadPool()
Definition: ThreadPool.cc:172
std::shared_ptr< Mutex > lock_t
Definition: ThreadPool.hh:77
void record_exit()
Definition: ThreadPool.hh:193
int run_on_this(task_pointer)
Definition: ThreadPool.hh:296
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:166
size_type stop_thread()
Definition: ThreadPool.cc:479
ThreadPool & operator=(const ThreadPool &)=delete
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
Definition: ThreadPool.hh:72
int get_active_threads_count() const
Definition: ThreadPool.hh:161
Thread * get_thread(std::thread::id id) const
int insert(const task_pointer &, int=-1)
Definition: ThreadPool.hh:317
std::shared_ptr< std::atomic_short > pool_state_type
Definition: ThreadPool.hh:73
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:280
bool is_master() const
Definition: ThreadPool.hh:171
ThreadPool & operator=(ThreadPool &&)=default
task_queue_t * get_queue() const
Definition: ThreadPool.hh:135
std::function< void()> initialize_func_t
Definition: ThreadPool.hh:88
Thread * get_thread(size_type _n) const
std::shared_ptr< std::atomic_bool > atomic_bool_type
Definition: ThreadPool.hh:74
int get_verbose() const
Definition: ThreadPool.hh:170
bool using_affinity() const
Definition: ThreadPool.hh:155
void execute_on_all_threads(FuncT &&_func)
Definition: ThreadPool.hh:375
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:112
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:84
std::deque< ThreadId > thread_list_t
Definition: ThreadPool.hh:82
task_type * task_pointer
Definition: ThreadPool.hh:79
size_type size() const
Definition: ThreadPool.hh:151
std::vector< bool > bool_list_t
Definition: ThreadPool.hh:83
size_type destroy_threadpool()
Definition: ThreadPool.cc:366
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:223
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:520
bool is_initialized() const
Definition: ThreadPool.cc:193
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
Definition: ThreadPool.hh:67
void set_initialization(initialize_func_t f)
Definition: ThreadPool.hh:140
void reset_initialization()
Definition: ThreadPool.hh:141
VUserTaskQueue task_queue_t
Definition: ThreadPool.hh:80
VTask is the abstract class stored in thread_pool.
Definition: VTask.hh:55
virtual bool is_native_task() const
Definition: VTask.cc:95
VTaskGroup * group() const
Definition: VTask.hh:80
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1)=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
static size_t active_value(parameter param)
void run(FuncT f)
Definition: ThreadData.hh:59
unsigned GetNumberOfCores()
Definition: Threading.cc:64
Definition: AutoLock.hh:254
std::mutex Mutex
Definition: Threading.hh:76
std::thread Thread
Definition: Threading.hh:157
Thread::id ThreadId
Definition: Threading.hh:201