BOSS 6.6.4.p01
BESIII Offline Software System
Loading...
Searching...
No Matches
PthrReaderBufPool.cc
Go to the documentation of this file.
1/**********************************************************
2 +-> 1 2 3 4 5 6 ... PoolSize
3 | ahead current next |
4 |_________________________________________________|
5
6 0, a looped buffer pool
7 1, we set semIn as (PoolSize-1), so to protect the
8 current event buffer not be changed by thread_filling
9 2, we can't update the current event buffer until we
10 move to the next event(call nextEvent()).
11 3, the ahead event buffer is ready to update when we
12 at the beginning of current event.
13**********************************************************/
14
15#include <iostream>
16
17
18template <typename Reader, int PoolSize>
20 : m_inLoop(-1),
21 m_outLoop(-1),
22 m_RawFileException(0)
23{
24 if ( PoolSize < 2 ) {
25 std::cout << "[PthrReaderBufPool] The PoolSize of buffer must > 1" << std::endl;
26 exit(1);
27 }
28
29 // use(PoolSize-1) to protect the current event buffer
30 sem_init(&m_semIn, 0, (PoolSize-1));
31 sem_init(&m_semOut, 0, 0);
32 pthread_mutex_init(&m_lock, NULL);
33
34 sem_init(&m_semSyn, 0, 0);
35
36 for ( int i = 0; i < PoolSize; ++i ) {
37 // buffer in the loop for each event
38 m_buf[i] = new AutoEnlargeBuffer(128*1024);
39 }
40
41 m_reader = new Reader(arg);
42
43 // create an individual thread to fill to buffer
44 pthread_create(&m_tid, NULL, thread_filling, (void*)this);
45}
46
47template <typename Reader, int PoolSize>
49{
50 for ( int i = 0; i < PoolSize; ++i ) {
51 delete m_buf[i];
52 }
53
54 delete m_RawFileException;
55
56 sem_destroy(&m_semIn);
57 sem_destroy(&m_semOut);
58
59 delete m_reader;
60}
61
62template <typename Reader, int PoolSize>
64{
65 if ( m_outLoop < 0 ) sem_post(&m_semSyn);
66
67 // the ahead event buffer is ready for update now
68 sem_post(&m_semIn);
69 // waiting for a ready to use event
70 sem_wait(&m_semOut);
71
72 pthread_mutex_lock(&m_lock);
73 int inLoop = m_inLoop;
74 pthread_mutex_unlock(&m_lock);
75
76 if ( m_RawFileException != 0 && m_outLoop >= inLoop ) {
77 if ( dynamic_cast<RawExMessage*>(m_RawFileException) ) {
78 throw RawExMessage(*(RawExMessage*)m_RawFileException);
79 }
80 if ( dynamic_cast<ReachEndOfFileList*>(m_RawFileException) ) {
81 throw ReachEndOfFileList(*(ReachEndOfFileList*)m_RawFileException);
82 }
83 }
84
85 ++m_outLoop;
86
87 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
88 return pevt;
89}
90
91template <typename Reader, int PoolSize>
93{
94 //can't be called before any nextEvent() call
95 const uint32_t* pevt = (uint32_t*)m_buf[ m_outLoop%PoolSize ]->data();
96 return pevt;
97}
98
99template <typename Reader, int PoolSize>
101{
102 return m_reader->currentFile();
103}
104
105template <typename Reader, int PoolSize>
107{
108 return m_reader->stat();
109}
110
111template <typename Reader, int PoolSize>
113{
115
116 uint32_t index;
117 const uint32_t* pevt;
118
119 sem_wait(&(pthis->m_semSyn));
120
121 while ( true ) {
122 // waiting for a buffer that ready to update
123 sem_wait(&(pthis->m_semIn));
124
125 try {
126 pevt = pthis->m_reader->nextEvent();
127 }
128 catch (RawExMessage& e) {
129 pthis->m_RawFileException = new RawExMessage(e);
130 break;
131 }
132 catch (ReachEndOfFileList& e) {
133 pthis->m_RawFileException = new ReachEndOfFileList(e);
134 break;
135 }
136 catch ( ... ) {
137 std::cout << "[PthrReaderBufPool] Catch unexpected exception !" << std::endl;
138 exit(1);
139 }
140
141 pthread_mutex_lock(&(pthis->m_lock));
142 index = (++(pthis->m_inLoop)) % PoolSize;
143 pthread_mutex_unlock(&(pthis->m_lock));
144
145 pthis->m_buf[index]->copy( (void*)pevt, pevt[1]*sizeof(uint32_t) );
146
147 // post an event that ready to use
148 sem_post(&(pthis->m_semOut));
149 }
150
151 sem_post(&(pthis->m_semOut));
152
153 pthread_exit(NULL);
154}
TTree * data
double arg(const EvtComplex &c)
Definition: EvtComplex.hh:227
void copy(void *src, int size)
std::string currentFile()
virtual ~PthrReaderBufPool()
const uint32_t * nextEvent()
const uint32_t * currentEvent() const