49, m_is_clone((parent) != nullptr)
50, m_thread_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
51, m_insert_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_hold((parent) ? parent->m_hold : new
std::atomic_bool(false))
53, m_ntasks((parent) ? parent->m_ntasks : new
std::atomic_uintmax_t(0))
54, m_mutex((parent) ? parent->m_mutex : new Mutex{})
60 for(intmax_t i = 0; i < nworkers + 1; ++i)
71 for(
auto& itr : *m_subqueues)
90 throw std::runtime_error(
"nullptr to mutex");
104 delete m_subqueues->back();
105 m_subqueues->pop_back();
124 static thread_local intmax_t tl_bin =
134 return (++m_insert_bin % (
m_workers + 1));
147 auto get_task = [&]() {
151 _task = task_subq->
PopTask(
true);
158 return (_task !=
nullptr);
163 while(!task_subq->
empty())
182 intmax_t n = (subq < 0) ? tbin : subq;
186 if(m_hold->load(std::memory_order_relaxed))
193 auto get_task = [&](intmax_t _n) {
200 _task = task_subq->
PopTask(n == tbin);
207 return (_task !=
nullptr);
215 for(intmax_t i = 0; i < nitr; ++i, ++n)
237 bool spin = m_hold->load(std::memory_order_relaxed);
253 auto insert_task = [&](intmax_t _n) {
264 task_subq->
PushTask(std::move(task));
280 while(!insert_task(n))
301 using thread_execute_map_t = std::map<int64_t, bool>;
309 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); }, tp };
313 while(tp->get_active_threads_count() > 0)
314 ThisThread::sleep_for(std::chrono::milliseconds(10));
316 thread_execute_map_t thread_execute_map{};
317 std::vector<std::shared_ptr<VTask>> _tasks{};
327 auto thread_specific_func = [&]() {
347 int nexecuted = tg.join();
350 std::stringstream msg;
351 msg <<
"Failure executing routine on all threads! Only " << nexecuted
352 <<
" threads executed function out of " <<
m_workers <<
" workers";
353 std::cerr << msg.str() << std::endl;
365 using thread_execute_map_t = std::map<int64_t, bool>;
367 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); }, tp };
371 while(tp->get_active_threads_count() > 0)
372 ThisThread::sleep_for(std::chrono::milliseconds(10));
380 thread_execute_map_t thread_execute_map{};
385 auto thread_specific_func = [&]() {
391 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
401 if(tid_set.count(ThisThread::get_id()) > 0)
413 decltype(tid_set.size()) nexecuted = tg.join();
414 if(nexecuted != tid_set.size())
416 std::stringstream msg;
417 msg <<
"Failure executing routine on specific threads! Only " << nexecuted
418 <<
" threads executed function out of " << tid_set.size() <<
" workers";
419 std::cerr << msg.str() << std::endl;
427UserTaskQueue::AcquireHold()
430 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
432 m_hold->compare_exchange_strong(_hold,
true, std::memory_order_release,
433 std::memory_order_relaxed);
440UserTaskQueue::ReleaseHold()
443 while((_hold = m_hold->load(std::memory_order_relaxed)))
445 m_hold->compare_exchange_strong(_hold,
false, std::memory_order_release,
446 std::memory_order_relaxed);
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
static ThreadData *& GetInstance()
static uintmax_t get_this_thread_id()
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
void resize(intmax_t) override
VUserTaskQueue * clone() override
task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
std::vector< TaskSubQueue * > TaskSubQueueContainer
intmax_t GetThreadBin() const override
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
task_pointer GetThreadBinTask()
~UserTaskQueue() override
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
VUserTaskQueue(intmax_t nworkers=-1)
Backports of C++ language features for use with C++11 compilers.
TemplateAutoLock< Mutex > AutoLock