36#include "PTL/Config.hh"
64#if defined(PTL_USE_TBB)
65# include <tbb/task_group.h>
82template <
typename Tp,
typename Arg = Tp,
intmax_t MaxDepth = 0>
87 template <
typename Up>
111 template <
typename... Args>
117 template <
typename Func>
120 template <
typename Up = Tp>
139 template <
typename Up>
186 template <
typename Func,
typename... Args>
193 template <
typename Func,
typename... Args,
typename Up = Tp>
196 template <
typename Func,
typename... Args,
typename Up = Tp>
199 template <
typename Func,
typename... Args>
200 void run(Func func, Args... args)
202 exec(std::move(func), std::move(args)...);
206 template <
typename Up,
typename Func,
typename... Args>
209 template <
typename Up,
typename Func,
typename... Args>
241 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value,
int> = 0>
245 template <
typename Up = Tp,
typename Rp = Arg,
250 template <
typename Up = Tp,
typename Rp = Arg,
283 void internal_update();
289template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
290template <
typename Func>
298template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
299template <
typename Up>
309template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
323 auto* _arena =
m_pool->get_task_arena();
330template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
331template <
typename Up>
340 return std::move(_task);
343template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
350 auto* _arena =
m_pool->get_task_arena();
370 fprintf(stderr,
"%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
371 __FUNCTION__, __LINE__,
static_cast<void*
>(
m_pool));
372 std::cerr << __FUNCTION__ <<
"@" << __LINE__ <<
" :: Warning! "
373 <<
"nullptr to thread pool!" << std::endl;
385 auto is_active_state = [&]() {
386 return (tpool->
state()->load(std::memory_order_relaxed) !=
387 thread_pool::state::STOPPED);
390 auto execute_this_threads_tasks = [&]() {
395 if((!_is_main || tpool->
size() < 2) && _within_task)
404 auto _task = taskq->
GetTask(bin);
416 if(!_is_main || tpool->
size() < 2)
425 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
427 __FUNCTION__, __LINE__,
static_cast<void*
>(tpool),
428 static_cast<void*
>(taskq));
433 fprintf(stderr,
"%s @ %i :: Warning! thread-pool is not alive!\n",
434 __FUNCTION__, __LINE__);
436 else if(!is_active_state())
438 fprintf(stderr,
"%s @ %i :: Warning! thread-pool is not active!\n",
439 __FUNCTION__, __LINE__);
443 intmax_t wake_size = 2;
446 while(is_active_state())
448 execute_this_threads_tasks();
451 while(_is_main &&
pending() > 0 && is_active_state())
458 if(!_lock.owns_lock())
470 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
473 if(_lock.owns_lock())
482 if(_lock.owns_lock())
488 std::stringstream ss;
489 ss <<
"\nWarning! Join operation issue! " << ntask <<
" tasks still "
490 <<
"are running!" << std::endl;
491 std::cerr << ss.str();
496template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
504 auto _count = --(_counter);
508 _task_cond.notify_all();
513template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
521template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
529template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
530template <
typename Func,
typename... Args,
typename Up>
544 auto _task =
wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
545 auto* _tdata = ThreadData::GetInstance();
547 ++(_tdata->task_depth);
549 auto _count = --(_counter);
551 --(_tdata->task_depth);
554 AutoLock _lk{ _task_lock };
555 _task_cond.notify_all();
561 auto* _arena =
m_pool->get_task_arena();
563 auto* _ptask = _task.get();
564 _arena->execute([_tbb_task_group, _ptask]() {
565 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
570 m_pool->add_task(std::move(_task));
574template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
575template <
typename Func,
typename... Args,
typename Up>
576enable_if_t<!std::is_void<Up>::value,
void>
589 auto _task =
wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
590 auto* _tdata = ThreadData::GetInstance();
592 ++(_tdata->task_depth);
593 auto&& _ret = func(args...);
594 auto _count = --(_counter);
596 --(_tdata->task_depth);
599 AutoLock _lk{ _task_lock };
600 _task_cond.notify_all();
602 return std::forward<
decltype(_ret)>(_ret);
607 auto* _arena =
m_pool->get_task_arena();
609 auto* _ptask = _task.get();
610 _arena->execute([_tbb_task_group, _ptask]() {
611 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
616 m_pool->add_task(std::move(_task));
621template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
622template <
typename Up,
typename Func,
typename... Args>
623enable_if_t<std::is_void<Up>::value,
void>
628 ++(_tdata->task_depth);
634 --(_tdata->task_depth);
637template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
638template <
typename Up,
typename Func,
typename... Args>
644 ++(_tdata->task_depth);
647 _p.set_value(func(args...));
649 --(_tdata->task_depth);
652template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
653template <typename Up, enable_if_t<!std::is_void<Up>::value,
int>>
660 using RetT =
decay_t<
decltype(itr->get())>;
661 accum = std::move(
m_join(std::ref(accum), std::forward<RetT>(itr->get())));
665 using RetT =
decay_t<
decltype(itr.get())>;
666 accum = std::move(
m_join(std::ref(accum), std::forward<RetT>(itr.get())));
672template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
673template <
typename Up,
typename Rp,
687template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
688template <
typename Up,
typename Rp,
694 for(
auto& itr : m_task_list)
696 using RetT =
decay_t<
decltype(itr->get())>;
697 m_join(std::forward<RetT>(itr->get()));
699 for(
auto& itr : m_future_list)
701 using RetT = decay_t<
decltype(itr.get())>;
702 m_join(std::forward<RetT>(itr.get()));
707template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
715template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
717TaskGroup<Tp, Arg, MaxDepth>::internal_update()
724 std::stringstream ss{};
725 ss <<
"[TaskGroup]> " << __FUNCTION__ <<
"@" << __LINE__
726 <<
" :: nullptr to thread pool";
727 throw std::runtime_error(ss.str());
730 if(m_pool->is_tbb_threadpool())
736template <
typename Tp,
typename Arg,
intmax_t MaxDepth>
TaskGroup & operator=(this_type &&rhs)=default
std::atomic_intmax_t atomic_int
container_type< future_type > future_list_t
typename future_list_t::reverse_iterator reverse_iterator
ThreadPool * pool() const
std::promise< ArgTp > promise_type
future_list_t m_future_list
tbb_task_group_t * m_tbb_task_group
std::atomic_uintmax_t atomic_uint
std::vector< Up > container_type
TaskGroup(const this_type &)=delete
const atomic_int & task_count() const
typename JoinFunction< Tp, Arg >::Type join_type
TaskGroup(ThreadPool *_tp=internal::get_default_threadpool(), enable_if_t< std::is_void< Up >::value, int >=0)
const future_list_t & get_tasks() const
bool is_native_task_group() const
typename future_list_t::const_iterator const_iterator
std::packaged_task< ArgTp()> packaged_task_type
static void set_verbose(int level)
enable_if_t< std::is_void< Up >::value, void > local_exec(Func func, Args... args)
std::shared_ptr< TaskFuture< ArgTp > > task_pointer
TaskGroup & operator=(const this_type &rhs)=delete
static tid_type this_tid()
atomic_int m_tot_task_count
atomic_int & task_count()
const_reverse_iterator critr_t
condition_t & task_cond()
std::shared_ptr< task_type< Args... > > wrap(Func func, Args... args)
void run(Func func, Args... args)
Task< ArgTp, decay_t< Args >... > task_type
future_list_t & get_tasks()
enable_if_t<!std::is_void< Up >::value, void > local_exec(Func func, Args... args)
typename future_list_t::iterator iterator
TaskGroup(Func &&_join, ThreadPool *_tp=internal::get_default_threadpool())
enable_if_t< std::is_void< Up >::value, void > exec(Func func, Args... args)
typename future_list_t::const_reverse_iterator const_reverse_iterator
void set_pool(ThreadPool *tp)
std::future< ArgTp > future_type
TaskGroup(this_type &&rhs)=default
TaskGroup< Tp, Arg, MaxDepth > this_type
container_type< task_pointer > task_list_t
enable_if_t<!std::is_void< Up >::value, void > exec(Func func, Args... args)
ScopeDestructor get_scope_destructor()
std::shared_ptr< Up > operator+=(std::shared_ptr< Up > &&_task)
The task class is supplied to thread_pool.
static ThreadData *& GetInstance()
VUserTaskQueue * current_queue
const pool_state_type & state() const
task_queue_t * get_queue() const
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual bool empty() const =0
virtual intmax_t GetThreadBin() const =0
intmax_t get_task_depth()
ThreadPool * get_default_threadpool()
std::atomic_uintmax_t & task_group_counter()
Backports of C++ language features for use with C++11 compilers.
TemplateAutoLock< Mutex > AutoLock
typename std::enable_if< B, T >::type enable_if_t
tbb::task_group tbb_task_group_t
typename std::decay< T >::type decay_t
std::function< JoinT(JoinT &, JoinArg &&)> Type