Geant4 11.3.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::ThreadPool Class Reference

#include <ThreadPool.hh>

Classes

struct  Config
 

Public Types

template<typename KeyT, typename MappedT, typename HashT = KeyT>
using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>
 
using size_type = size_t
 
using task_count_type = std::shared_ptr<std::atomic_uintmax_t>
 
using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>
 
using pool_state_type = std::shared_ptr<std::atomic_short>
 
using atomic_bool_type = std::shared_ptr<std::atomic_bool>
 
using task_type = VTask
 
using lock_t = std::shared_ptr<Mutex>
 
using condition_t = std::shared_ptr<Condition>
 
using task_pointer = std::shared_ptr<task_type>
 
using task_queue_t = VUserTaskQueue
 
using thread_list_t = std::deque<ThreadId>
 
using bool_list_t = std::vector<bool>
 
using thread_id_map_t = std::map<ThreadId, uintmax_t>
 
using thread_index_map_t = std::map<uintmax_t, ThreadId>
 
using thread_vec_t = std::vector<Thread>
 
using thread_data_t = std::vector<std::shared_ptr<ThreadData>>
 
using initialize_func_t = std::function<void()>
 
using finalize_func_t = std::function<void()>
 
using affinity_func_t = std::function<intmax_t(intmax_t)>
 

Public Member Functions

 ThreadPool (const Config &)
 
 ~ThreadPool ()
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=default
 
ThreadPooloperator= (const ThreadPool &)=delete
 
ThreadPooloperator= (ThreadPool &&)=default
 
size_type initialize_threadpool (size_type)
 
size_type destroy_threadpool ()
 
size_type stop_thread ()
 
template<typename FuncT>
void execute_on_all_threads (FuncT &&_func)
 
template<typename FuncT>
void execute_on_specific_threads (const std::set< std::thread::id > &_tid, FuncT &&_func)
 
task_queue_tget_queue () const
 
task_queue_t *& get_valid_queue (task_queue_t *&) const
 
bool is_tbb_threadpool () const
 
size_type add_task (task_pointer &&task, int bin=-1)
 
template<typename ListT>
size_type add_tasks (ListT &)
 
Thread * get_thread (size_type _n) const
 
Thread * get_thread (std::thread::id id) const
 
void set_initialization (initialize_func_t f)
 
void set_finalization (finalize_func_t f)
 
void reset_initialization ()
 
void reset_finalization ()
 
const pool_state_typestate () const
 
size_type size () const
 
void resize (size_type _n)
 
bool using_affinity () const
 
bool is_alive ()
 
void notify ()
 
void notify_all ()
 
void notify (size_type)
 
bool is_initialized () const
 
int get_active_threads_count () const
 
void set_affinity (affinity_func_t f)
 
void set_affinity (intmax_t i, Thread &) const
 
void set_priority (int _prio, Thread &) const
 
void set_verbose (int n)
 
int get_verbose () const
 
bool is_main () const
 
tbb_task_arena_tget_task_arena ()
 

Static Public Member Functions

static affinity_func_taffinity_functor ()
 
static initialize_func_tinitialization_functor ()
 
static finalize_func_tfinalization_functor ()
 
static void set_default_size (size_type _v)
 set the default pool size
 
static size_type get_default_size ()
 get the default pool size
 
static tbb_global_control_t *& tbb_global_control ()
 
static const thread_id_map_tget_thread_ids ()
 
static uintmax_t get_thread_id (ThreadId)
 
static uintmax_t get_this_thread_id ()
 
static uintmax_t add_thread_id (ThreadId=ThisThread::get_id())
 

Detailed Description

Definition at line 87 of file ThreadPool.hh.

Member Typedef Documentation

◆ affinity_func_t

using PTL::ThreadPool::affinity_func_t = std::function<intmax_t(intmax_t)>

Definition at line 115 of file ThreadPool.hh.

◆ atomic_bool_type

using PTL::ThreadPool::atomic_bool_type = std::shared_ptr<std::atomic_bool>

Definition at line 98 of file ThreadPool.hh.

◆ atomic_int_type

using PTL::ThreadPool::atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 96 of file ThreadPool.hh.

◆ bool_list_t

using PTL::ThreadPool::bool_list_t = std::vector<bool>

Definition at line 107 of file ThreadPool.hh.

◆ condition_t

using PTL::ThreadPool::condition_t = std::shared_ptr<Condition>

Definition at line 102 of file ThreadPool.hh.

◆ finalize_func_t

using PTL::ThreadPool::finalize_func_t = std::function<void()>

Definition at line 114 of file ThreadPool.hh.

◆ initialize_func_t

using PTL::ThreadPool::initialize_func_t = std::function<void()>

Definition at line 113 of file ThreadPool.hh.

◆ lock_t

using PTL::ThreadPool::lock_t = std::shared_ptr<Mutex>

Definition at line 101 of file ThreadPool.hh.

◆ pool_state_type

using PTL::ThreadPool::pool_state_type = std::shared_ptr<std::atomic_short>

Definition at line 97 of file ThreadPool.hh.

