41# include <tbb/global_control.h>
58#include <unordered_map>
66 template <
typename KeyT,
typename MappedT,
typename HashT = KeyT>
67 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
77 using lock_t = std::shared_ptr<Mutex>;
95 bool _use_affinity = GetEnv<bool>(
"PTL_CPU_AFFINITY",
false),
97 static std::atomic<intmax_t> assigned;
98 intmax_t _assign = assigned++;
99 return _assign % Thread::hardware_concurrency();
115 template <
typename FuncT>
129 template <
typename ListT>
163 return (m_thread_awake) ? m_thread_awake->load() : 0;
171 bool is_master()
const {
return ThisThread::get_id() == m_master_tid; }
190 ++(*m_thread_active);
196 --(*m_thread_active);
209 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>();
210 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>();
213 lock_t m_task_lock = std::make_shared<Mutex>();
215 condition_t m_task_cond = std::make_shared<Condition>();
243 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
246 m_task_cond->notify_one();
255 m_task_cond->notify_all();
265 if(m_thread_awake && m_thread_awake->load() < m_pool_size)
268 if(ntasks < this->
size())
271 m_task_cond->notify_one();
274 m_task_cond->notify_all();
289 if(_n == m_pool_size)
292 m_task_queue->
resize(
static_cast<intmax_t
>(_n));
304 if(m_tbb_tp && m_tbb_task_group)
306 m_tbb_task_group->
run(_func);
322 auto ibin = m_task_queue->
InsertTask(task, _data, bin);
335 if(!m_alive_flag->load())
341template <
typename ListT>
354 auto c_size = c.size();
357 if(!itr->is_native_task())
373template <
typename FuncT>
377 if(m_tbb_tp && m_tbb_task_group)
379#if defined(PTL_USE_TBB)
384 auto master_tid = ThisThread::get_id();
385 std::set<std::thread::id> _first;
389 static thread_local int _once = 0;
391 if(_first.find(std::this_thread::get_id()) == _first.end())
396 _first.insert(std::this_thread::get_id());
407 auto _consume = [](
long n) {
408 using stl_mutex_t = std::mutex;
409 using unique_lock_t = std::unique_lock<stl_mutex_t>;
413 unique_lock_t hold_lk(
mutex);
415 unique_lock_t try_lk(
mutex, std::defer_lock);
417 auto now = std::chrono::steady_clock::now();
419 while(std::chrono::steady_clock::now() < (now + std::chrono::milliseconds(n)))
424 std::atomic<size_t> _total_init{ 0 };
426 auto _init_task = [&]() {
429 if(ThisThread::get_id() != master_tid)
448 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
449 auto _fname = __FUNCTION__;
450 auto _write_info = [&]() {
451 std::cerr <<
"[" << _fname <<
"]> Total initalized: " << _total_init
452 <<
", expected: " << _num <<
", max-parallel: " << _maxp
453 <<
", size: " << _sz <<
", ncore: " << _ncore << std::endl;
455 while(_total_init < _num)
459 m_tbb_task_group->
run(_init_task);
460 m_tbb_task_group->
wait();
462 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
468 if(nitr > 4 * (_ncore + 1))
static ThreadData *& GetInstance()
std::shared_ptr< Condition > condition_t
size_type add_task(task_pointer task, int bin=-1)
std::shared_ptr< std::atomic_uintmax_t > task_count_type
std::map< uintmax_t, ThreadId > thread_index_map_t
ThreadPool(ThreadPool &&)=default
static uintmax_t get_this_thread_id()
const pool_state_type & state() const
ThreadPool(const ThreadPool &)=delete
static void start_thread(ThreadPool *, intmax_t=-1)
std::vector< Thread > thread_vec_t
std::function< intmax_t(intmax_t)> affinity_func_t
size_type add_tasks(ListT &)
void resize(size_type _n)
static void set_use_tbb(bool val)
std::shared_ptr< Mutex > lock_t
int run_on_this(task_pointer)
void set_affinity(affinity_func_t f)
ThreadPool & operator=(const ThreadPool &)=delete
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
int get_active_threads_count() const
Thread * get_thread(std::thread::id id) const
int insert(const task_pointer &, int=-1)
std::shared_ptr< std::atomic_short > pool_state_type
static tbb_global_control_t *& tbb_global_control()
ThreadPool & operator=(ThreadPool &&)=default
task_queue_t * get_queue() const
std::function< void()> initialize_func_t
Thread * get_thread(size_type _n) const
std::shared_ptr< std::atomic_bool > atomic_bool_type
bool using_affinity() const
void execute_on_all_threads(FuncT &&_func)
static const thread_id_map_t & get_thread_ids()
std::map< ThreadId, uintmax_t > thread_id_map_t
std::deque< ThreadId > thread_list_t
std::vector< bool > bool_list_t
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
void execute_thread(VUserTaskQueue *)
bool is_initialized() const
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
void set_initialization(initialize_func_t f)
void reset_initialization()
VUserTaskQueue task_queue_t
VTask is the abstract class stored in thread_pool.
virtual bool is_native_task() const
VTaskGroup * group() const
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1)=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
@ max_allowed_parallelism
static size_t active_value(parameter param)
unsigned GetNumberOfCores()