Geant4 11.2.2
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::UserTaskQueue Class Reference

#include <UserTaskQueue.hh>

+ Inheritance diagram for PTL::UserTaskQueue:

Public Types

using task_pointer = std::shared_ptr<VTask>
 
using TaskSubQueueContainer = std::vector<TaskSubQueue*>
 
using random_engine_t = std::default_random_engine
 
using int_dist_t = std::uniform_int_distribution<int>
 
- Public Types inherited from PTL::VUserTaskQueue
using task_pointer = std::shared_ptr<VTask>
 
using AtomicInt = std::atomic<intmax_t>
 
using size_type = uintmax_t
 
using function_type = std::function<void()>
 
using ThreadIdSet = std::set<ThreadId>
 

Public Member Functions

 UserTaskQueue (intmax_t nworkers=-1, UserTaskQueue *=nullptr)
 
 ~UserTaskQueue () override
 
task_pointer GetTask (intmax_t subq=-1, intmax_t nitr=-1) override
 
intmax_t InsertTask (task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
 
task_pointer GetThreadBinTask ()
 
void Wait () override
 
void resize (intmax_t) override
 
bool empty () const override
 
size_type size () const override
 
size_type bin_size (size_type bin) const override
 
bool bin_empty (size_type bin) const override
 
bool true_empty () const override
 
size_type true_size () const override
 
void ExecuteOnAllThreads (ThreadPool *tp, function_type f) override
 
void ExecuteOnSpecificThreads (ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
 
VUserTaskQueueclone () override
 
intmax_t GetThreadBin () const override
 
- Public Member Functions inherited from PTL::VUserTaskQueue
 VUserTaskQueue (intmax_t nworkers=-1)
 
virtual ~VUserTaskQueue ()=default
 
intmax_t workers () const
 

Protected Member Functions

intmax_t GetInsertBin () const
 

Additional Inherited Members

- Protected Attributes inherited from PTL::VUserTaskQueue
intmax_t m_workers = 0
 

Detailed Description

Definition at line 42 of file UserTaskQueue.hh.

Member Typedef Documentation

◆ int_dist_t

using PTL::UserTaskQueue::int_dist_t = std::uniform_int_distribution<int>

Definition at line 48 of file UserTaskQueue.hh.

◆ random_engine_t

using PTL::UserTaskQueue::random_engine_t = std::default_random_engine

Definition at line 47 of file UserTaskQueue.hh.

◆ task_pointer

using PTL::UserTaskQueue::task_pointer = std::shared_ptr<VTask>

Definition at line 45 of file UserTaskQueue.hh.

◆ TaskSubQueueContainer

Definition at line 46 of file UserTaskQueue.hh.

Constructor & Destructor Documentation

◆ UserTaskQueue()

UserTaskQueue::UserTaskQueue ( intmax_t nworkers = -1,
UserTaskQueue * parent = nullptr )

Definition at line 48 of file UserTaskQueue.cc.

49: VUserTaskQueue(nworkers)
50, m_is_clone((parent) != nullptr)
51, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
53, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
54, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
55, m_mutex((parent) ? parent->m_mutex : new Mutex{})
56, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
57{
58 // create nthreads + 1 subqueues so there is always a subqueue available
59 if(!parent)
60 {
61 for(intmax_t i = 0; i < nworkers + 1; ++i)
62 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
63 }
64
65#if defined(DEBUG)
66 if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
67 {
68 RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>());
69 std::stringstream ss;
70 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
71 << __FUNCTION__ << ":" << __LINE__ << "] "
72 << "this = " << this << ", "
73 << "clone = " << std::boolalpha << m_is_clone << ", "
74 << "thread = " << m_thread_bin << ", "
75 << "insert = " << m_insert_bin << ", "
76 << "hold = " << m_hold->load() << " @ " << m_hold << ", "
77 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
78 << "subqueue = " << m_subqueues << ", "
79 << "size = " << true_size() << ", "
80 << "empty = " << true_empty();
81 std::cout << ss.str() << std::endl;
82 }
83#endif
84}
static uintmax_t get_this_thread_id()
size_type true_size() const override
std::vector< TaskSubQueue * > TaskSubQueueContainer
bool true_empty() const override
VUserTaskQueue(intmax_t nworkers=-1)
std::mutex Mutex
Definition Threading.hh:57
std::recursive_mutex RecursiveMutex
Definition Threading.hh:58
MutexTp & TypeMutex(const unsigned int &_n=0)
Definition Threading.hh:74
Tp GetEnv(const std::string &env_id, Tp _default=Tp())
Definition Utility.hh:155

Referenced by clone().

◆ ~UserTaskQueue()

UserTaskQueue::~UserTaskQueue ( )
override

Definition at line 88 of file UserTaskQueue.cc.

89{
90 if(!m_is_clone)
91 {
92 for(auto& itr : *m_subqueues)
93 {
94 assert(itr->empty());
95 delete itr;
96 }
97 m_subqueues->clear();
98 delete m_hold;
99 delete m_ntasks;
100 delete m_mutex;
101 delete m_subqueues;
102 }
103}

Member Function Documentation

◆ bin_empty()

bool PTL::UserTaskQueue::bin_empty ( size_type bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 135 of file UserTaskQueue.hh.

136{
137 return (*m_subqueues)[bin]->empty();
138}

◆ bin_size()

UserTaskQueue::size_type PTL::UserTaskQueue::bin_size ( size_type bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 127 of file UserTaskQueue.hh.

128{
129 return (*m_subqueues)[bin]->size();
130}

◆ clone()

VUserTaskQueue * UserTaskQueue::clone ( )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 135 of file UserTaskQueue.cc.

136{
137 return new UserTaskQueue(workers(), this);
138}
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
intmax_t workers() const

◆ empty()

bool PTL::UserTaskQueue::empty ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 111 of file UserTaskQueue.hh.

112{
113 return (m_ntasks->load(std::memory_order_relaxed) == 0);
114}

◆ ExecuteOnAllThreads()

void UserTaskQueue::ExecuteOnAllThreads ( ThreadPool * tp,
function_type f )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 320 of file UserTaskQueue.cc.

321{
322 using task_group_type = TaskGroup<int, int>;
323 using thread_execute_map_t = std::map<int64_t, bool>;
324
325 if(!tp->is_alive())
326 {
327 func();
328 return;
329 }
330
331 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
332
333 // wait for all threads to finish any work
334 // NOTE: will cause deadlock if called from a task
335 while(tp->get_active_threads_count() > 0)
336 ThisThread::sleep_for(std::chrono::milliseconds(10));
337
338 thread_execute_map_t thread_execute_map{};
339 std::vector<std::shared_ptr<VTask>> _tasks{};
340 _tasks.reserve(m_workers + 1);
341
342 AcquireHold();
343 for(int i = 0; i < (m_workers + 1); ++i)
344 {
345 if(i == GetThreadBin())
346 continue;
347
348 //--------------------------------------------------------------------//
349 auto thread_specific_func = [&]() {
350 ScopeDestructor _dtor = tg.get_scope_destructor();
351 static Mutex _mtx;
352 _mtx.lock();
353 bool& _executed = thread_execute_map[GetThreadBin()];
354 _mtx.unlock();
355 if(!_executed)
356 {
357 func();
358 _executed = true;
359 return 1;
360 }
361 return 0;
362 };
363 //--------------------------------------------------------------------//
364
365 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
366 }
367
368 tp->notify_all();
369 int nexecuted = tg.join();
370 if(nexecuted != m_workers)
371 {
372 std::stringstream msg;
373 msg << "Failure executing routine on all threads! Only " << nexecuted
374 << " threads executed function out of " << m_workers << " workers";
375 std::cerr << msg.str() << std::endl;
376 }
377 ReleaseHold();
378}
static ThreadData *& GetInstance()
Definition ThreadData.cc:32
intmax_t GetThreadBin() const override
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD

◆ ExecuteOnSpecificThreads()

void UserTaskQueue::ExecuteOnSpecificThreads ( ThreadIdSet tid_set,
ThreadPool * tp,
function_type f )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 383 of file UserTaskQueue.cc.

385{
386 using task_group_type = TaskGroup<int, int>;
387 using thread_execute_map_t = std::map<int64_t, bool>;
388
389 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
390
391 // wait for all threads to finish any work
392 // NOTE: will cause deadlock if called from a task
393 while(tp->get_active_threads_count() > 0)
394 ThisThread::sleep_for(std::chrono::milliseconds(10));
395
396 if(!tp->is_alive())
397 {
398 func();
399 return;
400 }
401
402 thread_execute_map_t thread_execute_map{};
403
404 //========================================================================//
405 // wrap the function so that it will only be executed if the thread
406 // has an ID in the set
407 auto thread_specific_func = [&]() {
408 ScopeDestructor _dtor = tg.get_scope_destructor();
409 static Mutex _mtx;
410 _mtx.lock();
411 bool& _executed = thread_execute_map[GetThreadBin()];
412 _mtx.unlock();
413 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
414 {
415 func();
416 _executed = true;
417 return 1;
418 }
419 return 0;
420 };
421 //========================================================================//
422
423 if(tid_set.count(ThisThread::get_id()) > 0)
424 func();
425
426 AcquireHold();
427 for(int i = 0; i < (m_workers + 1); ++i)
428 {
429 if(i == GetThreadBin())
430 continue;
431
432 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
433 }
434 tp->notify_all();
435 decltype(tid_set.size()) nexecuted = tg.join();
436 if(nexecuted != tid_set.size())
437 {
438 std::stringstream msg;
439 msg << "Failure executing routine on specific threads! Only " << nexecuted
440 << " threads executed function out of " << tid_set.size() << " workers";
441 std::cerr << msg.str() << std::endl;
442 }
443 ReleaseHold();
444}

◆ GetInsertBin()

intmax_t UserTaskQueue::GetInsertBin ( ) const
protected

Definition at line 153 of file UserTaskQueue.cc.

154{
155 return (++m_insert_bin % (m_workers + 1));
156}

Referenced by InsertTask().

◆ GetTask()

UserTaskQueue::task_pointer UserTaskQueue::GetTask ( intmax_t subq = -1,
intmax_t nitr = -1 )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 195 of file UserTaskQueue.cc.

196{
197 // exit if empty
198 if(this->true_empty())
199 return nullptr;
200
201 // ensure the thread has a bin assignment
202 intmax_t tbin = GetThreadBin();
203 intmax_t n = (subq < 0) ? tbin : subq;
204 if(nitr < 1)
205 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
206
207 if(m_hold->load(std::memory_order_relaxed))
208 {
209 return GetThreadBinTask();
210 }
211
212 task_pointer _task = nullptr;
213 //------------------------------------------------------------------------//
214 auto get_task = [&](intmax_t _n) {
215 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
216 // try to acquire a claim for the bin
217 // if acquired, no other threads will access bin until claim is released
218 if(!task_subq->empty() && task_subq->AcquireClaim())
219 {
220 // pop task out of bin
221 _task = task_subq->PopTask(n == tbin);
222 // release the claim on the bin
223 task_subq->ReleaseClaim();
224 }
225 if(_task)
226 --(*m_ntasks);
227 // return success if valid pointer
228 return (_task != nullptr);
229 };
230 //------------------------------------------------------------------------//
231
232 // there are num_workers+1 bins so there is always a bin that is open
233 // execute num_workers+2 iterations so the thread checks its bin twice
234 // while(!empty())
235 {
236 for(intmax_t i = 0; i < nitr; ++i, ++n)
237 {
238 if(get_task(n % (m_workers + 1)))
239 return _task;
240 }
241 }
242
243 // only reached if looped over all bins (and looked in own bin twice)
244 // and found no work so return an empty task and the thread will be put to
245 // sleep if there is still no work by the time it reaches its
246 // condition variable
247 return _task;
248}
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
bool empty() const
std::shared_ptr< VTask > task_pointer
task_pointer GetThreadBinTask()

◆ GetThreadBin()

intmax_t UserTaskQueue::GetThreadBin ( ) const
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 142 of file UserTaskQueue.cc.

143{
144 // get a thread id number
145 static thread_local intmax_t tl_bin =
146 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
147 return tl_bin;
148}

Referenced by ExecuteOnAllThreads(), ExecuteOnSpecificThreads(), GetTask(), GetThreadBinTask(), and InsertTask().

◆ GetThreadBinTask()

UserTaskQueue::task_pointer UserTaskQueue::GetThreadBinTask ( )

Definition at line 161 of file UserTaskQueue.cc.

162{
163 intmax_t tbin = GetThreadBin();
164 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
165 task_pointer _task = nullptr;
166
167 //------------------------------------------------------------------------//
168 auto get_task = [&]() {
169 if(task_subq->AcquireClaim())
170 {
171 // run task
172 _task = task_subq->PopTask(true);
173 // release the claim on the bin
174 task_subq->ReleaseClaim();
175 }
176 if(_task)
177 --(*m_ntasks);
178 // return success if valid pointer
179 return (_task != nullptr);
180 };
181 //------------------------------------------------------------------------//
182
183 // while not empty
184 while(!task_subq->empty())
185 {
186 if(get_task())
187 break;
188 }
189 return _task;
190}

Referenced by GetTask().

◆ InsertTask()

intmax_t UserTaskQueue::InsertTask ( task_pointer && task,
ThreadData * data = nullptr,
intmax_t subq = -1 )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 253 of file UserTaskQueue.cc.

254{
255 // increment number of tasks
256 ++(*m_ntasks);
257
258 bool spin = m_hold->load(std::memory_order_relaxed);
259 intmax_t tbin = GetThreadBin();
260
261 if(data && data->within_task)
262 {
263 subq = tbin;
264 // spin = true;
265 }
266
267 // subq is -1 unless specified so unless specified
268 // GetInsertBin() call increments a counter and returns
269 // counter % (num_workers + 1) so that tasks are distributed evenly
270 // among the bins
271 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
272
273 //------------------------------------------------------------------------//
274 auto insert_task = [&](intmax_t _n) {
275 TaskSubQueue* task_subq = (*m_subqueues)[_n];
276 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
277 // if not threads bin and size difference, insert into smaller
278 // if(n != tbin && next_subq->size() < task_subq->size())
279 // task_subq = next_subq;
280 // try to acquire a claim for the bin
281 // if acquired, no other threads will access bin until claim is released
282 if(task_subq->AcquireClaim())
283 {
284 // push the task into the bin
285 task_subq->PushTask(std::move(task));
286 // release the claim on the bin
287 task_subq->ReleaseClaim();
288 // return success
289 return true;
290 }
291 return false;
292 };
293 //------------------------------------------------------------------------//
294
295 // if not in "hold/spin mode", where thread only inserts tasks into
296 // specified bin, then move onto next bin
297 //
298 if(spin)
299 {
300 n = n % (m_workers + 1);
301 while(!insert_task(n))
302 ;
303 return n;
304 }
305
306 // there are num_workers+1 bins so there is always a bin that is open
307 // execute num_workers+2 iterations so the thread checks its bin twice
308 while(true)
309 {
310 auto _n = (n++) % (m_workers + 1);
311 if(insert_task(_n))
312 return _n;
313 }
314 return GetThreadBin();
315}
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
intmax_t GetInsertBin() const

Referenced by ExecuteOnAllThreads(), and ExecuteOnSpecificThreads().

◆ resize()

void UserTaskQueue::resize ( intmax_t n)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 108 of file UserTaskQueue.cc.

109{
110 if(!m_mutex)
111 throw std::runtime_error("nullptr to mutex");
112 AutoLock lk(m_mutex);
113 if(m_workers < n)
114 {
115 while(m_workers < n)
116 {
117 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
118 ++m_workers;
119 }
120 }
121 else if(m_workers > n)
122 {
123 while(m_workers > n)
124 {
125 delete m_subqueues->back();
126 m_subqueues->pop_back();
127 --m_workers;
128 }
129 }
130}

◆ size()

UserTaskQueue::size_type PTL::UserTaskQueue::size ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 119 of file UserTaskQueue.hh.

120{
121 return m_ntasks->load(std::memory_order_relaxed);
122}

◆ true_empty()

bool PTL::UserTaskQueue::true_empty ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 143 of file UserTaskQueue.hh.

144{
145 for(const auto& itr : *m_subqueues)
146 if(!itr->empty())
147 return false;
148 return true;
149}

Referenced by GetTask(), and UserTaskQueue().

◆ true_size()

UserTaskQueue::size_type PTL::UserTaskQueue::true_size ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 154 of file UserTaskQueue.hh.

155{
156 size_type _n = 0;
157 for(const auto& itr : *m_subqueues)
158 _n += itr->size();
159 return _n;
160}

Referenced by UserTaskQueue().

◆ Wait()

void PTL::UserTaskQueue::Wait ( )
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 68 of file UserTaskQueue.hh.

68{}

The documentation for this class was generated from the following files: