BOSS 7.0.7
BESIII Offline Software System
Loading...
Searching...
No Matches
PthrWriterBufPool.cc
Go to the documentation of this file.
1/**********************************************************
2 +-> 1 2 3 4 5 6 ... PoolSize
3 | ahead current next |
4 |_________________________________________________|
5
6 0, a looped buffer pool
7 1, we set semIn as (PoolSize-1), so to protect the
8 current event buffer not be changed by writeEvent
9 2, we can't update the current event buffer until we
10 move to the next event(in thread_writing()).
11 3, the ahead event buffer is ready to update when we
12 at the beginning of current event.
13**********************************************************/
14
15#include <iostream>
16
17
18template <typename Writer, int PoolSize>
20 : m_inLoop(-1),
21 m_outLoop(-1)
22{
23 if ( PoolSize < 2 ) {
24 std::cout << "[PthrWriterBufPool] The PoolSize of buffer must > 1" << std::endl;
25 exit(1);
26 }
27
28 // use(PoolSize-1) to protect the current event buffer
29 sem_init(&m_semIn, 0, (PoolSize-1));
30 sem_init(&m_semOut, 0, 0);
31 sem_init(&m_semFinalize, 0, 0);
32
33 for ( int i = 0; i < PoolSize; ++i ) {
34 // buffer in the loop for each event
35 m_buf[i] = new AutoEnlargeBuffer(128*1024);
36 }
37
38 m_writer = new Writer(arg);
39
40 // create an individual thread for writing
41 pthread_create(&m_tid, NULL, thread_writing, (void*)this);
42}
43
44template <typename Writer, int PoolSize>
46{
47 // wait for the end of thread_writing
48 sem_wait(&m_semFinalize);
49
50 for ( int i = 0; i < PoolSize; ++i ) {
51 delete m_buf[i];
52 }
53
54 sem_destroy(&m_semIn);
55 sem_destroy(&m_semOut);
56 sem_destroy(&m_semFinalize);
57
58 delete m_writer;
59}
60
61template <typename Writer, int PoolSize>
63{
64 if ( m_writer->stat() != 0 ) return m_writer->stat();
65
66 sem_wait(&m_semIn);
67
68 int index = (++m_inLoop) % PoolSize;
69
70 m_buf[index]->copy( pevt, size );
71
72 sem_post(&m_semOut);
73
74 return 0;
75}
76
77template <typename Writer, int PoolSize>
79{
80 return m_writer->stat();
81}
82
83template <typename Writer, int PoolSize>
85{
87
88 uint32_t index;
90
91
92 while ( true ) {
93 // the ahead event buffer is ready for update now
94 sem_post(&(pthis->m_semIn));
95 // waiting for an event that ready to use
96 sem_wait(&(pthis->m_semOut));
97
98 index = (++(pthis->m_outLoop)) % PoolSize;
99
100 pbuf = pthis->m_buf[index];
101
102 try {
103 pthis->m_writer->writeEvent( pbuf->data(), pbuf->size() );
104 }
105 catch (ReachEndOfFileList& e) {
106 //std::cout << "[PthrWriterBufPool] Finalized Successfully!" << std::endl;
107 break;
108 }
109 catch (RawExMessage& e) {
110 e.print();
111 break;
112 }
113 catch ( ... ) {
114 std::cout << "[PthrWriterBufPool] Catch unexpected exception !" << std::endl;
115 exit(1);
116 }
117 }
118
119
120 pthis->m_writer->close();
121
122 sem_post(&(pthis->m_semFinalize));
123
124 pthread_exit(NULL);
125}
double arg(const EvtComplex &c)
Definition: EvtComplex.hh:227
int writeEvent(void *pevt, int size)
virtual ~PthrWriterBufPool()
virtual void print() const