Geant4 11.1.1
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
ThreadPool.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#include "PTL/ThreadPool.hh"
32#include "PTL/ThreadData.hh"
33#include "PTL/Threading.hh"
34#include "PTL/UserTaskQueue.hh"
35#include "PTL/Utility.hh"
36#include "PTL/VUserTaskQueue.hh"
37
38#include <cassert>
39#include <mutex>
40#include <new>
41#include <stdexcept>
42#include <thread>
43
44using namespace PTL;
45
46//======================================================================================//
47
48namespace
49{
51thread_data()
52{
54}
55} // namespace
56
57//======================================================================================//
58
60ThreadPool::f_thread_ids()
61{
62 static auto _v = thread_id_map_t{};
63 return _v;
64}
65
66//======================================================================================//
67
68bool&
69ThreadPool::f_use_tbb()
70{
71 static bool _v = GetEnv<bool>("PTL_USE_TBB", false);
72 return _v;
73}
74
75//======================================================================================//
76
77bool&
78ThreadPool::f_use_cpu_affinity()
79{
80 static bool _v = GetEnv<bool>("PTL_CPU_AFFINITY", false);
81 return _v;
82}
83
84//======================================================================================//
85
86int&
87ThreadPool::f_thread_priority()
88{
89 static int _v = GetEnv<int>("PTL_THREAD_PRIORITY", 0);
90 return _v;
91}
92
93//======================================================================================//
94
95int&
96ThreadPool::f_verbose()
97{
98 static int _v = GetEnv<int>("PTL_VERBOSE", 0);
99 return _v;
100}
101
102//======================================================================================//
103
105ThreadPool::f_default_pool_size()
106{
107 static size_type _v =
108 GetEnv<size_type>("PTL_NUM_THREADS", Thread::hardware_concurrency());
109 return _v;
110}
111
112//======================================================================================//
113// static member function that calls the member function we want the thread to
114// run
115void
117{
118 if(tp->get_verbose() > 0)
119 {
120 AutoLock lock(TypeMutex<decltype(std::cerr)>());
121 std::cerr << "[PTL::ThreadPool] Starting thread " << _idx << "..." << std::endl;
122 }
123
124 auto _thr_data = std::make_shared<ThreadData>(tp);
125 {
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
128 lock.lock();
129 if(_idx < 0)
130 _idx = f_thread_ids().size();
131 f_thread_ids()[std::this_thread::get_id()] = _idx;
132 Threading::SetThreadId((int)_idx);
133 _data->emplace_back(_thr_data);
134 }
135 thread_data() = _thr_data.get();
136 tp->record_entry();
137 tp->execute_thread(thread_data()->current_queue);
138 tp->record_exit();
139
140 if(tp->get_verbose() > 0)
141 {
142 AutoLock lock(TypeMutex<decltype(std::cerr)>());
143 std::cerr << "[PTL::ThreadPool] Thread " << _idx << " terminating..."
144 << std::endl;
145 }
146}
147
148//======================================================================================//
149// static member function that checks enabling of tbb library
150bool
152{
153 return f_use_tbb();
154}
155
156//======================================================================================//
157// static member function that initialized tbb library
158void
160{
161#if defined(PTL_USE_TBB)
162 f_use_tbb() = enable;
163#else
164 ConsumeParameters(enable);
165#endif
166}
167
168//======================================================================================//
169// static member function that initialized tbb library
170void
172{
173#if defined(PTL_USE_TBB)
174 f_use_cpu_affinity() = enable;
175#else
176 ConsumeParameters(enable);
177#endif
178}
179
180//======================================================================================//
181
184{
185 return f_thread_ids();
186}
187
188//======================================================================================//
189
190uintmax_t
192{
193 uintmax_t _idx = 0;
194 {
195 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
196 if(!lock.owns_lock())
197 lock.lock();
198 auto itr = f_thread_ids().find(_tid);
199 if(itr == f_thread_ids().end())
200 {
201 _idx = f_thread_ids().size();
202 f_thread_ids()[_tid] = _idx;
203 }
204 else
205 {
206 _idx = itr->second;
207 }
208 }
209 return _idx;
210}
211
212//======================================================================================//
213
214uintmax_t
216{
217 return get_thread_id(ThisThread::get_id());
218}
219
220//======================================================================================//
221
222uintmax_t
224{
225 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
226 if(!lock.owns_lock())
227 lock.lock();
228 if(f_thread_ids().find(_tid) == f_thread_ids().end())
229 {
230 auto _idx = f_thread_ids().size();
231 f_thread_ids()[_tid] = _idx;
232 Threading::SetThreadId((int)_idx);
233 }
234 return f_thread_ids().at(_tid);
235}
236
237//======================================================================================//
238
239ThreadPool::Config::Config(bool _init, bool _use_tbb, bool _use_affinity, int _verbose,
240 int _prio, size_type _size, VUserTaskQueue* _task_queue,
241 affinity_func_t _affinity_func, initialize_func_t _init_func,
242 finalize_func_t _fini_func)
243: init{ _init }
244, use_tbb{ _use_tbb }
245, use_affinity{ _use_affinity }
246, verbose{ _verbose }
247, priority{ _prio }
248, pool_size{ _size }
249, task_queue{ _task_queue }
250, set_affinity{ std::move(_affinity_func) }
251, initializer{ std::move(_init_func) }
252, finalizer{ std::move(_fini_func) }
253{}
254
255//======================================================================================//
256
258: m_use_affinity{ _cfg.use_affinity }
259, m_tbb_tp{ _cfg.use_tbb }
260, m_verbose{ _cfg.verbose }
261, m_priority{ _cfg.priority }
262, m_pool_state{ std::make_shared<std::atomic_short>(thread_pool::state::NONINIT) }
263, m_task_queue{ _cfg.task_queue }
264, m_init_func{ _cfg.initializer }
265, m_fini_func{ _cfg.finalizer }
266, m_affinity_func{ _cfg.set_affinity }
267{
268 auto master_id = get_this_thread_id();
269 if(master_id != 0 && m_verbose > 1)
270 {
271 AutoLock lock(TypeMutex<decltype(std::cerr)>());
272 std::cerr << "[PTL::ThreadPool] ThreadPool created on worker thread" << std::endl;
273 }
274
275 thread_data() = new ThreadData(this);
276
277 // initialize after get_this_thread_id so master is zero
278 if(_cfg.init)
280}
281
282ThreadPool::ThreadPool(const size_type& _pool_size, VUserTaskQueue* _task_queue,
283 bool _use_affinity, affinity_func_t _affinity_func,
284 initialize_func_t _init_func, finalize_func_t _fini_func)
285: ThreadPool{ Config{ true, f_use_tbb(), _use_affinity, f_verbose(), f_thread_priority(),
286 _pool_size, _task_queue, std::move(_affinity_func),
287 std::move(_init_func), std::move(_fini_func) } }
288{}
289
291 finalize_func_t _fini_func, bool _use_affinity,
292 affinity_func_t _affinity_func, VUserTaskQueue* task_queue)
293: ThreadPool{ pool_size,
294 task_queue,
295 _use_affinity,
296 std::move(_affinity_func),
297 std::move(_init_func),
298 std::move(_fini_func) }
299{}
300
301//======================================================================================//
302
304{
305 if(m_alive_flag->load())
306 {
307 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
308 "destroy_threadpool() before deleting the ThreadPool object to "
309 "eliminate this message."
310 << std::endl;
311 m_pool_state->store(thread_pool::state::STOPPED);
312 m_task_lock->lock();
313 m_task_cond->notify_all();
314 m_task_lock->unlock();
315 for(auto& itr : m_threads)
316 itr.join();
317 m_threads.clear();
318 }
319
320 // delete owned resources
321 if(m_delete_task_queue)
322 delete m_task_queue;
323
324 delete m_tbb_task_arena;
325 delete m_tbb_task_group;
326}
327
328//======================================================================================//
329
330bool
332{
333 return !(m_pool_state->load() == thread_pool::state::NONINIT);
334}
335
336//======================================================================================//
337
338void
340{
341 ++(*m_thread_active);
342}
343
344//======================================================================================//
345
346void
348{
349 --(*m_thread_active);
350}
351
352//======================================================================================//
353
354void
355ThreadPool::set_affinity(intmax_t i, Thread& _thread) const
356{
357 try
358 {
359 NativeThread native_thread = _thread.native_handle();
360 intmax_t _pin = m_affinity_func(i);
361 if(m_verbose > 0)
362 {
363 AutoLock lock(TypeMutex<decltype(std::cerr)>());
364 std::cerr << "[PTL::ThreadPool] Setting pin affinity for thread "
365 << get_thread_id(_thread.get_id()) << " to " << _pin << std::endl;
366 }
367 Threading::SetPinAffinity((int)_pin, native_thread);
368 } catch(std::runtime_error& e)
369 {
370 std::cerr << "[PTL::ThreadPool] Error setting pin affinity: " << e.what()
371 << std::endl;
372 }
373}
374
375//======================================================================================//
376
377void
378ThreadPool::set_priority(int _prio, Thread& _thread) const
379{
380 try
381 {
382 NativeThread native_thread = _thread.native_handle();
383 if(m_verbose > 0)
384 {
385 AutoLock lock(TypeMutex<decltype(std::cerr)>());
386 std::cerr << "[PTL::ThreadPool] Setting thread "
387 << get_thread_id(_thread.get_id()) << " priority to " << _prio
388 << std::endl;
389 }
390 Threading::SetThreadPriority(_prio, native_thread);
391 } catch(std::runtime_error& e)
392 {
393 AutoLock lock(TypeMutex<decltype(std::cerr)>());
394 std::cerr << "[PTL::ThreadPool] Error setting thread priority: " << e.what()
395 << std::endl;
396 }
397}
398
399//======================================================================================//
400
403{
404 //--------------------------------------------------------------------//
405 // return before initializing
406 if(proposed_size < 1)
407 return 0;
408
409 //--------------------------------------------------------------------//
410 // store that has been started
411 if(!m_alive_flag->load())
412 m_pool_state->store(thread_pool::state::STARTED);
413
414#if defined(PTL_USE_TBB)
415 //--------------------------------------------------------------------//
416 // handle tbb task scheduler
417 if(m_tbb_tp)
418 {
419 m_tbb_tp = true;
420 m_pool_size = proposed_size;
421 tbb_global_control_t*& _global_control = tbb_global_control();
422 // delete if wrong size
423 if(m_pool_size != proposed_size)
424 {
425 delete _global_control;
426 _global_control = nullptr;
427 }
428
429 if(!_global_control)
430 {
431 _global_control = new tbb_global_control_t(
433 if(m_verbose > 0)
434 {
435 AutoLock lock(TypeMutex<decltype(std::cerr)>());
436 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] initialized with "
437 << m_pool_size << " threads." << std::endl;
438 }
439 }
440
441 // create task group (used for async)
442 if(!m_tbb_task_group)
443 {
444 m_tbb_task_group = new tbb_task_group_t{};
445 execute_on_all_threads([this]() { m_init_func(); });
446 }
447
448 return m_pool_size;
449 }
450#endif
451
452 m_alive_flag->store(true);
453
454 //--------------------------------------------------------------------//
455 // if started, stop some thread if smaller or return if equal
456 if(m_pool_state->load() == thread_pool::state::STARTED)
457 {
458 if(m_pool_size > proposed_size)
459 {
460 while(stop_thread() > proposed_size)
461 ;
462 if(m_verbose > 0)
463 {
464 AutoLock lock(TypeMutex<decltype(std::cerr)>());
465 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with "
466 << m_pool_size << " threads." << std::endl;
467 }
468 if(!m_task_queue)
469 {
470 m_delete_task_queue = true;
471 m_task_queue = new UserTaskQueue(m_pool_size);
472 }
473 else
474 {
475 m_task_queue->resize(m_pool_size);
476 }
477 return m_pool_size;
478 }
479 else if(m_pool_size == proposed_size) // NOLINT
480 {
481 if(m_verbose > 0)
482 {
483 AutoLock lock(TypeMutex<decltype(std::cerr)>());
484 std::cerr << "ThreadPool initialized with " << m_pool_size << " threads."
485 << std::endl;
486 }
487 if(!m_task_queue)
488 {
489 m_delete_task_queue = true;
490 m_task_queue = new UserTaskQueue(m_pool_size);
491 }
492 return m_pool_size;
493 }
494 }
495
496 //--------------------------------------------------------------------//
497 // reserve enough space to prevent realloc later
498 {
499 AutoLock _task_lock(*m_task_lock);
500 m_is_joined.reserve(proposed_size);
501 }
502
503 if(!m_task_queue)
504 {
505 m_delete_task_queue = true;
506 m_task_queue = new UserTaskQueue(proposed_size);
507 }
508
509 auto this_tid = get_this_thread_id();
510 for(size_type i = m_pool_size; i < proposed_size; ++i)
511 {
512 // add the threads
513 try
514 {
515 // create thread
516 Thread thr{ ThreadPool::start_thread, this, &m_thread_data,
517 this_tid + i + 1 };
518 // only reaches here if successful creation of thread
519 ++m_pool_size;
520 // store thread
521 m_main_threads.push_back(thr.get_id());
522 // list of joined thread booleans
523 m_is_joined.push_back(false);
524 // set the affinity
525 if(m_use_affinity)
526 set_affinity(i, thr);
527 set_priority(m_priority, thr);
528 // store
529 m_threads.emplace_back(std::move(thr));
530 } catch(std::runtime_error& e)
531 {
532 AutoLock lock(TypeMutex<decltype(std::cerr)>());
533 std::cerr << "[PTL::ThreadPool] " << e.what()
534 << std::endl; // issue creating thread
535 continue;
536 } catch(std::bad_alloc& e)
537 {
538 AutoLock lock(TypeMutex<decltype(std::cerr)>());
539 std::cerr << "[PTL::ThreadPool] " << e.what() << std::endl;
540 continue;
541 }
542 }
543 //------------------------------------------------------------------------//
544
545 AutoLock _task_lock(*m_task_lock);
546
547 // thread pool size doesn't match with join vector
548 // this will screw up joining later
549 if(m_is_joined.size() != m_main_threads.size())
550 {
551 std::stringstream ss;
552 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
553 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
554 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
555
556 throw std::runtime_error(ss.str());
557 }
558
559 if(m_verbose > 0)
560 {
561 AutoLock lock(TypeMutex<decltype(std::cerr)>());
562 std::cerr << "[PTL::ThreadPool] ThreadPool initialized with " << m_pool_size
563 << " threads." << std::endl;
564 }
565
566 return m_main_threads.size();
567}
568
569//======================================================================================//
570
573{
574 // Note: this is not for synchronization, its for thread communication!
575 // destroy_threadpool() will only be called from the main thread, yet
576 // the modified m_pool_state may not show up to other threads until its
577 // modified in a lock!
578 //------------------------------------------------------------------------//
579 m_pool_state->store(thread_pool::state::STOPPED);
580
581 //--------------------------------------------------------------------//
582 // handle tbb task scheduler
583#if defined(PTL_USE_TBB)
584 if(m_tbb_task_group)
585 {
586 execute_on_all_threads([this]() { m_fini_func(); });
587 auto _func = [&]() { m_tbb_task_group->wait(); };
588 if(m_tbb_task_arena)
589 m_tbb_task_arena->execute(_func);
590 else
591 _func();
592 delete m_tbb_task_group;
593 m_tbb_task_group = nullptr;
594 }
595 if(m_tbb_task_arena)
596 {
597 delete m_tbb_task_arena;
598 m_tbb_task_arena = nullptr;
599 }
600 if(m_tbb_tp && tbb_global_control())
601 {
602 tbb_global_control_t*& _global_control = tbb_global_control();
603 delete _global_control;
604 _global_control = nullptr;
605 m_tbb_tp = false;
606 AutoLock lock(TypeMutex<decltype(std::cerr)>());
607 if(m_verbose > 0)
608 {
609 std::cerr << "[PTL::ThreadPool] ThreadPool [TBB] destroyed" << std::endl;
610 }
611 }
612#endif
613
614 if(!m_alive_flag->load())
615 return 0;
616
617 //------------------------------------------------------------------------//
618 // notify all threads we are shutting down
619 m_task_lock->lock();
620 m_task_cond->notify_all();
621 m_task_lock->unlock();
622 //------------------------------------------------------------------------//
623
624 if(m_is_joined.size() != m_main_threads.size())
625 {
626 std::stringstream ss;
627 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
628 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
629 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
630
631 throw std::runtime_error(ss.str());
632 }
633
634 for(size_type i = 0; i < m_is_joined.size(); i++)
635 {
636 //--------------------------------------------------------------------//
637 //
638 if(i < m_threads.size())
639 m_threads.at(i).join();
640
641 //--------------------------------------------------------------------//
642 // if its joined already, nothing else needs to be done
643 if(m_is_joined.at(i))
644 continue;
645
646 //--------------------------------------------------------------------//
647 // join
648 if(std::this_thread::get_id() == m_main_threads[i])
649 continue;
650
651 //--------------------------------------------------------------------//
652 // thread id and index
653 auto _tid = m_main_threads[i];
654
655 //--------------------------------------------------------------------//
656 // erase thread from thread ID list
657 if(f_thread_ids().find(_tid) != f_thread_ids().end())
658 f_thread_ids().erase(f_thread_ids().find(_tid));
659
660 //--------------------------------------------------------------------//
661 // it's joined
662 m_is_joined.at(i) = true;
663 }
664
665 m_thread_data.clear();
666 m_threads.clear();
667 m_main_threads.clear();
668 m_is_joined.clear();
669
670 m_alive_flag->store(false);
671
672 auto start = std::chrono::steady_clock::now();
673 auto elapsed = std::chrono::duration<double>{};
674 // wait maximum of 30 seconds for threads to exit
675 while(m_thread_active->load() > 0 && elapsed.count() < 30)
676 {
677 std::this_thread::sleep_for(std::chrono::milliseconds(50));
678 elapsed = std::chrono::steady_clock::now() - start;
679 }
680
681 auto _active = m_thread_active->load();
682
683 if(get_verbose() > 0)
684 {
685 if(_active == 0)
686 {
687 AutoLock lock(TypeMutex<decltype(std::cerr)>());
688 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed" << std::endl;
689 }
690 else
691 {
692 AutoLock lock(TypeMutex<decltype(std::cerr)>());
693 std::cerr << "[PTL::ThreadPool] ThreadPool destroyed but " << _active
694 << " threads might still be active (and cause a termination error)"
695 << std::endl;
696 }
697 }
698
699 if(m_delete_task_queue)
700 {
701 delete m_task_queue;
702 m_task_queue = nullptr;
703 }
704
705 return 0;
706}
707
708//======================================================================================//
709
712{
713 if(!m_alive_flag->load() || m_pool_size == 0)
714 return 0;
715
716 m_pool_state->store(thread_pool::state::PARTIAL);
717
718 //------------------------------------------------------------------------//
719 // notify all threads we are shutting down
720 m_task_lock->lock();
721 m_is_stopped.push_back(true);
722 m_task_cond->notify_one();
723 m_task_lock->unlock();
724 //------------------------------------------------------------------------//
725
726 while(!m_is_stopped.empty() && m_stop_threads.empty())
727 ;
728
729 // lock up the task queue
730 AutoLock _task_lock(*m_task_lock);
731
732 while(!m_stop_threads.empty())
733 {
734 auto tid = m_stop_threads.front();
735 // remove from stopped
736 m_stop_threads.pop_front();
737 // remove from main
738 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
739 {
740 if(*itr == tid)
741 {
742 m_main_threads.erase(itr);
743 break;
744 }
745 }
746 // remove from join list
747 m_is_joined.pop_back();
748 }
749
750 m_pool_state->store(thread_pool::state::STARTED);
751
752 m_pool_size = m_main_threads.size();
753 return m_main_threads.size();
754}
755
756//======================================================================================//
757
760{
761 if(!_queue)
762 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
763 return _queue;
764}
765//======================================================================================//
766
767void
769{
770 // how long the thread waits on condition variable
771 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
772
773 ++(*m_thread_awake);
774
775 // initialization function
776 m_init_func();
777 // finalization function (executed when scope is destroyed)
778 ScopeDestructor _fini{ [this]() { m_fini_func(); } };
779
780 ThreadId tid = ThisThread::get_id();
781 ThreadData* data = thread_data();
782 // auto thread_bin = _task_queue->GetThreadBin();
783 // auto workers = _task_queue->workers();
784
785 auto start = std::chrono::steady_clock::now();
786 auto elapsed = std::chrono::duration<double>{};
787 // check for updates for 60 seconds max
788 while(!_task_queue && elapsed.count() < 60)
789 {
790 elapsed = std::chrono::steady_clock::now() - start;
791 data->update();
792 _task_queue = data->current_queue;
793 }
794
795 if(!_task_queue)
796 {
797 --(*m_thread_awake);
798 throw std::runtime_error("No task queue was found after 60 seconds!");
799 }
800
801 assert(data->current_queue != nullptr);
802 assert(_task_queue == data->current_queue);
803
804 // essentially a dummy run
805 if(_task_queue)
806 {
807 data->within_task = true;
808 auto _task = _task_queue->GetTask();
809 if(_task)
810 {
811 (*_task)();
812 }
813 data->within_task = false;
814 }
815
816 // threads stay in this loop forever until thread-pool destroyed
817 while(true)
818 {
819 static thread_local auto p_task_lock = m_task_lock;
820
821 //--------------------------------------------------------------------//
822 // Try to pick a task
823 AutoLock _task_lock(*p_task_lock, std::defer_lock);
824 //--------------------------------------------------------------------//
825
826 auto leave_pool = [&]() {
827 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
828 auto _pool_state = _state();
829 if(_pool_state > 0)
830 {
831 // stop whole pool
832 if(_pool_state == thread_pool::state::STOPPED)
833 {
834 if(_task_lock.owns_lock())
835 _task_lock.unlock();
836 return true;
837 }
838 // single thread stoppage
839 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
840 {
841 if(!_task_lock.owns_lock())
842 _task_lock.lock();
843 if(!m_is_stopped.empty() && m_is_stopped.back())
844 {
845 m_stop_threads.push_back(tid);
846 m_is_stopped.pop_back();
847 if(_task_lock.owns_lock())
848 _task_lock.unlock();
849 // exit entire function
850 return true;
851 }
852 if(_task_lock.owns_lock())
853 _task_lock.unlock();
854 }
855 }
856 return false;
857 };
858
859 // We need to put condition.wait() in a loop for two reasons:
860 // 1. There can be spurious wake-ups (due to signal/ENITR)
861 // 2. When mutex is released for waiting, another thread can be woken up
862 // from a signal/broadcast and that thread can mess up the condition.
863 // So when the current thread wakes up the condition may no longer be
864 // actually true!
865 while(_task_queue->empty())
866 {
867 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
868 auto _size = [&]() { return _task_queue->true_size(); };
869 auto _empty = [&]() { return _task_queue->empty(); };
870 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
871
872 if(leave_pool())
873 return;
874
875 if(_task_queue->true_size() == 0)
876 {
877 if(m_thread_awake->load() > 0)
878 --(*m_thread_awake);
879
880 // lock before sleeping on condition
881 if(!_task_lock.owns_lock())
882 _task_lock.lock();
883
884 // Wait until there is a task in the queue
885 // Unlocks mutex while waiting, then locks it back when signaled
886 // use lambda to control waking
887 m_task_cond->wait(_task_lock, _wake);
888
889 if(_state() == thread_pool::state::STOPPED)
890 return;
891
892 // unlock if owned
893 if(_task_lock.owns_lock())
894 _task_lock.unlock();
895
896 // notify that is awake
897 if(m_thread_awake->load() < m_pool_size)
898 ++(*m_thread_awake);
899 }
900 else
901 break;
902 }
903
904 // release the lock
905 if(_task_lock.owns_lock())
906 _task_lock.unlock();
907
908 //----------------------------------------------------------------//
909
910 // leave pool if conditions dictate it
911 if(leave_pool())
912 return;
913
914 // activate guard against recursive deadlock
915 data->within_task = true;
916 //----------------------------------------------------------------//
917
918 // execute the task(s)
919 while(!_task_queue->empty())
920 {
921 auto _task = _task_queue->GetTask();
922 if(_task)
923 {
924 (*_task)();
925 }
926 }
927 //----------------------------------------------------------------//
928
929 // disable guard against recursive deadlock
930 data->within_task = false;
931 //----------------------------------------------------------------//
932 }
933}
934
935//======================================================================================//
static ThreadData *& GetInstance()
Definition: ThreadData.cc:32
VUserTaskQueue * current_queue
Definition: ThreadData.hh:152
std::vector< std::shared_ptr< ThreadData > > thread_data_t
Definition: ThreadPool.hh:111
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:115
ThreadPool(const Config &)
Definition: ThreadPool.cc:257
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:116
void record_entry()
Definition: ThreadPool.cc:339
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:215
const pool_state_type & state() const
Definition: ThreadPool.hh:250
void set_priority(int _prio, Thread &) const
Definition: ThreadPool.cc:378
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:108
static bool using_tbb()
Definition: ThreadPool.cc:151
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:759
size_t size_type
Definition: ThreadPool.hh:94
virtual ~ThreadPool()
Definition: ThreadPool.cc:303
static uintmax_t get_thread_id(ThreadId)
Definition: ThreadPool.cc:191
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:264
size_type stop_thread()
Definition: ThreadPool.cc:711
static uintmax_t add_thread_id(ThreadId=ThisThread::get_id())
Definition: ThreadPool.cc:223
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:384
int get_verbose() const
Definition: ThreadPool.hh:269
void execute_on_all_threads(FuncT &&_func)
Definition: ThreadPool.hh:493
std::function< void()> finalize_func_t
Definition: ThreadPool.hh:114
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:183
static void set_default_use_cpu_affinity(bool _v)
set the default use of cpu affinity
Definition: ThreadPool.cc:171
size_type destroy_threadpool()
Definition: ThreadPool.cc:572
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:402
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:768
std::function< void()> initialize_func_t
Definition: ThreadPool.hh:113
bool is_initialized() const
Definition: ThreadPool.cc:331
static void set_use_tbb(bool _v)
Definition: ThreadPool.cc:159
void record_exit()
Definition: ThreadPool.cc:347
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
virtual void resize(intmax_t)=0
auto execute(FuncT &&_func) -> decltype(_func())
Definition: ThreadData.hh:115
bool SetPinAffinity(int idx)
Definition: Threading.cc:129
bool SetThreadPriority(int _v)
Definition: Threading.cc:146
void SetThreadId(int aNewValue)
Definition: Threading.cc:115
Definition: AutoLock.hh:255
std::thread::native_handle_type NativeThread
Definition: Threading.hh:38
void ConsumeParameters(Args &&...)
Definition: Utility.hh:44
Thread::id ThreadId
Definition: Threading.hh:46
std::thread Thread
Definition: Threading.hh:37
MutexTp & TypeMutex(const unsigned int &_n=0)
Definition: Threading.hh:74
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:123
Config(bool, bool, bool, int, int, size_type, VUserTaskQueue *, affinity_func_t, initialize_func_t, finalize_func_t)
Definition: ThreadPool.cc:239