BOSS 7.0.1
BESIII Offline Software System
Loading...
Searching...
No Matches
DistBoss/DistBossUtil/DistBossUtil-00-00-04/DistBossUtil/template/PthrReaderBufPool.cc
Go to the documentation of this file.
1/**********************************************************
2 +-> 1 2 3 4 5 6 ... PoolSize
3 | ahead current next |
4 |_________________________________________________|
5
6 0, a looped buffer pool
7 1, we set semIn as (PoolSize-1), so to protect the
8 current event buffer not be changed by thread_filling
9 2, we can't update the current event buffer until we
10 move to the next event(call nextEvent()).
11 3, the ahead event buffer is ready to update when we
12 at the beginning of current event.
13**********************************************************/
14
15#include <iostream>
16
17
18template <typename Reader, int PoolSize>
20 : m_inLoop(-1),
21 m_outLoop(-1),
22 m_RawFileException(0)
23{
24 if ( PoolSize < 2 ) {
25 std::cout << "[PthrReaderBufPool] The PoolSize of buffer must > 1" << std::endl;
26 exit(1);
27 }
28
29 // use(PoolSize-1) to protect the current event buffer
30 sem_init(&m_semIn, 0, (PoolSize-1));
31 sem_init(&m_semOut, 0, 0);
32 pthread_mutex_init(&m_lock, NULL);
33
34 sem_init(&m_semSyn, 0, 0);
35
36 for ( int i = 0; i < PoolSize; ++i ) {
37 // buffer in the loop for each event
38 m_buf[i] = new AutoEnlargeBuffer(128*1024);
39 }
40
41 m_reader = new Reader(arg);
42
43 // create an individual thread to fill to buffer
44 pthread_create(&m_tid, NULL, thread_filling, (void*)this);
45}
46
47template <typename Reader, int PoolSize>
49{
50 for ( int i = 0; i < PoolSize; ++i ) {
51 delete m_buf[i];
52 }
53
54 delete m_RawFileException;
55
56 sem_destroy(&m_semIn);
57 sem_destroy(&m_semOut);
58
59 delete m_reader;
60}
61
62template <typename Reader, int PoolSize>
64{
65 if ( m_outLoop < 0 ) sem_post(&m_semSyn);
66
67 // the ahead event buffer is ready for update now
68 sem_post(&m_semIn);
69 // waiting for a ready to use event
70 sem_wait(&m_semOut);
71
72 pthread_mutex_lock(&m_lock);
73 int inLoop = m_inLoop;
74 pthread_mutex_unlock(&m_lock);
75
76 if ( m_RawFileException != 0 && m_outLoop >= inLoop ) {
77 if ( dynamic_cast<RawExMessage*>(m_RawFileException) ) {
78 throw RawExMessage(*(RawExMessage*)m_RawFileException);
79 }
80 if ( dynamic_cast<ReachEndOfFileList*>(m_RawFileException) ) {
81 throw ReachEndOfFileList(*(ReachEndOfFileList*)m_RawFileException);
82 }
83 }
84
85 ++m_outLoop;
86
87 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
88 return pevt;
89}
90
91template <typename Reader, int PoolSize>
93{
94 //can't be called before any nextEvent() call
95 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
96 return pevt;
97}
98
99template <typename Reader, int PoolSize>
101{
102 return m_reader->runNo();
103}
104
105template <typename Reader, int PoolSize>
107{
108 return m_reader->currentFile();
109}
110
111template <typename Reader, int PoolSize>
113{
114 return m_reader->stat();
115}
116
117template <typename Reader, int PoolSize>
119{
121
122 uint32_t index;
123 const uint32_t* pevt;
124
125 sem_wait(&(pthis->m_semSyn));
126
127 while ( true ) {
128 // waiting for a buffer that ready to update
129 sem_wait(&(pthis->m_semIn));
130
131 try {
132 pevt = pthis->m_reader->nextEvent();
133 }
134 catch (RawExMessage& e) {
135 pthis->m_RawFileException = new RawExMessage(e);
136 break;
137 }
138 catch (ReachEndOfFileList& e) {
139 pthis->m_RawFileException = new ReachEndOfFileList(e);
140 break;
141 }
142 catch ( ... ) {
143 std::cout << "[PthrReaderBufPool] Catch unexpected exception !" << std::endl;
144 exit(1);
145 }
146
147 pthread_mutex_lock(&(pthis->m_lock));
148 index = (++(pthis->m_inLoop)) % PoolSize;
149 pthread_mutex_unlock(&(pthis->m_lock));
150
151 pthis->m_buf[index]->copy( (void*)pevt, pevt[1]*sizeof(uint32_t) );
152
153 // post an event that ready to use
154 sem_post(&(pthis->m_semOut));
155 }
156
157 sem_post(&(pthis->m_semOut));
158
159 pthread_exit(NULL);
160}
TTree * data
double arg(const EvtComplex &c)
Definition: EvtComplex.hh:227