Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
ThreadPool.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#include "PTL/ThreadPool.hh"
32#include "PTL/Globals.hh"
33#include "PTL/ThreadData.hh"
34#include "PTL/UserTaskQueue.hh"
35#include "PTL/VUserTaskQueue.hh"
36
37#include <cstdlib>
38
39using namespace PTL;
40
41//======================================================================================//
42
43inline intmax_t
45{
46 return static_cast<intmax_t>(Thread::hardware_concurrency());
47}
48
49//======================================================================================//
50
51ThreadPool::thread_id_map_t ThreadPool::f_thread_ids;
52
53//======================================================================================//
54
55namespace
56{
58thread_data()
59{
61}
62} // namespace
63
64//======================================================================================//
65
66bool ThreadPool::f_use_tbb = false;
67
68//======================================================================================//
69// static member function that calls the member function we want the thread to
70// run
71void
73{
74 {
75 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
76 if(!lock.owns_lock())
77 lock.lock();
78 if(_idx < 0)
79 _idx = f_thread_ids.size();
80 f_thread_ids[std::this_thread::get_id()] = _idx;
81 }
82 static thread_local std::unique_ptr<ThreadData> _unique_data(new ThreadData(tp));
83 thread_data() = _unique_data.get();
84 tp->record_entry();
85 tp->execute_thread(thread_data()->current_queue);
86 tp->record_exit();
87}
88
89//======================================================================================//
90// static member function that checks enabling of tbb library
91bool
93{
94 return f_use_tbb;
95}
96
97//======================================================================================//
98// static member function that initialized tbb library
99void
101{
102#if defined(PTL_USE_TBB)
103 f_use_tbb = enable;
104#else
105 ConsumeParameters<bool>(enable);
106#endif
107}
108
109//======================================================================================//
110
113{
114 return f_thread_ids;
115}
116
117//======================================================================================//
118
119uintmax_t
121{
122 auto _tid = ThisThread::get_id();
123 {
124 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
125 if(!lock.owns_lock())
126 lock.lock();
127 if(f_thread_ids.find(_tid) == f_thread_ids.end())
128 {
129 auto _idx = f_thread_ids.size();
130 f_thread_ids[_tid] = _idx;
131 }
132 }
133 return f_thread_ids[_tid];
134}
135
136//======================================================================================//
137
138ThreadPool::ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue,
139 bool _use_affinity, const affinity_func_t& _affinity_func)
140: m_use_affinity(_use_affinity)
141, m_tbb_tp(false)
142, m_verbose(0)
143, m_pool_size(0)
144, m_master_tid(ThisThread::get_id())
145, m_alive_flag(std::make_shared<std::atomic_bool>(false))
146, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
147, m_thread_awake(std::make_shared<std::atomic_uintmax_t>(0))
148, m_task_lock(std::make_shared<Mutex>())
149, m_task_cond(std::make_shared<Condition>())
150, m_task_queue(task_queue)
151, m_tbb_task_group(nullptr)
152, m_init_func([]() { return; })
153, m_affinity_func(_affinity_func)
154{
155 m_verbose = GetEnv<int>("PTL_VERBOSE", m_verbose);
156
157 auto master_id = get_this_thread_id();
158 if(master_id != 0 && m_verbose > 1)
159 std::cerr << "ThreadPool created on non-master slave" << std::endl;
160
161 thread_data() = new ThreadData(this);
162
163 // initialize after get_this_thread_id so master is zero
164 this->initialize_threadpool(pool_size);
165
166 if(!m_task_queue)
167 m_task_queue = new UserTaskQueue(m_pool_size);
168}
169
170//======================================================================================//
171
173{
174 if(m_alive_flag->load())
175 {
176 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
177 "destroy_threadpool() before deleting the ThreadPool object to "
178 "eliminate this message."
179 << std::endl;
180 m_pool_state->store(thread_pool::state::STOPPED);
181 m_task_lock->lock();
182 CONDITIONBROADCAST(m_task_cond.get());
183 m_task_lock->unlock();
184 for(auto& itr : m_threads)
185 itr.join();
186 m_threads.clear();
187 }
188}
189
190//======================================================================================//
191
192bool
194{
195 return !(m_pool_state->load() == thread_pool::state::NONINIT);
196}
197
198//======================================================================================//
199
200void
201ThreadPool::set_affinity(intmax_t i, Thread& _thread)
202{
203 try
204 {
205 NativeThread native_thread = _thread.native_handle();
206 intmax_t _pin = m_affinity_func(i);
207 if(m_verbose > 0)
208 {
209 std::cout << "Setting pin affinity for thread " << _thread.get_id() << " to "
210 << _pin << std::endl;
211 }
212 Threading::SetPinAffinity(_pin, native_thread);
213 } catch(std::runtime_error& e)
214 {
215 std::cout << "Error setting pin affinity" << std::endl;
216 std::cerr << e.what() << std::endl; // issue assigning affinity
217 }
218}
219
220//======================================================================================//
221
224{
225 //--------------------------------------------------------------------//
226 // return before initializing
227 if(proposed_size < 1)
228 return 0;
229
230 //--------------------------------------------------------------------//
231 // store that has been started
232 if(!m_alive_flag->load())
233 m_pool_state->store(thread_pool::state::STARTED);
234
235 //--------------------------------------------------------------------//
236 // handle tbb task scheduler
237#ifdef PTL_USE_TBB
238 if(f_use_tbb)
239 {
240 m_tbb_tp = true;
241 m_pool_size = proposed_size;
242 tbb_global_control_t*& _global_control = tbb_global_control();
243 // delete if wrong size
244 if(m_pool_size != proposed_size)
245 {
246 delete _global_control;
247 _global_control = nullptr;
248 }
249
250 if(!_global_control)
251 {
252 _global_control = new tbb_global_control_t(
254 if(m_verbose > 0)
255 {
256 std::cout << "ThreadPool [TBB] initialized with " << m_pool_size
257 << " threads." << std::endl;
258 }
259 }
260
261 // create task group (used for async)
262 if(!m_tbb_task_group)
263 m_tbb_task_group = new tbb_task_group_t();
264 return m_pool_size;
265 }
266#endif
267
268 m_alive_flag->store(true);
269
270 //--------------------------------------------------------------------//
271 // if started, stop some thread if smaller or return if equal
272 if(m_pool_state->load() == thread_pool::state::STARTED)
273 {
274 if(m_pool_size > proposed_size)
275 {
276 while(stop_thread() > proposed_size)
277 ;
278 if(m_verbose > 0)
279 {
280 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
281 << std::endl;
282 }
283 if(!m_task_queue)
284 m_task_queue = new UserTaskQueue(m_pool_size);
285 return m_pool_size;
286 }
287 else if(m_pool_size == proposed_size) // NOLINT
288 {
289 if(m_verbose > 0)
290 {
291 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
292 << std::endl;
293 }
294 if(!m_task_queue)
295 m_task_queue = new UserTaskQueue(m_pool_size);
296 return m_pool_size;
297 }
298 }
299
300 //--------------------------------------------------------------------//
301 // reserve enough space to prevent realloc later
302 {
303 AutoLock _task_lock(*m_task_lock);
304 m_is_joined.reserve(proposed_size);
305 }
306
307 auto this_tid = get_this_thread_id();
308 for(size_type i = m_pool_size; i < proposed_size; ++i)
309 {
310 // add the threads
311 try
312 {
313 Thread thr(ThreadPool::start_thread, this, this_tid + i + 1);
314 // only reaches here if successful creation of thread
315 ++m_pool_size;
316 // store thread
317 m_main_threads.push_back(thr.get_id());
318 // list of joined thread booleans
319 m_is_joined.push_back(false);
320 // set the affinity
321 if(m_use_affinity)
322 set_affinity(i, thr);
323 // store
324 m_threads.emplace_back(std::move(thr));
325 } catch(std::runtime_error& e)
326 {
327 std::cerr << e.what() << std::endl; // issue creating thread
328 continue;
329 } catch(std::bad_alloc& e)
330 {
331 std::cerr << e.what() << std::endl;
332 continue;
333 }
334 }
335 //------------------------------------------------------------------------//
336
337 AutoLock _task_lock(*m_task_lock);
338
339 // thread pool size doesn't match with join vector
340 // this will screw up joining later
341 if(m_is_joined.size() != m_main_threads.size())
342 {
343 std::stringstream ss;
344 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
345 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
346 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
347
348 throw std::runtime_error(ss.str());
349 }
350
351 if(m_verbose > 0)
352 {
353 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
354 << std::endl;
355 }
356
357 if(!m_task_queue)
358 m_task_queue = new UserTaskQueue(m_main_threads.size());
359
360 return m_main_threads.size();
361}
362
363//======================================================================================//
364
367{
368 // Note: this is not for synchronization, its for thread communication!
369 // destroy_threadpool() will only be called from the main thread, yet
370 // the modified m_pool_state may not show up to other threads until its
371 // modified in a lock!
372 //------------------------------------------------------------------------//
373 m_pool_state->store(thread_pool::state::STOPPED);
374
375 //--------------------------------------------------------------------//
376 // handle tbb task scheduler
377#ifdef PTL_USE_TBB
378 if(m_tbb_task_group)
379 {
380 m_tbb_task_group->wait();
381 delete m_tbb_task_group;
382 m_tbb_task_group = nullptr;
383 }
384 if(m_tbb_tp && tbb_global_control())
385 {
386 tbb_global_control_t*& _global_control = tbb_global_control();
387 delete _global_control;
388 _global_control = nullptr;
389 m_tbb_tp = false;
390 std::cout << "ThreadPool [TBB] destroyed" << std::endl;
391 }
392#endif
393
394 if(!m_alive_flag->load())
395 return 0;
396
397 //------------------------------------------------------------------------//
398 // notify all threads we are shutting down
399 m_task_lock->lock();
400 CONDITIONBROADCAST(m_task_cond.get());
401 m_task_lock->unlock();
402 //------------------------------------------------------------------------//
403
404 if(m_is_joined.size() != m_main_threads.size())
405 {
406 std::stringstream ss;
407 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
408 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
409 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
410
411 throw std::runtime_error(ss.str());
412 }
413
414 for(size_type i = 0; i < m_is_joined.size(); i++)
415 {
416 //--------------------------------------------------------------------//
417 //
418 if(i < m_threads.size())
419 m_threads.at(i).join();
420
421 //--------------------------------------------------------------------//
422 // if its joined already, nothing else needs to be done
423 if(m_is_joined.at(i))
424 continue;
425
426 //--------------------------------------------------------------------//
427 // join
428 if(std::this_thread::get_id() == m_main_threads[i])
429 continue;
430
431 //--------------------------------------------------------------------//
432 // thread id and index
433 auto _tid = m_main_threads[i];
434
435 //--------------------------------------------------------------------//
436 // erase thread from thread ID list
437 if(f_thread_ids.find(_tid) != f_thread_ids.end())
438 f_thread_ids.erase(f_thread_ids.find(_tid));
439
440 //--------------------------------------------------------------------//
441 // it's joined
442 m_is_joined.at(i) = true;
443
444 //--------------------------------------------------------------------//
445 // try waking up a bunch of threads that are still waiting
446 CONDITIONBROADCAST(m_task_cond.get());
447 //--------------------------------------------------------------------//
448 }
449
450 m_threads.clear();
451 m_main_threads.clear();
452 m_is_joined.clear();
453 m_alive_flag->store(false);
454
455 auto start = std::chrono::steady_clock::now();
456 auto elapsed = std::chrono::duration<double>{};
457 // wait maximum of 30 seconds for threads to exit
458 while(m_thread_active->load() > 0 && elapsed.count() < 30)
459 {
460 std::this_thread::sleep_for(std::chrono::milliseconds(50));
461 elapsed = std::chrono::steady_clock::now() - start;
462 }
463
464 auto _active = m_thread_active->load();
465
466 if(_active == 0)
467 std::cout << "ThreadPool destroyed" << std::endl;
468 else
469 std::cout << "ThreadPool destroyed but " << _active
470 << " threads might still be active (and cause a termination error)"
471 << std::endl;
472
473 return 0;
474}
475
476//======================================================================================//
477
480{
481 if(!m_alive_flag->load() || m_pool_size == 0)
482 return 0;
483
484 //------------------------------------------------------------------------//
485 // notify all threads we are shutting down
486 m_task_lock->lock();
487 m_is_stopped.push_back(true);
488 CONDITIONNOTIFY(m_task_cond.get());
489 m_task_lock->unlock();
490 //------------------------------------------------------------------------//
491
492 // lock up the task queue
493 AutoLock _task_lock(*m_task_lock);
494
495 while(!m_stop_threads.empty())
496 {
497 auto tid = m_stop_threads.front();
498 // remove from stopped
499 m_stop_threads.pop_front();
500 // remove from main
501 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
502 {
503 if(*itr == tid)
504 {
505 m_main_threads.erase(itr);
506 break;
507 }
508 }
509 // remove from join list
510 m_is_joined.pop_back();
511 }
512
513 m_pool_size = m_main_threads.size();
514 return m_main_threads.size();
515}
516
517//======================================================================================//
518
519void
521{
522 // how long the thread waits on condition variable
523 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
524
525 ++(*m_thread_awake);
526
527 // initialization function
528 m_init_func();
529
530 ThreadId tid = ThisThread::get_id();
531 ThreadData* data = thread_data();
532 // auto thread_bin = _task_queue->GetThreadBin();
533 // auto workers = _task_queue->workers();
534
535 auto start = std::chrono::steady_clock::now();
536 auto elapsed = std::chrono::duration<double>{};
537 // check for updates for 60 seconds max
538 while(!_task_queue && elapsed.count() < 60)
539 {
540 elapsed = std::chrono::steady_clock::now() - start;
541 data->update();
542 _task_queue = data->current_queue;
543 }
544
545 if(!_task_queue)
546 {
547 --(*m_thread_awake);
548 throw std::runtime_error("No task queue was found after 60 seconds!");
549 }
550
551 assert(data->current_queue != nullptr);
552 assert(_task_queue == data->current_queue);
553
554 // essentially a dummy run
555 if(_task_queue)
556 {
557 data->within_task = true;
558 auto _task = _task_queue->GetTask();
559 if(_task)
560 {
561 (*_task)();
562 if(!_task->group())
563 delete _task;
564 }
565 data->within_task = false;
566 }
567
568 // threads stay in this loop forever until thread-pool destroyed
569 while(true)
570 {
571 static thread_local auto p_task_lock = m_task_lock;
572
573 //--------------------------------------------------------------------//
574 // Try to pick a task
575 AutoLock _task_lock(*p_task_lock, std::defer_lock);
576 //--------------------------------------------------------------------//
577
578 auto leave_pool = [&]() {
579 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
580 auto _pool_state = _state();
581 if(_pool_state > 0)
582 {
583 // stop whole pool
584 if(_pool_state == thread_pool::state::STOPPED)
585 {
586 if(_task_lock.owns_lock())
587 _task_lock.unlock();
588 return true;
589 }
590 // single thread stoppage
591 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
592 {
593 if(!_task_lock.owns_lock())
594 _task_lock.lock();
595 if(!m_is_stopped.empty() && m_is_stopped.back())
596 {
597 m_stop_threads.push_back(tid);
598 m_is_stopped.pop_back();
599 if(_task_lock.owns_lock())
600 _task_lock.unlock();
601 // exit entire function
602 return true;
603 }
604 if(_task_lock.owns_lock())
605 _task_lock.unlock();
606 }
607 }
608 return false;
609 };
610
611 // We need to put condition.wait() in a loop for two reasons:
612 // 1. There can be spurious wake-ups (due to signal/ENITR)
613 // 2. When mutex is released for waiting, another thread can be woken up
614 // from a signal/broadcast and that thread can mess up the condition.
615 // So when the current thread wakes up the condition may no longer be
616 // actually true!
617 while(_task_queue->empty())
618 {
619 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
620 auto _size = [&]() { return _task_queue->true_size(); };
621 auto _empty = [&]() { return _task_queue->empty(); };
622 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
623
624 if(leave_pool())
625 return;
626
627 if(_task_queue->true_size() == 0)
628 {
629 if(m_thread_awake && m_thread_awake->load() > 0)
630 --(*m_thread_awake);
631
632 // lock before sleeping on condition
633 if(!_task_lock.owns_lock())
634 _task_lock.lock();
635
636 // Wait until there is a task in the queue
637 // Unlocks mutex while waiting, then locks it back when signaled
638 // use lambda to control waking
639 m_task_cond->wait(_task_lock, _wake);
640
641 if(_state() == thread_pool::state::STOPPED)
642 return;
643
644 // unlock if owned
645 if(_task_lock.owns_lock())
646 _task_lock.unlock();
647
648 // notify that is awake
649 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
650 ++(*m_thread_awake);
651 }
652 else
653 break;
654 }
655
656 // release the lock
657 if(_task_lock.owns_lock())
658 _task_lock.unlock();
659
660 //----------------------------------------------------------------//
661
662 // leave pool if conditions dictate it
663 if(leave_pool())
664 return;
665
666 // activate guard against recursive deadlock
667 data->within_task = true;
668 //----------------------------------------------------------------//
669
670 // execute the task(s)
671 while(!_task_queue->empty())
672 {
673 auto _task = _task_queue->GetTask();
674 if(_task)
675 {
676 (*_task)();
677 if(!_task->group())
678 delete _task;
679 }
680 }
681 //----------------------------------------------------------------//
682
683 // disable guard against recursive deadlock
684 data->within_task = false;
685 //----------------------------------------------------------------//
686 }
687}
688
689//======================================================================================//
intmax_t ncores()
Definition: ThreadPool.cc:44
#define CONDITIONNOTIFY(cond)
Definition: Threading.hh:194
#define CONDITIONBROADCAST(cond)
Definition: Threading.hh:195
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
VUserTaskQueue * current_queue
Definition: ThreadData.hh:115
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:120
static void start_thread(ThreadPool *, intmax_t=-1)
Definition: ThreadPool.cc:72
static bool using_tbb()
Definition: ThreadPool.cc:92
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:89
size_t size_type
Definition: ThreadPool.hh:70
static void set_use_tbb(bool val)
Definition: ThreadPool.cc:100
virtual ~ThreadPool()
Definition: ThreadPool.cc:172
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:166
size_type stop_thread()
Definition: ThreadPool.cc:479
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:280
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:112
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:84
size_type destroy_threadpool()
Definition: ThreadPool.cc:366
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:223
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:520
bool is_initialized() const
Definition: ThreadPool.cc:193
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), const affinity_func_t &=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
Definition: ThreadPool.cc:138
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
bool SetPinAffinity(int idx, NativeThread &at)
Definition: Threading.cc:95
Definition: AutoLock.hh:254
std::thread::native_handle_type NativeThread
Definition: Threading.hh:158
std::condition_variable Condition
Definition: Threading.hh:189
std::mutex Mutex
Definition: Threading.hh:76
tbb::task_group tbb_task_group_t
Definition: ThreadData.hh:88
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:87
std::thread Thread
Definition: Threading.hh:157
Thread::id ThreadId
Definition: Threading.hh:201