Geant4 11.2.2
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
TaskGroup.hh
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19//
20// ---------------------------------------------------------------
21// Tasking class header file
22//
23// Class Description:
24//
25// This file creates the a class for handling a group of tasks that
26// can be independently joined
27//
28// ---------------------------------------------------------------
29// Author: Jonathan Madsen (Feb 13th 2018)
30// ---------------------------------------------------------------
31
32#pragma once
33
34#include "PTL/AutoLock.hh"
35#ifndef G4GMAKE
36#include "PTL/Config.hh"
37#endif
38#include "PTL/Globals.hh"
39#include "PTL/JoinFunction.hh"
40#include "PTL/Task.hh"
41#include "PTL/ThreadData.hh"
42#include "PTL/ThreadPool.hh"
43#include "PTL/Threading.hh"
44#include "PTL/Utility.hh"
45#include "PTL/VTask.hh"
46#include "PTL/VUserTaskQueue.hh"
47
48#include <atomic>
49#include <chrono>
50#include <cstdint>
51#include <cstdio>
52#include <functional>
53#include <future>
54#include <iostream>
55#include <memory>
56#include <mutex>
57#include <stdexcept>
58#include <thread>
59#include <type_traits>
60#include <utility>
61#include <vector>
62
63#if defined(PTL_USE_TBB)
64# include <tbb/task_group.h>
65#endif
66
67namespace PTL
68{
69namespace internal
70{
71std::atomic_uintmax_t&
73
76
77intmax_t
79} // namespace internal
80
81template <typename Tp, typename Arg = Tp, intmax_t MaxDepth = 0>
83{
84public:
85 //------------------------------------------------------------------------//
86 template <typename Up>
87 using container_type = std::vector<Up>;
88
89 using tid_type = std::thread::id;
90 using size_type = uintmax_t;
91 using lock_t = Mutex;
92 using atomic_int = std::atomic_intmax_t;
93 using atomic_uint = std::atomic_uintmax_t;
96 using result_type = Tp;
97 using task_pointer = std::shared_ptr<TaskFuture<ArgTp>>;
100 using promise_type = std::promise<ArgTp>;
101 using future_type = std::future<ArgTp>;
102 using packaged_task_type = std::packaged_task<ArgTp()>;
105 using iterator = typename future_list_t::iterator;
106 using reverse_iterator = typename future_list_t::reverse_iterator;
107 using const_iterator = typename future_list_t::const_iterator;
108 using const_reverse_iterator = typename future_list_t::const_reverse_iterator;
109 //------------------------------------------------------------------------//
110 template <typename... Args>
112 //------------------------------------------------------------------------//
113
114public:
115 // Constructor
116 template <typename Func>
118
119 template <typename Up = Tp>
121 enable_if_t<std::is_void<Up>::value, int> = 0);
122
123 // Destructor
124 ~TaskGroup();
125
126 // delete copy-construct
127 TaskGroup(const this_type&) = delete;
128 // define move-construct
129 // NOLINTNEXTLINE(performance-noexcept-move-constructor)
130 TaskGroup(this_type&& rhs) = default;
131 // delete copy-assign
132 TaskGroup& operator=(const this_type& rhs) = delete;
133 // define move-assign
134 // NOLINTNEXTLINE(performance-noexcept-move-constructor)
135 TaskGroup& operator=(this_type&& rhs) = default;
136
137public:
138 template <typename Up>
139 std::shared_ptr<Up> operator+=(std::shared_ptr<Up>&& _task);
140
141 // wait to finish
142 void wait();
143
144 // increment (prefix)
145 intmax_t operator++() { return ++(m_tot_task_count); }
146 intmax_t operator++(int) { return (m_tot_task_count)++; }
147 intmax_t operator--() { return --(m_tot_task_count); }
148 intmax_t operator--(int) { return (m_tot_task_count)--; }
149
150 // size
151 intmax_t size() const { return m_tot_task_count.load(); }
152
153 // get the locks/conditions
156
157 // identifier
158 uintmax_t id() const { return m_id; }
159
160 // thread pool
161 void set_pool(ThreadPool* tp) { m_pool = tp; }
162 ThreadPool*& pool() { return m_pool; }
163 ThreadPool* pool() const { return m_pool; }
164
165 bool is_native_task_group() const { return (m_tbb_task_group) == nullptr; }
166 bool is_main() const { return this_tid() == m_main_tid; }
167
168 // check if any tasks are still pending
169 intmax_t pending() { return m_tot_task_count.load(); }
170
171 static void set_verbose(int level) { f_verbose = level; }
172
174
175 void notify();
176 void notify_all();
177
178 void reserve(size_t _n)
179 {
180 m_task_list.reserve(_n);
181 m_future_list.reserve(_n);
182 }
183
184public:
185 template <typename Func, typename... Args>
186 std::shared_ptr<task_type<Args...>> wrap(Func func, Args... args)
187 {
188 return operator+=(std::make_shared<task_type<Args...>>(
189 is_native_task_group(), m_depth, std::move(func), std::move(args)...));
190 }
191
192 template <typename Func, typename... Args, typename Up = Tp>
193 enable_if_t<std::is_void<Up>::value, void> exec(Func func, Args... args);
194
195 template <typename Func, typename... Args, typename Up = Tp>
196 enable_if_t<!std::is_void<Up>::value, void> exec(Func func, Args... args);
197
198 template <typename Func, typename... Args>
199 void run(Func func, Args... args)
200 {
201 exec(std::move(func), std::move(args)...);
202 }
203
204protected:
205 template <typename Up, typename Func, typename... Args>
206 enable_if_t<std::is_void<Up>::value, void> local_exec(Func func, Args... args);
207
208 template <typename Up, typename Func, typename... Args>
209 enable_if_t<!std::is_void<Up>::value, void> local_exec(Func func, Args... args);
210
211 // shorter typedefs
216
217public:
218 //------------------------------------------------------------------------//
219 // Get tasks with non-void return types
220 //
222 const future_list_t& get_tasks() const { return m_future_list; }
223
224 //------------------------------------------------------------------------//
225 // iterate over tasks with return type
226 //
227 itr_t begin() { return m_future_list.begin(); }
228 itr_t end() { return m_future_list.end(); }
229 citr_t begin() const { return m_future_list.begin(); }
230 citr_t end() const { return m_future_list.end(); }
231 citr_t cbegin() const { return m_future_list.begin(); }
232 citr_t cend() const { return m_future_list.end(); }
233 ritr_t rbegin() { return m_future_list.rbegin(); }
234 ritr_t rend() { return m_future_list.rend(); }
235 critr_t rbegin() const { return m_future_list.rbegin(); }
236 critr_t rend() const { return m_future_list.rend(); }
237
238 //------------------------------------------------------------------------//
239 // wait to finish
240 template <typename Up = Tp, enable_if_t<!std::is_void<Up>::value, int> = 0>
241 inline Up join(Up accum = {});
242 //------------------------------------------------------------------------//
243 // wait to finish
244 template <typename Up = Tp, typename Rp = Arg,
245 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int> = 0>
246 inline void join();
247 //------------------------------------------------------------------------//
248 // wait to finish
249 template <typename Up = Tp, typename Rp = Arg,
250 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int> = 0>
251 inline void join();
252 //------------------------------------------------------------------------//
253 // clear the task result history
254 void clear();
255
256protected:
257 //------------------------------------------------------------------------//
258 // get the thread id
259 static tid_type this_tid() { return std::this_thread::get_id(); }
260
261 //------------------------------------------------------------------------//
262 // get the task count
264 const atomic_int& task_count() const { return m_tot_task_count; }
265
266protected:
267 static int f_verbose;
268 // Private variables
271 tid_type m_main_tid = std::this_thread::get_id();
280
281private:
282 void internal_update();
283};
284
285} // namespace PTL
286namespace PTL
287{
288template <typename Tp, typename Arg, intmax_t MaxDepth>
289template <typename Func>
291: m_join{ std::forward<Func>(_join) }
292, m_pool{ _tp }
293{
294 internal_update();
295}
296
297template <typename Tp, typename Arg, intmax_t MaxDepth>
298template <typename Up>
300 enable_if_t<std::is_void<Up>::value, int>)
301: m_join{ []() {} }
302, m_pool{ _tp }
303{
304 internal_update();
305}
306
307// Destructor
308template <typename Tp, typename Arg, intmax_t MaxDepth>
310{
311 {
312 // task will decrement counter and then acquire the lock to notify
313 // condition variable so acquiring lock here will prevent the
314 // task group from being destroyed before this is completed
315 AutoLock _lk{ m_task_lock, std::defer_lock };
316 if(!_lk.owns_lock())
317 _lk.lock();
318 }
319
320 if(m_tbb_task_group)
321 {
322 auto* _arena = m_pool->get_task_arena();
323 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
324 }
325 delete m_tbb_task_group;
326 this->clear();
327}
328
329template <typename Tp, typename Arg, intmax_t MaxDepth>
330template <typename Up>
331std::shared_ptr<Up>
333{
334 // thread-safe increment of tasks in task group
335 operator++();
336 // copy the shared pointer to abstract instance
337 m_task_list.push_back(_task);
338 // return the derived instance
339 return std::move(_task);
340}
341
342template <typename Tp, typename Arg, intmax_t MaxDepth>
343void
345{
346 auto _dtor = ScopeDestructor{ [&]() {
347 if(m_tbb_task_group)
348 {
349 auto* _arena = m_pool->get_task_arena();
350 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
351 }
352 } };
353
354 ThreadData* data = ThreadData::GetInstance();
355 if(!data)
356 return;
357
358 // if no pool was initially present at creation
359 if(!m_pool)
360 {
361 // check for master MT run-manager
362 m_pool = internal::get_default_threadpool();
363
364 // if no thread pool created
365 if(!m_pool)
366 {
367 if(f_verbose > 0)
368 {
369 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
370 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
371 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
372 << "nullptr to thread pool!" << std::endl;
373 }
374 return;
375 }
376 }
377
378 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
379 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
380
381 bool _is_main = data->is_main;
382 bool _within_task = data->within_task;
383
384 auto is_active_state = [&]() {
385 return (tpool->state()->load(std::memory_order_relaxed) !=
386 thread_pool::state::STOPPED);
387 };
388
389 auto execute_this_threads_tasks = [&]() {
390 if(!taskq)
391 return;
392
393 // only want to process if within a task
394 if((!_is_main || tpool->size() < 2) && _within_task)
395 {
396 int bin = static_cast<int>(taskq->GetThreadBin());
397 // const auto nitr = (tpool) ? tpool->size() :
398 // Thread::hardware_concurrency();
399 while(this->pending() > 0)
400 {
401 if(!taskq->empty())
402 {
403 auto _task = taskq->GetTask(bin);
404 if(_task)
405 (*_task)();
406 }
407 }
408 }
409 };
410
411 // checks for validity
412 if(!is_native_task_group())
413 {
414 // for external threads
415 if(!_is_main || tpool->size() < 2)
416 return;
417 }
418 else if(f_verbose > 0)
419 {
420 if(!tpool || !taskq)
421 {
422 // something is wrong, didn't create thread-pool?
423 fprintf(stderr,
424 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
425 "(%p)\n",
426 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
427 static_cast<void*>(taskq));
428 }
429 // return if thread pool isn't built
430 else if(is_native_task_group() && !tpool->is_alive())
431 {
432 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
433 __FUNCTION__, __LINE__);
434 }
435 else if(!is_active_state())
436 {
437 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
438 __FUNCTION__, __LINE__);
439 }
440 }
441
442 intmax_t wake_size = 2;
443 AutoLock _lock(m_task_lock, std::defer_lock);
444
445 while(is_active_state())
446 {
447 execute_this_threads_tasks();
448
449 // while loop protects against spurious wake-ups
450 while(_is_main && pending() > 0 && is_active_state())
451 {
452 // auto _wake = [&]() { return (wake_size > pending() ||
453 // !is_active_state());
454 // };
455
456 // lock before sleeping on condition
457 if(!_lock.owns_lock())
458 _lock.lock();
459
460 // Wait until signaled that a task has been competed
461 // Unlock mutex while wait, then lock it back when signaled
462 // when true, this wakes the thread
463 if(pending() >= wake_size)
464 {
465 m_task_cond.wait(_lock);
466 }
467 else
468 {
469 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
470 }
471 // unlock
472 if(_lock.owns_lock())
473 _lock.unlock();
474 }
475
476 // if pending is not greater than zero, we are joined
477 if(pending() <= 0)
478 break;
479 }
480
481 if(_lock.owns_lock())
482 _lock.unlock();
483
484 intmax_t ntask = this->task_count().load();
485 if(ntask > 0)
486 {
487 std::stringstream ss;
488 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
489 << "are running!" << std::endl;
490 std::cerr << ss.str();
491 this->wait();
492 }
493}
494
495template <typename Tp, typename Arg, intmax_t MaxDepth>
498{
499 auto& _counter = m_tot_task_count;
500 auto& _task_cond = task_cond();
501 auto& _task_lock = task_lock();
502 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
503 auto _count = --(_counter);
504 if(_count < 1)
505 {
506 AutoLock _lk{ _task_lock };
507 _task_cond.notify_all();
508 }
509 } };
510}
511
512template <typename Tp, typename Arg, intmax_t MaxDepth>
513void
515{
516 AutoLock _lk{ m_task_lock };
517 m_task_cond.notify_one();
518}
519
520template <typename Tp, typename Arg, intmax_t MaxDepth>
521void
523{
524 AutoLock _lk{ m_task_lock };
525 m_task_cond.notify_all();
526}
527
528template <typename Tp, typename Arg, intmax_t MaxDepth>
529template <typename Func, typename... Args, typename Up>
532{
533 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
534 ThreadData::GetInstance()->task_depth > MaxDepth)
535 {
536 local_exec<Tp>(std::move(func), std::move(args)...);
537 }
538 else
539 {
540 auto& _counter = m_tot_task_count;
541 auto& _task_cond = task_cond();
542 auto& _task_lock = task_lock();
543 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
544 auto* _tdata = ThreadData::GetInstance();
545 if(_tdata)
546 ++(_tdata->task_depth);
547 func(args...);
548 auto _count = --(_counter);
549 if(_tdata)
550 --(_tdata->task_depth);
551 if(_count < 1)
552 {
553 AutoLock _lk{ _task_lock };
554 _task_cond.notify_all();
555 }
556 });
557
558 if(m_tbb_task_group)
559 {
560 auto* _arena = m_pool->get_task_arena();
561 auto* _tbb_task_group = m_tbb_task_group;
562 auto* _ptask = _task.get();
563 _arena->execute([_tbb_task_group, _ptask]() {
564 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
565 });
566 }
567 else
568 {
569 m_pool->add_task(std::move(_task));
570 }
571 }
572}
573template <typename Tp, typename Arg, intmax_t MaxDepth>
574template <typename Func, typename... Args, typename Up>
575enable_if_t<!std::is_void<Up>::value, void>
577{
578 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
579 ThreadData::GetInstance()->task_depth > MaxDepth)
580 {
581 local_exec<Tp>(std::move(func), std::move(args)...);
582 }
583 else
584 {
585 auto& _counter = m_tot_task_count;
586 auto& _task_cond = task_cond();
587 auto& _task_lock = task_lock();
588 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
589 auto* _tdata = ThreadData::GetInstance();
590 if(_tdata)
591 ++(_tdata->task_depth);
592 auto&& _ret = func(args...);
593 auto _count = --(_counter);
594 if(_tdata)
595 --(_tdata->task_depth);
596 if(_count < 1)
597 {
598 AutoLock _lk{ _task_lock };
599 _task_cond.notify_all();
600 }
601 return std::forward<decltype(_ret)>(_ret);
602 });
603
604 if(m_tbb_task_group)
605 {
606 auto* _arena = m_pool->get_task_arena();
607 auto* _tbb_task_group = m_tbb_task_group;
608 auto* _ptask = _task.get();
609 _arena->execute([_tbb_task_group, _ptask]() {
610 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
611 });
612 }
613 else
614 {
615 m_pool->add_task(std::move(_task));
616 }
617 }
618}
619
620template <typename Tp, typename Arg, intmax_t MaxDepth>
621template <typename Up, typename Func, typename... Args>
622enable_if_t<std::is_void<Up>::value, void>
624{
625 auto* _tdata = ThreadData::GetInstance();
626 if(_tdata)
627 ++(_tdata->task_depth);
628 promise_type _p{};
629 m_future_list.emplace_back(_p.get_future());
630 func(args...);
631 _p.set_value();
632 if(_tdata)
633 --(_tdata->task_depth);
634}
635
636template <typename Tp, typename Arg, intmax_t MaxDepth>
637template <typename Up, typename Func, typename... Args>
640{
641 auto* _tdata = ThreadData::GetInstance();
642 if(_tdata)
643 ++(_tdata->task_depth);
644 promise_type _p{};
645 m_future_list.emplace_back(_p.get_future());
646 _p.set_value(func(args...));
647 if(_tdata)
648 --(_tdata->task_depth);
649}
650
651template <typename Tp, typename Arg, intmax_t MaxDepth>
652template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
653inline Up
655{
656 this->wait();
657 for(auto& itr : m_task_list)
658 {
659 using RetT = decay_t<decltype(itr->get())>;
660 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
661 }
662 for(auto& itr : m_future_list)
663 {
664 using RetT = decay_t<decltype(itr.get())>;
665 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
666 }
667 this->clear();
668 return accum;
669}
670
671template <typename Tp, typename Arg, intmax_t MaxDepth>
672template <typename Up, typename Rp,
673 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
674inline void
676{
677 this->wait();
678 for(auto& itr : m_task_list)
679 itr->get();
680 for(auto& itr : m_future_list)
681 itr.get();
682 m_join();
683 this->clear();
684}
685
686template <typename Tp, typename Arg, intmax_t MaxDepth>
687template <typename Up, typename Rp,
688 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
689inline void
691{
692 this->wait();
693 for(auto& itr : m_task_list)
694 {
695 using RetT = decay_t<decltype(itr->get())>;
696 m_join(std::forward<RetT>(itr->get()));
697 }
698 for(auto& itr : m_future_list)
699 {
700 using RetT = decay_t<decltype(itr.get())>;
701 m_join(std::forward<RetT>(itr.get()));
702 }
703 this->clear();
704}
705
706template <typename Tp, typename Arg, intmax_t MaxDepth>
707void
709{
710 m_future_list.clear();
711 m_task_list.clear();
712}
713
714template <typename Tp, typename Arg, intmax_t MaxDepth>
715void
717{
718 if(!m_pool)
719 m_pool = internal::get_default_threadpool();
720
721 if(!m_pool)
722 {
723 std::stringstream ss{};
724 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
725 << " :: nullptr to thread pool";
726 throw std::runtime_error(ss.str());
727 }
728
729 if(m_pool->is_tbb_threadpool())
730 {
731 m_tbb_task_group = new tbb_task_group_t{};
732 }
733}
734
735template <typename Tp, typename Arg, intmax_t MaxDepth>
736int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>("PTL_VERBOSE", 0);
737
738} // namespace PTL
ritr_t rend()
Definition TaskGroup.hh:234
intmax_t operator++()
Definition TaskGroup.hh:145
TaskGroup & operator=(this_type &&rhs)=default
std::atomic_intmax_t atomic_int
Definition TaskGroup.hh:92
container_type< future_type > future_list_t
Definition TaskGroup.hh:103
typename future_list_t::reverse_iterator reverse_iterator
Definition TaskGroup.hh:106
ThreadPool * pool() const
Definition TaskGroup.hh:163
std::promise< ArgTp > promise_type
Definition TaskGroup.hh:100
intmax_t size() const
Definition TaskGroup.hh:151
future_list_t m_future_list
Definition TaskGroup.hh:279
tbb_task_group_t * m_tbb_task_group
Definition TaskGroup.hh:277
std::atomic_uintmax_t atomic_uint
Definition TaskGroup.hh:93
std::vector< Up > container_type
Definition TaskGroup.hh:87
decay_t< Arg > ArgTp
Definition TaskGroup.hh:95
ritr_t rbegin()
Definition TaskGroup.hh:233
TaskGroup(const this_type &)=delete
iterator itr_t
Definition TaskGroup.hh:212
ThreadPool * m_pool
Definition TaskGroup.hh:276
const atomic_int & task_count() const
Definition TaskGroup.hh:264
typename JoinFunction< Tp, Arg >::Type join_type
Definition TaskGroup.hh:104
citr_t end() const
Definition TaskGroup.hh:230
citr_t cbegin() const
Definition TaskGroup.hh:231
intmax_t operator++(int)
Definition TaskGroup.hh:146
const future_list_t & get_tasks() const
Definition TaskGroup.hh:222
bool is_native_task_group() const
Definition TaskGroup.hh:165
citr_t cend() const
Definition TaskGroup.hh:232
typename future_list_t::const_iterator const_iterator
Definition TaskGroup.hh:107
intmax_t operator--()
Definition TaskGroup.hh:147
std::packaged_task< ArgTp()> packaged_task_type
Definition TaskGroup.hh:102
critr_t rend() const
Definition TaskGroup.hh:236
static void set_verbose(int level)
Definition TaskGroup.hh:171
enable_if_t< std::is_void< Up >::value, void > local_exec(Func func, Args... args)
Definition TaskGroup.hh:623
std::thread::id tid_type
Definition TaskGroup.hh:89
join_type m_join
Definition TaskGroup.hh:275
task_list_t m_task_list
Definition TaskGroup.hh:278
std::shared_ptr< TaskFuture< ArgTp > > task_pointer
Definition TaskGroup.hh:97
TaskGroup & operator=(const this_type &rhs)=delete
void notify_all()
Definition TaskGroup.hh:522
Condition condition_t
Definition TaskGroup.hh:94
static tid_type this_tid()
Definition TaskGroup.hh:259
atomic_int m_tot_task_count
Definition TaskGroup.hh:272
atomic_int & task_count()
Definition TaskGroup.hh:263
const_reverse_iterator critr_t
Definition TaskGroup.hh:215
const_iterator citr_t
Definition TaskGroup.hh:213
ThreadPool *& pool()
Definition TaskGroup.hh:162
static int f_verbose
Definition TaskGroup.hh:267
condition_t & task_cond()
Definition TaskGroup.hh:155
intmax_t operator--(int)
Definition TaskGroup.hh:148
tid_type m_main_tid
Definition TaskGroup.hh:271
reverse_iterator ritr_t
Definition TaskGroup.hh:214
lock_t m_task_lock
Definition TaskGroup.hh:273
std::shared_ptr< task_type< Args... > > wrap(Func func, Args... args)
Definition TaskGroup.hh:186
void run(Func func, Args... args)
Definition TaskGroup.hh:199
future_list_t & get_tasks()
Definition TaskGroup.hh:221
bool is_main() const
Definition TaskGroup.hh:166
uintmax_t size_type
Definition TaskGroup.hh:90
typename future_list_t::iterator iterator
Definition TaskGroup.hh:105
TaskGroup(Func &&_join, ThreadPool *_tp=internal::get_default_threadpool())
Definition TaskGroup.hh:290
lock_t & task_lock()
Definition TaskGroup.hh:154
enable_if_t< std::is_void< Up >::value, void > exec(Func func, Args... args)
Definition TaskGroup.hh:531
uintmax_t id() const
Definition TaskGroup.hh:158
citr_t begin() const
Definition TaskGroup.hh:229
condition_t m_task_cond
Definition TaskGroup.hh:274
intmax_t m_depth
Definition TaskGroup.hh:270
uintmax_t m_id
Definition TaskGroup.hh:269
typename future_list_t::const_reverse_iterator const_reverse_iterator
Definition TaskGroup.hh:108
void set_pool(ThreadPool *tp)
Definition TaskGroup.hh:161
std::future< ArgTp > future_type
Definition TaskGroup.hh:101
void reserve(size_t _n)
Definition TaskGroup.hh:178
critr_t rbegin() const
Definition TaskGroup.hh:235
TaskGroup(this_type &&rhs)=default
container_type< task_pointer > task_list_t
Definition TaskGroup.hh:98
intmax_t pending()
Definition TaskGroup.hh:169
ScopeDestructor get_scope_destructor()
Definition TaskGroup.hh:497
std::shared_ptr< Up > operator+=(std::shared_ptr< Up > &&_task)
Definition TaskGroup.hh:332
The task class is supplied to thread_pool.
Definition Task.hh:131
VUserTaskQueue * current_queue
ThreadPool * thread_pool
const pool_state_type & state() const
task_queue_t * get_queue() const
size_type size() const
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual bool empty() const =0
virtual intmax_t GetThreadBin() const =0
intmax_t get_task_depth()
Definition TaskGroup.cc:64
ThreadPool * get_default_threadpool()
Definition TaskGroup.cc:51
std::atomic_uintmax_t & task_group_counter()
Definition TaskGroup.cc:44
std::mutex Mutex
Definition Threading.hh:57
typename std::enable_if< B, T >::type enable_if_t
Definition Globals.hh:57
std::condition_variable Condition
Definition Threading.hh:43
tbb::task_group tbb_task_group_t
typename std::decay< T >::type decay_t
Definition Globals.hh:54
std::function< JoinT(JoinT &, JoinArg &&)> Type