Geant4 11.3.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::UserTaskQueue Class Reference

#include <UserTaskQueue.hh>

+ Inheritance diagram for PTL::UserTaskQueue:

Public Types

using task_pointer = std::shared_ptr<VTask>
 
using TaskSubQueueContainer = std::vector<TaskSubQueue*>
 
using random_engine_t = std::default_random_engine
 
using int_dist_t = std::uniform_int_distribution<int>
 
- Public Types inherited from PTL::VUserTaskQueue
using task_pointer = std::shared_ptr<VTask>
 
using AtomicInt = std::atomic<intmax_t>
 
using size_type = uintmax_t
 
using function_type = std::function<void()>
 
using ThreadIdSet = std::set<ThreadId>
 

Public Member Functions

 UserTaskQueue (intmax_t nworkers=-1, UserTaskQueue *=nullptr)
 
 ~UserTaskQueue () override
 
task_pointer GetTask (intmax_t subq=-1, intmax_t nitr=-1) override
 
intmax_t InsertTask (task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
 
task_pointer GetThreadBinTask ()
 
void Wait () override
 
void resize (intmax_t) override
 
bool empty () const override
 
size_type size () const override
 
size_type bin_size (size_type bin) const override
 
bool bin_empty (size_type bin) const override
 
bool true_empty () const override
 
size_type true_size () const override
 
void ExecuteOnAllThreads (ThreadPool *tp, function_type f) override
 
void ExecuteOnSpecificThreads (ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
 
VUserTaskQueueclone () override
 
intmax_t GetThreadBin () const override
 
- Public Member Functions inherited from PTL::VUserTaskQueue
 VUserTaskQueue (intmax_t nworkers=-1)
 
virtual ~VUserTaskQueue ()=default
 
intmax_t workers () const
 

Protected Member Functions

intmax_t GetInsertBin () const
 

Additional Inherited Members

- Protected Attributes inherited from PTL::VUserTaskQueue
intmax_t m_workers = 0
 

Detailed Description

Definition at line 45 of file UserTaskQueue.hh.

Member Typedef Documentation

◆ int_dist_t

using PTL::UserTaskQueue::int_dist_t = std::uniform_int_distribution<int>

Definition at line 51 of file UserTaskQueue.hh.

◆ random_engine_t

using PTL::UserTaskQueue::random_engine_t = std::default_random_engine

Definition at line 50 of file UserTaskQueue.hh.

◆ task_pointer

using PTL::UserTaskQueue::task_pointer = std::shared_ptr<VTask>

Definition at line 48 of file UserTaskQueue.hh.

◆ TaskSubQueueContainer

Definition at line 49 of file UserTaskQueue.hh.

Constructor & Destructor Documentation

◆ UserTaskQueue()

PTL::UserTaskQueue::UserTaskQueue ( intmax_t nworkers = -1,
UserTaskQueue * parent = nullptr )

Definition at line 47 of file UserTaskQueue.cc.

48: VUserTaskQueue(nworkers)
49, m_is_clone((parent) != nullptr)
50, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
51, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
52, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
53, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
54, m_mutex((parent) ? parent->m_mutex : new Mutex{})
55, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
56{
57 // create nthreads + 1 subqueues so there is always a subqueue available
58 if(!parent)
59 {
60 for(intmax_t i = 0; i < nworkers + 1; ++i)
61 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
62 }
63}
static uintmax_t get_this_thread_id()
std::vector< TaskSubQueue * > TaskSubQueueContainer
VUserTaskQueue(intmax_t nworkers=-1)

Referenced by clone(), and UserTaskQueue().

◆ ~UserTaskQueue()

PTL::UserTaskQueue::~UserTaskQueue ( )
override

Definition at line 67 of file UserTaskQueue.cc.

68{
69 if(!m_is_clone)
70 {
71 for(auto& itr : *m_subqueues)
72 {
73 assert(itr->empty());
74 delete itr;
75 }
76 m_subqueues->clear();
77 delete m_hold;
78 delete m_ntasks;
79 delete m_mutex;
80 delete m_subqueues;
81 }
82}

Member Function Documentation

◆ bin_empty()

bool PTL::UserTaskQueue::bin_empty ( size_type bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 138 of file UserTaskQueue.hh.

139{
140 return (*m_subqueues)[bin]->empty();
141}

◆ bin_size()

UserTaskQueue::size_type PTL::UserTaskQueue::bin_size ( size_type bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 130 of file UserTaskQueue.hh.

131{
132 return (*m_subqueues)[bin]->size();
133}

◆ clone()

VUserTaskQueue * PTL::UserTaskQueue::clone ( )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 114 of file UserTaskQueue.cc.

115{
116 return new UserTaskQueue(workers(), this);
117}
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
intmax_t workers() const

◆ empty()

bool PTL::UserTaskQueue::empty ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 114 of file UserTaskQueue.hh.

115{
116 return (m_ntasks->load(std::memory_order_relaxed) == 0);
117}

◆ ExecuteOnAllThreads()

void PTL::UserTaskQueue::ExecuteOnAllThreads ( ThreadPool * tp,
function_type f )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 298 of file UserTaskQueue.cc.

299{
300 using task_group_type = TaskGroup<int, int>;
301 using thread_execute_map_t = std::map<int64_t, bool>;
302
303 if(!tp->is_alive())
304 {
305 func();
306 return;
307 }
308
309 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
310
311 // wait for all threads to finish any work
312 // NOTE: will cause deadlock if called from a task
313 while(tp->get_active_threads_count() > 0)
314 ThisThread::sleep_for(std::chrono::milliseconds(10));
315
316 thread_execute_map_t thread_execute_map{};
317 std::vector<std::shared_ptr<VTask>> _tasks{};
318 _tasks.reserve(m_workers + 1);
319
320 AcquireHold();
321 for(int i = 0; i < (m_workers + 1); ++i)
322 {
323 if(i == GetThreadBin())
324 continue;
325
326 //--------------------------------------------------------------------//
327 auto thread_specific_func = [&]() {
328 ScopeDestructor _dtor = tg.get_scope_destructor();
329 static Mutex _mtx;
330 _mtx.lock();
331 bool& _executed = thread_execute_map[GetThreadBin()];
332 _mtx.unlock();
333 if(!_executed)
334 {
335 func();
336 _executed = true;
337 return 1;
338 }
339 return 0;
340 };
341 //--------------------------------------------------------------------//
342
343 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
344 }
345
346 tp->notify_all();
347 int nexecuted = tg.join();
348 if(nexecuted != m_workers)
349 {
350 std::stringstream msg;
351 msg << "Failure executing routine on all threads! Only " << nexecuted
352 << " threads executed function out of " << m_workers << " workers";
353 std::cerr << msg.str() << std::endl;
354 }
355 ReleaseHold();
356}
static ThreadData *& GetInstance()
Definition ThreadData.cc:31
intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
intmax_t GetThreadBin() const override

◆ ExecuteOnSpecificThreads()

void PTL::UserTaskQueue::ExecuteOnSpecificThreads ( ThreadIdSet tid_set,
ThreadPool * tp,
function_type f )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 361 of file UserTaskQueue.cc.

363{
364 using task_group_type = TaskGroup<int, int>;
365 using thread_execute_map_t = std::map<int64_t, bool>;
366
367 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
368
369 // wait for all threads to finish any work
370 // NOTE: will cause deadlock if called from a task
371 while(tp->get_active_threads_count() > 0)
372 ThisThread::sleep_for(std::chrono::milliseconds(10));
373
374 if(!tp->is_alive())
375 {
376 func();
377 return;
378 }
379
380 thread_execute_map_t thread_execute_map{};
381
382 //========================================================================//
383 // wrap the function so that it will only be executed if the thread
384 // has an ID in the set
385 auto thread_specific_func = [&]() {
386 ScopeDestructor _dtor = tg.get_scope_destructor();
387 static Mutex _mtx;
388 _mtx.lock();
389 bool& _executed = thread_execute_map[GetThreadBin()];
390 _mtx.unlock();
391 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
392 {
393 func();
394 _executed = true;
395 return 1;
396 }
397 return 0;
398 };
399 //========================================================================//
400
401 if(tid_set.count(ThisThread::get_id()) > 0)
402 func();
403
404 AcquireHold();
405 for(int i = 0; i < (m_workers + 1); ++i)
406 {
407 if(i == GetThreadBin())
408 continue;
409
410 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
411 }
412 tp->notify_all();
413 decltype(tid_set.size()) nexecuted = tg.join();
414 if(nexecuted != tid_set.size())
415 {
416 std::stringstream msg;
417 msg << "Failure executing routine on specific threads! Only " << nexecuted
418 << " threads executed function out of " << tid_set.size() << " workers";
419 std::cerr << msg.str() << std::endl;
420 }
421 ReleaseHold();
422}

◆ GetInsertBin()

intmax_t PTL::UserTaskQueue::GetInsertBin ( ) const
protected

Definition at line 132 of file UserTaskQueue.cc.

133{
134 return (++m_insert_bin % (m_workers + 1));
135}

Referenced by InsertTask().

◆ GetTask()

UserTaskQueue::task_pointer PTL::UserTaskQueue::GetTask ( intmax_t subq = -1,
intmax_t nitr = -1 )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 174 of file UserTaskQueue.cc.

175{
176 // exit if empty
177 if(this->true_empty())
178 return nullptr;
179
180 // ensure the thread has a bin assignment
181 intmax_t tbin = GetThreadBin();
182 intmax_t n = (subq < 0) ? tbin : subq;
183 if(nitr < 1)
184 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
185
186 if(m_hold->load(std::memory_order_relaxed))
187 {
188 return GetThreadBinTask();
189 }
190
191 task_pointer _task = nullptr;
192 //------------------------------------------------------------------------//
193 auto get_task = [&](intmax_t _n) {
194 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
195 // try to acquire a claim for the bin
196 // if acquired, no other threads will access bin until claim is released
197 if(!task_subq->empty() && task_subq->AcquireClaim())
198 {
199 // pop task out of bin
200 _task = task_subq->PopTask(n == tbin);
201 // release the claim on the bin
202 task_subq->ReleaseClaim();
203 }
204 if(_task)
205 --(*m_ntasks);
206 // return success if valid pointer
207 return (_task != nullptr);
208 };
209 //------------------------------------------------------------------------//
210
211 // there are num_workers+1 bins so there is always a bin that is open
212 // execute num_workers+2 iterations so the thread checks its bin twice
213 // while(!empty())
214 {
215 for(intmax_t i = 0; i < nitr; ++i, ++n)
216 {
217 if(get_task(n % (m_workers + 1)))
218 return _task;
219 }
220 }
221
222 // only reached if looped over all bins (and looked in own bin twice)
223 // and found no work so return an empty task and the thread will be put to
224 // sleep if there is still no work by the time it reaches its
225 // condition variable
226 return _task;
227}
bool true_empty() const override
std::shared_ptr< VTask > task_pointer
task_pointer GetThreadBinTask()

◆ GetThreadBin()

intmax_t PTL::UserTaskQueue::GetThreadBin ( ) const
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 121 of file UserTaskQueue.cc.

122{
123 // get a thread id number
124 static thread_local intmax_t tl_bin =
125 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
126 return tl_bin;
127}

Referenced by ExecuteOnAllThreads(), ExecuteOnSpecificThreads(), GetTask(), GetThreadBinTask(), and InsertTask().

◆ GetThreadBinTask()

UserTaskQueue::task_pointer PTL::UserTaskQueue::GetThreadBinTask ( )

Definition at line 140 of file UserTaskQueue.cc.

141{
142 intmax_t tbin = GetThreadBin();
143 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
144 task_pointer _task = nullptr;
145
146 //------------------------------------------------------------------------//
147 auto get_task = [&]() {
148 if(task_subq->AcquireClaim())
149 {
150 // run task
151 _task = task_subq->PopTask(true);
152 // release the claim on the bin
153 task_subq->ReleaseClaim();
154 }
155 if(_task)
156 --(*m_ntasks);
157 // return success if valid pointer
158 return (_task != nullptr);
159 };
160 //------------------------------------------------------------------------//
161
162 // while not empty
163 while(!task_subq->empty())
164 {
165 if(get_task())
166 break;
167 }
168 return _task;
169}

Referenced by GetTask().

◆ InsertTask()

intmax_t PTL::UserTaskQueue::InsertTask ( task_pointer && task,
ThreadData * data = nullptr,
intmax_t subq = -1 )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 232 of file UserTaskQueue.cc.

233{
234 // increment number of tasks
235 ++(*m_ntasks);
236
237 bool spin = m_hold->load(std::memory_order_relaxed);
238 intmax_t tbin = GetThreadBin();
239
240 if(data && data->within_task)
241 {
242 subq = tbin;
243 // spin = true;
244 }
245
246 // subq is -1 unless specified so unless specified
247 // GetInsertBin() call increments a counter and returns
248 // counter % (num_workers + 1) so that tasks are distributed evenly
249 // among the bins
250 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
251
252 //------------------------------------------------------------------------//
253 auto insert_task = [&](intmax_t _n) {
254 TaskSubQueue* task_subq = (*m_subqueues)[_n];
255 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
256 // if not threads bin and size difference, insert into smaller
257 // if(n != tbin && next_subq->size() < task_subq->size())
258 // task_subq = next_subq;
259 // try to acquire a claim for the bin
260 // if acquired, no other threads will access bin until claim is released
261 if(task_subq->AcquireClaim())
262 {
263 // push the task into the bin
264 task_subq->PushTask(std::move(task));
265 // release the claim on the bin
266 task_subq->ReleaseClaim();
267 // return success
268 return true;
269 }
270 return false;
271 };
272 //------------------------------------------------------------------------//
273
274 // if not in "hold/spin mode", where thread only inserts tasks into
275 // specified bin, then move onto next bin
276 //
277 if(spin)
278 {
279 n = n % (m_workers + 1);
280 while(!insert_task(n))
281 ;
282 return n;
283 }
284
285 // there are num_workers+1 bins so there is always a bin that is open
286 // execute num_workers+2 iterations so the thread checks its bin twice
287 while(true)
288 {
289 auto _n = (n++) % (m_workers + 1);
290 if(insert_task(_n))
291 return _n;
292 }
293}
intmax_t GetInsertBin() const

Referenced by ExecuteOnAllThreads(), and ExecuteOnSpecificThreads().

◆ resize()

void PTL::UserTaskQueue::resize ( intmax_t n)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 87 of file UserTaskQueue.cc.

88{
89 if(!m_mutex)
90 throw std::runtime_error("nullptr to mutex");
91 AutoLock lk(m_mutex);
92 if(m_workers < n)
93 {
94 while(m_workers < n)
95 {
96 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
97 ++m_workers;
98 }
99 }
100 else if(m_workers > n)
101 {
102 while(m_workers > n)
103 {
104 delete m_subqueues->back();
105 m_subqueues->pop_back();
106 --m_workers;
107 }
108 }
109}
TemplateAutoLock< Mutex > AutoLock
Definition AutoLock.hh:479

◆ size()

UserTaskQueue::size_type PTL::UserTaskQueue::size ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 122 of file UserTaskQueue.hh.

123{
124 return m_ntasks->load(std::memory_order_relaxed);
125}

◆ true_empty()

bool PTL::UserTaskQueue::true_empty ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 146 of file UserTaskQueue.hh.

147{
148 for(const auto& itr : *m_subqueues)
149 if(!itr->empty())
150 return false;
151 return true;
152}

Referenced by GetTask().

◆ true_size()

UserTaskQueue::size_type PTL::UserTaskQueue::true_size ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 157 of file UserTaskQueue.hh.

158{
159 size_type _n = 0;
160 for(const auto& itr : *m_subqueues)
161 _n += itr->size();
162 return _n;
163}

◆ Wait()

void PTL::UserTaskQueue::Wait ( )
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 71 of file UserTaskQueue.hh.

71{}

Referenced by Wait().


The documentation for this class was generated from the following files: