Geant4 10.7.0
Toolkit for the simulation of the passage of particles through matter
Loading...
Searching...
No Matches
PTL::UserTaskQueue Class Reference

#include <UserTaskQueue.hh>

+ Inheritance diagram for PTL::UserTaskQueue:

Public Types

typedef VTasktask_pointer
 
typedef std::vector< TaskSubQueue * > TaskSubQueueContainer
 
typedef std::default_random_engine random_engine_t
 
typedef std::uniform_int_distribution< int > int_dist_t
 
- Public Types inherited from PTL::VUserTaskQueue
typedef VTasktask_pointer
 
typedef std::atomic< intmax_t > AtomicInt
 
typedef uintmax_t size_type
 
typedef std::function< void()> function_type
 
typedef std::set< ThreadIdThreadIdSet
 

Public Member Functions

 UserTaskQueue (intmax_t nworkers=-1, UserTaskQueue *=nullptr)
 
virtual ~UserTaskQueue () override
 
virtual task_pointer GetTask (intmax_t subq=-1, intmax_t nitr=-1) override
 
virtual intmax_t InsertTask (task_pointer, ThreadData *=nullptr, intmax_t subq=-1) override
 
task_pointer GetThreadBinTask ()
 
virtual void Wait () override
 
virtual void resize (intmax_t) override
 
virtual bool empty () const override
 
virtual size_type size () const override
 
virtual size_type bin_size (size_type bin) const override
 
virtual bool bin_empty (size_type bin) const override
 
bool true_empty () const override
 
size_type true_size () const override
 
virtual void ExecuteOnAllThreads (ThreadPool *tp, function_type f) override
 
