Geant4 11.3.0
Toolkit for the simulation of the passage of particles through matter
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
UserTaskQueue.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21// Class Description:
22// ---------------------------------------------------------------
23// Author: Jonathan Madsen
24// ---------------------------------------------------------------
25
26#include "PTL/UserTaskQueue.hh"
27
28#include "PTL/AutoLock.hh"
30#include "PTL/TaskGroup.hh"
31#include "PTL/ThreadData.hh"
32#include "PTL/ThreadPool.hh"
33
34#include <cassert>
35#include <chrono>
36#include <functional>
37#include <iostream>
38#include <map>
39#include <stdexcept>
40#include <thread>
41#include <utility>
42
43namespace PTL
44{
45//======================================================================================//
46
48: VUserTaskQueue(nworkers)
49, m_is_clone((parent) != nullptr)
50, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
51, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
53, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
54, m_mutex((parent) ? parent->m_mutex : new Mutex{})
55, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
56{
57 // create nthreads + 1 subqueues so there is always a subqueue available
58 if(!parent)
59 {
60 for(intmax_t i = 0; i < nworkers + 1; ++i)
61 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
62 }
63}
64
65//======================================================================================//
66
68{
69 if(!m_is_clone)
70 {
71 for(auto& itr : *m_subqueues)
72 {
73 assert(itr->empty());
74 delete itr;
75 }
76 m_subqueues->clear();
77 delete m_hold;
78 delete m_ntasks;
79 delete m_mutex;
80 delete m_subqueues;
81 }
82}
83
84//======================================================================================//
85
86void
88{
89 if(!m_mutex)
90 throw std::runtime_error("nullptr to mutex");
91 AutoLock lk(m_mutex);
92 if(m_workers < n)
93 {
94 while(m_workers < n)
95 {
96 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
97 ++m_workers;
98 }
99 }
100 else if(m_workers > n)
101 {
102 while(m_workers > n)
103 {
104 delete m_subqueues->back();
105 m_subqueues->pop_back();
106 --m_workers;
107 }
108 }
109}
110
111//======================================================================================//
112
115{
116 return new UserTaskQueue(workers(), this);
117}
118//======================================================================================//
119
120intmax_t
122{
123 // get a thread id number
124 static thread_local intmax_t tl_bin =
125 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126 return tl_bin;
127}
128
129//======================================================================================//
130
131intmax_t
133{
134 return (++m_insert_bin % (m_workers + 1));
135}
136
137//======================================================================================//
138
141{
142 intmax_t tbin = GetThreadBin();
143 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144 task_pointer _task = nullptr;
145
146 //------------------------------------------------------------------------//
147 auto get_task = [&]() {
148 if(task_subq->AcquireClaim())
149 {
150 // run task
151 _task = task_subq->PopTask(true);
152 // release the claim on the bin
153 task_subq->ReleaseClaim();
154 }
155 if(_task)
156 --(*m_ntasks);
157 // return success if valid pointer
158 return (_task != nullptr);
159 };
160 //------------------------------------------------------------------------//
161
162 // while not empty
163 while(!task_subq->empty())
164 {
165 if(get_task())
166 break;
167 }
168 return _task;
169}
170
171//======================================================================================//
172
174UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
175{
176 // exit if empty
177 if(this->true_empty())
178 return nullptr;
179
180 // ensure the thread has a bin assignment
181 intmax_t tbin = GetThreadBin();
182 intmax_t n = (subq < 0) ? tbin : subq;
183 if(nitr < 1)
184 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
185
186 if(m_hold->load(std::memory_order_relaxed))
187 {
188 return GetThreadBinTask();
189 }
190
191 task_pointer _task = nullptr;
192 //------------------------------------------------------------------------//
193 auto get_task = [&](intmax_t _n) {
194 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195 // try to acquire a claim for the bin
196 // if acquired, no other threads will access bin until claim is released
197 if(!task_subq->empty() && task_subq->AcquireClaim())
198 {
199 // pop task out of bin
200 _task = task_subq->PopTask(n == tbin);
201 // release the claim on the bin
202 task_subq->ReleaseClaim();
203 }
204 if(_task)
205 --(*m_ntasks);
206 // return success if valid pointer
207 return (_task != nullptr);
208 };
209 //------------------------------------------------------------------------//
210
211 // there are num_workers+1 bins so there is always a bin that is open
212 // execute num_workers+2 iterations so the thread checks its bin twice
213 // while(!empty())
214 {
215 for(intmax_t i = 0; i < nitr; ++i, ++n)
216 {
217 if(get_task(n % (m_workers + 1)))
218 return _task;
219 }
220 }
221
222 // only reached if looped over all bins (and looked in own bin twice)
223 // and found no work so return an empty task and the thread will be put to
224 // sleep if there is still no work by the time it reaches its
225 // condition variable
226 return _task;
227}
228
229//======================================================================================//
230
231intmax_t
233{
234 // increment number of tasks
235 ++(*m_ntasks);
236
237 bool spin = m_hold->load(std::memory_order_relaxed);
238 intmax_t tbin = GetThreadBin();
239
240 if(data && data->within_task)
241 {
242 subq = tbin;
243 // spin = true;
244 }
245
246 // subq is -1 unless specified so unless specified
247 // GetInsertBin() call increments a counter and returns
248 // counter % (num_workers + 1) so that tasks are distributed evenly
249 // among the bins
250 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251
252 //------------------------------------------------------------------------//
253 auto insert_task = [&](intmax_t _n) {
254 TaskSubQueue* task_subq = (*m_subqueues)[_n];
255 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256 // if not threads bin and size difference, insert into smaller
257 // if(n != tbin && next_subq->size() < task_subq->size())
258 // task_subq = next_subq;
259 // try to acquire a claim for the bin
260 // if acquired, no other threads will access bin until claim is released
261 if(task_subq->AcquireClaim())
262 {
263 // push the task into the bin
264 task_subq->PushTask(std::move(task));
265 // release the claim on the bin
266 task_subq->ReleaseClaim();
267 // return success
268 return true;
269 }
270 return false;
271 };
272 //------------------------------------------------------------------------//
273
274 // if not in "hold/spin mode", where thread only inserts tasks into
275 // specified bin, then move onto next bin
276 //
277 if(spin)
278 {
279 n = n % (m_workers + 1);
280 while(!insert_task(n))
281 ;
282 return n;
283 }
284
285 // there are num_workers+1 bins so there is always a bin that is open
286 // execute num_workers+2 iterations so the thread checks its bin twice
287 while(true)
288 {
289 auto _n = (n++) % (m_workers + 1);
290 if(insert_task(_n))
291 return _n;
292 }
293}
294
295//======================================================================================//
296
297void
299{
300 using task_group_type = TaskGroup<int, int>;
301 using thread_execute_map_t = std::map<int64_t, bool>;
302
303 if(!tp->is_alive())
304 {
305 func();
306 return;
307 }
308
309 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310
311 // wait for all threads to finish any work
312 // NOTE: will cause deadlock if called from a task
313 while(tp->get_active_threads_count() > 0)
314 ThisThread::sleep_for(std::chrono::milliseconds(10));
315
316 thread_execute_map_t thread_execute_map{};
317 std::vector<std::shared_ptr<VTask>> _tasks{};
318 _tasks.reserve(m_workers + 1);
319
320 AcquireHold();
321 for(int i = 0; i < (m_workers + 1); ++i)
322 {
323 if(i == GetThreadBin())
324 continue;
325
326 //--------------------------------------------------------------------//
327 auto thread_specific_func = [&]() {
328 ScopeDestructor _dtor = tg.get_scope_destructor();
329 static Mutex _mtx;
330 _mtx.lock();
331 bool& _executed = thread_execute_map[GetThreadBin()];
332 _mtx.unlock();
333 if(!_executed)
334 {
335 func();
336 _executed = true;
337 return 1;
338 }
339 return 0;
340 };
341 //--------------------------------------------------------------------//
342
343 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344 }
345
346 tp->notify_all();
347 int nexecuted = tg.join();
348 if(nexecuted != m_workers)
349 {
350 std::stringstream msg;
351 msg << "Failure executing routine on all threads! Only " << nexecuted
352 << " threads executed function out of " << m_workers << " workers";
353 std::cerr << msg.str() << std::endl;
354 }
355 ReleaseHold();
356}
357
358//======================================================================================//
359
360void
362 function_type func)
363{
364 using task_group_type = TaskGroup<int, int>;
365 using thread_execute_map_t = std::map<int64_t, bool>;
366
367 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368
369 // wait for all threads to finish any work
370 // NOTE: will cause deadlock if called from a task
371 while(tp->get_active_threads_count() > 0)
372 ThisThread::sleep_for(std::chrono::milliseconds(10));
373
374 if(!tp->is_alive())
375 {
376 func();
377 return;
378 }
379
380 thread_execute_map_t thread_execute_map{};
381
382 //========================================================================//
383 // wrap the function so that it will only be executed if the thread
384 // has an ID in the set
385 auto thread_specific_func = [&]() {
386 ScopeDestructor _dtor = tg.get_scope_destructor();
387 static Mutex _mtx;
388 _mtx.lock();
389 bool& _executed = thread_execute_map[GetThreadBin()];
390 _mtx.unlock();
391 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392 {
393 func();
394 _executed = true;
395 return 1;
396 }
397 return 0;
398 };
399 //========================================================================//
400
401 if(tid_set.count(ThisThread::get_id()) > 0)
402 func();
403
404 AcquireHold();
405 for(int i = 0; i < (m_workers + 1); ++i)
406 {
407 if(i == GetThreadBin())
408 continue;
409
410 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411 }
412 tp->notify_all();
413 decltype(tid_set.size()) nexecuted = tg.join();
414 if(nexecuted != tid_set.size())
415 {
416 std::stringstream msg;
417 msg << "Failure executing routine on specific threads! Only " << nexecuted
418 << " threads executed function out of " << tid_set.size() << " workers";
419 std::cerr << msg.str() << std::endl;
420 }
421 ReleaseHold();
422}
423
424//======================================================================================//
425
426void
427UserTaskQueue::AcquireHold()
428{
429 bool _hold;
430 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
431 {
432 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
433 std::memory_order_relaxed);
434 }
435}
436
437//======================================================================================//
438
439void
440UserTaskQueue::ReleaseHold()
441{
442 bool _hold;
443 while((_hold = m_hold->load(std::memory_order_relaxed)))
444 {
445 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
446 std::memory_order_relaxed);
447 }
448}
449
450//======================================================================================//
451
452} // namespace PTL
void PushTask(task_pointer &&) PTL_NO_SANITIZE_THREAD
task_pointer PopTask(bool front=true) PTL_NO_SANITIZE_THREAD
bool empty() const
static ThreadData *& GetInstance()
Definition ThreadData.cc:31
static uintmax_t get_this_thread_id()
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
void resize(intmax_t) override
VUserTaskQueue * clone() override
task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
std::vector< TaskSubQueue * > TaskSubQueueContainer
intmax_t GetThreadBin() const override
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
task_pointer GetThreadBinTask()
~UserTaskQueue() override
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
intmax_t workers() const
VUserTaskQueue(intmax_t nworkers=-1)
Backports of C++ language features for use with C++11 compilers.
Definition AutoLock.hh:255
TemplateAutoLock< Mutex > AutoLock
Definition AutoLock.hh:479