Geant4 11.1.1
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::ThreadPool Class Reference

#include <ThreadPool.hh>

Classes

struct  Config
 

Public Types

template<typename KeyT , typename MappedT , typename HashT = KeyT>
using uomap = std::unordered_map< KeyT, MappedT, std::hash< HashT > >
 
using size_type = size_t
 
using task_count_type = std::shared_ptr< std::atomic_uintmax_t >
 
using atomic_int_type = std::shared_ptr< std::atomic_uintmax_t >
 
using pool_state_type = std::shared_ptr< std::atomic_short >
 
using atomic_bool_type = std::shared_ptr< std::atomic_bool >
 
using task_type = VTask
 
using lock_t = std::shared_ptr< Mutex >
 
using condition_t = std::shared_ptr< Condition >
 
using task_pointer = std::shared_ptr< task_type >
 
using task_queue_t = VUserTaskQueue
 
using thread_list_t = std::deque< ThreadId >
 
using bool_list_t = std::vector< bool >
 
using thread_id_map_t = std::map< ThreadId, uintmax_t >
 
using thread_index_map_t = std::map< uintmax_t, ThreadId >
 
using thread_vec_t = std::vector< Thread >
 
using thread_data_t = std::vector< std::shared_ptr< ThreadData > >
 
using initialize_func_t = std::function< void()>
 
using finalize_func_t = std::function< void()>
 
using affinity_func_t = std::function< intmax_t(intmax_t)>
 

Public Member Functions

 ThreadPool (const Config &)
 
 ThreadPool (const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=f_use_cpu_affinity(), affinity_func_t=affinity_functor(), initialize_func_t=initialization_functor(), finalize_func_t=finalization_functor())
 
 ThreadPool (const size_type &pool_size, initialize_func_t, finalize_func_t, bool _use_affinity=f_use_cpu_affinity(), affinity_func_t=affinity_functor(), VUserTaskQueue *task_queue=nullptr)
 
virtual ~ThreadPool ()
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=default
 
ThreadPooloperator= (const ThreadPool &)=delete
 
ThreadPooloperator= (ThreadPool &&)=default
 
size_type initialize_threadpool (size_type)
 
size_type destroy_threadpool ()
 
size_type stop_thread ()
 
template<typename FuncT >
void execute_on_all_threads (FuncT &&_func)
 
template<typename FuncT >
void execute_on_specific_threads (const std::set< std::thread::id > &_tid, FuncT &&_func)
 
task_queue_tget_queue () const
 
task_queue_t *& get_valid_queue (task_queue_t *&) const
 
bool is_tbb_threadpool () const
 
size_type add_task (task_pointer &&task, int bin=-1)
 
template<typename ListT >
size_type add_tasks (ListT &)
 
Threadget_thread (size_type _n) const
 
Threadget_thread (std::thread::id id) const
 
void set_initialization (initialize_func_t f)
 
void set_finalization (finalize_func_t f)
 
void reset_initialization ()
 
void reset_finalization ()
 
const pool_state_typestate () const
 
size_type size () const
 
void resize (size_type _n)
 
bool using_affinity () const
 
bool is_alive ()
 
void notify ()
 
void notify_all ()
 
void notify (size_type)
 
bool is_initialized () const
 
int get_active_threads_count () const
 
void set_affinity (affinity_func_t f)
 
void set_affinity (intmax_t i, Thread &) const
 
void set_priority (int _prio, Thread &) const
 
void set_verbose (int n)
 
int get_verbose () const
 
bool is_main () const
 
tbb_task_arena_tget_task_arena ()
 

Static Public Member Functions

static affinity_func_taffinity_functor ()
 
static initialize_func_tinitialization_functor ()
 
static finalize_func_tfinalization_functor ()
 
static bool using_tbb ()
 
static void set_use_tbb (bool _v)
 
static void set_default_use_tbb (bool _v)
 set the default use of tbb
 
static void set_default_use_cpu_affinity (bool _v)
 set the default use of cpu affinity
 
static void set_default_scheduling_priority (int _v)
 set the default scheduling priority of threads in thread-pool
 
static void set_default_verbose (int _v)
 set the default verbosity
 
static void set_default_size (size_type _v)
 set the default pool size
 
static bool get_default_use_tbb ()
 get the default use of tbb
 
static bool get_default_use_cpu_affinity ()
 get the default use of cpu affinity
 
static int get_default_scheduling_priority ()
 get the default scheduling priority of threads in thread-pool
 
static int get_default_verbose ()
 get the default verbosity
 
static size_type get_default_size ()
 get the default pool size
 
static tbb_global_control_t *& tbb_global_control ()
 
static const thread_id_map_tget_thread_ids ()
 
static uintmax_t get_thread_id (ThreadId)
 
static uintmax_t get_this_thread_id ()
 
static uintmax_t add_thread_id (ThreadId=ThisThread::get_id())
 

Protected Member Functions

void execute_thread (VUserTaskQueue *)
 
int insert (task_pointer &&, int=-1)
 
int run_on_this (task_pointer &&)
 
void record_entry ()
 
void record_exit ()
 

Static Protected Member Functions

static void start_thread (ThreadPool *, thread_data_t *, intmax_t=-1)
 

Detailed Description

Definition at line 87 of file ThreadPool.hh.

Member Typedef Documentation

◆ affinity_func_t

using PTL::ThreadPool::affinity_func_t = std::function<intmax_t(intmax_t)>

Definition at line 115 of file ThreadPool.hh.

◆ atomic_bool_type

using PTL::ThreadPool::atomic_bool_type = std::shared_ptr<std::atomic_bool>

Definition at line 98 of file ThreadPool.hh.

◆ atomic_int_type

using PTL::ThreadPool::atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 96 of file ThreadPool.hh.

◆ bool_list_t

using PTL::ThreadPool::bool_list_t = std::vector<bool>

Definition at line 107 of file ThreadPool.hh.

◆ condition_t

using PTL::ThreadPool::condition_t = std::shared_ptr<Condition>

Definition at line 102 of file ThreadPool.hh.

◆ finalize_func_t

using PTL::ThreadPool::finalize_func_t = std::function<void()>

Definition at line 114 of file ThreadPool.hh.

◆ initialize_func_t

using PTL::ThreadPool::initialize_func_t = std::function<void()>

Definition at line 113 of file ThreadPool.hh.

◆ lock_t

using PTL::ThreadPool::lock_t = std::shared_ptr<Mutex>

Definition at line 101 of file ThreadPool.hh.

◆ pool_state_type

using PTL::ThreadPool::pool_state_type = std::shared_ptr<std::atomic_short>

Definition at line 97 of file ThreadPool.hh.

◆ size_type

Definition at line 94 of file ThreadPool.hh.

◆ task_count_type

using PTL::ThreadPool::task_count_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 95 of file ThreadPool.hh.