virtual void ExecuteOnSpecificThreads (ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
 
virtual VUserTaskQueueclone () override
 
virtual intmax_t GetThreadBin () const override
 
- Public Member Functions inherited from PTL::VUserTaskQueue
 VUserTaskQueue (intmax_t nworkers=-1)
 
virtual ~VUserTaskQueue ()
 
virtual task_pointer GetTask (intmax_t subq=-1, intmax_t nitr=-1)=0
 
virtual intmax_t InsertTask (task_pointer, ThreadData *=nullptr, intmax_t subq=-1)=0
 
virtual void Wait ()=0
 
virtual intmax_t GetThreadBin () const =0
 
virtual void resize (intmax_t)=0
 
virtual size_type size () const =0
 
virtual bool empty () const =0
 
virtual size_type bin_size (size_type bin) const =0
 
virtual bool bin_empty (size_type bin) const =0
 
virtual size_type true_size () const
 
virtual bool true_empty () const
 
virtual void ExecuteOnAllThreads (ThreadPool *tp, function_type f)=0
 
virtual void ExecuteOnSpecificThreads (ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
 
intmax_t workers () const
 
virtual VUserTaskQueueclone ()=0
 

Protected Member Functions

intmax_t GetInsertBin () const
 

Additional Inherited Members

- Static Public Member Functions inherited from PTL::VUserTaskQueue
template<typename ContainerT , size_t... Idx>
static auto ContainerToTupleImpl (ContainerT &&container, mpl::index_sequence< Idx... >) -> decltype(std::make_tuple(std::forward< ContainerT >(container)[Idx]...))
 
template<std::size_t N, typename ContainerT >
static auto ContainerToTuple (ContainerT &&container) -> decltype(ContainerToTupleImpl(std::forward< ContainerT >(container), mpl::make_index_sequence< N >{}))
 
template<std::size_t N, std::size_t Nt, typename TupleT , enable_if_t<(N==Nt), int > = 0>
static void TExecutor (TupleT &&_t)
 
template<std::size_t N, std::size_t Nt, typename TupleT , enable_if_t<(N< Nt), int > = 0>
static void TExecutor (TupleT &&_t)
 
template<typename TupleT , std::size_t N = std::tuple_size<decay_t<TupleT>>::value>
static void Executor (TupleT &&__t)
 
template<typename Container , typename std::enable_if< std::is_same< Container, task_pointer >::value, int >::type = 0>
static void Execute (Container &obj)
 
template<typename Container , typename std::enable_if<!std::is_same< Container, task_pointer >::value, int >::type = 0>
static void Execute (Container &tasks)
 
- Protected Attributes inherited from PTL::VUserTaskQueue
intmax_t m_workers = 0
 

Detailed Description

Definition at line 47 of file UserTaskQueue.hh.

Member Typedef Documentation

◆ int_dist_t

typedef std::uniform_int_distribution<int> PTL::UserTaskQueue::int_dist_t

Definition at line 53 of file UserTaskQueue.hh.

◆ random_engine_t

typedef std::default_random_engine PTL::UserTaskQueue::random_engine_t

Definition at line 52 of file UserTaskQueue.hh.

◆ task_pointer

Definition at line 50 of file UserTaskQueue.hh.

◆ TaskSubQueueContainer

typedef std::vector<TaskSubQueue*> PTL::UserTaskQueue::TaskSubQueueContainer

Definition at line 51 of file UserTaskQueue.hh.

Constructor & Destructor Documentation

◆ UserTaskQueue()

UserTaskQueue::UserTaskQueue ( intmax_t  nworkers = -1,
UserTaskQueue parent = nullptr 
)

Definition at line 37 of file UserTaskQueue.cc.

38: VUserTaskQueue(nworkers)
39, m_is_clone((parent) ? true : false)
40, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
41, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
42, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
43, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
44, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
45{
46 // create nthreads + 1 subqueues so there is always a subqueue available
47 if(!parent)
48 {
49 for(intmax_t i = 0; i < nworkers + 1; ++i)
50 m_subqueues->push_back(new TaskSubQueue(m_ntasks));
51 }
52
53#if defined(DEBUG)
54 if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
55 {
56 RecursiveAutoLock l(TypeRecursiveMutex<decltype(std::cout)>());
57 std::stringstream ss;
58 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
59 << __FUNCTION__ << ":" << __LINE__ << "] "
60 << "this = " << this << ", "
61 << "clone = " << std::boolalpha << m_is_clone << ", "
62 << "thread = " << m_thread_bin << ", "
63 << "insert = " << m_insert_bin << ", "
64 << "hold = " << m_hold->load() << " @ " << m_hold << ", "
65 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
66 << "subqueue = " << m_subqueues << ", "
67 << "size = " << true_size() << ", "
68 << "empty = " << true_empty();
69 std::cout << ss.str() << std::endl;
70 }
71#endif
72}
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:120
std::vector< TaskSubQueue * > TaskSubQueueContainer
bool true_empty() const override
size_type true_size() const override
RecursiveMutex & TypeRecursiveMutex(const unsigned int &_n=0)
Definition: Threading.hh:140

◆ ~UserTaskQueue()

UserTaskQueue::~UserTaskQueue ( )
overridevirtual

Definition at line 76 of file UserTaskQueue.cc.

77{
78 if(!m_is_clone)
79 {
80 for(auto& itr : *m_subqueues)
81 {
82 assert(itr->size() == 0);
83 delete itr;
84 }
85 m_subqueues->clear();
86 delete m_hold;
87 delete m_ntasks;
88 delete m_subqueues;
89 }
90}

Member Function Documentation

◆ bin_empty()

bool PTL::UserTaskQueue::bin_empty ( size_type  bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 146 of file UserTaskQueue.hh.

147{
148 return (*m_subqueues)[bin]->empty();
149}

◆ bin_size()

PTL::UserTaskQueue::size_type PTL::UserTaskQueue::bin_size ( size_type  bin) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 138 of file UserTaskQueue.hh.

139{
140 return (*m_subqueues)[bin]->size();
141}

◆ clone()

VUserTaskQueue * UserTaskQueue::clone ( )
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 120 of file UserTaskQueue.cc.

121{
122 return new UserTaskQueue(workers(), this);
123}
intmax_t workers() const

◆ empty()

bool PTL::UserTaskQueue::empty ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 122 of file UserTaskQueue.hh.

123{
124 return (m_ntasks->load(std::memory_order_relaxed) == 0);
125}

◆ ExecuteOnAllThreads()

void UserTaskQueue::ExecuteOnAllThreads ( ThreadPool tp,
function_type  f 
)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 305 of file UserTaskQueue.cc.

306{
307 typedef TaskGroup<int, int> task_group_type;
308 typedef std::map<int64_t, bool> thread_execute_map_t;
309
310 if(!tp->is_alive())
311 {
312 func();
313 return;
314 }
315
316 auto join_func = [=](int& ref, int i) {
317 ref += i;
318 return ref;
319 };
320 task_group_type* tg = new task_group_type(join_func, tp);
321
322 // wait for all threads to finish any work
323 // NOTE: will cause deadlock if called from a task
324 while(tp->get_active_threads_count() > 0)
325 ThisThread::sleep_for(std::chrono::milliseconds(10));
326
327 thread_execute_map_t* thread_execute_map = new thread_execute_map_t();
328
329 AcquireHold();
330 for(int i = 0; i < (m_workers + 1); ++i)
331 {
332 if(i == GetThreadBin())
333 continue;
334
335 //--------------------------------------------------------------------//
336 auto thread_specific_func = [&]() {
337 static Mutex _mtx;
338 _mtx.lock();
339 bool& _executed = (*thread_execute_map)[GetThreadBin()];
340 _mtx.unlock();
341 if(!_executed)
342 {
343 func();
344 _executed = true;
345 return 1;
346 }
347 return 0;
348 };
349 //--------------------------------------------------------------------//
350
351 auto _task = tg->wrap(thread_specific_func);
352 //++(*m_ntasks);
353 // TaskSubQueue* task_subq = (*m_subqueues)[i];
354 // task_subq->PushTask(_task);
356 }
357
358 tp->notify_all();
359 int nexecuted = tg->join();
360 if(nexecuted != m_workers)
361 {
362 std::stringstream msg;
363 msg << "Failure executing routine on all threads! Only " << nexecuted
364 << " threads executed function out of " << m_workers;
365 std::cerr << msg.str() << std::endl;
366 // Exception("UserTaskQueue::ExecuteOnAllThreads", "TaskQueue0000",
367 // JustWarning, msg);
368 }
369 delete thread_execute_map;
370 ReleaseHold();
371}
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
virtual intmax_t GetThreadBin() const override
virtual intmax_t InsertTask(task_pointer, ThreadData *=nullptr, intmax_t subq=-1) override
std::mutex Mutex
Definition: Threading.hh:76

◆ ExecuteOnSpecificThreads()

void UserTaskQueue::ExecuteOnSpecificThreads ( ThreadIdSet  tid_set,
ThreadPool tp,
function_type  f 
)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 376 of file UserTaskQueue.cc.

378{
379 typedef TaskGroup<int, int> task_group_type;
380 typedef std::map<int64_t, bool> thread_execute_map_t;
381
382 auto join_func = [=](int& ref, int i) {
383 ref += i;
384 return ref;
385 };
386 task_group_type* tg = new task_group_type(join_func, tp);
387
388 // wait for all threads to finish any work
389 // NOTE: will cause deadlock if called from a task
390 while(tp->get_active_threads_count() > 0)
391 ThisThread::sleep_for(std::chrono::milliseconds(10));
392
393 if(!tp->is_alive())
394 {
395 func();
396 return;
397 }
398
399 thread_execute_map_t* thread_execute_map = new thread_execute_map_t();
400
401 //========================================================================//
402 // wrap the function so that it will only be executed if the thread
403 // has an ID in the set
404 auto thread_specific_func = [=]() {
405 static Mutex _mtx;
406 _mtx.lock();
407 bool& _executed = (*thread_execute_map)[GetThreadBin()];
408 _mtx.unlock();
409 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
410 {
411 func();
412 _executed = true;
413 return 1;
414 }
415 return 0;
416 };
417 //========================================================================//
418
419 if(tid_set.count(ThisThread::get_id()) > 0)
420 func();
421
422 AcquireHold();
423 for(int i = 0; i < (m_workers + 1); ++i)
424 {
425 if(i == GetThreadBin())
426 continue;
427
428 auto _task = tg->wrap(thread_specific_func);
430 }
431 tp->notify_all();
432 int nexecuted = tg->join();
433 if(nexecuted != m_workers)
434 {
435 std::stringstream msg;
436 msg << "Failure executing routine on all threads! Only " << nexecuted
437 << " threads executed function out of " << tid_set.size();
438 std::cerr << msg.str() << std::endl;
439 // Exception("UserTaskQueue::ExecuteOnSpecificThreads", "TaskQueue0001",
440 // JustWarning, msg);
441 }
442 delete thread_execute_map;
443 ReleaseHold();
444}

◆ GetInsertBin()

intmax_t UserTaskQueue::GetInsertBin ( ) const
protected

Definition at line 138 of file UserTaskQueue.cc.

139{
140 return (++m_insert_bin % (m_workers + 1));
141}

Referenced by InsertTask().

◆ GetTask()

UserTaskQueue::task_pointer UserTaskQueue::GetTask ( intmax_t  subq = -1,
intmax_t  nitr = -1 
)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 180 of file UserTaskQueue.cc.

181{
182 // exit if empty
183 if(this->true_empty())
184 return nullptr;
185
186 // ensure the thread has a bin assignment
187 intmax_t tbin = GetThreadBin();
188 intmax_t n = (subq < 0) ? tbin : subq;
189 if(nitr < 1)
190 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
191
192 if(m_hold->load(std::memory_order_relaxed))
193 {
194 return GetThreadBinTask();
195 }
196
197 task_pointer _task = nullptr;
198 //------------------------------------------------------------------------//
199 auto get_task = [&](intmax_t _n) {
200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
201 // try to acquire a claim for the bin
202 // if acquired, no other threads will access bin until claim is released
203 if(!task_subq->empty() && task_subq->AcquireClaim())
204 {
205 // pop task out of bin
206 _task = task_subq->PopTask(n == tbin);
207 // release the claim on the bin
208 task_subq->ReleaseClaim();
209 }
210 if(_task)
211 --(*m_ntasks);
212 // return success if valid pointer
213 return (_task != nullptr);
214 };
215 //------------------------------------------------------------------------//
216
217 // there are num_workers+1 bins so there is always a bin that is open
218 // execute num_workers+2 iterations so the thread checks its bin twice
219 // while(!empty())
220 {
221 for(intmax_t i = 0; i < nitr; ++i, ++n)
222 {
223 if(get_task(n % (m_workers + 1)))
224 return _task;
225 }
226 }
227
228 // only reached if looped over all bins (and looked in own bin twice)
229 // and found no work so return an empty task and the thread will be put to
230 // sleep if there is still no work by the time it reaches its
231 // condition variable
232 return _task;
233}
task_pointer GetThreadBinTask()

◆ GetThreadBin()

intmax_t UserTaskQueue::GetThreadBin ( ) const
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 127 of file UserTaskQueue.cc.

128{
129 // get a thread id number
130 static thread_local intmax_t tl_bin =
131 (m_thread_bin + ThreadPool::get_this_thread_id()) % (m_workers + 1);
132 return tl_bin;
133}

Referenced by ExecuteOnAllThreads(), ExecuteOnSpecificThreads(), GetTask(), GetThreadBinTask(), and InsertTask().

◆ GetThreadBinTask()

UserTaskQueue::task_pointer UserTaskQueue::GetThreadBinTask ( )

Definition at line 146 of file UserTaskQueue.cc.

147{
148 intmax_t tbin = GetThreadBin();
149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
150 task_pointer _task = nullptr;
151
152 //------------------------------------------------------------------------//
153 auto get_task = [&]() {
154 if(task_subq->AcquireClaim())
155 {
156 // run task
157 _task = task_subq->PopTask(true);
158 // release the claim on the bin
159 task_subq->ReleaseClaim();
160 }
161 if(_task)
162 --(*m_ntasks);
163 // return success if valid pointer
164 return (_task != nullptr);
165 };
166 //------------------------------------------------------------------------//
167
168 // while not empty
169 while(!task_subq->empty())
170 {
171 if(get_task())
172 break;
173 }
174 return _task;
175}

Referenced by GetTask().

◆ InsertTask()

intmax_t UserTaskQueue::InsertTask ( task_pointer  task,
ThreadData data = nullptr,
intmax_t  subq = -1 
)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 238 of file UserTaskQueue.cc.

239{
240 // skip increment here (handled externally)
241 ++(*m_ntasks);
242
243 bool spin = m_hold->load(std::memory_order_relaxed);
244 intmax_t tbin = GetThreadBin();
245
246 if(data && data->within_task)
247 {
248 subq = tbin;
249 // spin = true;
250 }
251
252 // subq is -1 unless specified so unless specified
253 // GetInsertBin() call increments a counter and returns
254 // counter % (num_workers + 1) so that tasks are distributed evenly
255 // among the bins
256 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
257
258 //------------------------------------------------------------------------//
259 auto insert_task = [&](intmax_t _n) {
260 TaskSubQueue* task_subq = (*m_subqueues)[_n];
261 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
262 // if not threads bin and size difference, insert into smaller
263 // if(n != tbin && next_subq->size() < task_subq->size())
264 // task_subq = next_subq;
265 // try to acquire a claim for the bin
266 // if acquired, no other threads will access bin until claim is released
267 if(task_subq->AcquireClaim())
268 {
269 // push the task into the bin
270 task_subq->PushTask(task);
271 // release the claim on the bin
272 task_subq->ReleaseClaim();
273 // return success
274 return true;
275 }
276 return false;
277 };
278 //------------------------------------------------------------------------//
279
280 // if not in "hold/spin mode", where thread only inserts tasks into
281 // specified bin, then move onto next bin
282 //
283 if(spin)
284 {
285 n = n % (m_workers + 1);
286 while(!insert_task(n))
287 ;
288 return n;
289 }
290
291 // there are num_workers+1 bins so there is always a bin that is open
292 // execute num_workers+2 iterations so the thread checks its bin twice
293 while(true)
294 {
295 auto _n = (n++) % (m_workers + 1);
296 if(insert_task(_n))
297 return _n;
298 }
299 return GetThreadBin();
300}
intmax_t GetInsertBin() const

Referenced by ExecuteOnAllThreads(), and ExecuteOnSpecificThreads().

◆ resize()

void UserTaskQueue::resize ( intmax_t  n)
overridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 95 of file UserTaskQueue.cc.

96{
97 AutoLock l(m_mutex);
98 if(m_workers < n)
99 {
100 while(m_workers < n)
101 {
102 m_subqueues->push_back(new TaskSubQueue(m_ntasks));
103 ++m_workers;
104 }
105 }
106 else if(m_workers > n)
107 {
108 while(m_workers > n)
109 {
110 delete m_subqueues->back();
111 m_subqueues->pop_back();
112 --m_workers;
113 }
114 }
115}

◆ size()

PTL::UserTaskQueue::size_type PTL::UserTaskQueue::size ( ) const
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 130 of file UserTaskQueue.hh.

131{
132 return m_ntasks->load(std::memory_order_relaxed);
133}

◆ true_empty()

bool PTL::UserTaskQueue::true_empty ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 154 of file UserTaskQueue.hh.

155{
156 for(const auto& itr : *m_subqueues)
157 if(!itr->empty())
158 return false;
159 return true;
160}

Referenced by GetTask(), and UserTaskQueue().

◆ true_size()

PTL::UserTaskQueue::size_type PTL::UserTaskQueue::true_size ( ) const
inlineoverridevirtual

Reimplemented from PTL::VUserTaskQueue.

Definition at line 165 of file UserTaskQueue.hh.

166{
167 size_type _n = 0;
168 for(const auto& itr : *m_subqueues)
169 _n += itr->size();
170 return _n;
171}

Referenced by UserTaskQueue().

◆ Wait()

virtual void PTL::UserTaskQueue::Wait ( )
inlineoverridevirtual

Implements PTL::VUserTaskQueue.

Definition at line 73 of file UserTaskQueue.hh.

73{}

The documentation for this class was generated from the following files: