BOSS 7.1.1
BESIII Offline Software System
Loading...
Searching...
No Matches
ReaderRpc.cc
Go to the documentation of this file.
1#ifndef DISTBOSS_READER_RPC_CC
2#define DISTBOSS_READER_RPC_CC
3
6#include <iostream>
7
8template<class Reader>
9ReaderRpc<Reader>::ReaderRpc(const std::string& svrName, const std::vector<std::string>& fnames, int evtMax)
10 : DimRpc(svrName.c_str(), "I", "I"),
11 m_evtDone(0),
12 m_evtMax(evtMax),
13 m_stopFlag(0)
14{
15 sem_init(&m_sem, 0, 0);
16 m_freader = new PthrReaderBufPool<Reader>( fnames );
17
18 m_svrName = svrName.substr( svrName.find_last_of('/')+1 );
19}
20
21template<class Reader>
23{
24 sem_destroy(&m_sem);
25
26 delete m_freader;
27
28 for (std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.begin(); it != m_evtBak.end(); ++it) {
29 delete it->second;
30 }
31
32 std::cout << "[" << m_svrName << "] Terminated. Total events processed: " << m_evtDone << std::endl;
33}
34
35template<class Reader>
37{
38 sem_wait(&m_sem);
39
40 int nClients = dis_get_n_clients(itsIdOut);
41
42 int i = 0;
43 while ( nClients > 0 ) {
44 // wait for all the clients dis-connecting to this server
45 // and force the server to stop when the total waiting time
46 // is greater than 5s(5*1s)
47 if ( ++i > 5 ) break;
48 sleep(1);
49 nClients = dis_get_n_clients(itsIdOut);
50 }
51
52 return nClients;
53}
54
55template<class Reader>
57{
58 int clientId = DimServer::getClientId();
59 m_inCode = getInt();
60
61 switch (m_inCode) {
62
63 //--------------------------------------------------------
65 //--------------------------------------------------------
66 {
67 try {
68 if ( m_stopFlag != 0 ) {
69 throw RawExMessage(("["+ m_svrName+"] Server stopflag is set, now waiting clients to exit!").c_str());
70 }
71 m_pEvt = m_freader->nextEvent();
72 }
73 catch ( RawFileException& e ) {
74 if ( ++m_stopFlag < 3 ) e.print();
76 setData( (int&)m_outCode );
77 if ( m_stopFlag == 1 ) sem_post(&m_sem);
78 return;
79 }
80
81 setData((void*)m_pEvt, (m_pEvt[1]*4) );
82
83 std::map<int, AutoEnlargeBuffer*>::iterator it = m_evtBak.find( clientId );
84 if ( it == m_evtBak.end() ) {
85 m_evtBak[clientId] = new AutoEnlargeBuffer();
86 }
87 m_evtBak[clientId]->copy((void*)m_pEvt, (m_pEvt[1]*4) );
88
89 ++m_evtDone;
90
91 if ( m_evtMax >= 0 && m_evtDone >= m_evtMax ) {
92 ++m_stopFlag;
93 std::cout << "[" << m_svrName << "] We have reach the EvtMax!" << std::endl;
94 sem_post(&m_sem);
95 }
96 }
97
98 break;
99
100 //--------------------------------------------------------
102 //--------------------------------------------------------
103 {
104 //std::cout << "[" << m_svrName << "] RESEND event to: " << DimServer::getClientName() << std::endl;
105 m_pEvt = (uint32_t*)m_evtBak[clientId]->data();
106 setData((void*)m_pEvt, (m_pEvt[1]*4) );
107 }
108 break;
109
110 //--------------------------------------------------------
112 //--------------------------------------------------------
113 {
114 std::string fname = m_freader->currentFile();
115 setData( (void*)fname.c_str(), (fname.length()+1) );
116 }
117 break;
118
119 //--------------------------------------------------------
121 //--------------------------------------------------------
122 std::cout << "[" << m_svrName << "] Error code from client["
123 << clientId << "]: "
124 << DimServer::getClientName()
125 << std::endl;
126 break;
127
128 //--------------------------------------------------------
129 default :
130 //--------------------------------------------------------
131 std::cout << "[" << m_svrName << "] Unknown code(0x"
132 << std::hex << m_inCode << std::dec
133 << ") from client[" << clientId << "]: "
134 << DimServer::getClientName()
135 << std::endl;
136 }
137}
138
139#endif
TTree * data
virtual void print() const
virtual ~ReaderRpc()
Definition ReaderRpc.cc:22
int wait_to_end()
Definition ReaderRpc.cc:36