◆ size_type

Definition at line 94 of file ThreadPool.hh.

◆ task_count_type

using PTL::ThreadPool::task_count_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 95 of file ThreadPool.hh.

◆ task_pointer

using PTL::ThreadPool::task_pointer = std::shared_ptr<task_type>

Definition at line 103 of file ThreadPool.hh.

◆ task_queue_t

Definition at line 104 of file ThreadPool.hh.

◆ task_type

Definition at line 100 of file ThreadPool.hh.

◆ thread_data_t

using PTL::ThreadPool::thread_data_t = std::vector<std::shared_ptr<ThreadData>>

Definition at line 111 of file ThreadPool.hh.

◆ thread_id_map_t

using PTL::ThreadPool::thread_id_map_t = std::map<ThreadId, uintmax_t>

Definition at line 108 of file ThreadPool.hh.

◆ thread_index_map_t

using PTL::ThreadPool::thread_index_map_t = std::map<uintmax_t, ThreadId>

Definition at line 109 of file ThreadPool.hh.

◆ thread_list_t

using PTL::ThreadPool::thread_list_t = std::deque<ThreadId>

Definition at line 106 of file ThreadPool.hh.

◆ thread_vec_t

using PTL::ThreadPool::thread_vec_t = std::vector<Thread>

Definition at line 110 of file ThreadPool.hh.

◆ uomap

template<typename KeyT, typename MappedT, typename HashT = KeyT>
using PTL::ThreadPool::uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>

Definition at line 91 of file ThreadPool.hh.

Constructor & Destructor Documentation

◆ ThreadPool() [1/3]

PTL::ThreadPool::ThreadPool ( const Config & _cfg)
explicit

Definition at line 172 of file ThreadPool.cc.

173: m_use_affinity{ _cfg.use_affinity }
174, m_tbb_tp{ _cfg.use_tbb }
175, m_verbose{ _cfg.verbose }
176, m_priority{ _cfg.priority }
177, m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) }
178, m_task_queue{ _cfg.task_queue }
179, m_init_func{ _cfg.initializer }
180, m_fini_func{ _cfg.finalizer }
181, m_affinity_func{ _cfg.set_affinity }
182{
183 auto master_id = get_this_thread_id();
184 if(master_id != 0 && m_verbose > 1)
185 {
186 AutoLock lock(TypeMutex<decltype(std::cerr)>());
187 std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
188 }
189
190 thread_data() = new ThreadData(this);
191
192 // initialize after get_this_thread_id so master is zero
193 if(_cfg.init)
194 this->initialize_threadpool(_cfg.pool_size);
195}
size_type initialize_threadpool(size_type)
static uintmax_t get_this_thread_id()
TemplateAutoLock< Mutex > AutoLock
Definition AutoLock.hh:479

Referenced by operator=(), operator=(), ThreadPool(), and ThreadPool().

◆ ~ThreadPool()

PTL::ThreadPool::~ThreadPool ( )

Definition at line 199 of file ThreadPool.cc.

200{
201 if(m_alive_flag->load())
202 {
203 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
204 "destroy_threadpool() before deleting the ThreadPool object to "
205 "eliminate this message."
206 << std::endl;
207 m_pool_state->store(thread_pool::state::STOPPED);
208 m_task_lock->lock();
209 m_task_cond->notify_all();
210 m_task_lock->unlock();
211 for(auto& itr : m_threads)
212 itr.join();
213 m_threads.clear();
214 }
215
216 // delete owned resources
217 if(m_delete_task_queue)
218 delete m_task_queue;
219
220 delete m_tbb_task_arena;
221 delete m_tbb_task_group;
222}

◆ ThreadPool() [2/3]

PTL::ThreadPool::ThreadPool ( const ThreadPool & )
delete

◆ ThreadPool() [3/3]

PTL::ThreadPool::ThreadPool ( ThreadPool && )
default

Member Function Documentation

◆ add_task()

ThreadPool::size_type PTL::ThreadPool::add_task ( task_pointer && task,
int bin = -1 )
inline

Definition at line 411 of file ThreadPool.hh.

412{
413 // if not native (i.e. TBB) or we haven't built thread-pool, just execute
414 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
415 return static_cast<size_type>(run_on_this(std::move(task)));
416
417 return static_cast<size_type>(insert(std::move(task), bin));
418}

◆ add_tasks()

template<typename ListT>
ThreadPool::size_type PTL::ThreadPool::add_tasks ( ListT & c)
inline

Definition at line 422 of file ThreadPool.hh.

423{
424 if(!m_alive_flag) // if we haven't built thread-pool, just execute
425 {
426 for(auto& itr : c)
427 run(itr);
428 c.clear();
429 return 0;
430 }
431
432 // TODO: put a limit on how many tasks can be added at most
433 auto c_size = c.size();
434 for(auto& itr : c)
435 {
436 if(!itr->is_native_task())
437 --c_size;
438 else
439 {
440 //++(m_task_queue);
441 get_valid_queue(m_task_queue)->InsertTask(itr);
442 }
443 }
444 c.clear();
445
446 // notify sleeping threads
447 notify(c_size);
448
449 return c_size;
450}
task_queue_t *& get_valid_queue(task_queue_t *&) const
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0

