61ThreadPool::f_thread_ids()
70ThreadPool::f_default_pool_size()
81ThreadPool::start_thread(
ThreadPool* tp, thread_data_t* _data, intmax_t _idx)
83 if(
tp->get_verbose() > 0)
85 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
86 std::cerr <<
"[PTL::ThreadPool] Starting thread " << _idx <<
"..." << std::endl;
89 auto _thr_data = std::make_shared<ThreadData>(tp);
91 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
95 _idx = f_thread_ids().size();
96 f_thread_ids()[std::this_thread::get_id()] = _idx;
98 _data->emplace_back(_thr_data);
100 thread_data() = _thr_data.get();
102 tp->execute_thread(thread_data()->current_queue);
105 if(
tp->get_verbose() > 0)
107 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
108 std::cerr <<
"[PTL::ThreadPool] Thread " << _idx <<
" terminating..."
118 return f_thread_ids();
128 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
129 if(!lock.owns_lock())
131 auto itr = f_thread_ids().find(_tid);
132 if(itr == f_thread_ids().end())
134 _idx = f_thread_ids().size();
135 f_thread_ids()[_tid] = _idx;
158 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
159 if(!lock.owns_lock())
161 if(f_thread_ids().find(_tid) == f_thread_ids().end())
163 auto _idx = f_thread_ids().size();
164 f_thread_ids()[_tid] = _idx;
167 return f_thread_ids().at(_tid);
173: m_use_affinity{ _cfg.use_affinity }
174, m_tbb_tp{ _cfg.use_tbb }
175, m_verbose{ _cfg.verbose }
176, m_priority{ _cfg.priority }
178, m_task_queue{ _cfg.task_queue }
179, m_init_func{ _cfg.initializer }
180, m_fini_func{ _cfg.finalizer }
184 if(master_id != 0 && m_verbose > 1)
186 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
187 std::cerr <<
"[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
201 if(m_alive_flag->load())
203 std::cerr <<
"Warning! ThreadPool was not properly destroyed! Call "
204 "destroy_threadpool() before deleting the ThreadPool object to "
205 "eliminate this message."
207 m_pool_state->store(thread_pool::state::STOPPED);
209 m_task_cond->notify_all();
210 m_task_lock->unlock();
211 for(
auto& itr : m_threads)
217 if(m_delete_task_queue)
220 delete m_tbb_task_arena;
221 delete m_tbb_task_group;
229 return !(m_pool_state->load() == thread_pool::state::NONINIT);
235ThreadPool::record_entry()
237 ++(*m_thread_active);
243ThreadPool::record_exit()
245 --(*m_thread_active);
255 NativeThread native_thread = _thread.native_handle();
256 intmax_t _pin = m_affinity_func(i);
259 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
260 std::cerr <<
"[PTL::ThreadPool] Setting pin affinity for thread "
261 <<
get_thread_id(_thread.get_id()) <<
" to " << _pin << std::endl;
264 }
catch(std::runtime_error& e)
266 std::cerr <<
"[PTL::ThreadPool] Error setting pin affinity: " << e.what()
278 NativeThread native_thread = _thread.native_handle();
281 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
282 std::cerr <<
"[PTL::ThreadPool] Setting thread "
283 <<
get_thread_id(_thread.get_id()) <<
" priority to " << _prio
287 }
catch(std::runtime_error& e)
289 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
290 std::cerr <<
"[PTL::ThreadPool] Error setting thread priority: " << e.what()
302 if(proposed_size < 1)
307 if(!m_alive_flag->load())
308 m_pool_state->store(thread_pool::state::STARTED);
310#if defined(PTL_USE_TBB)
316 m_pool_size = proposed_size;
319 if(m_pool_size != proposed_size)
321 delete _global_control;
322 _global_control =
nullptr;
331 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
332 std::cerr <<
"[PTL::ThreadPool] ThreadPool [TBB] initialized with "
333 << m_pool_size <<
" threads." << std::endl;
338 if(!m_tbb_task_group)
348 m_alive_flag->store(
true);
352 if(m_pool_state->load() == thread_pool::state::STARTED)
354 if(m_pool_size > proposed_size)
360 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
361 std::cerr <<
"[PTL::ThreadPool] ThreadPool initialized with "
362 << m_pool_size <<
" threads." << std::endl;
366 m_delete_task_queue =
true;
371 m_task_queue->resize(m_pool_size);
375 else if(m_pool_size == proposed_size)
379 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
380 std::cerr <<
"ThreadPool initialized with " << m_pool_size <<
" threads."
385 m_delete_task_queue =
true;
396 m_is_joined.reserve(proposed_size);
401 m_delete_task_queue =
true;
406 for(
size_type i = m_pool_size; i < proposed_size; ++i)
412 Thread thr{ ThreadPool::start_thread,
this, &m_thread_data,
417 m_main_threads.push_back(thr.get_id());
419 m_is_joined.push_back(
false);
425 m_threads.emplace_back(std::move(thr));
426 }
catch(std::runtime_error& e)
428 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
429 std::cerr <<
"[PTL::ThreadPool] " << e.what()
432 }
catch(std::bad_alloc& e)
434 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
435 std::cerr <<
"[PTL::ThreadPool] " << e.what() << std::endl;
445 if(m_is_joined.size() != m_main_threads.size())
447 std::stringstream ss;
448 ss <<
"ThreadPool::initialize_threadpool - boolean is_joined vector "
449 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
450 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
452 throw std::runtime_error(ss.str());
457 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
458 std::cerr <<
"[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
459 <<
" threads." << std::endl;
462 return m_main_threads.size();
475 m_pool_state->store(thread_pool::state::STOPPED);
479#if defined(PTL_USE_TBB)
483 auto _func = [&]() { m_tbb_task_group->wait(); };
485 m_tbb_task_arena->execute(_func);
488 delete m_tbb_task_group;
489 m_tbb_task_group =
nullptr;
493 delete m_tbb_task_arena;
494 m_tbb_task_arena =
nullptr;
499 delete _global_control;
500 _global_control =
nullptr;
502 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
505 std::cerr <<
"[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
510 if(!m_alive_flag->load())
516 m_task_cond->notify_all();
517 m_task_lock->unlock();
520 if(m_is_joined.size() != m_main_threads.size())
522 std::stringstream ss;
523 ss <<
" ThreadPool::destroy_thread_pool - boolean is_joined vector "
524 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
525 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
527 throw std::runtime_error(ss.str());
530 for(
size_type i = 0; i < m_is_joined.size(); i++)
534 if(i < m_threads.size())
535 m_threads.at(i).join();
539 if(m_is_joined.at(i))
544 if(std::this_thread::get_id() == m_main_threads[i])
549 auto _tid = m_main_threads[i];
553 if(f_thread_ids().find(_tid) != f_thread_ids().end())
554 f_thread_ids().erase(f_thread_ids().find(_tid));
558 m_is_joined.at(i) =
true;
561 m_thread_data.clear();
563 m_main_threads.clear();
566 m_alive_flag->store(
false);
568 auto start = std::chrono::steady_clock::now();
569 auto elapsed = std::chrono::duration<double>{};
571 while(m_thread_active->load() > 0 && elapsed.count() < 30)
573 std::this_thread::sleep_for(std::chrono::milliseconds(50));
574 elapsed = std::chrono::steady_clock::now() - start;
577 auto _active = m_thread_active->load();
583 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
584 std::cerr <<
"[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
588 AutoLock lock(TypeMutex<
decltype(std::cerr)>());
589 std::cerr <<
"[PTL::ThreadPool] ThreadPool destroyed but " << _active
590 <<
" threads might still be active (and cause a termination error)"
595 if(m_delete_task_queue)
598 m_task_queue =
nullptr;
609 if(!m_alive_flag->load() || m_pool_size == 0)
612 m_pool_state->store(thread_pool::state::PARTIAL);
617 m_is_stopped.push_back(
true);
618 m_task_cond->notify_one();
619 m_task_lock->unlock();
622 while(!m_is_stopped.empty() && m_stop_threads.empty())
628 while(!m_stop_threads.empty())
630 auto tid = m_stop_threads.front();
632 m_stop_threads.pop_front();
634 for(
auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
638 m_main_threads.erase(itr);
643 m_is_joined.pop_back();
646 m_pool_state->store(thread_pool::state::STARTED);
648 m_pool_size = m_main_threads.size();
649 return m_main_threads.size();
658 _queue =
new UserTaskQueue{
static_cast<intmax_t
>(m_pool_size) };
664#if defined (__APPLE__) && defined(__amd64) && defined(__clang__)
677 ThreadId tid = ThisThread::get_id();
678 ThreadData* data = thread_data();
682 auto start = std::chrono::steady_clock::now();
683 auto elapsed = std::chrono::duration<double>{};
685 while(!_task_queue && elapsed.count() < 60)
687 elapsed = std::chrono::steady_clock::now() - start;
689 _task_queue = data->current_queue;
695 throw std::runtime_error(
"No task queue was found after 60 seconds!");
698 assert(data->current_queue !=
nullptr);
699 assert(_task_queue == data->current_queue);
704 data->within_task =
true;
705 auto _task = _task_queue->
GetTask();
710 data->within_task =
false;
716 static thread_local auto p_task_lock = m_task_lock;
720 AutoLock _task_lock(*p_task_lock, std::defer_lock);
723 auto leave_pool = [&]() {
724 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
725 auto _pool_state = _state();
729 if(_pool_state == thread_pool::state::STOPPED)
731 if(_task_lock.owns_lock())
736 else if(_pool_state == thread_pool::state::PARTIAL)
738 if(!_task_lock.owns_lock())
740 if(!m_is_stopped.empty() && m_is_stopped.back())
742 m_stop_threads.push_back(tid);
743 m_is_stopped.pop_back();
744 if(_task_lock.owns_lock())
749 if(_task_lock.owns_lock())
762 while(_task_queue->
empty())
764 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
765 auto _size = [&]() {
return _task_queue->
true_size(); };
766 auto _empty = [&]() {
return _task_queue->
empty(); };
767 auto _wake = [&]() {
return (!_empty() || _size() > 0 || _state() > 0); };
774 if(m_thread_awake->load() > 0)
778 if(!_task_lock.owns_lock())
784 m_task_cond->wait(_task_lock, _wake);
786 if(_state() == thread_pool::state::STOPPED)
790 if(_task_lock.owns_lock())
794 if(m_thread_awake->load() < m_pool_size)
802 if(_task_lock.owns_lock())
812 data->within_task =
true;
816 while(!_task_queue->
empty())
818 auto _task = _task_queue->
GetTask();
827 data->within_task =
false;
static ThreadData *& GetInstance()
const pool_state_type & state() const
std::map< ThreadId, uintmax_t > thread_id_map_t
void set_affinity(affinity_func_t f)
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
ThreadPool(const Config &)
static tbb_global_control_t *& tbb_global_control()
size_type initialize_threadpool(size_type)
static uintmax_t get_this_thread_id()
task_queue_t *& get_valid_queue(task_queue_t *&) const
void execute_on_all_threads(FuncT &&_func)
size_type destroy_threadpool()
bool is_initialized() const
void set_priority(int _prio, Thread &) const
static const thread_id_map_t & get_thread_ids()
VUserTaskQueue task_queue_t
static uintmax_t get_thread_id(ThreadId)
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
@ max_allowed_parallelism
Backports of C++ language features for use with C++11 compilers.
bool SetThreadPriority(int _v)
TemplateAutoLock< Mutex > AutoLock
tbb::global_control tbb_global_control_t
bool SetPinAffinity(int idx)
void SetThreadId(int aNewValue)
tbb::task_group tbb_task_group_t
Tp GetEnv(const std::string &env_id, Tp _default=Tp())