Geant4 11.3.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
ThreadPool.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#include "PTL/ThreadPool.hh"
32#include "PTL/GetEnv.hh"
34#include "PTL/ThreadData.hh"
35#include "PTL/Threading.hh"
36#include "PTL/UserTaskQueue.hh"
37#include "PTL/VUserTaskQueue.hh"
38
39#include <cassert>
40#include <mutex>
41#include <new>
42#include <stdexcept>
43#include <thread>
44
45//======================================================================================//
46
47namespace
48{
50thread_data()
51{
53}
54} // namespace
55
56namespace PTL
57{
58//======================================================================================//
59
61ThreadPool::f_thread_ids()
62{
63 static auto _v = thread_id_map_t{};
64 return _v;
65}
66
67//======================================================================================//
68
70ThreadPool::f_default_pool_size()
71{
72 static size_type _v =
73 GetEnv<size_type>("PTL_NUM_THREADS", Thread::hardware_concurrency());
74 return _v;
75}
76
77//======================================================================================//
78// static member function that calls the member function we want the thread to
79// run
80void
81ThreadPool::start_thread(ThreadPool* tp, thread_data_t* _data, intmax_t _idx)
82{
83 if(tp->get_verbose() > 0)
84 {
85 AutoLock lock(TypeMutex<decltype(std::cerr)>());
86 std::cerr << "[PTL::ThreadPool] Starting thread " << _idx << "..." << std::endl;
87 }
88
89 auto _thr_data = std::make_shared<ThreadData>(tp);
90 {
91 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
92 if(!lock.owns_lock())
93 lock.lock();
94 if(_idx < 0)
95 _idx = f_thread_ids().size();
96 f_thread_ids()[std::this_thread::get_id()] = _idx;
97 SetThreadId((int)_idx);
98 _data->emplace_back(_thr_data);
99 }
100 thread_data() = _thr_data.get();
101 tp->record_entry();
102 tp->execute_thread(thread_data()->current_queue);
103 tp->record_exit();
104
105 if(tp->get_verbose() > 0)
106 {
107 AutoLock lock(TypeMutex<decltype(std::cerr)>());
108 std::cerr << "[PTL::ThreadPool] Thread " << _idx << " terminating..."
109 << std::endl;
110 }
111}
112
113//======================================================================================//
114
117{
118 return f_thread_ids();
119}
120
121//======================================================================================//
122
123uintmax_t
125{
126 uintmax_t _idx = 0;
127 {
128 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
129 if(!lock.owns_lock())
130 lock.lock();
131 auto itr = f_thread_ids().find(_tid);
132 if(itr == f_thread_ids().end())
133 {
134 _idx = f_thread_ids().size();
135 f_thread_ids()[_tid] = _idx;
136 }
137 else
138 {
139 _idx = itr->second;
140 }
141 }
142 return _idx;
143}
144
145//======================================================================================//
146
147uintmax_t
149{
150 return get_thread_id(ThisThread::get_id());
151}
152
153//======================================================================================//
154
155uintmax_t
157{
158 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
159 if(!lock.owns_lock())
160 lock.lock();
161 if(f_thread_ids().find(_tid) == f_thread_ids().end())
162 {
163 auto _idx = f_thread_ids().size();
164 f_thread_ids()[_tid] = _idx;
165 SetThreadId((int)_idx);
166 }
167 return f_thread_ids().at(_tid);
168}
169
170//======================================================================================//
171
173: m_use_affinity{ _cfg.use_affinity }
174, m_tbb_tp{ _cfg.use_tbb }
175, m_verbose{ _cfg.verbose }
176, m_priority{ _cfg.priority }
177, m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) }
178, m_task_queue{ _cfg.task_queue }
179, m_init_func{ _cfg.initializer }
180, m_fini_func{ _cfg.finalizer }
181, m_affinity_func{ _cfg.set_affinity }
182{
183 auto master_id = get_this_thread_id();
184 if(master_id != 0 && m_verbose > 1)
185 {
186 AutoLock lock(TypeMutex<decltype(std::cerr)>());
187 std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
188 }
189
190 thread_data() = new ThreadData(this);
191
192 // initialize after get_this_thread_id so master is zero
193 if(_cfg.init)
195}
196
197//======================================================================================//
198
200{
201 if(m_alive_flag->load())
202 {
203 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
204 "destroy_threadpool() before deleting the ThreadPool object to "
205 "eliminate this message."
206 << std::endl;
207 m_pool_state->store(thread_pool::state::STOPPED);
208 m_task_lock->lock();
209 m_task_cond->notify_all();
210 m_task_lock->unlock();
211 for(auto& itr : m_threads)
212 itr.join();
213 m_threads.clear();
214 }
215
216 // delete owned resources
217 if(m_delete_task_queue)
218 delete m_task_queue;
219
220 delete m_tbb_task_arena;
221 delete m_tbb_task_group;
222}
223
224//======================================================================================//
225
226bool
228{
229 return !(m_pool_state->load() == thread_pool::state::NONINIT);
230}
231
232//======================================================================================//
233
234void
235ThreadPool::record_entry()
236{
237 ++(*m_thread_active);
238}
239
240//======================================================================================//
241
242void
243ThreadPool::record_exit()
244{
245 --(*m_thread_active);
246}
247
248//======================================================================================//
249
250void
251ThreadPool::set_affinity(intmax_t i, Thread& _thread) const
252{
253 try
254 {
255 NativeThread native_thread = _thread.native_handle();
256 intmax_t _pin = m_affinity_func(i);
257 if(m_verbose > 0)
258 {
259 AutoLock lock(TypeMutex<decltype(std::cerr)>());
260 std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread "
261 << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl;
262 }
263 SetPinAffinity((int)_pin, native_thread);
264 } catch(std::runtime_error& e)
265 {
266 std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what()
267 << std::endl;
268 }
269}
270
271//======================================================================================//
272
273void
274ThreadPool::set_priority(int _prio, Thread& _thread) const
275{
276 try
277 {
278 NativeThread native_thread = _thread.native_handle();
279 if(m_verbose > 0)
280 {
281 AutoLock lock(TypeMutex<decltype(std::cerr)>());
282 std::cerr << "[PTL::ThreadPool] Setting thread "
283 << get_thread_id(_thread.get_id()) << " priority to " << _prio
284 << std::endl;
285 }
286 SetThreadPriority(_prio, native_thread);
287 } catch(std::runtime_error& e)
288 {
289 AutoLock lock(TypeMutex<decltype(std::cerr)>());
290 std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what()
291 << std::endl;
292 }
293}
294
295//======================================================================================//
296
299{
300 //--------------------------------------------------------------------//
301 // return before initializing
302 if(proposed_size < 1)
303 return 0;
304
305 //--------------------------------------------------------------------//
306 // store that has been started
307 if(!m_alive_flag->load())
308 m_pool_state->store(thread_pool::state::STARTED);
309
310#if defined(PTL_USE_TBB)
311 //--------------------------------------------------------------------//
312 // handle tbb task scheduler
313 if(m_tbb_tp)
314 {
315 m_tbb_tp = true;
316 m_pool_size = proposed_size;
317 tbb_global_control_t*& _global_control = tbb_global_control();
318 // delete if wrong size
319 if(m_pool_size != proposed_size)
320 {
321 delete _global_control;
322 _global_control = nullptr;
323 }
324
325 if(!_global_control)
326 {
327 _global_control = new tbb_global_control_t(
329 if(m_verbose > 0)
330 {
331 AutoLock lock(TypeMutex<decltype(std::cerr)>());
332 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with "
333 << m_pool_size << " threads." << std::endl;
334 }
335 }
336
337 // create task group (used for async)
338 if(!m_tbb_task_group)
339 {
340 m_tbb_task_group = new tbb_task_group_t{};
341 execute_on_all_threads([this]() { m_init_func(); });
342 }
343
344 return m_pool_size;
345 }
346#endif
347
348 m_alive_flag->store(true);
349
350 //--------------------------------------------------------------------//
351 // if started, stop some thread if smaller or return if equal
352 if(m_pool_state->load() == thread_pool::state::STARTED)
353 {
354 if(m_pool_size > proposed_size)
355 {
356 while(stop_thread() > proposed_size)
357 ;
358 if(m_verbose > 0)
359 {
360 AutoLock lock(TypeMutex<decltype(std::cerr)>());
361 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with "
362 << m_pool_size << " threads." << std::endl;
363 }
364 if(!m_task_queue)
365 {
366 m_delete_task_queue = true;
367 m_task_queue = new UserTaskQueue(m_pool_size);
368 }
369 else
370 {
371 m_task_queue->resize(m_pool_size);
372 }
373 return m_pool_size;
374 }
375 else if(m_pool_size == proposed_size) // NOLINT
376 {
377 if(m_verbose > 0)
378 {
379 AutoLock lock(TypeMutex<decltype(std::cerr)>());
380 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads."
381 << std::endl;
382 }
383 if(!m_task_queue)
384 {
385 m_delete_task_queue = true;
386 m_task_queue = new UserTaskQueue(m_pool_size);
387 }
388 return m_pool_size;
389 }
390 }
391
392 //--------------------------------------------------------------------//
393 // reserve enough space to prevent realloc later
394 {
395 AutoLock _task_lock(*m_task_lock);
396 m_is_joined.reserve(proposed_size);
397 }
398
399 if(!m_task_queue)
400 {
401 m_delete_task_queue = true;
402 m_task_queue = new UserTaskQueue(proposed_size);
403 }
404
405 auto this_tid = get_this_thread_id();
406 for(size_type i = m_pool_size; i < proposed_size; ++i)
407 {
408 // add the threads
409 try
410 {
411 // create thread
412 Thread thr{ ThreadPool::start_thread, this, &m_thread_data,
413 this_tid + i + 1 };
414 // only reaches here if successful creation of thread
415 ++m_pool_size;
416 // store thread
417 m_main_threads.push_back(thr.get_id());
418 // list of joined thread booleans
419 m_is_joined.push_back(false);
420 // set the affinity
421 if(m_use_affinity)
422 set_affinity(i, thr);
423 set_priority(m_priority, thr);
424 // store
425 m_threads.emplace_back(std::move(thr));
426 } catch(std::runtime_error& e)
427 {
428 AutoLock lock(TypeMutex<decltype(std::cerr)>());
429 std::cerr << "[PTL::ThreadPool] " << e.what()
430 << std::endl; // issue creating thread
431 continue;
432 } catch(std::bad_alloc& e)
433 {
434 AutoLock lock(TypeMutex<decltype(std::cerr)>());
435 std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl;
436 continue;
437 }
438 }
439 //------------------------------------------------------------------------//
440
441 AutoLock _task_lock(*m_task_lock);
442
443 // thread pool size doesn't match with join vector
444 // this will screw up joining later
445 if(m_is_joined.size() != m_main_threads.size())
446 {
447 std::stringstream ss;
448 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
449 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
450 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
451
452 throw std::runtime_error(ss.str());
453 }
454
455 if(m_verbose > 0)
456 {
457 AutoLock lock(TypeMutex<decltype(std::cerr)>());
458 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
459 << " threads." << std::endl;
460 }
461
462 return m_main_threads.size();
463}
464
465//======================================================================================//
466
469{
470 // Note: this is not for synchronization, its for thread communication!
471 // destroy_threadpool() will only be called from the main thread, yet
472 // the modified m_pool_state may not show up to other threads until its
473 // modified in a lock!
474 //------------------------------------------------------------------------//
475 m_pool_state->store(thread_pool::state::STOPPED);
476
477 //--------------------------------------------------------------------//
478 // handle tbb task scheduler
479#if defined(PTL_USE_TBB)
480 if(m_tbb_task_group)
481 {
482 execute_on_all_threads([this]() { m_fini_func(); });
483 auto _func = [&]() { m_tbb_task_group->wait(); };
484 if(m_tbb_task_arena)
485 m_tbb_task_arena->execute(_func);
486 else
487 _func();
488 delete m_tbb_task_group;
489 m_tbb_task_group = nullptr;
490 }
491 if(m_tbb_task_arena)
492 {
493 delete m_tbb_task_arena;
494 m_tbb_task_arena = nullptr;
495 }
496 if(m_tbb_tp && tbb_global_control())
497 {
498 tbb_global_control_t*& _global_control = tbb_global_control();
499 delete _global_control;
500 _global_control = nullptr;
501 m_tbb_tp = false;
502 AutoLock lock(TypeMutex<decltype(std::cerr)>());
503 if(m_verbose > 0)
504 {
505 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
506 }
507 }
508#endif
509
510 if(!m_alive_flag->load())
511 return 0;
512
513 //------------------------------------------------------------------------//
514 // notify all threads we are shutting down
515 m_task_lock->lock();
516 m_task_cond->notify_all();
517 m_task_lock->unlock();
518 //------------------------------------------------------------------------//
519
520 if(m_is_joined.size() != m_main_threads.size())
521 {
522 std::stringstream ss;
523 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
524 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
525 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
526
527 throw std::runtime_error(ss.str());
528 }
529
530 for(size_type i = 0; i < m_is_joined.size(); i++)
531 {
532 //--------------------------------------------------------------------//
533 //
534 if(i < m_threads.size())
535 m_threads.at(i).join();
536
537 //--------------------------------------------------------------------//
538 // if its joined already, nothing else needs to be done
539 if(m_is_joined.at(i))
540 continue;
541
542 //--------------------------------------------------------------------//
543 // join
544 if(std::this_thread::get_id() == m_main_threads[i])
545 continue;
546
547 //--------------------------------------------------------------------//
548 // thread id and index
549 auto _tid = m_main_threads[i];
550
551 //--------------------------------------------------------------------//
552 // erase thread from thread ID list
553 if(f_thread_ids().find(_tid) != f_thread_ids().end())
554 f_thread_ids().erase(f_thread_ids().find(_tid));
555
556 //--------------------------------------------------------------------//
557 // it's joined
558 m_is_joined.at(i) = true;
559 }
560
561 m_thread_data.clear();
562 m_threads.clear();
563 m_main_threads.clear();
564 m_is_joined.clear();
565
566 m_alive_flag->store(false);
567
568 auto start = std::chrono::steady_clock::now();
569 auto elapsed = std::chrono::duration<double>{};
570 // wait maximum of 30 seconds for threads to exit
571 while(m_thread_active->load() > 0 && elapsed.count() < 30)
572 {
573 std::this_thread::sleep_for(std::chrono::milliseconds(50));
574 elapsed = std::chrono::steady_clock::now() - start;
575 }
576
577 auto _active = m_thread_active->load();
578
579 if(get_verbose() > 0)
580 {
581 if(_active == 0)
582 {
583 AutoLock lock(TypeMutex<decltype(std::cerr)>());
584 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
585 }
586 else
587 {
588 AutoLock lock(TypeMutex<decltype(std::cerr)>());
589 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active
590 << " threads might still be active (and cause a termination error)"
591 << std::endl;
592 }
593 }
594
595 if(m_delete_task_queue)
596 {
597 delete m_task_queue;
598 m_task_queue = nullptr;
599 }
600
601 return 0;
602}
603
604//======================================================================================//
605
608{
609 if(!m_alive_flag->load() || m_pool_size == 0)
610 return 0;
611
612 m_pool_state->store(thread_pool::state::PARTIAL);
613
614 //------------------------------------------------------------------------//
615 // notify all threads we are shutting down
616 m_task_lock->lock();
617 m_is_stopped.push_back(true);
618 m_task_cond->notify_one();
619 m_task_lock->unlock();
620 //------------------------------------------------------------------------//
621
622 while(!m_is_stopped.empty() && m_stop_threads.empty())
623 ;
624
625 // lock up the task queue
626 AutoLock _task_lock(*m_task_lock);
627
628 while(!m_stop_threads.empty())
629 {
630 auto tid = m_stop_threads.front();
631 // remove from stopped
632 m_stop_threads.pop_front();
633 // remove from main
634 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
635 {
636 if(*itr == tid)
637 {
638 m_main_threads.erase(itr);
639 break;
640 }
641 }
642 // remove from join list
643 m_is_joined.pop_back();
644 }
645
646 m_pool_state->store(thread_pool::state::STARTED);
647
648 m_pool_size = m_main_threads.size();
649 return m_main_threads.size();
650}
651
652//======================================================================================//
653
656{
657 if(!_queue)
658 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
659 return _queue;
660}
661//======================================================================================//
662
663// Temporary workaround for shared_ptr constructor GPFLT on Intel Macs and Clang 15
664#if defined (__APPLE__) && defined(__amd64) && defined(__clang__)
665[[clang::optnone]]
666#endif
667void
668ThreadPool::execute_thread(VUserTaskQueue* _task_queue)
669{
670 ++(*m_thread_awake);
671
672 // initialization function
673 m_init_func();
674 // finalization function (executed when scope is destroyed)
675 ScopeDestructor _fini{ [this]() { m_fini_func(); } };
676
677 ThreadId tid = ThisThread::get_id();
678 ThreadData* data = thread_data();
679 // auto thread_bin = _task_queue->GetThreadBin();
680 // auto workers = _task_queue->workers();
681
682 auto start = std::chrono::steady_clock::now();
683 auto elapsed = std::chrono::duration<double>{};
684 // check for updates for 60 seconds max
685 while(!_task_queue && elapsed.count() < 60)
686 {
687 elapsed = std::chrono::steady_clock::now() - start;
688 data->update();
689 _task_queue = data->current_queue;
690 }
691
692 if(!_task_queue)
693 {
694 --(*m_thread_awake);
695 throw std::runtime_error("No task queue was found after 60 seconds!");
696 }
697
698 assert(data->current_queue != nullptr);
699 assert(_task_queue == data->current_queue);
700
701 // essentially a dummy run
702 if(_task_queue)
703 {
704 data->within_task = true;
705 auto _task = _task_queue->GetTask();
706 if(_task)
707 {
708 (*_task)();
709 }
710 data->within_task = false;
711 }
712
713 // threads stay in this loop forever until thread-pool destroyed
714 while(true)
715 {
716 static thread_local auto p_task_lock = m_task_lock;
717
718 //--------------------------------------------------------------------//
719 // Try to pick a task
720 AutoLock _task_lock(*p_task_lock, std::defer_lock);
721 //--------------------------------------------------------------------//
722
723 auto leave_pool = [&]() {
724 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
725 auto _pool_state = _state();
726 if(_pool_state > 0)
727 {
728 // stop whole pool
729 if(_pool_state == thread_pool::state::STOPPED)
730 {
731 if(_task_lock.owns_lock())
732 _task_lock.unlock();
733 return true;
734 }
735 // single thread stoppage
736 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
737 {
738 if(!_task_lock.owns_lock())
739 _task_lock.lock();
740 if(!m_is_stopped.empty() && m_is_stopped.back())
741 {
742 m_stop_threads.push_back(tid);
743 m_is_stopped.pop_back();
744 if(_task_lock.owns_lock())
745 _task_lock.unlock();
746 // exit entire function
747 return true;
748 }
749 if(_task_lock.owns_lock())
750 _task_lock.unlock();
751 }
752 }
753 return false;
754 };
755
756 // We need to put condition.wait() in a loop for two reasons:
757 // 1. There can be spurious wake-ups (due to signal/ENITR)
758 // 2. When mutex is released for waiting, another thread can be woken up
759 // from a signal/broadcast and that thread can mess up the condition.
760 // So when the current thread wakes up the condition may no longer be
761 // actually true!
762 while(_task_queue->empty())
763 {
764 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
765 auto _size = [&]() { return _task_queue->true_size(); };
766 auto _empty = [&]() { return _task_queue->empty(); };
767 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
768
769 if(leave_pool())
770 return;
771
772 if(_task_queue->true_size() == 0)
773 {
774 if(m_thread_awake->load() > 0)
775 --(*m_thread_awake);
776
777 // lock before sleeping on condition
778 if(!_task_lock.owns_lock())
779 _task_lock.lock();
780
781 // Wait until there is a task in the queue
782 // Unlocks mutex while waiting, then locks it back when signaled
783 // use lambda to control waking
784 m_task_cond->wait(_task_lock, _wake);
785
786 if(_state() == thread_pool::state::STOPPED)
787 return;
788
789 // unlock if owned
790 if(_task_lock.owns_lock())
791 _task_lock.unlock();
792
793 // notify that is awake
794 if(m_thread_awake->load() < m_pool_size)
795 ++(*m_thread_awake);
796 }
797 else
798 break;
799 }
800
801 // release the lock
802 if(_task_lock.owns_lock())
803 _task_lock.unlock();
804
805 //----------------------------------------------------------------//
806
807 // leave pool if conditions dictate it
808 if(leave_pool())
809 return;
810
811 // activate guard against recursive deadlock
812 data->within_task = true;
813 //----------------------------------------------------------------//
814
815 // execute the task(s)
816 while(!_task_queue->empty())
817 {
818 auto _task = _task_queue->GetTask();
819 if(_task)
820 {
821 (*_task)();
822 }
823 }
824 //----------------------------------------------------------------//
825
826 // disable guard against recursive deadlock
827 data->within_task = false;
828 //----------------------------------------------------------------//
829 }
830}
831
832//======================================================================================//
833
834} // namespace PTL
static ThreadData *& GetInstance()
Definition ThreadData.cc:31
const pool_state_type & state() const
std::map< ThreadId, uintmax_t > thread_id_map_t
size_type stop_thread()
void set_affinity(affinity_func_t f)
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
ThreadPool(const Config &)
static tbb_global_control_t *& tbb_global_control()
size_type initialize_threadpool(size_type)
static uintmax_t get_this_thread_id()
int get_verbose() const
task_queue_t *& get_valid_queue(task_queue_t *&) const
void execute_on_all_threads(FuncT &&_func)
size_type destroy_threadpool()
bool is_initialized() const
void set_priority(int _prio, Thread &) const
static const thread_id_map_t & get_thread_ids()
VUserTaskQueue task_queue_t
static uintmax_t get_thread_id(ThreadId)
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
Backports of C++ language features for use with C++11 compilers.
Definition AutoLock.hh:255
bool SetThreadPriority(int _v)
Definition Threading.cc:150
TemplateAutoLock< Mutex > AutoLock
Definition AutoLock.hh:479
tbb::global_control tbb_global_control_t
bool SetPinAffinity(int idx)
Definition Threading.cc:133
void SetThreadId(int aNewValue)
Definition Threading.cc:119
tbb::task_group tbb_task_group_t
Tp GetEnv(const std::string &env_id, Tp _default=Tp())
Definition GetEnv.hh:47