39, m_is_clone((parent) ? true : false)
40, m_thread_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
41, m_insert_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
42, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
43, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
49 for(intmax_t i = 0; i < nworkers + 1; ++i)
50 m_subqueues->push_back(
new TaskSubQueue(m_ntasks));
54 if(GetEnv<int>(
"PTL_VERBOSE", 0) > 3)
59 << __FUNCTION__ <<
":" << __LINE__ <<
"] "
60 <<
"this = " <<
this <<
", "
61 <<
"clone = " << std::boolalpha << m_is_clone <<
", "
62 <<
"thread = " << m_thread_bin <<
", "
63 <<
"insert = " << m_insert_bin <<
", "
64 <<
"hold = " << m_hold->load() <<
" @ " << m_hold <<
", "
65 <<
"tasks = " << m_ntasks->load() <<
" @ " << m_ntasks <<
", "
66 <<
"subqueue = " << m_subqueues <<
", "
69 std::cout << ss.str() << std::endl;
80 for(
auto& itr : *m_subqueues)
82 assert(itr->size() == 0);
102 m_subqueues->push_back(
new TaskSubQueue(m_ntasks));
110 delete m_subqueues->back();
111 m_subqueues->pop_back();
130 static thread_local intmax_t tl_bin =
140 return (++m_insert_bin % (
m_workers + 1));
149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (
m_workers + 1)];
153 auto get_task = [&]() {
154 if(task_subq->AcquireClaim())
157 _task = task_subq->PopTask(
true);
159 task_subq->ReleaseClaim();
164 return (_task !=
nullptr);
169 while(!task_subq->empty())
188 intmax_t n = (subq < 0) ? tbin : subq;
192 if(m_hold->load(std::memory_order_relaxed))
199 auto get_task = [&](intmax_t _n) {
200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (
m_workers + 1)];
203 if(!task_subq->empty() && task_subq->AcquireClaim())
206 _task = task_subq->PopTask(n == tbin);
208 task_subq->ReleaseClaim();
213 return (_task !=
nullptr);
221 for(intmax_t i = 0; i < nitr; ++i, ++n)
243 bool spin = m_hold->load(std::memory_order_relaxed);
259 auto insert_task = [&](intmax_t _n) {
260 TaskSubQueue* task_subq = (*m_subqueues)[_n];
267 if(task_subq->AcquireClaim())
270 task_subq->PushTask(task);
272 task_subq->ReleaseClaim();
286 while(!insert_task(n))
308 typedef std::map<int64_t, bool> thread_execute_map_t;
316 auto join_func = [=](
int& ref,
int i) {
320 task_group_type* tg =
new task_group_type(join_func, tp);
324 while(tp->get_active_threads_count() > 0)
325 ThisThread::sleep_for(std::chrono::milliseconds(10));
327 thread_execute_map_t* thread_execute_map =
new thread_execute_map_t();
336 auto thread_specific_func = [&]() {
339 bool& _executed = (*thread_execute_map)[
GetThreadBin()];
351 auto _task = tg->wrap(thread_specific_func);
359 int nexecuted = tg->join();
362 std::stringstream msg;
363 msg <<
"Failure executing routine on all threads! Only " << nexecuted
364 <<
" threads executed function out of " <<
m_workers;
365 std::cerr << msg.str() << std::endl;
369 delete thread_execute_map;
380 typedef std::map<int64_t, bool> thread_execute_map_t;
382 auto join_func = [=](
int& ref,
int i) {
386 task_group_type* tg =
new task_group_type(join_func, tp);
390 while(tp->get_active_threads_count() > 0)
391 ThisThread::sleep_for(std::chrono::milliseconds(10));
399 thread_execute_map_t* thread_execute_map =
new thread_execute_map_t();
404 auto thread_specific_func = [=]() {
407 bool& _executed = (*thread_execute_map)[
GetThreadBin()];
409 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
419 if(tid_set.count(ThisThread::get_id()) > 0)
428 auto _task = tg->wrap(thread_specific_func);
432 int nexecuted = tg->join();
435 std::stringstream msg;
436 msg <<
"Failure executing routine on all threads! Only " << nexecuted
437 <<
" threads executed function out of " << tid_set.size();
438 std::cerr << msg.str() << std::endl;
442 delete thread_execute_map;
449UserTaskQueue::AcquireHold()
452 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
454 m_hold->compare_exchange_strong(_hold,
true, std::memory_order_release,
455 std::memory_order_relaxed);
462UserTaskQueue::ReleaseHold()
465 while((_hold = m_hold->load(std::memory_order_relaxed)))
467 m_hold->compare_exchange_strong(_hold,
false, std::memory_order_release,
468 std::memory_order_relaxed);
static ThreadData *& GetInstance()
static uintmax_t get_this_thread_id()
virtual VUserTaskQueue * clone() override
std::vector< TaskSubQueue * > TaskSubQueueContainer
virtual void resize(intmax_t) override
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
virtual intmax_t GetThreadBin() const override
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1) override
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
bool true_empty() const override
virtual ~UserTaskQueue() override
task_pointer GetThreadBinTask()
size_type true_size() const override
VTask is the abstract class stored in thread_pool.
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
RecursiveMutex & TypeRecursiveMutex(const unsigned int &_n=0)