◆ task_pointer

using PTL::ThreadPool::task_pointer = std::shared_ptr<task_type>

Definition at line 103 of file ThreadPool.hh.

◆ task_queue_t

Definition at line 104 of file ThreadPool.hh.

◆ task_type

Definition at line 100 of file ThreadPool.hh.

◆ thread_data_t

using PTL::ThreadPool::thread_data_t = std::vector<std::shared_ptr<ThreadData> >

Definition at line 111 of file ThreadPool.hh.

◆ thread_id_map_t

using PTL::ThreadPool::thread_id_map_t = std::map<ThreadId, uintmax_t>

Definition at line 108 of file ThreadPool.hh.

◆ thread_index_map_t

using PTL::ThreadPool::thread_index_map_t = std::map<uintmax_t, ThreadId>

Definition at line 109 of file ThreadPool.hh.

◆ thread_list_t

Definition at line 106 of file ThreadPool.hh.

◆ thread_vec_t

using PTL::ThreadPool::thread_vec_t = std::vector<Thread>

Definition at line 110 of file ThreadPool.hh.

◆ uomap

template<typename KeyT , typename MappedT , typename HashT = KeyT>
using PTL::ThreadPool::uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT> >

Definition at line 91 of file ThreadPool.hh.

Constructor & Destructor Documentation

◆ ThreadPool() [1/5]

ThreadPool::ThreadPool ( const Config _cfg)
explicit

Definition at line 257 of file ThreadPool.cc.

258: m_use_affinity{ _cfg.use_affinity }
259, m_tbb_tp{ _cfg.use_tbb }
260, m_verbose{ _cfg.verbose }
261, m_priority{ _cfg.priority }
262, m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) }
263, m_task_queue{ _cfg.task_queue }
264, m_init_func{ _cfg.initializer }
265, m_fini_func{ _cfg.finalizer }
266, m_affinity_func{ _cfg.set_affinity }
267{
268 auto master_id = get_this_thread_id();
269 if(master_id != 0 && m_verbose > 1)
270 {
271 AutoLock lock(TypeMutex<decltype(std::cerr)>());
272 std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
273 }
274
275 thread_data() = new ThreadData(this);
276
277 // initialize after get_this_thread_id so master is zero
278 if(_cfg.init)
279 this->initialize_threadpool(_cfg.pool_size);
280}
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:215
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:402
MutexTp & TypeMutex(const unsigned int &_n=0)
Definition: Threading.hh:74

◆ ThreadPool() [2/5]

ThreadPool::ThreadPool ( const size_type pool_size,
VUserTaskQueue task_queue = nullptr,
bool  _use_affinity = f_use_cpu_affinity(),
affinity_func_t  _affinity_func = affinity_functor(),
initialize_func_t  _init_func = initialization_functor(),
finalize_func_t  _fini_func = finalization_functor() 
)

Definition at line 282 of file ThreadPool.cc.

285: ThreadPool{ Config{ true, f_use_tbb(), _use_affinity, f_verbose(), f_thread_priority(),
286 _pool_size, _task_queue, std::move(_affinity_func),
287 std::move(_init_func), std::move(_fini_func) } }
288{}

◆ ThreadPool() [3/5]

ThreadPool::ThreadPool ( const size_type pool_size,
initialize_func_t  _init_func,
finalize_func_t  _fini_func,
bool  _use_affinity = f_use_cpu_affinity(),
affinity_func_t  _affinity_func = affinity_functor(),
VUserTaskQueue task_queue = nullptr 
)

Definition at line 290 of file ThreadPool.cc.

293: ThreadPool{ pool_size,
294 task_queue,
295 _use_affinity,
296 std::move(_affinity_func),
297 std::move(_init_func),
298 std::move(_fini_func) }
299{}

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )
virtual

Definition at line 303 of file ThreadPool.cc.

304{
305 if(m_alive_flag->load())
306 {
307 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
308 "destroy_threadpool() before deleting the ThreadPool object to "
309 "eliminate this message."
310 << std::endl;
311 m_pool_state->store(thread_pool::state::STOPPED);
312 m_task_lock->lock();
313 m_task_cond->notify_all();
314 m_task_lock->unlock();
315 for(auto& itr : m_threads)
316 itr.join();
317 m_threads.clear();
318 }
319
320 // delete owned resources
321 if(m_delete_task_queue)
322 delete m_task_queue;
323
324 delete m_tbb_task_arena;
325 delete m_tbb_task_group;
326}

◆ ThreadPool() [4/5]

PTL::ThreadPool::ThreadPool ( const ThreadPool )
delete

◆ ThreadPool() [5/5]

PTL::ThreadPool::ThreadPool ( ThreadPool &&  )
default

Member Function Documentation

◆ add_task()

ThreadPool::size_type PTL::ThreadPool::add_task ( task_pointer &&  task,
int  bin = -1 
)
inline

Definition at line 450 of file ThreadPool.hh.

451{
452 // if not native (i.e. TBB) or we haven't built thread-pool, just execute
453 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
454 return static_cast<size_type>(run_on_this(std::move(task)));
455
456 return static_cast<size_type>(insert(std::move(task), bin));
457}
int insert(task_pointer &&, int=-1)
Definition: ThreadPool.hh:439
int run_on_this(task_pointer &&)
Definition: ThreadPool.hh:421
size_t size_type
Definition: ThreadPool.hh:94

Referenced by PTL::TaskManager::async(), and PTL::TaskManager::exec().

◆ add_tasks()

template<typename ListT >
ThreadPool::size_type PTL::ThreadPool::add_tasks ( ListT &  c)
inline

Definition at line 461 of file ThreadPool.hh.

462{
463 if(!m_alive_flag) // if we haven't built thread-pool, just execute
464 {
465 for(auto& itr : c)
466 run(itr);
467 c.clear();
468 return 0;
469 }
470
471 // TODO: put a limit on how many tasks can be added at most
472 auto c_size = c.size();
473 for(auto& itr : c)
474 {
475 if(!itr->is_native_task())
476 --c_size;
477 else
478 {
479 //++(m_task_queue);
480 get_valid_queue(m_task_queue)->InsertTask(itr);
481 }
482 }
483 c.clear();
484
485 // notify sleeping threads
486 notify(c_size);
487
488 return c_size;
489}
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:759
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0

◆ add_thread_id()

uintmax_t ThreadPool::add_thread_id ( ThreadId  _tid = ThisThread::get_id())
static

Definition at line 223 of file ThreadPool.cc.

224{
225 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
226 if(!lock.owns_lock())
227 lock.lock();
228 if(f_thread_ids().find(_tid) == f_thread_ids().end())
229 {
230 auto _idx = f_thread_ids().size();
231 f_thread_ids()[_tid] = _idx;
232 Threading::SetThreadId((int)_idx);
233 }
234 return f_thread_ids().at(_tid);
235}
void SetThreadId(int aNewValue)
Definition: Threading.cc:115

Referenced by execute_on_all_threads(), and execute_on_specific_threads().

◆ affinity_functor()

static affinity_func_t & PTL::ThreadPool::affinity_functor ( )
inlinestatic

Definition at line 117 of file ThreadPool.hh.

118 {
119 static affinity_func_t _v = [](intmax_t) {
120 static std::atomic<intmax_t> assigned;
121 intmax_t _assign = assigned++;
122 return _assign % Thread::hardware_concurrency();
123 };
124 return _v;
125 }
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:115

◆ destroy_threadpool()

ThreadPool::size_type ThreadPool::destroy_threadpool ( )

Definition at line 572 of file ThreadPool.cc.

573{
574 // Note: this is not for synchronization, its for thread communication!
575 // destroy_threadpool() will only be called from the main thread, yet
576 // the modified m_pool_state may not show up to other threads until its
577 // modified in a lock!
578 //------------------------------------------------------------------------//
579 m_pool_state->store(thread_pool::state::STOPPED);
580
581 //--------------------------------------------------------------------//
582 // handle tbb task scheduler
583#if defined(PTL_USE_TBB)
584 if(m_tbb_task_group)
585 {
586 execute_on_all_threads([this]() { m_fini_func(); });
587 auto _func = [&]() { m_tbb_task_group->wait(); };
588 if(m_tbb_task_arena)
589 m_tbb_task_arena->execute(_func);
590 else
591 _func();
592 delete m_tbb_task_group;
593 m_tbb_task_group = nullptr;
594 }
595 if(m_tbb_task_arena)
596 {
597 delete m_tbb_task_arena;
598 m_tbb_task_arena = nullptr;
599 }
600 if(m_tbb_tp && tbb_global_control())
601 {
602 tbb_global_control_t*& _global_control = tbb_global_control();
603 delete _global_control;
604 _global_control = nullptr;
605 m_tbb_tp = false;
606 AutoLock lock(TypeMutex<decltype(std::cerr)>());
607 if(m_verbose > 0)
608 {
609 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
610 }
611 }
612#endif
613
614 if(!m_alive_flag->load())
615 return 0;
616
617 //------------------------------------------------------------------------//
618 // notify all threads we are shutting down
619 m_task_lock->lock();
620 m_task_cond->notify_all();
621 m_task_lock->unlock();
622 //------------------------------------------------------------------------//
623
624 if(m_is_joined.size() != m_main_threads.size())
625 {
626 std::stringstream ss;
627 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
628 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
629 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
630
631 throw std::runtime_error(ss.str());
632 }
633
634 for(size_type i = 0; i < m_is_joined.size(); i++)
635 {
636 //--------------------------------------------------------------------//
637 //
638 if(i < m_threads.size())
639 m_threads.at(i).join();
640
641 //--------------------------------------------------------------------//
642 // if its joined already, nothing else needs to be done
643 if(m_is_joined.at(i))
644 continue;
645
646 //--------------------------------------------------------------------//
647 // join
648 if(std::this_thread::get_id() == m_main_threads[i])
649 continue;
650
651 //--------------------------------------------------------------------//
652 // thread id and index
653 auto _tid = m_main_threads[i];
654
655 //--------------------------------------------------------------------//
656 // erase thread from thread ID list
657 if(f_thread_ids().find(_tid) != f_thread_ids().end())
658 f_thread_ids().erase(f_thread_ids().find(_tid));
659
660 //--------------------------------------------------------------------//
661 // it's joined
662 m_is_joined.at(i) = true;
663 }
664
665 m_thread_data.clear();
666 m_threads.clear();
667 m_main_threads.clear();
668 m_is_joined.clear();
669
670 m_alive_flag->store(false);
671
672 auto start = std::chrono::steady_clock::now();
673 auto elapsed = std::chrono::duration<double>{};
674 // wait maximum of 30 seconds for threads to exit
675 while(m_thread_active->load() > 0 && elapsed.count() < 30)
676 {
677 std::this_thread::sleep_for(std::chrono::milliseconds(50));
678 elapsed = std::chrono::steady_clock::now() - start;
679 }
680
681 auto _active = m_thread_active->load();
682
683 if(get_verbose() > 0)
684 {
685 if(_active == 0)
686 {
687 AutoLock lock(TypeMutex<decltype(std::cerr)>());
688 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
689 }
690 else
691 {
692 AutoLock lock(TypeMutex<decltype(std::cerr)>());
693 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active
694 << " threads might still be active (and cause a termination error)"
695 << std::endl;
696 }
697 }
698
699 if(m_delete_task_queue)
700 {
701 delete m_task_queue;
702 m_task_queue = nullptr;
703 }
704
705 return 0;
706}
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:384
int get_verbose() const
Definition: ThreadPool.hh:269
void execute_on_all_threads(FuncT &&_func)
Definition: ThreadPool.hh:493
auto execute(FuncT &&_func) -> decltype(_func())
Definition: ThreadData.hh:115

Referenced by PTL::TaskManager::finalize(), PTL::TaskRunManager::Terminate(), and G4TaskRunManager::~G4TaskRunManager().

◆ execute_on_all_threads()

template<typename FuncT >
void PTL::ThreadPool::execute_on_all_threads ( FuncT &&  _func)
inline

Definition at line 493 of file ThreadPool.hh.

494{
495 if(m_tbb_tp && m_tbb_task_group)
496 {
497#if defined(PTL_USE_TBB)
498 // TBB lazily activates threads to process tasks and the main thread
499 // participates in processing the tasks so getting a specific
500 // function to execute only on the worker threads requires some trickery
501 //
502 std::set<std::thread::id> _first{};
503 Mutex _mutex{};
504 // init function which executes function and returns 1 only once
505 auto _init = [&]() {
506 int _once = 0;
507 _mutex.lock();
508 if(_first.find(std::this_thread::get_id()) == _first.end())
509 {
510 // we need to reset this thread-local static for multiple invocations
511 // of the same template instantiation
512 _once = 1;
513 _first.insert(std::this_thread::get_id());
514 }
515 _mutex.unlock();
516 if(_once != 0)
517 {
518 _func();
519 return 1;
520 }
521 return 0;
522 };
523 // this will collect the number of threads which have
524 // executed the _init function above
525 std::atomic<size_t> _total_init{ 0 };
526 // max parallelism by TBB
527 size_t _maxp = tbb_global_control()->active_value(
529 // create a task arean
530 auto* _arena = get_task_arena();
531 // size of the thread-pool
532 size_t _sz = size();
533 // number of cores
534 size_t _ncore = Threading::GetNumberOfCores();
535 // maximum depth for recursion
536 size_t _dmax = std::max<size_t>(_ncore, 8);
537 // how many threads we need to initialize
538 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
539 // this is the task passed to the task-group
540 std::function<void()> _init_task;
541 _init_task = [&]() {
543 static thread_local size_type _depth = 0;
544 int _ret = 0;
545 // don't let the main thread execute the function
546 if(!is_main())
547 {
548 // execute the function
549 _ret = _init();
550 // add the result
551 _total_init += _ret;
552 }
553 // if the function did not return anything, recursively execute
554 // two more tasks
555 ++_depth;
556 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
557 {
558 tbb::task_group tg{};
559 tg.run([&]() { _init_task(); });
560 tg.run([&]() { _init_task(); });
561 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
562 tg.wait();
563 }
564 --_depth;
565 };
566
567 // TBB won't oversubscribe so we need to limit by ncores - 1
568 size_t nitr = 0;
569 auto _fname = __FUNCTION__;
570 auto _write_info = [&]() {
571 std::cout << "[" << _fname << "]> Total initialized: " << _total_init
572 << ", expected: " << _num << ", max-parallel: " << _maxp
573 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
574 };
575 while(_total_init < _num)
576 {
577 auto _n = 2 * _num;
578 while(--_n > 0)
579 {
580 _arena->execute(
581 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
582 }
583 _arena->execute([&]() { m_tbb_task_group->wait(); });
584 // don't loop infinitely but use a strict condition
585 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
586 {
587 _write_info();
588 break;
589 }
590 // at this point we need to exit
591 if(nitr > 4 * (_ncore + 1))
592 {
593 _write_info();
594 break;
595 }
596 }
597 if(get_verbose() > 3)
598 _write_info();
599#endif
600 }
601 else if(get_queue())
602 {
603 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
604 }
605}
bool is_main() const
Definition: ThreadPool.hh:270
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
Definition: ThreadPool.cc:223
task_queue_t * get_queue() const
Definition: ThreadPool.hh:189
tbb_task_arena_t * get_task_arena()
Definition: ThreadPool.hh:392
size_type size() const
Definition: ThreadPool.hh:252
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
static size_t active_value(parameter param)
void run(FuncT f)
Definition: ThreadData.hh:69
unsigned GetNumberOfCores()
Definition: Threading.cc:63
std::mutex Mutex
Definition: Threading.hh:57

Referenced by G4TaskRunManager::CreateAndStartWorkers(), destroy_threadpool(), initialize_threadpool(), G4TaskRunManager::RequestWorkersProcessCommandsStack(), G4TaskRunManager::TerminateWorkers(), and G4TaskRunManager::WaitForEndEventLoopWorkers().

◆ execute_on_specific_threads()

template<typename FuncT >
void PTL::ThreadPool::execute_on_specific_threads ( const std::set< std::thread::id > &  _tid,
FuncT &&  _func 
)
inline

Definition at line 611 of file ThreadPool.hh.

613{
614 if(m_tbb_tp && m_tbb_task_group)
615 {
616#if defined(PTL_USE_TBB)
617 // TBB lazily activates threads to process tasks and the main thread
618 // participates in processing the tasks so getting a specific
619 // function to execute only on the worker threads requires some trickery
620 //
621 std::set<std::thread::id> _first{};
622 Mutex _mutex{};
623 // init function which executes function and returns 1 only once
624 auto _exec = [&]() {
625 int _once = 0;
626 _mutex.lock();
627 if(_first.find(std::this_thread::get_id()) == _first.end())
628 {
629 // we need to reset this thread-local static for multiple invocations
630 // of the same template instantiation
631 _once = 1;
632 _first.insert(std::this_thread::get_id());
633 }
634 _mutex.unlock();
635 if(_once != 0)
636 {
637 _func();
638 return 1;
639 }
640 return 0;
641 };
642 // this will collect the number of threads which have
643 // executed the _exec function above
644 std::atomic<size_t> _total_exec{ 0 };
645 // number of cores
646 size_t _ncore = Threading::GetNumberOfCores();
647 // maximum depth for recursion
648 size_t _dmax = std::max<size_t>(_ncore, 8);
649 // how many threads we need to initialize
650 size_t _num = _tids.size();
651 // create a task arena
652 auto* _arena = get_task_arena();
653 // this is the task passed to the task-group
654 std::function<void()> _exec_task;
655 _exec_task = [&]() {
657 static thread_local size_type _depth = 0;
658 int _ret = 0;
659 auto _this_tid = std::this_thread::get_id();
660 // don't let the main thread execute the function
661 if(_tids.count(_this_tid) > 0)
662 {
663 // execute the function
664 _ret = _exec();
665 // add the result
666 _total_exec += _ret;
667 }
668 // if the function did not return anything, recursively execute
669 // two more tasks
670 ++_depth;
671 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
672 {
673 tbb::task_group tg{};
674 tg.run([&]() { _exec_task(); });
675 tg.run([&]() { _exec_task(); });
676 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
677 tg.wait();
678 }
679 --_depth;
680 };
681
682 // TBB won't oversubscribe so we need to limit by ncores - 1
683 size_t nitr = 0;
684 auto _fname = __FUNCTION__;
685 auto _write_info = [&]() {
686 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
687 << ", expected: " << _num << ", size: " << size() << std::endl;
688 };
689 while(_total_exec < _num)
690 {
691 auto _n = 2 * _num;
692 while(--_n > 0)
693 {
694 _arena->execute(
695 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
696 }
697 _arena->execute([&]() { m_tbb_task_group->wait(); });
698 // don't loop infinitely but use a strict condition
699 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
700 {
701 _write_info();
702 break;
703 }
704 // at this point we need to exit
705 if(nitr > 8 * (_num + 1))
706 {
707 _write_info();
708 break;
709 }
710 }
711 if(get_verbose() > 3)
712 _write_info();
713#endif
714 }
715 else if(get_queue())
716 {
717 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
718 }
719}
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0

◆ execute_thread()

void ThreadPool::execute_thread ( VUserTaskQueue _task_queue)
protected

Definition at line 768 of file ThreadPool.cc.

769{
770 // how long the thread waits on condition variable
771 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
772
773 ++(*m_thread_awake);
774
775 // initialization function
776 m_init_func();
777 // finalization function (executed when scope is destroyed)
778 ScopeDestructor _fini{ [this]() { m_fini_func(); } };
779
780 ThreadId tid = ThisThread::get_id();
781 ThreadData* data = thread_data();
782 // auto thread_bin = _task_queue->GetThreadBin();
783 // auto workers = _task_queue->workers();
784
785 auto start = std::chrono::steady_clock::now();
786 auto elapsed = std::chrono::duration<double>{};
787 // check for updates for 60 seconds max
788 while(!_task_queue && elapsed.count() < 60)
789 {
790 elapsed = std::chrono::steady_clock::now() - start;
791 data->update();
792 _task_queue = data->current_queue;
793 }
794
795 if(!_task_queue)
796 {
797 --(*m_thread_awake);
798 throw std::runtime_error("No task queue was found after 60 seconds!");
799 }
800
801 assert(data->current_queue != nullptr);
802 assert(_task_queue == data->current_queue);
803
804 // essentially a dummy run
805 if(_task_queue)
806 {
807 data->within_task = true;
808 auto _task = _task_queue->GetTask();
809 if(_task)
810 {
811 (*_task)();
812 }
813 data->within_task = false;
814 }
815
816 // threads stay in this loop forever until thread-pool destroyed
817 while(true)
818 {
819 static thread_local auto p_task_lock = m_task_lock;
820
821 //--------------------------------------------------------------------//
822 // Try to pick a task
823 AutoLock _task_lock(*p_task_lock, std::defer_lock);
824 //--------------------------------------------------------------------//
825
826 auto leave_pool = [&]() {
827 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
828 auto _pool_state = _state();
829 if(_pool_state > 0)
830 {
831 // stop whole pool
832 if(_pool_state == thread_pool::state::STOPPED)
833 {
834 if(_task_lock.owns_lock())
835 _task_lock.unlock();
836 return true;
837 }
838 // single thread stoppage
839 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
840 {
841 if(!_task_lock.owns_lock())
842 _task_lock.lock();
843 if(!m_is_stopped.empty() && m_is_stopped.back())
844 {
845 m_stop_threads.push_back(tid);
846 m_is_stopped.pop_back();
847 if(_task_lock.owns_lock())
848 _task_lock.unlock();
849 // exit entire function
850 return true;
851 }
852 if(_task_lock.owns_lock())
853 _task_lock.unlock();
854 }
855 }
856 return false;
857 };
858
859 // We need to put condition.wait() in a loop for two reasons:
860 // 1. There can be spurious wake-ups (due to signal/ENITR)
861 // 2. When mutex is released for waiting, another thread can be woken up
862 // from a signal/broadcast and that thread can mess up the condition.
863 // So when the current thread wakes up the condition may no longer be
864 // actually true!
865 while(_task_queue->empty())
866 {
867 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
868 auto _size = [&]() { return _task_queue->true_size(); };
869 auto _empty = [&]() { return _task_queue->empty(); };
870 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
871
872 if(leave_pool())
873 return;
874
875 if(_task_queue->true_size() == 0)
876 {
877 if(m_thread_awake->load() > 0)
878 --(*m_thread_awake);
879
880 // lock before sleeping on condition
881 if(!_task_lock.owns_lock())
882 _task_lock.lock();
883
884 // Wait until there is a task in the queue
885 // Unlocks mutex while waiting, then locks it back when signaled
886 // use lambda to control waking
887 m_task_cond->wait(_task_lock, _wake);
888
889 if(_state() == thread_pool::state::STOPPED)
890 return;
891
892 // unlock if owned
893 if(_task_lock.owns_lock())
894 _task_lock.unlock();
895
896 // notify that is awake
897 if(m_thread_awake->load() < m_pool_size)
898 ++(*m_thread_awake);
899 }
900 else
901 break;
902 }
903
904 // release the lock
905 if(_task_lock.owns_lock())
906 _task_lock.unlock();
907
908 //----------------------------------------------------------------//
909
910 // leave pool if conditions dictate it
911 if(leave_pool())
912 return;
913
914 // activate guard against recursive deadlock
915 data->within_task = true;
916 //----------------------------------------------------------------//
917
918 // execute the task(s)
919 while(!_task_queue->empty())
920 {
921 auto _task = _task_queue->GetTask();
922 if(_task)
923 {
924 (*_task)();
925 }
926 }
927 //----------------------------------------------------------------//
928
929 // disable guard against recursive deadlock
930 data->within_task = false;
931 //----------------------------------------------------------------//
932 }
933}
VUserTaskQueue * current_queue
Definition: ThreadData.hh:152
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
Thread::id ThreadId
Definition: Threading.hh:46

◆ finalization_functor()

static finalize_func_t & PTL::ThreadPool::finalization_functor ( )
inlinestatic

Definition at line 133 of file ThreadPool.hh.

134 {
135 static finalize_func_t _v = []() {};
136 return _v;
137 }
std::function< void()> finalize_func_t
Definition: ThreadPool.hh:114

◆ get_active_threads_count()

int PTL::ThreadPool::get_active_threads_count ( ) const
inline

Definition at line 262 of file ThreadPool.hh.

262{ return (int)m_thread_awake->load(); }

◆ get_default_scheduling_priority()

static int PTL::ThreadPool::get_default_scheduling_priority ( )
inlinestatic

get the default scheduling priority of threads in thread-pool

Definition at line 216 of file ThreadPool.hh.

216{ return f_thread_priority(); }

◆ get_default_size()

static size_type PTL::ThreadPool::get_default_size ( )
inlinestatic

get the default pool size

Definition at line 220 of file ThreadPool.hh.

220{ return f_default_pool_size(); }

◆ get_default_use_cpu_affinity()

static bool PTL::ThreadPool::get_default_use_cpu_affinity ( )
inlinestatic

get the default use of cpu affinity

Definition at line 214 of file ThreadPool.hh.

214{ return f_use_cpu_affinity(); }

◆ get_default_use_tbb()

static bool PTL::ThreadPool::get_default_use_tbb ( )
inlinestatic

get the default use of tbb

Definition at line 212 of file ThreadPool.hh.

212{ return f_use_tbb(); }

◆ get_default_verbose()

static int PTL::ThreadPool::get_default_verbose ( )
inlinestatic

get the default verbosity

Definition at line 218 of file ThreadPool.hh.

218{ return f_verbose(); }

◆ get_queue()

task_queue_t * PTL::ThreadPool::get_queue ( ) const
inline

◆ get_task_arena()

tbb_task_arena_t * PTL::ThreadPool::get_task_arena ( )
inline

Definition at line 392 of file ThreadPool.hh.

393{
394#if defined(PTL_USE_TBB)
395 // create a task arena
396 if(!m_tbb_task_arena)
397 {
398 auto _sz = (tbb_global_control())
401 : size();
402 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
403 m_tbb_task_arena->initialize(_sz, 1);
404 }
405#else
406 if(!m_tbb_task_arena)
407 m_tbb_task_arena = new tbb_task_arena_t{};
408#endif
409 return m_tbb_task_arena;
410}
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
tbb::task_arena tbb_task_arena_t
Definition: ThreadData.hh:125

Referenced by execute_on_all_threads(), execute_on_specific_threads(), and run_on_this().

◆ get_this_thread_id()

uintmax_t ThreadPool::get_this_thread_id ( )
static

Definition at line 215 of file ThreadPool.cc.

216{
217 return get_thread_id(ThisThread::get_id());
218}
static uintmax_t get_thread_id(ThreadId)
Definition: ThreadPool.cc:191

Referenced by PTL::UserTaskQueue::GetThreadBin(), initialize_threadpool(), G4TaskRunManagerKernel::InitializeWorker(), ThreadPool(), and PTL::UserTaskQueue::UserTaskQueue().

◆ get_thread() [1/2]

Thread * PTL::ThreadPool::get_thread ( size_type  _n) const

◆ get_thread() [2/2]

Thread * PTL::ThreadPool::get_thread ( std::thread::id  id) const

◆ get_thread_id()

uintmax_t ThreadPool::get_thread_id ( ThreadId  _tid)
static

Definition at line 191 of file ThreadPool.cc.

192{
193 uintmax_t _idx = 0;
194 {
195 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
196 if(!lock.owns_lock())
197 lock.lock();
198 auto itr = f_thread_ids().find(_tid);
199 if(itr == f_thread_ids().end())
200 {
201 _idx = f_thread_ids().size();
202 f_thread_ids()[_tid] = _idx;
203 }
204 else
205 {
206 _idx = itr->second;
207 }
208 }
209 return _idx;
210}

Referenced by get_this_thread_id(), set_affinity(), and set_priority().

◆ get_thread_ids()

const ThreadPool::thread_id_map_t & ThreadPool::get_thread_ids ( )
static

Definition at line 183 of file ThreadPool.cc.

184{
185 return f_thread_ids();
186}

◆ get_valid_queue()

ThreadPool::task_queue_t *& ThreadPool::get_valid_queue ( task_queue_t *&  _queue) const

Definition at line 759 of file ThreadPool.cc.

760{
761 if(!_queue)
762 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
763 return _queue;
764}

Referenced by add_tasks(), and insert().

◆ get_verbose()

int PTL::ThreadPool::get_verbose ( ) const
inline

Definition at line 269 of file ThreadPool.hh.

269{ return m_verbose; }

Referenced by destroy_threadpool(), execute_on_all_threads(), and execute_on_specific_threads().

◆ initialization_functor()

static initialize_func_t & PTL::ThreadPool::initialization_functor ( )
inlinestatic

Definition at line 127 of file ThreadPool.hh.

128 {
129 static initialize_func_t _v = []() {};
130 return _v;
131 }
std::function< void()> initialize_func_t
Definition: ThreadPool.hh:113

◆ initialize_threadpool()

ThreadPool::size_type ThreadPool::initialize_threadpool ( size_type  proposed_size)

Definition at line 402 of file ThreadPool.cc.

403{
404 //--------------------------------------------------------------------//
405 // return before initializing
406 if(proposed_size < 1)
407 return 0;
408
409 //--------------------------------------------------------------------//
410 // store that has been started
411 if(!m_alive_flag->load())
412 m_pool_state->store(thread_pool::state::STARTED);
413
414#if defined(PTL_USE_TBB)
415 //--------------------------------------------------------------------//
416 // handle tbb task scheduler
417 if(m_tbb_tp)
418 {
419 m_tbb_tp = true;
420 m_pool_size = proposed_size;
421 tbb_global_control_t*& _global_control = tbb_global_control();
422 // delete if wrong size
423 if(m_pool_size != proposed_size)
424 {
425 delete _global_control;
426 _global_control = nullptr;
427 }
428
429 if(!_global_control)
430 {
431 _global_control = new tbb_global_control_t(
433 if(m_verbose > 0)
434 {
435 AutoLock lock(TypeMutex<decltype(std::cerr)>());
436 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with "
437 << m_pool_size << " threads." << std::endl;
438 }
439 }
440
441 // create task group (used for async)
442 if(!m_tbb_task_group)
443 {
444 m_tbb_task_group = new tbb_task_group_t{};
445 execute_on_all_threads([this]() { m_init_func(); });
446 }
447
448 return m_pool_size;
449 }
450#endif
451
452 m_alive_flag->store(true);
453
454 //--------------------------------------------------------------------//
455 // if started, stop some thread if smaller or return if equal
456 if(m_pool_state->load() == thread_pool::state::STARTED)
457 {
458 if(m_pool_size > proposed_size)
459 {
460 while(stop_thread() > proposed_size)
461 ;
462 if(m_verbose > 0)
463 {
464 AutoLock lock(TypeMutex<decltype(std::cerr)>());
465 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with "
466 << m_pool_size << " threads." << std::endl;
467 }
468 if(!m_task_queue)
469 {
470 m_delete_task_queue = true;
471 m_task_queue = new UserTaskQueue(m_pool_size);
472 }
473 else
474 {
475 m_task_queue->resize(m_pool_size);
476 }
477 return m_pool_size;
478 }
479 else if(m_pool_size == proposed_size) // NOLINT
480 {
481 if(m_verbose > 0)
482 {
483 AutoLock lock(TypeMutex<decltype(std::cerr)>());
484 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads."
485 << std::endl;
486 }
487 if(!m_task_queue)
488 {
489 m_delete_task_queue = true;
490 m_task_queue = new UserTaskQueue(m_pool_size);
491 }
492 return m_pool_size;
493 }
494 }
495
496 //--------------------------------------------------------------------//
497 // reserve enough space to prevent realloc later
498 {
499 AutoLock _task_lock(*m_task_lock);
500 m_is_joined.reserve(proposed_size);
501 }
502
503 if(!m_task_queue)
504 {
505 m_delete_task_queue = true;
506 m_task_queue = new UserTaskQueue(proposed_size);
507 }
508
509 auto this_tid = get_this_thread_id();
510 for(size_type i = m_pool_size; i < proposed_size; ++i)
511 {
512 // add the threads
513 try
514 {
515 // create thread
516 Thread thr{ ThreadPool::start_thread, this, &m_thread_data,
517 this_tid + i + 1 };
518 // only reaches here if successful creation of thread
519 ++m_pool_size;
520 // store thread
521 m_main_threads.push_back(thr.get_id());
522 // list of joined thread booleans
523 m_is_joined.push_back(false);
524 // set the affinity
525 if(m_use_affinity)
526 set_affinity(i, thr);
527 set_priority(m_priority, thr);
528 // store
529 m_threads.emplace_back(std::move(thr));
530 } catch(std::runtime_error& e)
531 {
532 AutoLock lock(TypeMutex<decltype(std::cerr)>());
533 std::cerr << "[PTL::ThreadPool] " << e.what()
534 << std::endl; // issue creating thread
535 continue;
536 } catch(std::bad_alloc& e)
537 {
538 AutoLock lock(TypeMutex<decltype(std::cerr)>());
539 std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl;
540 continue;
541 }
542 }
543 //------------------------------------------------------------------------//
544
545 AutoLock _task_lock(*m_task_lock);
546
547 // thread pool size doesn't match with join vector
548 // this will screw up joining later
549 if(m_is_joined.size() != m_main_threads.size())
550 {
551 std::stringstream ss;
552 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
553 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
554 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
555
556 throw std::runtime_error(ss.str());
557 }
558
559 if(m_verbose > 0)
560 {
561 AutoLock lock(TypeMutex<decltype(std::cerr)>());
562 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
563 << " threads." << std::endl;
564 }
565
566 return m_main_threads.size();
567}
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:116
void set_priority(int _prio, Thread &) const
Definition: ThreadPool.cc:378
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:264
size_type stop_thread()
Definition: ThreadPool.cc:711
virtual void resize(intmax_t)=0
std::thread Thread
Definition: Threading.hh:37
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:123

Referenced by resize(), and ThreadPool().

◆ insert()

int PTL::ThreadPool::insert ( task_pointer &&  task,
int  bin = -1 
)
inlineprotected

Definition at line 439 of file ThreadPool.hh.

440{
441 static thread_local ThreadData* _data = ThreadData::GetInstance();
442
443 // pass the task to the queue
444 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
445 notify();
446 return (int)ibin;
447}
static ThreadData *& GetInstance()
Definition: ThreadData.cc:32

Referenced by add_task().

◆ is_alive()

bool PTL::ThreadPool::is_alive ( )
inline

Definition at line 257 of file ThreadPool.hh.

257{ return m_alive_flag->load(); }

Referenced by PTL::TaskGroup< Tp, Arg, MaxDepth >::wait().

◆ is_initialized()

bool ThreadPool::is_initialized ( ) const

Definition at line 331 of file ThreadPool.cc.

332{
333 return !(m_pool_state->load() == thread_pool::state::NONINIT);
334}

◆ is_main()

bool PTL::ThreadPool::is_main ( ) const
inline

Definition at line 270 of file ThreadPool.hh.

270{ return ThisThread::get_id() == m_main_tid; }

Referenced by execute_on_all_threads().

◆ is_tbb_threadpool()

bool PTL::ThreadPool::is_tbb_threadpool ( ) const
inline

Definition at line 192 of file ThreadPool.hh.

192{ return m_tbb_tp; }

Referenced by G4TaskRunManager::InitializeThreadPool().

◆ notify() [1/2]

void PTL::ThreadPool::notify ( )
inline

Definition at line 342 of file ThreadPool.hh.

343{
344 // wake up one thread that is waiting for a task to be available
345 if(m_thread_awake->load() < m_pool_size)
346 {
347 AutoLock l(*m_task_lock);
348 m_task_cond->notify_one();
349 }
350}
TemplateAutoLock< Mutex > AutoLock
Definition: AutoLock.hh:479

Referenced by add_tasks(), and insert().

◆ notify() [2/2]

void PTL::ThreadPool::notify ( size_type  ntasks)
inline

Definition at line 361 of file ThreadPool.hh.

362{
363 if(ntasks == 0)
364 return;
365
366 // wake up as many threads that tasks just added
367 if(m_thread_awake->load() < m_pool_size)
368 {
369 AutoLock l(*m_task_lock);
370 if(ntasks < this->size())
371 {
372 for(size_type i = 0; i < ntasks; ++i)
373 m_task_cond->notify_one();
374 }
375 else
376 {
377 m_task_cond->notify_all();
378 }
379 }
380}

◆ notify_all()

void PTL::ThreadPool::notify_all ( )
inline

Definition at line 353 of file ThreadPool.hh.

354{
355 // wake all threads
356 AutoLock l(*m_task_lock);
357 m_task_cond->notify_all();
358}

◆ operator=() [1/2]

ThreadPool & PTL::ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool & PTL::ThreadPool::operator= ( ThreadPool &&  )
default

◆ record_entry()

void ThreadPool::record_entry ( )
protected

Definition at line 339 of file ThreadPool.cc.

340{
341 ++(*m_thread_active);
342}

◆ record_exit()

void ThreadPool::record_exit ( )
protected

Definition at line 347 of file ThreadPool.cc.

348{
349 --(*m_thread_active);
350}

◆ reset_finalization()

void PTL::ThreadPool::reset_finalization ( )
inline

Definition at line 243 of file ThreadPool.hh.

244 {
245 m_fini_func = []() {};
246 }

◆ reset_initialization()

void PTL::ThreadPool::reset_initialization ( )
inline

Definition at line 239 of file ThreadPool.hh.

240 {
241 m_init_func = []() {};
242 }

◆ resize()

void PTL::ThreadPool::resize ( size_type  _n)
inline

Definition at line 413 of file ThreadPool.hh.

414{
416 if(m_task_queue)
417 m_task_queue->resize(static_cast<intmax_t>(_n));
418}

Referenced by PTL::TaskRunManager::Initialize(), and G4TaskRunManager::SetNumberOfThreads().

◆ run_on_this()

int PTL::ThreadPool::run_on_this ( task_pointer &&  _task)
inlineprotected

Definition at line 421 of file ThreadPool.hh.

422{
423 auto&& _func = [_task]() { (*_task)(); };
424
425 if(m_tbb_tp && m_tbb_task_group)
426 {
427 auto* _arena = get_task_arena();
428 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
429 }
430 else
431 {
432 _func();
433 }
434 // return the number of tasks added to task-list
435 return 0;
436}

Referenced by add_task().

◆ set_affinity() [1/2]

void PTL::ThreadPool::set_affinity ( affinity_func_t  f)
inline

Definition at line 264 of file ThreadPool.hh.

264{ m_affinity_func = std::move(f); }

Referenced by initialize_threadpool().

◆ set_affinity() [2/2]

void ThreadPool::set_affinity ( intmax_t  i,
Thread _thread 
) const

Definition at line 355 of file ThreadPool.cc.

356{
357 try
358 {
359 NativeThread native_thread = _thread.native_handle();
360 intmax_t _pin = m_affinity_func(i);
361 if(m_verbose > 0)
362 {
363 AutoLock lock(TypeMutex<decltype(std::cerr)>());
364 std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread "
365 << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl;
366 }
367 Threading::SetPinAffinity((int)_pin, native_thread);
368 } catch(std::runtime_error& e)
369 {
370 std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what()
371 << std::endl;
372 }
373}
bool SetPinAffinity(int idx)
Definition: Threading.cc:129
std::thread::native_handle_type NativeThread
Definition: Threading.hh:38

◆ set_default_scheduling_priority()

static void PTL::ThreadPool::set_default_scheduling_priority ( int  _v)
inlinestatic

set the default scheduling priority of threads in thread-pool

Definition at line 205 of file ThreadPool.hh.

205{ f_thread_priority() = _v; }

◆ set_default_size()

static void PTL::ThreadPool::set_default_size ( size_type  _v)
inlinestatic

set the default pool size

Definition at line 209 of file ThreadPool.hh.

209{ f_default_pool_size() = _v; }

◆ set_default_use_cpu_affinity()

void ThreadPool::set_default_use_cpu_affinity ( bool  _v)
static

set the default use of cpu affinity

Definition at line 171 of file ThreadPool.cc.

172{
173#if defined(PTL_USE_TBB)
174 f_use_cpu_affinity() = enable;
175#else
176 ConsumeParameters(enable);
177#endif
178}
void ConsumeParameters(Args &&...)
Definition: Utility.hh:44

◆ set_default_use_tbb()

static void PTL::ThreadPool::set_default_use_tbb ( bool  _v)
inlinestatic

set the default use of tbb

Definition at line 201 of file ThreadPool.hh.

201{ set_use_tbb(_v); }
static void set_use_tbb(bool _v)
Definition: ThreadPool.cc:159

◆ set_default_verbose()

static void PTL::ThreadPool::set_default_verbose ( int  _v)
inlinestatic

set the default verbosity

Definition at line 207 of file ThreadPool.hh.

207{ f_verbose() = _v; }

◆ set_finalization()

void PTL::ThreadPool::set_finalization ( finalize_func_t  f)
inline

Definition at line 237 of file ThreadPool.hh.

237{ m_fini_func = std::move(f); }

◆ set_initialization()

void PTL::ThreadPool::set_initialization ( initialize_func_t  f)
inline

Definition at line 236 of file ThreadPool.hh.

236{ m_init_func = std::move(f); }

◆ set_priority()

void ThreadPool::set_priority ( int  _prio,
Thread _thread 
) const

Definition at line 378 of file ThreadPool.cc.

379{
380 try
381 {
382 NativeThread native_thread = _thread.native_handle();
383 if(m_verbose > 0)
384 {
385 AutoLock lock(TypeMutex<decltype(std::cerr)>());
386 std::cerr << "[PTL::ThreadPool] Setting thread "
387 << get_thread_id(_thread.get_id()) << " priority to " << _prio
388 << std::endl;
389 }
390 Threading::SetThreadPriority(_prio, native_thread);
391 } catch(std::runtime_error& e)
392 {
393 AutoLock lock(TypeMutex<decltype(std::cerr)>());
394 std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what()
395 << std::endl;
396 }
397}
bool SetThreadPriority(int _v)
Definition: Threading.cc:146

Referenced by initialize_threadpool().

◆ set_use_tbb()

void ThreadPool::set_use_tbb ( bool  _v)
static

Definition at line 159 of file ThreadPool.cc.

160{
161#if defined(PTL_USE_TBB)
162 f_use_tbb() = enable;
163#else
164 ConsumeParameters(enable);
165#endif
166}

Referenced by G4TaskRunManager::G4TaskRunManager(), set_default_use_tbb(), and PTL::TaskRunManager::TaskRunManager().

◆ set_verbose()

void PTL::ThreadPool::set_verbose ( int  n)
inline

Definition at line 268 of file ThreadPool.hh.

268{ m_verbose = n; }

◆ size()

◆ start_thread()

void ThreadPool::start_thread ( ThreadPool tp,
thread_data_t _data,
intmax_t  _idx = -1 
)
staticprotected

Definition at line 116 of file ThreadPool.cc.

117{
118 if(tp->get_verbose() > 0)
119 {
120 AutoLock lock(TypeMutex<decltype(std::cerr)>());
121 std::cerr << "[PTL::ThreadPool] Starting thread " << _idx << "..." << std::endl;
122 }
123
124 auto _thr_data = std::make_shared<ThreadData>(tp);
125 {
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
128 lock.lock();
129 if(_idx < 0)
130 _idx = f_thread_ids().size();
131 f_thread_ids()[std::this_thread::get_id()] = _idx;
132 Threading::SetThreadId((int)_idx);
133 _data->emplace_back(_thr_data);
134 }
135 thread_data() = _thr_data.get();
136 tp->record_entry();
137 tp->execute_thread(thread_data()->current_queue);
138 tp->record_exit();
139
140 if(tp->get_verbose() > 0)
141 {
142 AutoLock lock(TypeMutex<decltype(std::cerr)>());
143 std::cerr << "[PTL::ThreadPool] Thread " << _idx << " terminating..."
144 << std::endl;
145 }
146}

Referenced by initialize_threadpool().

◆ state()

const pool_state_type & PTL::ThreadPool::state ( ) const
inline

Definition at line 250 of file ThreadPool.hh.

250{ return m_pool_state; }

Referenced by PTL::TaskGroup< Tp, Arg, MaxDepth >::wait().

◆ stop_thread()

ThreadPool::size_type ThreadPool::stop_thread ( )

Definition at line 711 of file ThreadPool.cc.

712{
713 if(!m_alive_flag->load() || m_pool_size == 0)
714 return 0;
715
716 m_pool_state->store(thread_pool::state::PARTIAL);
717
718 //------------------------------------------------------------------------//
719 // notify all threads we are shutting down
720 m_task_lock->lock();
721 m_is_stopped.push_back(true);
722 m_task_cond->notify_one();
723 m_task_lock->unlock();
724 //------------------------------------------------------------------------//
725
726 while(!m_is_stopped.empty() && m_stop_threads.empty())
727 ;
728
729 // lock up the task queue
730 AutoLock _task_lock(*m_task_lock);
731
732 while(!m_stop_threads.empty())
733 {
734 auto tid = m_stop_threads.front();
735 // remove from stopped
736 m_stop_threads.pop_front();
737 // remove from main
738 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
739 {
740 if(*itr == tid)
741 {
742 m_main_threads.erase(itr);
743 break;
744 }
745 }
746 // remove from join list
747 m_is_joined.pop_back();
748 }
749
750 m_pool_state->store(thread_pool::state::STARTED);
751
752 m_pool_size = m_main_threads.size();
753 return m_main_threads.size();
754}

Referenced by initialize_threadpool().

◆ tbb_global_control()

tbb_global_control_t *& PTL::ThreadPool::tbb_global_control ( )
inlinestatic

Definition at line 384 of file ThreadPool.hh.

385{
386 static thread_local tbb_global_control_t* _instance = nullptr;
387 return _instance;
388}

Referenced by destroy_threadpool(), execute_on_all_threads(), get_task_arena(), and initialize_threadpool().

◆ using_affinity()

bool PTL::ThreadPool::using_affinity ( ) const
inline

Definition at line 256 of file ThreadPool.hh.

256{ return m_use_affinity; }

◆ using_tbb()

bool ThreadPool::using_tbb ( )
static

Definition at line 151 of file ThreadPool.cc.

152{
153 return f_use_tbb();
154}

Referenced by PTL::TaskRunManager::Initialize().


The documentation for this class was generated from the following files: