46 return static_cast<intmax_t
>(Thread::hardware_concurrency());
66bool ThreadPool::f_use_tbb =
false;
75 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
79 _idx = f_thread_ids.size();
80 f_thread_ids[std::this_thread::get_id()] = _idx;
82 static thread_local std::unique_ptr<ThreadData> _unique_data(
new ThreadData(tp));
83 thread_data() = _unique_data.get();
85 tp->execute_thread(thread_data()->current_queue);
102#if defined(PTL_USE_TBB)
105 ConsumeParameters<bool>(enable);
122 auto _tid = ThisThread::get_id();
124 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
125 if(!lock.owns_lock())
127 if(f_thread_ids.find(_tid) == f_thread_ids.end())
129 auto _idx = f_thread_ids.size();
130 f_thread_ids[_tid] = _idx;
133 return f_thread_ids[_tid];
140: m_use_affinity(_use_affinity)
144, m_master_tid(ThisThread::get_id())
145, m_alive_flag(std::make_shared<std::atomic_bool>(false))
146, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
147, m_thread_awake(std::make_shared<std::atomic_uintmax_t>(0))
148, m_task_lock(std::make_shared<
Mutex>())
149, m_task_cond(std::make_shared<
Condition>())
150, m_task_queue(task_queue)
151, m_tbb_task_group(nullptr)
152, m_init_func([]() {
return; })
153, m_affinity_func(_affinity_func)
155 m_verbose = GetEnv<int>(
"PTL_VERBOSE", m_verbose);
158 if(master_id != 0 && m_verbose > 1)
159 std::cerr <<
"ThreadPool created on non-master slave" << std::endl;
174 if(m_alive_flag->load())
176 std::cerr <<
"Warning! ThreadPool was not properly destroyed! Call "
177 "destroy_threadpool() before deleting the ThreadPool object to "
178 "eliminate this message."
180 m_pool_state->store(thread_pool::state::STOPPED);
183 m_task_lock->unlock();
184 for(
auto& itr : m_threads)
195 return !(m_pool_state->load() == thread_pool::state::NONINIT);
206 intmax_t _pin = m_affinity_func(i);
209 std::cout <<
"Setting pin affinity for thread " << _thread.get_id() <<
" to "
210 << _pin << std::endl;
213 }
catch(std::runtime_error& e)
215 std::cout <<
"Error setting pin affinity" << std::endl;
216 std::cerr << e.what() << std::endl;
227 if(proposed_size < 1)
232 if(!m_alive_flag->load())
233 m_pool_state->store(thread_pool::state::STARTED);
241 m_pool_size = proposed_size;
244 if(m_pool_size != proposed_size)
246 delete _global_control;
247 _global_control =
nullptr;
256 std::cout <<
"ThreadPool [TBB] initialized with " << m_pool_size
257 <<
" threads." << std::endl;
262 if(!m_tbb_task_group)
268 m_alive_flag->store(
true);
272 if(m_pool_state->load() == thread_pool::state::STARTED)
274 if(m_pool_size > proposed_size)
280 std::cout <<
"ThreadPool initialized with " << m_pool_size <<
" threads."
287 else if(m_pool_size == proposed_size)
291 std::cout <<
"ThreadPool initialized with " << m_pool_size <<
" threads."
304 m_is_joined.reserve(proposed_size);
308 for(
size_type i = m_pool_size; i < proposed_size; ++i)
317 m_main_threads.push_back(thr.get_id());
319 m_is_joined.push_back(
false);
324 m_threads.emplace_back(std::move(thr));
325 }
catch(std::runtime_error& e)
327 std::cerr << e.what() << std::endl;
329 }
catch(std::bad_alloc& e)
331 std::cerr << e.what() << std::endl;
341 if(m_is_joined.size() != m_main_threads.size())
343 std::stringstream ss;
344 ss <<
"ThreadPool::initialize_threadpool - boolean is_joined vector "
345 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
346 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
348 throw std::runtime_error(ss.str());
353 std::cout <<
"ThreadPool initialized with " << m_pool_size <<
" threads."
360 return m_main_threads.size();
373 m_pool_state->store(thread_pool::state::STOPPED);
380 m_tbb_task_group->
wait();
381 delete m_tbb_task_group;
382 m_tbb_task_group =
nullptr;
387 delete _global_control;
388 _global_control =
nullptr;
390 std::cout <<
"ThreadPool [TBB] destroyed" << std::endl;
394 if(!m_alive_flag->load())
401 m_task_lock->unlock();
404 if(m_is_joined.size() != m_main_threads.size())
406 std::stringstream ss;
407 ss <<
" ThreadPool::destroy_thread_pool - boolean is_joined vector "
408 <<
"is a different size than threads vector: " << m_is_joined.size() <<
" vs. "
409 << m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
411 throw std::runtime_error(ss.str());
414 for(
size_type i = 0; i < m_is_joined.size(); i++)
418 if(i < m_threads.size())
419 m_threads.at(i).join();
423 if(m_is_joined.at(i))
428 if(std::this_thread::get_id() == m_main_threads[i])
433 auto _tid = m_main_threads[i];
437 if(f_thread_ids.find(_tid) != f_thread_ids.end())
438 f_thread_ids.erase(f_thread_ids.find(_tid));
442 m_is_joined.at(i) =
true;
451 m_main_threads.clear();
453 m_alive_flag->store(
false);
455 auto start = std::chrono::steady_clock::now();
456 auto elapsed = std::chrono::duration<double>{};
458 while(m_thread_active->load() > 0 && elapsed.count() < 30)
460 std::this_thread::sleep_for(std::chrono::milliseconds(50));
461 elapsed = std::chrono::steady_clock::now() - start;
464 auto _active = m_thread_active->load();
467 std::cout <<
"ThreadPool destroyed" << std::endl;
469 std::cout <<
"ThreadPool destroyed but " << _active
470 <<
" threads might still be active (and cause a termination error)"
481 if(!m_alive_flag->load() || m_pool_size == 0)
487 m_is_stopped.push_back(
true);
489 m_task_lock->unlock();
495 while(!m_stop_threads.empty())
497 auto tid = m_stop_threads.front();
499 m_stop_threads.pop_front();
501 for(
auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
505 m_main_threads.erase(itr);
510 m_is_joined.pop_back();
513 m_pool_size = m_main_threads.size();
514 return m_main_threads.size();
530 ThreadId tid = ThisThread::get_id();
535 auto start = std::chrono::steady_clock::now();
536 auto elapsed = std::chrono::duration<double>{};
538 while(!_task_queue && elapsed.count() < 60)
540 elapsed = std::chrono::steady_clock::now() - start;
548 throw std::runtime_error(
"No task queue was found after 60 seconds!");
558 auto _task = _task_queue->
GetTask();
571 static thread_local auto p_task_lock = m_task_lock;
575 AutoLock _task_lock(*p_task_lock, std::defer_lock);
578 auto leave_pool = [&]() {
579 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
580 auto _pool_state = _state();
584 if(_pool_state == thread_pool::state::STOPPED)
586 if(_task_lock.owns_lock())
591 else if(_pool_state == thread_pool::state::PARTIAL)
593 if(!_task_lock.owns_lock())
595 if(!m_is_stopped.empty() && m_is_stopped.back())
597 m_stop_threads.push_back(tid);
598 m_is_stopped.pop_back();
599 if(_task_lock.owns_lock())
604 if(_task_lock.owns_lock())
617 while(_task_queue->
empty())
619 auto _state = [&]() {
return static_cast<int>(m_pool_state->load()); };
620 auto _size = [&]() {
return _task_queue->
true_size(); };
621 auto _empty = [&]() {
return _task_queue->
empty(); };
622 auto _wake = [&]() {
return (!_empty() || _size() > 0 || _state() > 0); };
629 if(m_thread_awake && m_thread_awake->load() > 0)
633 if(!_task_lock.owns_lock())
639 m_task_cond->wait(_task_lock, _wake);
641 if(_state() == thread_pool::state::STOPPED)
645 if(_task_lock.owns_lock())
649 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
657 if(_task_lock.owns_lock())
671 while(!_task_queue->
empty())
673 auto _task = _task_queue->
GetTask();
#define CONDITIONNOTIFY(cond)
#define CONDITIONBROADCAST(cond)
static ThreadData *& GetInstance()
VUserTaskQueue * current_queue
static uintmax_t get_this_thread_id()
static void start_thread(ThreadPool *, intmax_t=-1)
std::function< intmax_t(intmax_t)> affinity_func_t
static void set_use_tbb(bool val)
void set_affinity(affinity_func_t f)
static tbb_global_control_t *& tbb_global_control()
static const thread_id_map_t & get_thread_ids()
std::map< ThreadId, uintmax_t > thread_id_map_t
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
void execute_thread(VUserTaskQueue *)
bool is_initialized() const
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), const affinity_func_t &=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
@ max_allowed_parallelism
bool SetPinAffinity(int idx, NativeThread &at)
std::thread::native_handle_type NativeThread
std::condition_variable Condition
tbb::task_group tbb_task_group_t
tbb::global_control tbb_global_control_t