◆ add_thread_id()

uintmax_t PTL::ThreadPool::add_thread_id ( ThreadId _tid = ThisThread::get_id())
static

Definition at line 156 of file ThreadPool.cc.

157{
158 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
159 if(!lock.owns_lock())
160 lock.lock();
161 if(f_thread_ids().find(_tid) == f_thread_ids().end())
162 {
163 auto _idx = f_thread_ids().size();
164 f_thread_ids()[_tid] = _idx;
165 SetThreadId((int)_idx);
166 }
167 return f_thread_ids().at(_tid);
168}
void SetThreadId(int aNewValue)
Definition Threading.cc:119

Referenced by execute_on_all_threads(), and execute_on_specific_threads().

◆ affinity_functor()

static affinity_func_t & PTL::ThreadPool::affinity_functor ( )
inlinestatic

Definition at line 117 of file ThreadPool.hh.

118 {
119 static affinity_func_t _v = [](intmax_t) {
120 static std::atomic<intmax_t> assigned;
121 intmax_t _assign = assigned++;
122 return _assign % Thread::hardware_concurrency();
123 };
124 return _v;
125 }
std::function< intmax_t(intmax_t)> affinity_func_t

◆ destroy_threadpool()

ThreadPool::size_type PTL::ThreadPool::destroy_threadpool ( )

Definition at line 468 of file ThreadPool.cc.

469{
470 // Note: this is not for synchronization, its for thread communication!
471 // destroy_threadpool() will only be called from the main thread, yet
472 // the modified m_pool_state may not show up to other threads until its
473 // modified in a lock!
474 //------------------------------------------------------------------------//
475 m_pool_state->store(thread_pool::state::STOPPED);
476
477 //--------------------------------------------------------------------//
478 // handle tbb task scheduler
479#if defined(PTL_USE_TBB)
480 if(m_tbb_task_group)
481 {
482 execute_on_all_threads([this]() { m_fini_func(); });
483 auto _func = [&]() { m_tbb_task_group->wait(); };
484 if(m_tbb_task_arena)
485 m_tbb_task_arena->execute(_func);
486 else
487 _func();
488 delete m_tbb_task_group;
489 m_tbb_task_group = nullptr;
490 }
491 if(m_tbb_task_arena)
492 {
493 delete m_tbb_task_arena;
494 m_tbb_task_arena = nullptr;
495 }
496 if(m_tbb_tp && tbb_global_control())
497 {
498 tbb_global_control_t*& _global_control = tbb_global_control();
499 delete _global_control;
500 _global_control = nullptr;
501 m_tbb_tp = false;
502 AutoLock lock(TypeMutex<decltype(std::cerr)>());
503 if(m_verbose > 0)
504 {
505 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
506 }
507 }
508#endif
509
510 if(!m_alive_flag->load())
511 return 0;
512
513 //------------------------------------------------------------------------//
514 // notify all threads we are shutting down
515 m_task_lock->lock();
516 m_task_cond->notify_all();
517 m_task_lock->unlock();
518 //------------------------------------------------------------------------//
519
520 if(m_is_joined.size() != m_main_threads.size())
521 {
522 std::stringstream ss;
523 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
524 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
525 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
526
527 throw std::runtime_error(ss.str());
528 }
529
530 for(size_type i = 0; i < m_is_joined.size(); i++)
531 {
532 //--------------------------------------------------------------------//
533 //
534 if(i < m_threads.size())
535 m_threads.at(i).join();
536
537 //--------------------------------------------------------------------//
538 // if its joined already, nothing else needs to be done
539 if(m_is_joined.at(i))
540 continue;
541
542 //--------------------------------------------------------------------//
543 // join
544 if(std::this_thread::get_id() == m_main_threads[i])
545 continue;
546
547 //--------------------------------------------------------------------//
548 // thread id and index
549 auto _tid = m_main_threads[i];
550
551 //--------------------------------------------------------------------//
552 // erase thread from thread ID list
553 if(f_thread_ids().find(_tid) != f_thread_ids().end())
554 f_thread_ids().erase(f_thread_ids().find(_tid));
555
556 //--------------------------------------------------------------------//
557 // it's joined
558 m_is_joined.at(i) = true;
559 }
560
561 m_thread_data.clear();
562 m_threads.clear();
563 m_main_threads.clear();
564 m_is_joined.clear();
565
566 m_alive_flag->store(false);
567
568 auto start = std::chrono::steady_clock::now();
569 auto elapsed = std::chrono::duration<double>{};
570 // wait maximum of 30 seconds for threads to exit
571 while(m_thread_active->load() > 0 && elapsed.count() < 30)
572 {
573 std::this_thread::sleep_for(std::chrono::milliseconds(50));
574 elapsed = std::chrono::steady_clock::now() - start;
575 }
576
577 auto _active = m_thread_active->load();
578
579 if(get_verbose() > 0)
580 {
581 if(_active == 0)
582 {
583 AutoLock lock(TypeMutex<decltype(std::cerr)>());
584 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
585 }
586 else
587 {
588 AutoLock lock(TypeMutex<decltype(std::cerr)>());
589 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active
590 << " threads might still be active (and cause a termination error)"
591 << std::endl;
592 }
593 }
594
595 if(m_delete_task_queue)
596 {
597 delete m_task_queue;
598 m_task_queue = nullptr;
599 }
600
601 return 0;
602}
static tbb_global_control_t *& tbb_global_control()
int get_verbose() const
void execute_on_all_threads(FuncT &&_func)
tbb::global_control tbb_global_control_t

◆ execute_on_all_threads()

template<typename FuncT>
void PTL::ThreadPool::execute_on_all_threads ( FuncT && _func)
inline

Definition at line 454 of file ThreadPool.hh.

455{
456 if(m_tbb_tp && m_tbb_task_group)
457 {
458#if defined(PTL_USE_TBB)
459 // TBB lazily activates threads to process tasks and the main thread
460 // participates in processing the tasks so getting a specific
461 // function to execute only on the worker threads requires some trickery
462 //
463 std::set<std::thread::id> _first{};
464 Mutex _mutex{};
465 // init function which executes function and returns 1 only once
466 auto _init = [&]() {
467 int _once = 0;
468 _mutex.lock();
469 if(_first.find(std::this_thread::get_id()) == _first.end())
470 {
471 // we need to reset this thread-local static for multiple invocations
472 // of the same template instantiation
473 _once = 1;
474 _first.insert(std::this_thread::get_id());
475 }
476 _mutex.unlock();
477 if(_once != 0)
478 {
479 _func();
480 return 1;
481 }
482 return 0;
483 };
484 // this will collect the number of threads which have
485 // executed the _init function above
486 std::atomic<size_t> _total_init{ 0 };
487 // max parallelism by TBB
488 size_t _maxp = tbb_global_control()->active_value(
490 // create a task arean
491 auto* _arena = get_task_arena();
492 // size of the thread-pool
493 size_t _sz = size();
494 // number of cores
495 size_t _ncore = GetNumberOfCores();
496 // maximum depth for recursion
497 size_t _dmax = std::max<size_t>(_ncore, 8);
498 // how many threads we need to initialize
499 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
500 // this is the task passed to the task-group
501 std::function<void()> _init_task;
502 _init_task = [&]() {
504 static thread_local size_type _depth = 0;
505 int _ret = 0;
506 // don't let the main thread execute the function
507 if(!is_main())
508 {
509 // execute the function
510 _ret = _init();
511 // add the result
512 _total_init += _ret;
513 }
514 // if the function did not return anything, recursively execute
515 // two more tasks
516 ++_depth;
517 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
518 {
519 tbb::task_group tg{};
520 tg.run([&]() { _init_task(); });
521 tg.run([&]() { _init_task(); });
522 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
523 tg.wait();
524 }
525 --_depth;
526 };
527
528 // TBB won't oversubscribe so we need to limit by ncores - 1
529 size_t nitr = 0;
530 auto _fname = __FUNCTION__;
531 auto _write_info = [&]() {
532 std::cout << "[" << _fname << "]> Total initialized: " << _total_init
533 << ", expected: " << _num << ", max-parallel: " << _maxp
534 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
535 };
536 while(_total_init < _num)
537 {
538 auto _n = 2 * _num;
539 while(--_n > 0)
540 {
541 _arena->execute(
542 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
543 }
544 _arena->execute([&]() { m_tbb_task_group->wait(); });
545 // don't loop infinitely but use a strict condition
546 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
547 {
548 _write_info();
549 break;
550 }
551 // at this point we need to exit
552 if(nitr > 4 * (_ncore + 1))
553 {
554 _write_info();
555 break;
556 }
557 }
558 if(get_verbose() > 3)
559 _write_info();
560#endif
561 }
562 else if(get_queue())
563 {
564 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
565 }
566}
bool is_main() const
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
task_queue_t * get_queue() const
tbb_task_arena_t * get_task_arena()
size_type size() const
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
static size_t active_value(parameter param)
unsigned GetNumberOfCores()
Definition Threading.cc:67

Referenced by destroy_threadpool(), and initialize_threadpool().

◆ execute_on_specific_threads()

template<typename FuncT>
void PTL::ThreadPool::execute_on_specific_threads ( const std::set< std::thread::id > & _tid,
FuncT && _func )
inline

Definition at line 572 of file ThreadPool.hh.

574{
575 if(m_tbb_tp && m_tbb_task_group)
576 {
577#if defined(PTL_USE_TBB)
578 // TBB lazily activates threads to process tasks and the main thread
579 // participates in processing the tasks so getting a specific
580 // function to execute only on the worker threads requires some trickery
581 //
582 std::set<std::thread::id> _first{};
583 Mutex _mutex{};
584 // init function which executes function and returns 1 only once
585 auto _exec = [&]() {
586 int _once = 0;
587 _mutex.lock();
588 if(_first.find(std::this_thread::get_id()) == _first.end())
589 {
590 // we need to reset this thread-local static for multiple invocations
591 // of the same template instantiation
592 _once = 1;
593 _first.insert(std::this_thread::get_id());
594 }
595 _mutex.unlock();
596 if(_once != 0)
597 {
598 _func();
599 return 1;
600 }
601 return 0;
602 };
603 // this will collect the number of threads which have
604 // executed the _exec function above
605 std::atomic<size_t> _total_exec{ 0 };
606 // number of cores
607 size_t _ncore = GetNumberOfCores();
608 // maximum depth for recursion
609 size_t _dmax = std::max<size_t>(_ncore, 8);
610 // how many threads we need to initialize
611 size_t _num = _tids.size();
612 // create a task arena
613 auto* _arena = get_task_arena();
614 // this is the task passed to the task-group
615 std::function<void()> _exec_task;
616 _exec_task = [&]() {
618 static thread_local size_type _depth = 0;
619 int _ret = 0;
620 auto _this_tid = std::this_thread::get_id();
621 // don't let the main thread execute the function
622 if(_tids.count(_this_tid) > 0)
623 {
624 // execute the function
625 _ret = _exec();
626 // add the result
627 _total_exec += _ret;
628 }
629 // if the function did not return anything, recursively execute
630 // two more tasks
631 ++_depth;
632 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
633 {
634 tbb::task_group tg{};
635 tg.run([&]() { _exec_task(); });
636 tg.run([&]() { _exec_task(); });
637 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
638 tg.wait();
639 }
640 --_depth;
641 };
642
643 // TBB won't oversubscribe so we need to limit by ncores - 1
644 size_t nitr = 0;
645 auto _fname = __FUNCTION__;
646 auto _write_info = [&]() {
647 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
648 << ", expected: " << _num << ", size: " << size() << std::endl;
649 };
650 while(_total_exec < _num)
651 {
652 auto _n = 2 * _num;
653 while(--_n > 0)
654 {
655 _arena->execute(
656 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
657 }
658 _arena->execute([&]() { m_tbb_task_group->wait(); });
659 // don't loop infinitely but use a strict condition
660 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
661 {
662 _write_info();
663 break;
664 }
665 // at this point we need to exit
666 if(nitr > 8 * (_num + 1))
667 {
668 _write_info();
669 break;
670 }
671 }
672 if(get_verbose() > 3)
673 _write_info();
674#endif
675 }
676 else if(get_queue())
677 {
678 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
679 }
680}
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0

◆ finalization_functor()

static finalize_func_t & PTL::ThreadPool::finalization_functor ( )
inlinestatic

Definition at line 133 of file ThreadPool.hh.

134 {
135 static finalize_func_t _v = []() {};
136 return _v;
137 }
std::function< void()> finalize_func_t

◆ get_active_threads_count()

int PTL::ThreadPool::get_active_threads_count ( ) const
inline

Definition at line 227 of file ThreadPool.hh.

227{ return (int)m_thread_awake->load(); }

◆ get_default_size()

static size_type PTL::ThreadPool::get_default_size ( )
inlinestatic

get the default pool size

Definition at line 185 of file ThreadPool.hh.

185{ return f_default_pool_size(); }

◆ get_queue()

task_queue_t * PTL::ThreadPool::get_queue ( ) const
inline

Definition at line 175 of file ThreadPool.hh.

175{ return m_task_queue; }

Referenced by execute_on_all_threads(), execute_on_specific_threads(), and PTL::TaskGroup< Tp, Arg, MaxDepth >::wait().

◆ get_task_arena()

tbb_task_arena_t * PTL::ThreadPool::get_task_arena ( )
inline

Definition at line 353 of file ThreadPool.hh.

354{
355#if defined(PTL_USE_TBB)
356 // create a task arena
357 if(!m_tbb_task_arena)
358 {
359 auto _sz = (tbb_global_control())
362 : size();
363 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
364 m_tbb_task_arena->initialize(_sz, 1);
365 }
366#else
367 if(!m_tbb_task_arena)
368 m_tbb_task_arena = new tbb_task_arena_t{};
369#endif
370 return m_tbb_task_arena;
371}
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
tbb::task_arena tbb_task_arena_t

Referenced by execute_on_all_threads(), and execute_on_specific_threads().

◆ get_this_thread_id()

uintmax_t PTL::ThreadPool::get_this_thread_id ( )
static

Definition at line 148 of file ThreadPool.cc.

149{
150 return get_thread_id(ThisThread::get_id());
151}
static uintmax_t get_thread_id(ThreadId)

Referenced by PTL::UserTaskQueue::GetThreadBin(), initialize_threadpool(), G4TaskRunManagerKernel::InitializeWorker(), and ThreadPool().

◆ get_thread() [1/2]

Thread * PTL::ThreadPool::get_thread ( size_type _n) const

◆ get_thread() [2/2]

Thread * PTL::ThreadPool::get_thread ( std::thread::id id) const

◆ get_thread_id()

uintmax_t PTL::ThreadPool::get_thread_id ( ThreadId _tid)
static

Definition at line 124 of file ThreadPool.cc.

125{
126 uintmax_t _idx = 0;
127 {
128 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
129 if(!lock.owns_lock())
130 lock.lock();
131 auto itr = f_thread_ids().find(_tid);
132 if(itr == f_thread_ids().end())
133 {
134 _idx = f_thread_ids().size();
135 f_thread_ids()[_tid] = _idx;
136 }
137 else
138 {
139 _idx = itr->second;
140 }
141 }
142 return _idx;
143}

Referenced by get_this_thread_id(), set_affinity(), and set_priority().

◆ get_thread_ids()

const ThreadPool::thread_id_map_t & PTL::ThreadPool::get_thread_ids ( )
static

Definition at line 116 of file ThreadPool.cc.

117{
118 return f_thread_ids();
119}

◆ get_valid_queue()

ThreadPool::task_queue_t *& PTL::ThreadPool::get_valid_queue ( task_queue_t *& _queue) const

Definition at line 655 of file ThreadPool.cc.

656{
657 if(!_queue)
658 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
659 return _queue;
660}

Referenced by add_tasks().

◆ get_verbose()

int PTL::ThreadPool::get_verbose ( ) const
inline

Definition at line 234 of file ThreadPool.hh.

234{ return m_verbose; }

Referenced by destroy_threadpool(), execute_on_all_threads(), and execute_on_specific_threads().

◆ initialization_functor()

static initialize_func_t & PTL::ThreadPool::initialization_functor ( )
inlinestatic

Definition at line 127 of file ThreadPool.hh.

128 {
129 static initialize_func_t _v = []() {};
130 return _v;
131 }
std::function< void()> initialize_func_t

◆ initialize_threadpool()

ThreadPool::size_type PTL::ThreadPool::initialize_threadpool ( size_type proposed_size)

Definition at line 298 of file ThreadPool.cc.

299{
300 //--------------------------------------------------------------------//
301 // return before initializing
302 if(proposed_size < 1)
303 return 0;
304
305 //--------------------------------------------------------------------//
306 // store that has been started
307 if(!m_alive_flag->load())
308 m_pool_state->store(thread_pool::state::STARTED);
309
310#if defined(PTL_USE_TBB)
311 //--------------------------------------------------------------------//
312 // handle tbb task scheduler
313 if(m_tbb_tp)
314 {
315 m_tbb_tp = true;
316 m_pool_size = proposed_size;
317 tbb_global_control_t*& _global_control = tbb_global_control();
318 // delete if wrong size
319 if(m_pool_size != proposed_size)
320 {
321 delete _global_control;
322 _global_control = nullptr;
323 }
324
325 if(!_global_control)
326 {
327 _global_control = new tbb_global_control_t(
329 if(m_verbose > 0)
330 {
331 AutoLock lock(TypeMutex<decltype(std::cerr)>());
332 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with "
333 << m_pool_size << " threads." << std::endl;
334 }
335 }
336
337 // create task group (used for async)
338 if(!m_tbb_task_group)
339 {
340 m_tbb_task_group = new tbb_task_group_t{};
341 execute_on_all_threads([this]() { m_init_func(); });
342 }
343
344 return m_pool_size;
345 }
346#endif
347
348 m_alive_flag->store(true);
349
350 //--------------------------------------------------------------------//
351 // if started, stop some thread if smaller or return if equal
352 if(m_pool_state->load() == thread_pool::state::STARTED)
353 {
354 if(m_pool_size > proposed_size)
355 {
356 while(stop_thread() > proposed_size)
357 ;
358 if(m_verbose > 0)
359 {
360 AutoLock lock(TypeMutex<decltype(std::cerr)>());
361 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with "
362 << m_pool_size << " threads." << std::endl;
363 }
364 if(!m_task_queue)
365 {
366 m_delete_task_queue = true;
367 m_task_queue = new UserTaskQueue(m_pool_size);
368 }
369 else
370 {
371 m_task_queue->resize(m_pool_size);
372 }
373 return m_pool_size;
374 }
375 else if(m_pool_size == proposed_size) // NOLINT
376 {
377 if(m_verbose > 0)
378 {
379 AutoLock lock(TypeMutex<decltype(std::cerr)>());
380 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads."
381 << std::endl;
382 }
383 if(!m_task_queue)
384 {
385 m_delete_task_queue = true;
386 m_task_queue = new UserTaskQueue(m_pool_size);
387 }
388 return m_pool_size;
389 }
390 }
391
392 //--------------------------------------------------------------------//
393 // reserve enough space to prevent realloc later
394 {
395 AutoLock _task_lock(*m_task_lock);
396 m_is_joined.reserve(proposed_size);
397 }
398
399 if(!m_task_queue)
400 {
401 m_delete_task_queue = true;
402 m_task_queue = new UserTaskQueue(proposed_size);
403 }
404
405 auto this_tid = get_this_thread_id();
406 for(size_type i = m_pool_size; i < proposed_size; ++i)
407 {
408 // add the threads
409 try
410 {
411 // create thread
412 Thread thr{ ThreadPool::start_thread, this, &m_thread_data,
413 this_tid + i + 1 };
414 // only reaches here if successful creation of thread
415 ++m_pool_size;
416 // store thread
417 m_main_threads.push_back(thr.get_id());
418 // list of joined thread booleans
419 m_is_joined.push_back(false);
420 // set the affinity
421 if(m_use_affinity)
422 set_affinity(i, thr);
423 set_priority(m_priority, thr);
424 // store
425 m_threads.emplace_back(std::move(thr));
426 } catch(std::runtime_error& e)
427 {
428 AutoLock lock(TypeMutex<decltype(std::cerr)>());
429 std::cerr << "[PTL::ThreadPool] " << e.what()
430 << std::endl; // issue creating thread
431 continue;
432 } catch(std::bad_alloc& e)
433 {
434 AutoLock lock(TypeMutex<decltype(std::cerr)>());
435 std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl;
436 continue;
437 }
438 }
439 //------------------------------------------------------------------------//
440
441 AutoLock _task_lock(*m_task_lock);
442
443 // thread pool size doesn't match with join vector
444 // this will screw up joining later
445 if(m_is_joined.size() != m_main_threads.size())
446 {
447 std::stringstream ss;
448 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
449 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
450 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
451
452 throw std::runtime_error(ss.str());
453 }
454
455 if(m_verbose > 0)
456 {
457 AutoLock lock(TypeMutex<decltype(std::cerr)>());
458 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
459 << " threads." << std::endl;
460 }
461
462 return m_main_threads.size();
463}
size_type stop_thread()
void set_affinity(affinity_func_t f)
void set_priority(int _prio, Thread &) const
tbb::task_group tbb_task_group_t

Referenced by resize(), and ThreadPool().

◆ is_alive()

bool PTL::ThreadPool::is_alive ( )
inline

Definition at line 222 of file ThreadPool.hh.

222{ return m_alive_flag->load(); }

Referenced by PTL::TaskGroup< Tp, Arg, MaxDepth >::wait().

◆ is_initialized()

bool PTL::ThreadPool::is_initialized ( ) const

Definition at line 227 of file ThreadPool.cc.

228{
229 return !(m_pool_state->load() == thread_pool::state::NONINIT);
230}

◆ is_main()

bool PTL::ThreadPool::is_main ( ) const
inline

Definition at line 235 of file ThreadPool.hh.

235{ return ThisThread::get_id() == m_main_tid; }

Referenced by execute_on_all_threads().

◆ is_tbb_threadpool()

bool PTL::ThreadPool::is_tbb_threadpool ( ) const
inline

Definition at line 178 of file ThreadPool.hh.

178{ return m_tbb_tp; }

◆ notify() [1/2]

void PTL::ThreadPool::notify ( )
inline

Definition at line 303 of file ThreadPool.hh.

304{
305 // wake up one thread that is waiting for a task to be available
306 if(m_thread_awake->load() < m_pool_size)
307 {
308 AutoLock l(*m_task_lock);
309 m_task_cond->notify_one();
310 }
311}

Referenced by add_tasks().

◆ notify() [2/2]

void PTL::ThreadPool::notify ( size_type ntasks)
inline

Definition at line 322 of file ThreadPool.hh.

323{
324 if(ntasks == 0)
325 return;
326
327 // wake up as many threads that tasks just added
328 if(m_thread_awake->load() < m_pool_size)
329 {
330 AutoLock l(*m_task_lock);
331 if(ntasks < this->size())
332 {
333 for(size_type i = 0; i < ntasks; ++i)
334 m_task_cond->notify_one();
335 }
336 else
337 {
338 m_task_cond->notify_all();
339 }
340 }
341}

◆ notify_all()

void PTL::ThreadPool::notify_all ( )
inline

Definition at line 314 of file ThreadPool.hh.

315{
316 // wake all threads
317 AutoLock l(*m_task_lock);
318 m_task_cond->notify_all();
319}

◆ operator=() [1/2]

ThreadPool & PTL::ThreadPool::operator= ( const ThreadPool & )
delete

◆ operator=() [2/2]

ThreadPool & PTL::ThreadPool::operator= ( ThreadPool && )
default

◆ reset_finalization()

void PTL::ThreadPool::reset_finalization ( )
inline

Definition at line 208 of file ThreadPool.hh.

209 {
210 m_fini_func = []() {};
211 }

◆ reset_initialization()

void PTL::ThreadPool::reset_initialization ( )
inline

Definition at line 204 of file ThreadPool.hh.

205 {
206 m_init_func = []() {};
207 }

◆ resize()

void PTL::ThreadPool::resize ( size_type _n)
inline

Definition at line 374 of file ThreadPool.hh.

375{
377 if(m_task_queue)
378 m_task_queue->resize(static_cast<intmax_t>(_n));
379}

Referenced by G4TaskRunManager::SetNumberOfThreads().

◆ set_affinity() [1/2]

void PTL::ThreadPool::set_affinity ( affinity_func_t f)
inline

Definition at line 229 of file ThreadPool.hh.

229{ m_affinity_func = std::move(f); }

Referenced by initialize_threadpool(), and ThreadPool().

◆ set_affinity() [2/2]

void PTL::ThreadPool::set_affinity ( intmax_t i,
Thread & _thread ) const

Definition at line 251 of file ThreadPool.cc.

252{
253 try
254 {
255 NativeThread native_thread = _thread.native_handle();
256 intmax_t _pin = m_affinity_func(i);
257 if(m_verbose > 0)
258 {
259 AutoLock lock(TypeMutex<decltype(std::cerr)>());
260 std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread "
261 << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl;
262 }
263 SetPinAffinity((int)_pin, native_thread);
264 } catch(std::runtime_error& e)
265 {
266 std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what()
267 << std::endl;
268 }
269}
bool SetPinAffinity(int idx)
Definition Threading.cc:133

◆ set_default_size()

static void PTL::ThreadPool::set_default_size ( size_type _v)
inlinestatic

set the default pool size

Definition at line 182 of file ThreadPool.hh.

182{ f_default_pool_size() = _v; }

◆ set_finalization()

void PTL::ThreadPool::set_finalization ( finalize_func_t f)
inline

Definition at line 202 of file ThreadPool.hh.

202{ m_fini_func = std::move(f); }

◆ set_initialization()

void PTL::ThreadPool::set_initialization ( initialize_func_t f)
inline

Definition at line 201 of file ThreadPool.hh.

201{ m_init_func = std::move(f); }

◆ set_priority()

void PTL::ThreadPool::set_priority ( int _prio,
Thread & _thread ) const

Definition at line 274 of file ThreadPool.cc.

275{
276 try
277 {
278 NativeThread native_thread = _thread.native_handle();
279 if(m_verbose > 0)
280 {
281 AutoLock lock(TypeMutex<decltype(std::cerr)>());
282 std::cerr << "[PTL::ThreadPool] Setting thread "
283 << get_thread_id(_thread.get_id()) << " priority to " << _prio
284 << std::endl;
285 }
286 SetThreadPriority(_prio, native_thread);
287 } catch(std::runtime_error& e)
288 {
289 AutoLock lock(TypeMutex<decltype(std::cerr)>());
290 std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what()
291 << std::endl;
292 }
293}
bool SetThreadPriority(int _v)
Definition Threading.cc:150

Referenced by initialize_threadpool().

◆ set_verbose()

void PTL::ThreadPool::set_verbose ( int n)
inline

Definition at line 233 of file ThreadPool.hh.

233{ m_verbose = n; }

◆ size()

size_type PTL::ThreadPool::size ( ) const
inline

◆ state()

const pool_state_type & PTL::ThreadPool::state ( ) const
inline

Definition at line 215 of file ThreadPool.hh.

215{ return m_pool_state; }

Referenced by ThreadPool(), and PTL::TaskGroup< Tp, Arg, MaxDepth >::wait().

◆ stop_thread()

ThreadPool::size_type PTL::ThreadPool::stop_thread ( )

Definition at line 607 of file ThreadPool.cc.

608{
609 if(!m_alive_flag->load() || m_pool_size == 0)
610 return 0;
611
612 m_pool_state->store(thread_pool::state::PARTIAL);
613
614 //------------------------------------------------------------------------//
615 // notify all threads we are shutting down
616 m_task_lock->lock();
617 m_is_stopped.push_back(true);
618 m_task_cond->notify_one();
619 m_task_lock->unlock();
620 //------------------------------------------------------------------------//
621
622 while(!m_is_stopped.empty() && m_stop_threads.empty())
623 ;
624
625 // lock up the task queue
626 AutoLock _task_lock(*m_task_lock);
627
628 while(!m_stop_threads.empty())
629 {
630 auto tid = m_stop_threads.front();
631 // remove from stopped
632 m_stop_threads.pop_front();
633 // remove from main
634 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
635 {
636 if(*itr == tid)
637 {
638 m_main_threads.erase(itr);
639 break;
640 }
641 }
642 // remove from join list
643 m_is_joined.pop_back();
644 }
645
646 m_pool_state->store(thread_pool::state::STARTED);
647
648 m_pool_size = m_main_threads.size();
649 return m_main_threads.size();
650}

Referenced by initialize_threadpool().

◆ tbb_global_control()

tbb_global_control_t *& PTL::ThreadPool::tbb_global_control ( )
inlinestatic

Definition at line 345 of file ThreadPool.hh.

346{
347 static thread_local tbb_global_control_t* _instance = nullptr;
348 return _instance;
349}

Referenced by destroy_threadpool(), execute_on_all_threads(), get_task_arena(), and initialize_threadpool().

◆ using_affinity()

bool PTL::ThreadPool::using_affinity ( ) const
inline

Definition at line 221 of file ThreadPool.hh.

221{ return m_use_affinity; }

The documentation for this class was generated from the following files: