CGEM BOSS 6.6.5.f
BESIII Offline Software System
Loading...
Searching...
No Matches
DistBossServer.cxx
Go to the documentation of this file.
1#include "DistBossServer/DistBossServer.h"
2#include "DistBossServer/WriterRpc.h"
3#include "DistBossServer/ReaderRpc.h"
4#include "RootFile/RootFileWriter.h"
5#include "RawFile/RawFileReader.h"
6#include "IRawFile/RawFileExceptions.h"
7#include <sstream>
8#include <iostream>
9#include <unistd.h>
10
11const std::string DistBossServer::m_serverName = DistBossServer::autoServerName();
12const std::string DistBossServer::m_svc[7] = {
13 m_serverName + "/RawEvtSvc",
14 m_serverName + "/SimSeedSvc",
15 m_serverName + "/RootEvtSvc",
16 m_serverName + "/WriteDst",
17 m_serverName + "/WriteRec",
18 m_serverName + "/WriteTuple",
19 m_serverName + "/WriteHist"
20};
21
22
23std::string DistBossServer::autoServerName()
24{
25 std::stringstream spid;
26 spid << getpid();
27
28 char hname[64];
29 gethostname(hname, 64);
30
31 std::string name("/DistBoss/P");
32 name += spid.str() + "@" + hname;
33
34 return name;
35}
36
37DistBossServer::DistBossServer(const std::string& jobOpt)
38 : m_optMgr(jobOpt)
39{
40 if ( m_optMgr.evtMax() == 0 ) {
41 std::cout << "[DistBossServer] EvtMax = 0, exit now!" << std::endl;
42 exit(0);
43 }
44
45 std::cout << "[DistBossServer] Starting server: " << m_serverName << std::endl;
46
47 switch ( m_optMgr.inputType() ) {
48 case ( 0 ) :
49 m_readerRpc = new ReaderRpc<RawFileReader>(m_svc[0], m_optMgr.inputFiles(), m_optMgr.evtMax());
50 break;
51 case ( 1 ) :
52 std::cout << "[DistBossServer] Simulation is not supported by DistBoss yet!" << std::endl;
53 exit(1);
54 case ( 2 ) :
55 //m_readerRpc = new ReaderRpc<RootFileReader>(m_svc[2], m_optMgr.inputFiles(), m_optMgr.evtMax());
56 //break;
57 std::cout << "[DistBossServer] Root input files are not supported by DistBoss yet!" << std::endl;
58 exit(1);
59 default :
60 assert( false ); //we shouldn't come here
61 }
62
63 const std::vector<int>& outputs = m_optMgr.outputTypes();
64 for ( std::vector<int>::const_iterator it = outputs.begin(); it != outputs.end(); ++it ) {
65 switch ( *it ) {
66 case ( 3 ) :
67 m_writerRpcs.push_back(new WriterRpc<RootFileWriter>(m_svc[3], m_optMgr.dstFile()));
68 break;
69 case ( 4 ) :
70 m_writerRpcs.push_back(new WriterRpc<RootFileWriter>(m_svc[4], m_optMgr.recFile()));
71 break;
72 case ( 5 ) :
73 //std::cout << "[DistBossServer] Ntuple outputs is not supported by DistBoss yet!" << std::endl;
74 break;
75 case ( 6 ) :
76 //std::cout << "[DistBossServer] Hist outputs is not supported by DistBoss yet!" << std::endl;
77 break;
78 default :
79 assert(false); //we shouldn't come here
80 }
81 }
82
83 m_optMgr.clientOptsTemplate( m_serverName );
84
85 m_exitHandler = new ServerExitHandler();
86 m_errorHandler = new ServerErrorHandler();
87}
88
90{
91 delete m_readerRpc;
92
93 for ( unsigned int i = 0; i < m_writerRpcs.size(); ++i ) {
94 delete m_writerRpcs[i];
95 }
96
97 std::cout << "[DistBossServer] Server stopped." << std::endl;
98
99 delete m_exitHandler;
100}
101
103{
104 DimServer::addExitHandler( m_exitHandler );
105 DimServer::addErrorHandler( m_errorHandler );
106
107 DimServer::start(m_serverName.c_str());
108
109 // we'd better check the return value of wait_to_end() here
110 int nClients = m_readerRpc->wait_to_end();
111
112 for ( unsigned int i = 0; i < m_writerRpcs.size(); ++i ) {
113 nClients = m_writerRpcs[i]->wait_to_end();
114 }
115
116 return nClients;
117}
DistBossServer(const std::string &jobOpt)
void clientOptsTemplate(const std::string &svrName)
virtual int wait_to_end()=0