Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
UserTaskQueue.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21// Class Description:
22// ---------------------------------------------------------------
23// Author: Jonathan Madsen
24// ---------------------------------------------------------------
25
26#include "PTL/UserTaskQueue.hh"
27#include "PTL/Task.hh"
28#include "PTL/TaskGroup.hh"
29#include "PTL/ThreadPool.hh"
30
31#include <cassert>
32
33using namespace PTL;
34
35//======================================================================================//
36
38: VUserTaskQueue(nworkers)
39, m_is_clone((parent) ? true : false)
40, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
41, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
42, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
43, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
44, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
45{
46 // create nthreads + 1 subqueues so there is always a subqueue available
47 if(!parent)
48 {
49 for(intmax_t i = 0; i < nworkers + 1; ++i)
50 m_subqueues->push_back(new TaskSubQueue(m_ntasks));
51 }
52
53#if defined(DEBUG)
54 if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
55 {
56 RecursiveAutoLock l(TypeRecursiveMutex<decltype(std::cout)>());
57 std::stringstream ss;
58 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
59 << __FUNCTION__ << ":" << __LINE__ << "] "
60 << "this = " << this << ", "
61 << "clone = " << std::boolalpha << m_is_clone << ", "
62 << "thread = " << m_thread_bin << ", "
63 << "insert = " << m_insert_bin << ", "
64 << "hold = " << m_hold->load() << " @ " << m_hold << ", "
65 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
66 << "subqueue = " << m_subqueues << ", "
67 << "size = " << true_size() << ", "
68 << "empty = " << true_empty();
69 std::cout << ss.str() << std::endl;
70 }
71#endif
72}
73
74//======================================================================================//
75
77{
78 if(!m_is_clone)
79 {
80 for(auto& itr : *m_subqueues)
81 {
82 assert(itr->size() == 0);
83 delete itr;
84 }
85 m_subqueues->clear();
86 delete m_hold;
87 delete m_ntasks;
88 delete m_subqueues;
89 }
90}
91
92//======================================================================================//
93
94void
96{
97 AutoLock l(m_mutex);
98 if(m_workers < n)
99 {
100 while(m_workers < n)
101 {
102 m_subqueues->push_back(new TaskSubQueue(m_ntasks));
103 ++m_workers;
104 }
105 }
106 else if(m_workers > n)
107 {
108 while(m_workers > n)
109 {
110 delete m_subqueues->back();
111 m_subqueues->pop_back();
112 --m_workers;
113 }
114 }
115}
116
117//======================================================================================//
118
121{
122 return new UserTaskQueue(workers(), this);
123}
124//======================================================================================//
125
126intmax_t
128{
129 // get a thread id number
130 static thread_local intmax_t tl_bin =
131 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
132 return tl_bin;
133}
134
135//======================================================================================//
136
137intmax_t
139{
140 return (++m_insert_bin % (m_workers + 1));
141}
142
143//======================================================================================//
144
147{
148 intmax_t tbin = GetThreadBin();
149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
150 task_pointer _task = nullptr;
151
152 //------------------------------------------------------------------------//
153 auto get_task = [&]() {
154 if(task_subq->AcquireClaim())
155 {
156 // run task
157 _task = task_subq->PopTask(true);
158 // release the claim on the bin
159 task_subq->ReleaseClaim();
160 }
161 if(_task)
162 --(*m_ntasks);
163 // return success if valid pointer
164 return (_task != nullptr);
165 };
166 //------------------------------------------------------------------------//
167
168 // while not empty
169 while(!task_subq->empty())
170 {
171 if(get_task())
172 break;
173 }
174 return _task;
175}
176
177//======================================================================================//
178
180UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
181{
182 // exit if empty
183 if(this->true_empty())
184 return nullptr;
185
186 // ensure the thread has a bin assignment
187 intmax_t tbin = GetThreadBin();
188 intmax_t n = (subq < 0) ? tbin : subq;
189 if(nitr < 1)
190 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
191
192 if(m_hold->load(std::memory_order_relaxed))
193 {
194 return GetThreadBinTask();
195 }
196
197 task_pointer _task = nullptr;
198 //------------------------------------------------------------------------//
199 auto get_task = [&](intmax_t _n) {
200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
201 // try to acquire a claim for the bin
202 // if acquired, no other threads will access bin until claim is released
203 if(!task_subq->empty() && task_subq->AcquireClaim())
204 {
205 // pop task out of bin
206 _task = task_subq->PopTask(n == tbin);
207 // release the claim on the bin
208 task_subq->ReleaseClaim();
209 }
210 if(_task)
211 --(*m_ntasks);
212 // return success if valid pointer
213 return (_task != nullptr);
214 };
215 //------------------------------------------------------------------------//
216
217 // there are num_workers+1 bins so there is always a bin that is open
218 // execute num_workers+2 iterations so the thread checks its bin twice
219 // while(!empty())
220 {
221 for(intmax_t i = 0; i < nitr; ++i, ++n)
222 {
223 if(get_task(n % (m_workers + 1)))
224 return _task;
225 }
226 }
227
228 // only reached if looped over all bins (and looked in own bin twice)
229 // and found no work so return an empty task and the thread will be put to
230 // sleep if there is still no work by the time it reaches its
231 // condition variable
232 return _task;
233}
234
235//======================================================================================//
236
237intmax_t
239{
240 // skip increment here (handled externally)
241 ++(*m_ntasks);
242
243 bool spin = m_hold->load(std::memory_order_relaxed);
244 intmax_t tbin = GetThreadBin();
245
246 if(data && data->within_task)
247 {
248 subq = tbin;
249 // spin = true;
250 }
251
252 // subq is -1 unless specified so unless specified
253 // GetInsertBin() call increments a counter and returns
254 // counter % (num_workers + 1) so that tasks are distributed evenly
255 // among the bins
256 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
257
258 //------------------------------------------------------------------------//
259 auto insert_task = [&](intmax_t _n) {
260 TaskSubQueue* task_subq = (*m_subqueues)[_n];
261 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
262 // if not threads bin and size difference, insert into smaller
263 // if(n != tbin && next_subq->size() < task_subq->size())
264 // task_subq = next_subq;
265 // try to acquire a claim for the bin
266 // if acquired, no other threads will access bin until claim is released
267 if(task_subq->AcquireClaim())
268 {
269 // push the task into the bin
270 task_subq->PushTask(task);
271 // release the claim on the bin
272 task_subq->ReleaseClaim();
273 // return success
274 return true;
275 }
276 return false;
277 };
278 //------------------------------------------------------------------------//
279
280 // if not in "hold/spin mode", where thread only inserts tasks into
281 // specified bin, then move onto next bin
282 //
283 if(spin)
284 {
285 n = n % (m_workers + 1);
286 while(!insert_task(n))
287 ;
288 return n;
289 }
290
291 // there are num_workers+1 bins so there is always a bin that is open
292 // execute num_workers+2 iterations so the thread checks its bin twice
293 while(true)
294 {
295 auto _n = (n++) % (m_workers + 1);
296 if(insert_task(_n))
297 return _n;
298 }
299 return GetThreadBin();
300}
301
302//======================================================================================//
303
304void
306{
307 typedef TaskGroup<int, int> task_group_type;
308 typedef std::map<int64_t, bool> thread_execute_map_t;
309
310 if(!tp->is_alive())
311 {
312 func();
313 return;
314 }
315
316 auto join_func = [=](int& ref, int i) {
317 ref += i;
318 return ref;
319 };
320 task_group_type* tg = new task_group_type(join_func, tp);
321
322 // wait for all threads to finish any work
323 // NOTE: will cause deadlock if called from a task
324 while(tp->get_active_threads_count() > 0)
325 ThisThread::sleep_for(std::chrono::milliseconds(10));
326
327 thread_execute_map_t* thread_execute_map = new thread_execute_map_t();
328
329 AcquireHold();
330 for(int i = 0; i < (m_workers + 1); ++i)
331 {
332 if(i == GetThreadBin())
333 continue;
334
335 //--------------------------------------------------------------------//
336 auto thread_specific_func = [&]() {
337 static Mutex _mtx;
338 _mtx.lock();
339 bool& _executed = (*thread_execute_map)[GetThreadBin()];
340 _mtx.unlock();
341 if(!_executed)
342 {
343 func();
344 _executed = true;
345 return 1;
346 }
347 return 0;
348 };
349 //--------------------------------------------------------------------//
350
351 auto _task = tg->wrap(thread_specific_func);
352 //++(*m_ntasks);
353 // TaskSubQueue* task_subq = (*m_subqueues)[i];
354 // task_subq->PushTask(_task);
356 }
357
358 tp->notify_all();
359 int nexecuted = tg->join();
360 if(nexecuted != m_workers)
361 {
362 std::stringstream msg;
363 msg << "Failure executing routine on all threads! Only " << nexecuted
364 << " threads executed function out of " << m_workers;
365 std::cerr << msg.str() << std::endl;
366 // Exception("UserTaskQueue::ExecuteOnAllThreads", "TaskQueue0000",
367 // JustWarning, msg);
368 }
369 delete thread_execute_map;
370 ReleaseHold();
371}
372
373//======================================================================================//
374
375void
377 function_type func)
378{
379 typedef TaskGroup<int, int> task_group_type;
380 typedef std::map<int64_t, bool> thread_execute_map_t;
381
382 auto join_func = [=](int& ref, int i) {
383 ref += i;
384 return ref;
385 };
386 task_group_type* tg = new task_group_type(join_func, tp);
387
388 // wait for all threads to finish any work
389 // NOTE: will cause deadlock if called from a task
390 while(tp->get_active_threads_count() > 0)
391 ThisThread::sleep_for(std::chrono::milliseconds(10));
392
393 if(!tp->is_alive())
394 {
395 func();
396 return;
397 }
398
399 thread_execute_map_t* thread_execute_map = new thread_execute_map_t();
400
401 //========================================================================//
402 // wrap the function so that it will only be executed if the thread
403 // has an ID in the set
404 auto thread_specific_func = [=]() {
405 static Mutex _mtx;
406 _mtx.lock();
407 bool& _executed = (*thread_execute_map)[GetThreadBin()];
408 _mtx.unlock();
409 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
410 {
411 func();
412 _executed = true;
413 return 1;
414 }
415 return 0;
416 };
417 //========================================================================//
418
419 if(tid_set.count(ThisThread::get_id()) > 0)
420 func();
421
422 AcquireHold();
423 for(int i = 0; i < (m_workers + 1); ++i)
424 {
425 if(i == GetThreadBin())
426 continue;
427
428 auto _task = tg->wrap(thread_specific_func);
430 }
431 tp->notify_all();
432 int nexecuted = tg->join();
433 if(nexecuted != m_workers)
434 {
435 std::stringstream msg;
436 msg << "Failure executing routine on all threads! Only " << nexecuted
437 << " threads executed function out of " << tid_set.size();
438 std::cerr << msg.str() << std::endl;
439 // Exception("UserTaskQueue::ExecuteOnSpecificThreads", "TaskQueue0001",
440 // JustWarning, msg);
441 }
442 delete thread_execute_map;
443 ReleaseHold();
444}
445
446//======================================================================================//
447
448void
449UserTaskQueue::AcquireHold()
450{
451 bool _hold;
452 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
453 {
454 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
455 std::memory_order_relaxed);
456 }
457}
458
459//======================================================================================//
460
461void
462UserTaskQueue::ReleaseHold()
463{
464 bool _hold;
465 while((_hold = m_hold->load(std::memory_order_relaxed)))
466 {
467 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
468 std::memory_order_relaxed);
469 }
470}
471
472//======================================================================================//
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:120
virtual VUserTaskQueue * clone() override
std::vector< TaskSubQueue * > TaskSubQueueContainer
virtual void resize(intmax_t) override
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
virtual intmax_t GetThreadBin() const override
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1) override
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
bool true_empty() const override
virtual ~UserTaskQueue() override
task_pointer GetThreadBinTask()
size_type true_size() const override
VTask is the abstract class stored in thread_pool.
Definition: VTask.hh:55
intmax_t workers() const
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
Definition: AutoLock.hh:254
std::mutex Mutex
Definition: Threading.hh:76
RecursiveMutex & TypeRecursiveMutex(const unsigned int &_n=0)
Definition: Threading.hh:140