Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::ThreadPool Class Reference

#include <ThreadPool.hh>

Public Types

template<typename KeyT , typename MappedT , typename HashT = KeyT>
using uomap = std::unordered_map< KeyT, MappedT, std::hash< HashT > >
 
using size_type = size_t
 
using task_count_type = std::shared_ptr< std::atomic_uintmax_t >
 
using atomic_int_type = std::shared_ptr< std::atomic_uintmax_t >
 
using pool_state_type = std::shared_ptr< std::atomic_short >
 
using atomic_bool_type = std::shared_ptr< std::atomic_bool >
 
using task_type = VTask
 
using lock_t = std::shared_ptr< Mutex >
 
using condition_t = std::shared_ptr< Condition >
 
using task_pointer = task_type *
 
using task_queue_t = VUserTaskQueue
 
typedef std::deque< ThreadIdthread_list_t
 
typedef std::vector< bool > bool_list_t
 
typedef std::map< ThreadId, uintmax_t > thread_id_map_t
 
typedef std::map< uintmax_t, ThreadIdthread_index_map_t
 
using thread_vec_t = std::vector< Thread >
 
typedef std::function< void()> initialize_func_t
 
typedef std::function< intmax_t(intmax_t)> affinity_func_t
 

Public Member Functions

 ThreadPool (const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), const affinity_func_t &=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
 
virtual ~ThreadPool ()
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=default
 
ThreadPooloperator= (const ThreadPool &)=delete
 
ThreadPooloperator= (ThreadPool &&)=default
 
size_type initialize_threadpool (size_type)
 
size_type destroy_threadpool ()
 
size_type stop_thread ()
 
template<typename FuncT >
void execute_on_all_threads (FuncT &&_func)
 
size_type add_task (task_pointer task, int bin=-1)
 
template<typename ListT >
size_type add_tasks (ListT &)
 
Threadget_thread (size_type _n) const
 
Threadget_thread (std::thread::id id) const
 
task_queue_tget_queue () const
 
void set_initialization (initialize_func_t f)
 
void reset_initialization ()
 
const pool_state_typestate () const
 
size_type size () const
 
void resize (size_type _n)
 
bool using_affinity () const
 
bool is_alive ()
 
void notify ()
 
void notify_all ()
 
void notify (size_type)
 
bool is_initialized () const
 
int get_active_threads_count () const
 
void set_affinity (affinity_func_t f)
 
void set_affinity (intmax_t i, Thread &)
 
void set_verbose (int n)
 
int get_verbose () const
 
bool is_master () const
 

Static Public Member Functions

static bool using_tbb ()
 
static void set_use_tbb (bool val)
 
static tbb_global_control_t *& tbb_global_control ()
 
static const thread_id_map_tget_thread_ids ()
 
static uintmax_t get_this_thread_id ()
 

Protected Member Functions

void execute_thread (VUserTaskQueue *)
 
int insert (const task_pointer &, int=-1)
 
int run_on_this (task_pointer)
 
void record_entry ()
 
void record_exit ()
 

Static Protected Member Functions

static void start_thread (ThreadPool *, intmax_t=-1)
 

Detailed Description

Definition at line 63 of file ThreadPool.hh.

Member Typedef Documentation

◆ affinity_func_t

typedef std::function<intmax_t(intmax_t)> PTL::ThreadPool::affinity_func_t

Definition at line 89 of file ThreadPool.hh.

◆ atomic_bool_type

using PTL::ThreadPool::atomic_bool_type = std::shared_ptr<std::atomic_bool>

Definition at line 74 of file ThreadPool.hh.

◆ atomic_int_type

using PTL::ThreadPool::atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 72 of file ThreadPool.hh.

◆ bool_list_t

typedef std::vector<bool> PTL::ThreadPool::bool_list_t

Definition at line 83 of file ThreadPool.hh.

◆ condition_t

using PTL::ThreadPool::condition_t = std::shared_ptr<Condition>

Definition at line 78 of file ThreadPool.hh.

◆ initialize_func_t

typedef std::function<void()> PTL::ThreadPool::initialize_func_t

Definition at line 88 of file ThreadPool.hh.

◆ lock_t

using PTL::ThreadPool::lock_t = std::shared_ptr<Mutex>

Definition at line 77 of file ThreadPool.hh.

◆ pool_state_type

using PTL::ThreadPool::pool_state_type = std::shared_ptr<std::atomic_short>

Definition at line 73 of file ThreadPool.hh.

◆ size_type

Definition at line 70 of file ThreadPool.hh.

◆ task_count_type

using PTL::ThreadPool::task_count_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 71 of file ThreadPool.hh.

◆ task_pointer

Definition at line 79 of file ThreadPool.hh.

◆ task_queue_t

Definition at line 80 of file ThreadPool.hh.

◆ task_type

Definition at line 76 of file ThreadPool.hh.

◆ thread_id_map_t

typedef std::map<ThreadId, uintmax_t> PTL::ThreadPool::thread_id_map_t

Definition at line 84 of file ThreadPool.hh.

◆ thread_index_map_t

typedef std::map<uintmax_t, ThreadId> PTL::ThreadPool::thread_index_map_t

Definition at line 85 of file ThreadPool.hh.

◆ thread_list_t

Definition at line 82 of file ThreadPool.hh.

◆ thread_vec_t

using PTL::ThreadPool::thread_vec_t = std::vector<Thread>

Definition at line 86 of file ThreadPool.hh.

◆ uomap

template<typename KeyT , typename MappedT , typename HashT = KeyT>
using PTL::ThreadPool::uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT> >

Definition at line 67 of file ThreadPool.hh.

Constructor & Destructor Documentation

◆ ThreadPool() [1/3]

ThreadPool::ThreadPool ( const size_type pool_size,
VUserTaskQueue task_queue = nullptr,
bool  _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
const affinity_func_t _affinity_func = [](intmax_t) { static std::atomic<intmax_t> assigned; intmax_t _assign = assigned++; return _assign % Thread::hardware_concurrency(); } 
)

Definition at line 138 of file ThreadPool.cc.

140: m_use_affinity(_use_affinity)
141, m_tbb_tp(false)
142, m_verbose(0)
143, m_pool_size(0)
144, m_master_tid(ThisThread::get_id())
145, m_alive_flag(std::make_shared<std::atomic_bool>(false))
146, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
147, m_thread_awake(std::make_shared<std::atomic_uintmax_t>(0))
148, m_task_lock(std::make_shared<Mutex>())
149, m_task_cond(std::make_shared<Condition>())
150, m_task_queue(task_queue)
151, m_tbb_task_group(nullptr)
152, m_init_func([]() { return; })
153, m_affinity_func(_affinity_func)
154{
155 m_verbose = GetEnv<int>("PTL_VERBOSE", m_verbose);
156
157 auto master_id = get_this_thread_id();
158 if(master_id != 0 && m_verbose > 1)
159 std::cerr << "ThreadPool created on non-master slave" << std::endl;
160
161 thread_data() = new ThreadData(this);
162
163 // initialize after get_this_thread_id so master is zero
164 this->initialize_threadpool(pool_size);
165
166 if(!m_task_queue)
167 m_task_queue = new UserTaskQueue(m_pool_size);
168}
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:120
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:223

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )
virtual

Definition at line 172 of file ThreadPool.cc.

173{
174 if(m_alive_flag->load())
175 {
176 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
177 "destroy_threadpool() before deleting the ThreadPool object to "
178 "eliminate this message."
179 << std::endl;
180 m_pool_state->store(thread_pool::state::STOPPED);
181 m_task_lock->lock();
182 CONDITIONBROADCAST(m_task_cond.get());
183 m_task_lock->unlock();
184 for(auto& itr : m_threads)
185 itr.join();
186 m_threads.clear();
187 }
188}
#define CONDITIONBROADCAST(cond)
Definition: Threading.hh:195

◆ ThreadPool() [2/3]

PTL::ThreadPool::ThreadPool ( const ThreadPool )
delete

◆ ThreadPool() [3/3]

PTL::ThreadPool::ThreadPool ( ThreadPool &&  )
default

Member Function Documentation

◆ add_task()

ThreadPool::size_type PTL::ThreadPool::add_task ( task_pointer  task,
int  bin = -1 
)
inline

Definition at line 328 of file ThreadPool.hh.

329{
330 // if not native (i.e. TBB) then return
331 if(!task->is_native_task())
332 return 0;
333
334 // if we haven't built thread-pool, just execute
335 if(!m_alive_flag->load())
336 return static_cast<size_type>(run_on_this(task));
337
338 return static_cast<size_type>(insert(task, bin));
339}
size_t size_type
Definition: ThreadPool.hh:70
int run_on_this(task_pointer)
Definition: ThreadPool.hh:296
int insert(const task_pointer &, int=-1)
Definition: ThreadPool.hh:317

Referenced by PTL::TaskManager::async(), PTL::TaskGroup< Tp, Arg >::exec(), PTL::TaskManager::exec(), and PTL::TaskGroup< Tp, Arg >::run().

◆ add_tasks()

template<typename ListT >
ThreadPool::size_type PTL::ThreadPool::add_tasks ( ListT &  c)
inline

Definition at line 343 of file ThreadPool.hh.

344{
345 if(!m_alive_flag) // if we haven't built thread-pool, just execute
346 {
347 for(auto& itr : c)
348 run(itr);
349 c.clear();
350 return 0;
351 }
352
353 // TODO: put a limit on how many tasks can be added at most
354 auto c_size = c.size();
355 for(auto& itr : c)
356 {
357 if(!itr->is_native_task())
358 --c_size;
359 else
360 {
361 //++(m_task_queue);
362 m_task_queue->InsertTask(itr);
363 }
364 }
365 c.clear();
366
367 // notify sleeping threads
368 notify(c_size);
369
370 return c_size;
371}
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1)=0

◆ destroy_threadpool()

ThreadPool::size_type ThreadPool::destroy_threadpool ( )

Definition at line 366 of file ThreadPool.cc.

367{
368 // Note: this is not for synchronization, its for thread communication!
369 // destroy_threadpool() will only be called from the main thread, yet
370 // the modified m_pool_state may not show up to other threads until its
371 // modified in a lock!
372 //------------------------------------------------------------------------//
373 m_pool_state->store(thread_pool::state::STOPPED);
374
375 //--------------------------------------------------------------------//
376 // handle tbb task scheduler
377#ifdef PTL_USE_TBB
378 if(m_tbb_task_group)
379 {
380 m_tbb_task_group->wait();
381 delete m_tbb_task_group;
382 m_tbb_task_group = nullptr;
383 }
384 if(m_tbb_tp && tbb_global_control())
385 {
386 tbb_global_control_t*& _global_control = tbb_global_control();
387 delete _global_control;
388 _global_control = nullptr;
389 m_tbb_tp = false;
390 std::cout << "ThreadPool [TBB] destroyed" << std::endl;
391 }
392#endif
393
394 if(!m_alive_flag->load())
395 return 0;
396
397 //------------------------------------------------------------------------//
398 // notify all threads we are shutting down
399 m_task_lock->lock();
400 CONDITIONBROADCAST(m_task_cond.get());
401 m_task_lock->unlock();
402 //------------------------------------------------------------------------//
403
404 if(m_is_joined.size() != m_main_threads.size())
405 {
406 std::stringstream ss;
407 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
408 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
409 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
410
411 throw std::runtime_error(ss.str());
412 }
413
414 for(size_type i = 0; i < m_is_joined.size(); i++)
415 {
416 //--------------------------------------------------------------------//
417 //
418 if(i < m_threads.size())
419 m_threads.at(i).join();
420
421 //--------------------------------------------------------------------//
422 // if its joined already, nothing else needs to be done
423 if(m_is_joined.at(i))
424 continue;
425
426 //--------------------------------------------------------------------//
427 // join
428 if(std::this_thread::get_id() == m_main_threads[i])
429 continue;
430
431 //--------------------------------------------------------------------//
432 // thread id and index
433 auto _tid = m_main_threads[i];
434
435 //--------------------------------------------------------------------//
436 // erase thread from thread ID list
437 if(f_thread_ids.find(_tid) != f_thread_ids.end())
438 f_thread_ids.erase(f_thread_ids.find(_tid));
439
440 //--------------------------------------------------------------------//
441 // it's joined
442 m_is_joined.at(i) = true;
443
444 //--------------------------------------------------------------------//
445 // try waking up a bunch of threads that are still waiting
446 CONDITIONBROADCAST(m_task_cond.get());
447 //--------------------------------------------------------------------//
448 }
449
450 m_threads.clear();
451 m_main_threads.clear();
452 m_is_joined.clear();
453 m_alive_flag->store(false);
454
455 auto start = std::chrono::steady_clock::now();
456 auto elapsed = std::chrono::duration<double>{};
457 // wait maximum of 30 seconds for threads to exit
458 while(m_thread_active->load() > 0 && elapsed.count() < 30)
459 {
460 std::this_thread::sleep_for(std::chrono::milliseconds(50));
461 elapsed = std::chrono::steady_clock::now() - start;
462 }
463
464 auto _active = m_thread_active->load();
465
466 if(_active == 0)
467 std::cout << "ThreadPool destroyed" << std::endl;
468 else
469 std::cout << "ThreadPool destroyed but " << _active
470 << " threads might still be active (and cause a termination error)"
471 << std::endl;
472
473 return 0;
474}
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:280

Referenced by PTL::TaskManager::finalize(), PTL::TaskRunManager::Terminate(), and G4TaskRunManager::~G4TaskRunManager().

◆ execute_on_all_threads()

template<typename FuncT >
void PTL::ThreadPool::execute_on_all_threads ( FuncT &&  _func)
inline

Definition at line 375 of file ThreadPool.hh.

