35#include "PTL/Config.hh"
43#if defined(PTL_USE_TBB)
44# if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
45# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
47# if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
48# define TBB_PREVIEW_GLOBAL_CONTROL 1
50# include <tbb/global_control.h>
51# include <tbb/task_arena.h>
52# include <tbb/task_group.h>
69#include <unordered_map>
79static const short STARTED = 0;
80static const short PARTIAL = 1;
81static const short STOPPED = 2;
82static const short NONINIT = 3;
90 template <
typename KeyT,
typename MappedT,
typename HashT = KeyT>
91 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
120 static std::atomic<intmax_t> assigned;
121 intmax_t _assign = assigned++;
122 return _assign % Thread::hardware_concurrency();
168 template <
typename FuncT>
171 template <
typename FuncT>
192 template <
typename ListT>
206 m_init_func = []() {};
210 m_fini_func = []() {};
235 bool is_main()
const {
return ThisThread::get_id() == m_main_tid; }
244 static uintmax_t
add_thread_id(ThreadId = ThisThread::get_id());
261 bool m_use_affinity =
false;
262 bool m_tbb_tp =
false;
263 bool m_delete_task_queue =
false;
267 ThreadId m_main_tid = ThisThread::get_id();
270 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>(0);
271 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>(0);
274 lock_t m_task_lock = std::make_shared<Mutex>();
276 condition_t m_task_cond = std::make_shared<Condition>();
306 if(m_thread_awake->load() < m_pool_size)
309 m_task_cond->notify_one();
318 m_task_cond->notify_all();
328 if(m_thread_awake->load() < m_pool_size)
331 if(ntasks < this->
size())
334 m_task_cond->notify_one();
338 m_task_cond->notify_all();
355#if defined(PTL_USE_TBB)
357 if(!m_tbb_task_arena)
367 if(!m_tbb_task_arena)
370 return m_tbb_task_arena;
378 m_task_queue->resize(
static_cast<intmax_t
>(_n));
382ThreadPool::run_on_this(task_pointer&& _task)
384 auto&& _func = [_task]() { (*_task)(); };
386 if(m_tbb_tp && m_tbb_task_group)
389 _arena->execute([
this, _func]() { this->m_tbb_task_group->
run(_func); });
400ThreadPool::insert(task_pointer&& task,
int bin)
414 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
415 return static_cast<size_type>(run_on_this(std::move(task)));
417 return static_cast<size_type>(insert(std::move(task), bin));
420template <
typename ListT>
433 auto c_size = c.size();
436 if(!itr->is_native_task())
452template <
typename FuncT>
456 if(m_tbb_tp && m_tbb_task_group)
458#if defined(PTL_USE_TBB)
463 std::set<std::thread::id> _first{};
469 if(_first.find(std::this_thread::get_id()) == _first.end())
474 _first.insert(std::this_thread::get_id());
486 std::atomic<size_t> _total_init{ 0 };
497 size_t _dmax = std::max<size_t>(_ncore, 8);
499 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
501 std::function<void()> _init_task;
504 static thread_local size_type _depth = 0;
517 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
520 tg.
run([&]() { _init_task(); });
521 tg.run([&]() { _init_task(); });
522 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
530 auto _fname = __FUNCTION__;
531 auto _write_info = [&]() {
532 std::cout <<
"[" << _fname <<
"]> Total initialized: " << _total_init
533 <<
", expected: " << _num <<
", max-parallel: " << _maxp
534 <<
", size: " << _sz <<
", ncore: " << _ncore << std::endl;
536 while(_total_init < _num)
542 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
544 _arena->execute([&]() { m_tbb_task_group->wait(); });
546 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
552 if(nitr > 4 * (_ncore + 1))
570template <
typename FuncT>
575 if(m_tbb_tp && m_tbb_task_group)
577#if defined(PTL_USE_TBB)
582 std::set<std::thread::id> _first{};
588 if(_first.find(std::this_thread::get_id()) == _first.end())
593 _first.insert(std::this_thread::get_id());
605 std::atomic<size_t> _total_exec{ 0 };
609 size_t _dmax = std::max<size_t>(_ncore, 8);
611 size_t _num = _tids.size();
615 std::function<void()> _exec_task;
618 static thread_local size_type _depth = 0;
620 auto _this_tid = std::this_thread::get_id();
622 if(_tids.count(_this_tid) > 0)
632 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
635 tg.
run([&]() { _exec_task(); });
636 tg.run([&]() { _exec_task(); });
637 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
645 auto _fname = __FUNCTION__;
646 auto _write_info = [&]() {
647 std::cout <<
"[" << _fname <<
"]> Total executed: " << _total_exec
648 <<
", expected: " << _num <<
", size: " <<
size() << std::endl;
650 while(_total_exec < _num)
656 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
658 _arena->execute([&]() { m_tbb_task_group->wait(); });
660 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
666 if(nitr > 8 * (_num + 1))
static ThreadData *& GetInstance()
std::function< intmax_t(intmax_t)> affinity_func_t
std::shared_ptr< Condition > condition_t
std::shared_ptr< std::atomic_uintmax_t > task_count_type
ThreadPool(ThreadPool &&)=default
std::map< uintmax_t, ThreadId > thread_index_map_t
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
const pool_state_type & state() const
size_type add_task(task_pointer &&task, int bin=-1)
ThreadPool(const ThreadPool &)=delete
std::map< ThreadId, uintmax_t > thread_id_map_t
void set_finalization(finalize_func_t f)
static initialize_func_t & initialization_functor()
std::vector< Thread > thread_vec_t
size_type add_tasks(ListT &)
static size_type get_default_size()
get the default pool size
void execute_on_specific_threads(const std::set< std::thread::id > &_tid, FuncT &&_func)
void resize(size_type _n)
std::shared_ptr< Mutex > lock_t
bool is_tbb_threadpool() const
std::shared_ptr< task_type > task_pointer
void set_affinity(affinity_func_t f)
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
ThreadPool & operator=(const ThreadPool &)=delete
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
static affinity_func_t & affinity_functor()
int get_active_threads_count() const
Thread * get_thread(std::thread::id id) const
std::shared_ptr< std::atomic_short > pool_state_type
ThreadPool(const Config &)
static tbb_global_control_t *& tbb_global_control()
ThreadPool & operator=(ThreadPool &&)=default
size_type initialize_threadpool(size_type)
static void set_default_size(size_type _v)
set the default pool size
task_queue_t * get_queue() const
static uintmax_t get_this_thread_id()
Thread * get_thread(size_type _n) const
static finalize_func_t & finalization_functor()
std::shared_ptr< std::atomic_bool > atomic_bool_type
tbb_task_arena_t * get_task_arena()
task_queue_t *& get_valid_queue(task_queue_t *&) const
bool using_affinity() const
void execute_on_all_threads(FuncT &&_func)
std::function< void()> finalize_func_t
size_type destroy_threadpool()
bool is_initialized() const
std::vector< std::shared_ptr< ThreadData > > thread_data_t
std::deque< ThreadId > thread_list_t
std::function< void()> initialize_func_t
void reset_finalization()
void set_priority(int _prio, Thread &) const
std::vector< bool > bool_list_t
void set_initialization(initialize_func_t f)
void reset_initialization()
static const thread_id_map_t & get_thread_ids()
VUserTaskQueue task_queue_t
static uintmax_t get_thread_id(ThreadId)
VTask is the abstract class stored in thread_pool.
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
@ max_allowed_parallelism
static size_t active_value(parameter param)
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
Backports of C++ language features for use with C++11 compilers.
TemplateAutoLock< Mutex > AutoLock
tbb::task_arena tbb_task_arena_t
tbb::global_control tbb_global_control_t
tbb::task_group tbb_task_group_t
unsigned GetNumberOfCores()
VUserTaskQueue * task_queue
affinity_func_t set_affinity
initialize_func_t initializer
finalize_func_t finalizer