36#include "PTL/Config.hh"
63#if defined(PTL_USE_TBB)
64# include <tbb/task_group.h>
81template <
typename Tp,
typename Arg = Tp,
intmax_t MaxDepth = 0>
86 template <
typename Up>
110 template <
typename... Args>
116 template <
typename Func>
119 template <
typename Up = Tp>
138 template <
typename Up>
139 std::shared_ptr<Up>
operator+=(std::shared_ptr<Up>&& _task);
185 template <
typename Func,
typename... Args>
192 template <
typename Func,
typename... Args,
typename Up = Tp>
195 template <
typename Func,
typename... Args,
typename Up = Tp>
198 template <
typename Func,
typename... Args>
199 void run(Func func, Args... args)
201 exec(std::move(func), std::move(args)...);
205 template <
typename Up,
typename Func,
typename... Args>
208 template <
typename Up,
typename Func,
typename... Args>
240 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value,
int> = 0>
241 inline Up
join(Up accum = {});
244 template <
typename Up = Tp,
typename Rp = Arg,
245 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value,
int> = 0>
249 template <
typename Up = Tp,
typename Rp = Arg,
250 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value,
int> = 0>
282 void internal_update();
288template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
289template <
typename Func>
291: m_join{
std::forward<Func>(_join) }
297template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
298template <
typename Up>
308template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
315 AutoLock _lk{ m_task_lock, std::defer_lock };
322 auto* _arena = m_pool->get_task_arena();
323 _arena->execute([
this]() { this->m_tbb_task_group->wait(); });
325 delete m_tbb_task_group;
329template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
330template <
typename Up>
337 m_task_list.push_back(_task);
339 return std::move(_task);
342template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
349 auto* _arena = m_pool->get_task_arena();
350 _arena->execute([
this]() { this->m_tbb_task_group->wait(); });
362 m_pool = internal::get_default_threadpool();
369 fprintf(stderr,
"%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
370 __FUNCTION__, __LINE__,
static_cast<void*
>(m_pool));
371 std::cerr << __FUNCTION__ <<
"@" << __LINE__ <<
" :: Warning! "
372 <<
"nullptr to thread pool!" << std::endl;
384 auto is_active_state = [&]() {
385 return (tpool->
state()->load(std::memory_order_relaxed) !=
386 thread_pool::state::STOPPED);
389 auto execute_this_threads_tasks = [&]() {
394 if((!_is_main || tpool->
size() < 2) && _within_task)
399 while(this->pending() > 0)
403 auto _task = taskq->
GetTask(bin);
412 if(!is_native_task_group())
415 if(!_is_main || tpool->
size() < 2)
418 else if(f_verbose > 0)
424 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
426 __FUNCTION__, __LINE__,
static_cast<void*
>(tpool),
427 static_cast<void*
>(taskq));
430 else if(is_native_task_group() && !tpool->
is_alive())
432 fprintf(stderr,
"%s @ %i :: Warning! thread-pool is not alive!\n",
433 __FUNCTION__, __LINE__);
435 else if(!is_active_state())
437 fprintf(stderr,
"%s @ %i :: Warning! thread-pool is not active!\n",
438 __FUNCTION__, __LINE__);
442 intmax_t wake_size = 2;
443 AutoLock _lock(m_task_lock, std::defer_lock);
445 while(is_active_state())
447 execute_this_threads_tasks();
450 while(_is_main && pending() > 0 && is_active_state())
457 if(!_lock.owns_lock())
463 if(pending() >= wake_size)
465 m_task_cond.wait(_lock);
469 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
472 if(_lock.owns_lock())
481 if(_lock.owns_lock())
484 intmax_t ntask = this->task_count().load();
487 std::stringstream ss;
488 ss <<
"\nWarning! Join operation issue! " << ntask <<
" tasks still "
489 <<
"are running!" << std::endl;
490 std::cerr << ss.str();
495template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
499 auto& _counter = m_tot_task_count;
500 auto& _task_cond = task_cond();
501 auto& _task_lock = task_lock();
503 auto _count = --(_counter);
507 _task_cond.notify_all();
512template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
517 m_task_cond.notify_one();
520template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
525 m_task_cond.notify_all();
528template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
529template <
typename Func,
typename... Args,
typename Up>
533 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
534 ThreadData::GetInstance()->task_depth > MaxDepth)
536 local_exec<Tp>(std::move(func), std::move(args)...);
540 auto& _counter = m_tot_task_count;
541 auto& _task_cond = task_cond();
542 auto& _task_lock = task_lock();
543 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
544 auto* _tdata = ThreadData::GetInstance();
546 ++(_tdata->task_depth);
548 auto _count = --(_counter);
550 --(_tdata->task_depth);
553 AutoLock _lk{ _task_lock };
554 _task_cond.notify_all();
560 auto* _arena = m_pool->get_task_arena();
561 auto* _tbb_task_group = m_tbb_task_group;
562 auto* _ptask = _task.get();
563 _arena->execute([_tbb_task_group, _ptask]() {
564 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
569 m_pool->add_task(std::move(_task));
573template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
574template <
typename Func,
typename... Args,
typename Up>
575enable_if_t<!std::is_void<Up>::value,
void>
578 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
579 ThreadData::GetInstance()->task_depth > MaxDepth)
581 local_exec<Tp>(std::move(func), std::move(args)...);
585 auto& _counter = m_tot_task_count;
586 auto& _task_cond = task_cond();
587 auto& _task_lock = task_lock();
588 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
589 auto* _tdata = ThreadData::GetInstance();
591 ++(_tdata->task_depth);
592 auto&& _ret = func(args...);
593 auto _count = --(_counter);
595 --(_tdata->task_depth);
598 AutoLock _lk{ _task_lock };
599 _task_cond.notify_all();
601 return std::forward<
decltype(_ret)>(_ret);
606 auto* _arena = m_pool->get_task_arena();
607 auto* _tbb_task_group = m_tbb_task_group;
608 auto* _ptask = _task.get();
609 _arena->execute([_tbb_task_group, _ptask]() {
610 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
615 m_pool->add_task(std::move(_task));
620template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
621template <
typename Up,
typename Func,
typename... Args>
622enable_if_t<std::is_void<Up>::value,
void>
625 auto* _tdata = ThreadData::GetInstance();
627 ++(_tdata->task_depth);
629 m_future_list.emplace_back(_p.get_future());
633 --(_tdata->task_depth);
636template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
637template <
typename Up,
typename Func,
typename... Args>
641 auto* _tdata = ThreadData::GetInstance();
643 ++(_tdata->task_depth);
645 m_future_list.emplace_back(_p.get_future());
646 _p.set_value(func(args...));
648 --(_tdata->task_depth);
651template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
652template <typename Up, enable_if_t<!std::is_void<Up>::value,
int>>
657 for(
auto& itr : m_task_list)
659 using RetT =
decay_t<
decltype(itr->get())>;
660 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
662 for(
auto& itr : m_future_list)
664 using RetT =
decay_t<
decltype(itr.get())>;
665 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
671template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
672template <
typename Up,
typename Rp,
678 for(
auto& itr : m_task_list)
680 for(
auto& itr : m_future_list)
686template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
687template <
typename Up,
typename Rp,
693 for(
auto& itr : m_task_list)
695 using RetT =
decay_t<
decltype(itr->get())>;
696 m_join(std::forward<RetT>(itr->get()));
698 for(
auto& itr : m_future_list)
700 using RetT = decay_t<
decltype(itr.get())>;
701 m_join(std::forward<RetT>(itr.get()));
706template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
710 m_future_list.
clear();
714template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
719 m_pool = internal::get_default_threadpool();
723 std::stringstream ss{};
724 ss <<
"[TaskGroup]> " << __FUNCTION__ <<
"@" << __LINE__
725 <<
" :: nullptr to thread pool";
726 throw std::runtime_error(ss.str());
729 if(m_pool->is_tbb_threadpool())
735template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
736int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>(
"PTL_VERBOSE", 0);
TaskGroup & operator=(this_type &&rhs)=default
std::atomic_intmax_t atomic_int
container_type< future_type > future_list_t
typename future_list_t::reverse_iterator reverse_iterator
ThreadPool * pool() const
std::promise< ArgTp > promise_type
future_list_t m_future_list
tbb_task_group_t * m_tbb_task_group
std::atomic_uintmax_t atomic_uint
std::vector< Up > container_type
TaskGroup(const this_type &)=delete
const atomic_int & task_count() const
typename JoinFunction< Tp, Arg >::Type join_type
const future_list_t & get_tasks() const
bool is_native_task_group() const
typename future_list_t::const_iterator const_iterator
std::packaged_task< ArgTp()> packaged_task_type
static void set_verbose(int level)
enable_if_t< std::is_void< Up >::value, void > local_exec(Func func, Args... args)
std::shared_ptr< TaskFuture< ArgTp > > task_pointer
TaskGroup & operator=(const this_type &rhs)=delete
static tid_type this_tid()
atomic_int m_tot_task_count
atomic_int & task_count()
const_reverse_iterator critr_t
condition_t & task_cond()
std::shared_ptr< task_type< Args... > > wrap(Func func, Args... args)
void run(Func func, Args... args)
future_list_t & get_tasks()
typename future_list_t::iterator iterator
TaskGroup(Func &&_join, ThreadPool *_tp=internal::get_default_threadpool())
enable_if_t< std::is_void< Up >::value, void > exec(Func func, Args... args)
typename future_list_t::const_reverse_iterator const_reverse_iterator
void set_pool(ThreadPool *tp)
std::future< ArgTp > future_type
TaskGroup(this_type &&rhs)=default
container_type< task_pointer > task_list_t
ScopeDestructor get_scope_destructor()
std::shared_ptr< Up > operator+=(std::shared_ptr< Up > &&_task)
The task class is supplied to thread_pool.
VUserTaskQueue * current_queue
const pool_state_type & state() const
task_queue_t * get_queue() const
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual bool empty() const =0
virtual intmax_t GetThreadBin() const =0
intmax_t get_task_depth()
ThreadPool * get_default_threadpool()
std::atomic_uintmax_t & task_group_counter()
typename std::enable_if< B, T >::type enable_if_t
std::condition_variable Condition
tbb::task_group tbb_task_group_t
typename std::decay< T >::type decay_t
std::function< JoinT(JoinT &, JoinArg &&)> Type