376{
377 if(m_tbb_tp && m_tbb_task_group)
378 {
379#if defined(PTL_USE_TBB)
380 // TBB lazily activates threads to process tasks and the master thread
381 // participates in processing the tasks so getting a specific
382 // function to execute only on the worker threads requires some trickery
383 //
384 auto master_tid = ThisThread::get_id();
385 std::set<std::thread::id> _first;
386 Mutex _mutex;
387 // init function which executes function and returns 1 only once
388 auto _init = [&]() {
389 static thread_local int _once = 0;
390 _mutex.lock();
391 if(_first.find(std::this_thread::get_id()) == _first.end())
392 {
393 // we need to reset this thread-local static for multiple invocations
394 // of the same template instantiation
395 _once = 0;
396 _first.insert(std::this_thread::get_id());
397 }
398 _mutex.unlock();
399 if(_once++ == 0)
400 {
401 _func();
402 return 1;
403 }
404 return 0;
405 };
406 // consumes approximately N milliseconds of cpu time
407 auto _consume = [](long n) {
408 using stl_mutex_t = std::mutex;
409 using unique_lock_t = std::unique_lock<stl_mutex_t>;
410 // a mutex held by one lock
411 stl_mutex_t mutex;
412 // acquire lock
413 unique_lock_t hold_lk(mutex);
414 // associate but defer
415 unique_lock_t try_lk(mutex, std::defer_lock);
416 // get current time
417 auto now = std::chrono::steady_clock::now();
418 // try until time point
419 while(std::chrono::steady_clock::now() < (now + std::chrono::milliseconds(n)))
420 try_lk.try_lock();
421 };
422 // this will collect the number of threads which have
423 // executed the _init function above
424 std::atomic<size_t> _total_init{ 0 };
425 // this is the task passed to the task-group
426 auto _init_task = [&]() {
427 int _ret = 0;
428 // don't let the master thread execute the function
429 if(ThisThread::get_id() != master_tid)
430 {
431 // execute the function
432 _ret = _init();
433 // add the result
434 _total_init += _ret;
435 }
436 // if the function did not return anything, put it to sleep
437 // so TBB will wake other threads to execute the remaining tasks
438 if(_ret == 0)
439 _consume(100);
440 };
441
442 // TBB won't oversubscribe so we need to limit by ncores - 1
443 size_t nitr = 0;
444 size_t _maxp = tbb_global_control()->active_value(
446 size_t _sz = size();
447 size_t _ncore = Threading::GetNumberOfCores() - 1;
448 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
449 auto _fname = __FUNCTION__;
450 auto _write_info = [&]() {
451 std::cerr << "[" << _fname << "]> Total initalized: " << _total_init
452 << ", expected: " << _num << ", max-parallel: " << _maxp
453 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
454 };
455 while(_total_init < _num)
456 {
457 auto _n = _num;
458 while(--_n > 0)
459 m_tbb_task_group->run(_init_task);
460 m_tbb_task_group->wait();
461 // don't loop infinitely but use a strict condition
462 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
463 {
464 _write_info();
465 break;
466 }
467 // at this point we need to exit
468 if(nitr > 4 * (_ncore + 1))
469 {
470 _write_info();
471 break;
472 }
473 }
474 if(get_verbose() > 3)
475 _write_info();
476#endif
477 }
478 else if(get_queue())
479 {
480 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
481 }
482}
task_queue_t * get_queue() const
Definition: ThreadPool.hh:135
int get_verbose() const
Definition: ThreadPool.hh:170
size_type size() const
Definition: ThreadPool.hh:151
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
static size_t active_value(parameter param)
void run(FuncT f)
Definition: ThreadData.hh:59
unsigned GetNumberOfCores()
Definition: Threading.cc:64
std::mutex Mutex
Definition: Threading.hh:76

Referenced by G4TaskRunManager::CreateAndStartWorkers(), G4TaskRunManager::RequestWorkersProcessCommandsStack(), G4TaskRunManager::TerminateWorkers(), and G4TaskRunManager::WaitForEndEventLoopWorkers().

◆ execute_thread()

void ThreadPool::execute_thread ( VUserTaskQueue _task_queue)
protected

Definition at line 520 of file ThreadPool.cc.

521{
522 // how long the thread waits on condition variable
523 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
524
525 ++(*m_thread_awake);
526
527 // initialization function
528 m_init_func();
529
530 ThreadId tid = ThisThread::get_id();
531 ThreadData* data = thread_data();
532 // auto thread_bin = _task_queue->GetThreadBin();
533 // auto workers = _task_queue->workers();
534
535 auto start = std::chrono::steady_clock::now();
536 auto elapsed = std::chrono::duration<double>{};
537 // check for updates for 60 seconds max
538 while(!_task_queue && elapsed.count() < 60)
539 {
540 elapsed = std::chrono::steady_clock::now() - start;
541 data->update();
542 _task_queue = data->current_queue;
543 }
544
545 if(!_task_queue)
546 {
547 --(*m_thread_awake);
548 throw std::runtime_error("No task queue was found after 60 seconds!");
549 }
550
551 assert(data->current_queue != nullptr);
552 assert(_task_queue == data->current_queue);
553
554 // essentially a dummy run
555 if(_task_queue)
556 {
557 data->within_task = true;
558 auto _task = _task_queue->GetTask();
559 if(_task)
560 {
561 (*_task)();
562 if(!_task->group())
563 delete _task;
564 }
565 data->within_task = false;
566 }
567
568 // threads stay in this loop forever until thread-pool destroyed
569 while(true)
570 {
571 static thread_local auto p_task_lock = m_task_lock;
572
573 //--------------------------------------------------------------------//
574 // Try to pick a task
575 AutoLock _task_lock(*p_task_lock, std::defer_lock);
576 //--------------------------------------------------------------------//
577
578 auto leave_pool = [&]() {
579 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
580 auto _pool_state = _state();
581 if(_pool_state > 0)
582 {
583 // stop whole pool
584 if(_pool_state == thread_pool::state::STOPPED)
585 {
586 if(_task_lock.owns_lock())
587 _task_lock.unlock();
588 return true;
589 }
590 // single thread stoppage
591 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
592 {
593 if(!_task_lock.owns_lock())
594 _task_lock.lock();
595 if(!m_is_stopped.empty() && m_is_stopped.back())
596 {
597 m_stop_threads.push_back(tid);
598 m_is_stopped.pop_back();
599 if(_task_lock.owns_lock())
600 _task_lock.unlock();
601 // exit entire function
602 return true;
603 }
604 if(_task_lock.owns_lock())
605 _task_lock.unlock();
606 }
607 }
608 return false;
609 };
610
611 // We need to put condition.wait() in a loop for two reasons:
612 // 1. There can be spurious wake-ups (due to signal/ENITR)
613 // 2. When mutex is released for waiting, another thread can be woken up
614 // from a signal/broadcast and that thread can mess up the condition.
615 // So when the current thread wakes up the condition may no longer be
616 // actually true!
617 while(_task_queue->empty())
618 {
619 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
620 auto _size = [&]() { return _task_queue->true_size(); };
621 auto _empty = [&]() { return _task_queue->empty(); };
622 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
623
624 if(leave_pool())
625 return;
626
627 if(_task_queue->true_size() == 0)
628 {
629 if(m_thread_awake && m_thread_awake->load() > 0)
630 --(*m_thread_awake);
631
632 // lock before sleeping on condition
633 if(!_task_lock.owns_lock())
634 _task_lock.lock();
635
636 // Wait until there is a task in the queue
637 // Unlocks mutex while waiting, then locks it back when signaled
638 // use lambda to control waking
639 m_task_cond->wait(_task_lock, _wake);
640
641 if(_state() == thread_pool::state::STOPPED)
642 return;
643
644 // unlock if owned
645 if(_task_lock.owns_lock())
646 _task_lock.unlock();
647
648 // notify that is awake
649 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
650 ++(*m_thread_awake);
651 }
652 else
653 break;
654 }
655
656 // release the lock
657 if(_task_lock.owns_lock())
658 _task_lock.unlock();
659
660 //----------------------------------------------------------------//
661
662 // leave pool if conditions dictate it
663 if(leave_pool())
664 return;
665
666 // activate guard against recursive deadlock
667 data->within_task = true;
668 //----------------------------------------------------------------//
669
670 // execute the task(s)
671 while(!_task_queue->empty())
672 {
673 auto _task = _task_queue->GetTask();
674 if(_task)
675 {
676 (*_task)();
677 if(!_task->group())
678 delete _task;
679 }
680 }
681 //----------------------------------------------------------------//
682
683 // disable guard against recursive deadlock
684 data->within_task = false;
685 //----------------------------------------------------------------//
686 }
687}
VUserTaskQueue * current_queue
Definition: ThreadData.hh:115
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
Thread::id ThreadId
Definition: Threading.hh:201

◆ get_active_threads_count()

int PTL::ThreadPool::get_active_threads_count ( ) const
inline

Definition at line 161 of file ThreadPool.hh.

162 {
163 return (m_thread_awake) ? m_thread_awake->load() : 0;
164 }

◆ get_queue()

task_queue_t * PTL::ThreadPool::get_queue ( ) const
inline

Definition at line 135 of file ThreadPool.hh.

135{ return m_task_queue; }

Referenced by execute_on_all_threads(), PTL::ThreadData::update(), and PTL::VTaskGroup::wait().

◆ get_this_thread_id()

uintmax_t ThreadPool::get_this_thread_id ( )
static

Definition at line 120 of file ThreadPool.cc.

121{
122 auto _tid = ThisThread::get_id();
123 {
124 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
125 if(!lock.owns_lock())
126 lock.lock();
127 if(f_thread_ids.find(_tid) == f_thread_ids.end())
128 {
129 auto _idx = f_thread_ids.size();
130 f_thread_ids[_tid] = _idx;
131 }
132 }
133 return f_thread_ids[_tid];
134}

Referenced by PTL::UserTaskQueue::GetThreadBin(), initialize_threadpool(), G4TaskRunManagerKernel::InitializeWorker(), PTL::VTask::operator--(), and PTL::UserTaskQueue::UserTaskQueue().

◆ get_thread() [1/2]

Thread * PTL::ThreadPool::get_thread ( size_type  _n) const

◆ get_thread() [2/2]

Thread * PTL::ThreadPool::get_thread ( std::thread::id  id) const

◆ get_thread_ids()

const ThreadPool::thread_id_map_t & ThreadPool::get_thread_ids ( )
static

Definition at line 112 of file ThreadPool.cc.

113{
114 return f_thread_ids;
115}

◆ get_verbose()

int PTL::ThreadPool::get_verbose ( ) const
inline

Definition at line 170 of file ThreadPool.hh.

170{ return m_verbose; }

Referenced by execute_on_all_threads().

◆ initialize_threadpool()

ThreadPool::size_type ThreadPool::initialize_threadpool ( size_type  proposed_size)

Definition at line 223 of file ThreadPool.cc.

224{
225 //--------------------------------------------------------------------//
226 // return before initializing
227 if(proposed_size < 1)
228 return 0;
229
230 //--------------------------------------------------------------------//
231 // store that has been started
232 if(!m_alive_flag->load())
233 m_pool_state->store(thread_pool::state::STARTED);
234
235 //--------------------------------------------------------------------//
236 // handle tbb task scheduler
237#ifdef PTL_USE_TBB
238 if(f_use_tbb)
239 {
240 m_tbb_tp = true;
241 m_pool_size = proposed_size;
242 tbb_global_control_t*& _global_control = tbb_global_control();
243 // delete if wrong size
244 if(m_pool_size != proposed_size)
245 {
246 delete _global_control;
247 _global_control = nullptr;
248 }
249
250 if(!_global_control)
251 {
252 _global_control = new tbb_global_control_t(
254 if(m_verbose > 0)
255 {
256 std::cout << "ThreadPool [TBB] initialized with " << m_pool_size
257 << " threads." << std::endl;
258 }
259 }
260
261 // create task group (used for async)
262 if(!m_tbb_task_group)
263 m_tbb_task_group = new tbb_task_group_t();
264 return m_pool_size;
265 }
266#endif
267
268 m_alive_flag->store(true);
269
270 //--------------------------------------------------------------------//
271 // if started, stop some thread if smaller or return if equal
272 if(m_pool_state->load() == thread_pool::state::STARTED)
273 {
274 if(m_pool_size > proposed_size)
275 {
276 while(stop_thread() > proposed_size)
277 ;
278 if(m_verbose > 0)
279 {
280 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
281 << std::endl;
282 }
283 if(!m_task_queue)
284 m_task_queue = new UserTaskQueue(m_pool_size);
285 return m_pool_size;
286 }
287 else if(m_pool_size == proposed_size) // NOLINT
288 {
289 if(m_verbose > 0)
290 {
291 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
292 << std::endl;
293 }
294 if(!m_task_queue)
295 m_task_queue = new UserTaskQueue(m_pool_size);
296 return m_pool_size;
297 }
298 }
299
300 //--------------------------------------------------------------------//
301 // reserve enough space to prevent realloc later
302 {
303 AutoLock _task_lock(*m_task_lock);
304 m_is_joined.reserve(proposed_size);
305 }
306
307 auto this_tid = get_this_thread_id();
308 for(size_type i = m_pool_size; i < proposed_size; ++i)
309 {
310 // add the threads
311 try
312 {
313 Thread thr(ThreadPool::start_thread, this, this_tid + i + 1);
314 // only reaches here if successful creation of thread
315 ++m_pool_size;
316 // store thread
317 m_main_threads.push_back(thr.get_id());
318 // list of joined thread booleans
319 m_is_joined.push_back(false);
320 // set the affinity
321 if(m_use_affinity)
322 set_affinity(i, thr);
323 // store
324 m_threads.emplace_back(std::move(thr));
325 } catch(std::runtime_error& e)
326 {
327 std::cerr << e.what() << std::endl; // issue creating thread
328 continue;
329 } catch(std::bad_alloc& e)
330 {
331 std::cerr << e.what() << std::endl;
332 continue;
333 }
334 }
335 //------------------------------------------------------------------------//
336
337 AutoLock _task_lock(*m_task_lock);
338
339 // thread pool size doesn't match with join vector
340 // this will screw up joining later
341 if(m_is_joined.size() != m_main_threads.size())
342 {
343 std::stringstream ss;
344 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
345 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
346 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
347
348 throw std::runtime_error(ss.str());
349 }
350
351 if(m_verbose > 0)
352 {
353 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
354 << std::endl;
355 }
356
357 if(!m_task_queue)
358 m_task_queue = new UserTaskQueue(m_main_threads.size());
359
360 return m_main_threads.size();
361}
static void start_thread(ThreadPool *, intmax_t=-1)
Definition: ThreadPool.cc:72
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:166
size_type stop_thread()
Definition: ThreadPool.cc:479
tbb::task_group tbb_task_group_t
Definition: ThreadData.hh:88
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:87
std::thread Thread
Definition: Threading.hh:157

Referenced by resize().

◆ insert()

int PTL::ThreadPool::insert ( const task_pointer task,
int  bin = -1 
)
inlineprotected

Definition at line 317 of file ThreadPool.hh.

318{
319 static thread_local ThreadData* _data = ThreadData::GetInstance();
320
321 // pass the task to the queue
322 auto ibin = m_task_queue->InsertTask(task, _data, bin);
323 notify();
324 return ibin;
325}
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35

Referenced by add_task().

◆ is_alive()

bool PTL::ThreadPool::is_alive ( )
inline

Definition at line 156 of file ThreadPool.hh.

156{ return m_alive_flag->load(); }

Referenced by PTL::VTaskGroup::wait().

◆ is_initialized()

bool ThreadPool::is_initialized ( ) const

Definition at line 193 of file ThreadPool.cc.

194{
195 return !(m_pool_state->load() == thread_pool::state::NONINIT);
196}

◆ is_master()

bool PTL::ThreadPool::is_master ( ) const
inline

Definition at line 171 of file ThreadPool.hh.

171{ return ThisThread::get_id() == m_master_tid; }

◆ notify() [1/2]

void PTL::ThreadPool::notify ( )
inline

Definition at line 240 of file ThreadPool.hh.

241{
242 // wake up one thread that is waiting for a task to be available
243 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
244 {
245 AutoLock l(*m_task_lock);
246 m_task_cond->notify_one();
247 }
248}
TemplateAutoLock< Mutex > AutoLock
Definition: AutoLock.hh:483

Referenced by add_tasks(), and insert().

◆ notify() [2/2]

void PTL::ThreadPool::notify ( size_type  ntasks)
inline

Definition at line 259 of file ThreadPool.hh.

260{
261 if(ntasks == 0)
262 return;
263
264 // wake up as many threads that tasks just added
265 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
266 {
267 AutoLock l(*m_task_lock);
268 if(ntasks < this->size())
269 {
270 for(size_type i = 0; i < ntasks; ++i)
271 m_task_cond->notify_one();
272 }
273 else
274 m_task_cond->notify_all();
275 }
276}

◆ notify_all()

void PTL::ThreadPool::notify_all ( )
inline

Definition at line 251 of file ThreadPool.hh.

252{
253 // wake all threads
254 AutoLock l(*m_task_lock);
255 m_task_cond->notify_all();
256}

◆ operator=() [1/2]

ThreadPool & PTL::ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool & PTL::ThreadPool::operator= ( ThreadPool &&  )
default

◆ record_entry()

void PTL::ThreadPool::record_entry ( )
inlineprotected

Definition at line 187 of file ThreadPool.hh.

188 {
189 if(m_thread_active)
190 ++(*m_thread_active);
191 }

◆ record_exit()

void PTL::ThreadPool::record_exit ( )
inlineprotected

Definition at line 193 of file ThreadPool.hh.

194 {
195 if(m_thread_active)
196 --(*m_thread_active);
197 }

◆ reset_initialization()

void PTL::ThreadPool::reset_initialization ( )
inline

Definition at line 141 of file ThreadPool.hh.

142 {
143 auto f = []() {};
144 m_init_func = f;
145 }

◆ resize()

void PTL::ThreadPool::resize ( size_type  _n)
inline

Definition at line 287 of file ThreadPool.hh.

288{
289 if(_n == m_pool_size)
290 return;
292 m_task_queue->resize(static_cast<intmax_t>(_n));
293}
virtual void resize(intmax_t)=0

Referenced by PTL::TaskRunManager::Initialize(), and G4TaskRunManager::SetNumberOfThreads().

◆ run_on_this()

int PTL::ThreadPool::run_on_this ( task_pointer  task)
inlineprotected

Definition at line 296 of file ThreadPool.hh.

297{
298 auto _func = [=]() {
299 (*task)();
300 if(!task->group())
301 delete task;
302 };
303
304 if(m_tbb_tp && m_tbb_task_group)
305 {
306 m_tbb_task_group->run(_func);
307 }
308 else
309 {
310 _func();
311 }
312 // return the number of tasks added to task-list
313 return 0;
314}

Referenced by add_task().

◆ set_affinity() [1/2]

void PTL::ThreadPool::set_affinity ( affinity_func_t  f)
inline

Definition at line 166 of file ThreadPool.hh.

166{ m_affinity_func = f; }

Referenced by initialize_threadpool().

◆ set_affinity() [2/2]

void ThreadPool::set_affinity ( intmax_t  i,
Thread _thread 
)

Definition at line 201 of file ThreadPool.cc.

202{
203 try
204 {
205 NativeThread native_thread = _thread.native_handle();
206 intmax_t _pin = m_affinity_func(i);
207 if(m_verbose > 0)
208 {
209 std::cout << "Setting pin affinity for thread " << _thread.get_id() << " to "
210 << _pin << std::endl;
211 }
212 Threading::SetPinAffinity(_pin, native_thread);
213 } catch(std::runtime_error& e)
214 {
215 std::cout << "Error setting pin affinity" << std::endl;
216 std::cerr << e.what() << std::endl; // issue assigning affinity
217 }
218}
bool SetPinAffinity(int idx, NativeThread &at)
Definition: Threading.cc:95
std::thread::native_handle_type NativeThread
Definition: Threading.hh:158

◆ set_initialization()

void PTL::ThreadPool::set_initialization ( initialize_func_t  f)
inline

Definition at line 140 of file ThreadPool.hh.

140{ m_init_func = f; }

◆ set_use_tbb()

void ThreadPool::set_use_tbb ( bool  val)
static

Definition at line 100 of file ThreadPool.cc.

101{
102#if defined(PTL_USE_TBB)
103 f_use_tbb = enable;
104#else
105 ConsumeParameters<bool>(enable);
106#endif
107}

Referenced by G4TaskRunManager::G4TaskRunManager(), and PTL::TaskRunManager::TaskRunManager().

◆ set_verbose()

void PTL::ThreadPool::set_verbose ( int  n)
inline

Definition at line 169 of file ThreadPool.hh.

169{ m_verbose = n; }

◆ size()

◆ start_thread()

void ThreadPool::start_thread ( ThreadPool tp,
intmax_t  _idx = -1 
)
staticprotected

Definition at line 72 of file ThreadPool.cc.

73{
74 {
75 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
76 if(!lock.owns_lock())
77 lock.lock();
78 if(_idx < 0)
79 _idx = f_thread_ids.size();
80 f_thread_ids[std::this_thread::get_id()] = _idx;
81 }
82 static thread_local std::unique_ptr<ThreadData> _unique_data(new ThreadData(tp));
83 thread_data() = _unique_data.get();
84 tp->record_entry();
85 tp->execute_thread(thread_data()->current_queue);
86 tp->record_exit();
87}

Referenced by initialize_threadpool().

◆ state()

const pool_state_type & PTL::ThreadPool::state ( ) const
inline

Definition at line 149 of file ThreadPool.hh.

149{ return m_pool_state; }

Referenced by PTL::VTaskGroup::wait().

◆ stop_thread()

ThreadPool::size_type ThreadPool::stop_thread ( )

Definition at line 479 of file ThreadPool.cc.

480{
481 if(!m_alive_flag->load() || m_pool_size == 0)
482 return 0;
483
484 //------------------------------------------------------------------------//
485 // notify all threads we are shutting down
486 m_task_lock->lock();
487 m_is_stopped.push_back(true);
488 CONDITIONNOTIFY(m_task_cond.get());
489 m_task_lock->unlock();
490 //------------------------------------------------------------------------//
491
492 // lock up the task queue
493 AutoLock _task_lock(*m_task_lock);
494
495 while(!m_stop_threads.empty())
496 {
497 auto tid = m_stop_threads.front();
498 // remove from stopped
499 m_stop_threads.pop_front();
500 // remove from main
501 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
502 {
503 if(*itr == tid)
504 {
505 m_main_threads.erase(itr);
506 break;
507 }
508 }
509 // remove from join list
510 m_is_joined.pop_back();
511 }
512
513 m_pool_size = m_main_threads.size();
514 return m_main_threads.size();
515}
#define CONDITIONNOTIFY(cond)
Definition: Threading.hh:194

Referenced by initialize_threadpool().

◆ tbb_global_control()

tbb_global_control_t *& PTL::ThreadPool::tbb_global_control ( )
inlinestatic

Definition at line 280 of file ThreadPool.hh.

281{
282 static thread_local tbb_global_control_t* _instance = nullptr;
283 return _instance;
284}

Referenced by destroy_threadpool(), execute_on_all_threads(), and initialize_threadpool().

◆ using_affinity()

bool PTL::ThreadPool::using_affinity ( ) const
inline

Definition at line 155 of file ThreadPool.hh.

155{ return m_use_affinity; }

◆ using_tbb()

bool ThreadPool::using_tbb ( )
static

Definition at line 92 of file ThreadPool.cc.

93{
94 return f_use_tbb;
95}

Referenced by PTL::TaskRunManager::Initialize(), and G4TaskRunManager::InitializeThreadPool().


The documentation for this class was generated from the following files: