29 printf(
"TARunInfo::ctor!\n");
46 printf(
"TARunInfo::dtor!\n");
68 printf(
"TARunInfo::dtor: deleted %d queued flow events!\n", count);
86 printf(
"TAFlowEvent::ctor: chain %p\n", flow);
93 printf(
"TAFlowEvent::dtor: this %p, next %p\n",
this,
fNext);
108 printf(
"TARunObject::ctor, run %d\n", runinfo->
fRunNo);
114 printf(
"TARunObject::BeginRun, run %d\n", runinfo->
fRunNo);
120 printf(
"TARunObject::EndRun, run %d\n", runinfo->
fRunNo);
126 printf(
"TARunObject::NextSubrun, run %d\n", runinfo->
fRunNo);
132 printf(
"TARunObject::PauseRun, run %d\n", runinfo->
fRunNo);
138 printf(
"TARunObject::ResumeRun, run %d\n", runinfo->
fRunNo);
144 printf(
"TARunObject::PreEndRun, run %d\n", runinfo->
fRunNo);
150 printf(
"TARunObject::Analyze!\n");
157 printf(
"TARunObject::Analyze!\n");
164 printf(
"TARunObject::AnalyzeSpecialEvent!\n");
176 printf(
"TAFactory::Usage!\n");
182 printf(
"TAFactory::Init!\n");
188 printf(
"TAFactory::Finish!\n");
208 printf(
"TARootHelper::ctor!\n");
212 if (filename.empty()) {
214 sprintf(xfilename,
"output%05d.root", runinfo->
fRunNo);
218 fOutputFile =
new TFile(filename.c_str(),
"RECREATE");
221 fprintf(stderr,
"TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
235 printf(
"TARootHelper::dtor!\n");
252 #include <sys/time.h>
259 #ifdef HAVE_THTTP_SERVER
260 #include "THttpServer.h"
279 fprintf(stderr,
"Waiting for all queues to empty out!\n");
281 int count_all_empty = 0;
284 int count_not_empty = 0;
285 size_t count_events = 0;
287 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
304 if (count_not_empty == 0) {
308 if (count_all_empty > 1) {
314 fprintf(stderr,
"Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (
int)count_events);
325 fprintf(stderr,
"Waiting for all threads to shut down!\n");
330 int count_running = 0;
332 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
337 if (count_running == 0) {
342 fprintf(stderr,
"Timeout waiting for all threads to shut down, %d still running!\n", count_running);
350 fprintf(stderr,
"Joining all threads!\n");
351 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
357 fprintf(stderr,
"Thread %d failed to shut down!\n", i);
384 for (
size_t i=0; i<nmodules; i++) {
401 printf(
"TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
442 printf(
"Multithread queue lengths:\n");
460 gModules =
new std::vector<TAFactory*>;
468 gettimeofday(&tv,NULL);
469 return tv.tv_sec + 0.000001*tv.tv_usec;
492 return elapsed_seconds.count();
551 Profiler(
const bool time_modules,
const int queue_interval_check);
553 void Begin(
TARunInfo* runinfo,
const std::vector<TARunObject*> fRunRun );
560 void AddModuleMap(
const char* UserProfileName,
unsigned long hash);
567 fTimeModules(time_modules), fQueueInterval(queue_interval_check)
570 printf(
"Profiler::ctor\n");
574 fStartUser = std::chrono::system_clock::now();
580 printf(
"Profiler::dtor\n");
586 printf(
"Profiler::begin\n");
605 Double_t bins[Nbins+1];
607 Double_t TimeRange = 10;
610 for (
int i=0; i<Nbins+1; i++) {
611 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
619 for (
size_t i = 0; i < runrun.size(); i++) {
621 if (runrun[i]->fModuleName.empty())
644 TString(
fModuleNames.at(i) +
" Event Proccessing Time; s"),
650 TString(
fModuleNames.at(i) +
" Flow Proccessing Time; s"),
664 TString(
fModuleNames.at(i) +
" Multithread Queue Length; Queue Depth"),
683 printf(
"Profiler::log\n");
692 std::chrono::duration<double> elapsed_seconds = stop - start;
693 double dt = elapsed_seconds.count();
714 printf(
"Profiler::log_AnalyzeEvent_time\n");
723 std::chrono::duration<double> elapsed_seconds = stop - start;
724 double dt = elapsed_seconds.count();
743 printf(
"Profiler::log_mt_queue_length\n");
767 printf(
"Profiler::LogUserWindows\n");
771 std::vector<TAFlowEvent*> flowArray;
775 flowArray.push_back(f);
779 for (
int ii=FlowEvents-1; ii>=0; ii--) {
784 unsigned int hash = std::hash<std::string>{}(timer->
fModuleName);
797 fprintf(stderr,
"manalyzer must be built with ROOT for using the user profiling tools\n");
804 printf(
"Profiler::AddModuleMap\n");
812 Double_t bins[Nbins+1];
813 Double_t TimeRange = 10;
814 for (
int i=0; i<Nbins+1; i++) {
815 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
817 TH1D* Histo =
new TH1D(UserProfileName,UserProfileName,Nbins,bins);
831 printf(
"Profiler::End\n");
856 double AllAnalyzeFlowEventTime=0;
858 AllAnalyzeFlowEventTime += n;
859 double AllAnalyzeEventTime=0;
861 AllAnalyzeEventTime += n;
863 printf(
"Module average processing time\n");
864 printf(
" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
865 printf(
"Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
866 printf(
"----------------------------------------------------------------------------------------------------------------\n");
870 printf(
"\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
877 printf(
"\t-\t-\t-\t-\t-");
880 printf(
"\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
887 printf(
"\t-\t-\t-\t-\t-");
890 printf(
"----------------------------------------------------------------------------------------------------------------\n");
891 printf(
" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
892 printf(
" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
895 printf(
"Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
896 printf(
"----------------------------------------------------------------------\n");
898 printf(
"%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",
fUserHistograms.at(i)->GetTitle(),
905 printf(
"----------------------------------------------------------------------\n");
907 printf(
"----------------------------------------------------------------------------------------------------------------\n");
910 printf(
"To use custom profile windows, please build rootana with root\n");
915 double cputime = (double)(clock() -
fStartCPU)/CLOCKS_PER_SEC;
916 std::chrono::duration<double> usertime = std::chrono::system_clock::now() -
fStartUser;
917 printf(
"%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
921 100.*cputime/usertime.count());
938 bool fMultithreadMode =
false;
941 RunHandler(
const std::vector<std::string>& args,
bool multithread,
bool profile,
int queue_interval_check)
945 fMultithreadMode = multithread;
947 fProfiler =
new Profiler(profile, queue_interval_check);
965 int nModules=(*gModules).size();
1006 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1057 assert(fRunInfo == NULL);
1058 assert(fRunRun.size() == 0);
1060 fRunInfo =
new TARunInfo(run_number, file_name, fArgs);
1062 int nModules = (*gModules).size();
1064 for (
int i=0; i<nModules; i++)
1065 fRunRun.push_back((*
gModules)[i]->NewRunObject(fRunInfo));
1067 if (fMultithreadMode) {
1072 std::vector<std::mutex> mutsize(nModules);
1077 for (
int i=0; i<nModules; i++) {
1078 printf(
"Create fMtFlowQueue thread %d\n",i);
1088 assert(fRunInfo != NULL);
1089 assert(fRunInfo->
fOdb != NULL);
1090 for (
unsigned i=0; i<fRunRun.size(); i++)
1091 fRunRun[i]->BeginRun(fRunInfo);
1093 fProfiler->
Begin(fRunInfo, fRunRun);
1106 for (
unsigned i=0; i<fRunRun.size(); i++)
1107 fRunRun[i]->PreEndRun(fRunInfo);
1112 AnalyzeFlowQueue(flags);
1127 for (
unsigned i=0; i<fRunRun.size(); i++)
1128 fRunRun[i]->EndRun(fRunInfo);
1140 for (
unsigned i=0; i<fRunRun.size(); i++)
1141 fRunRun[i]->NextSubrun(fRunInfo);
1148 for (
unsigned i=0; i<fRunRun.size(); i++) {
1154 assert(fRunRun.size() == 0);
1162 for (
unsigned i=0; i<fRunRun.size(); i++)
1163 fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1168 for (
unsigned i=0; i<fRunRun.size(); i++) {
1170 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1197 flow = AnalyzeFlowEvent(&flags, flow);
1209 assert(fRunInfo != NULL);
1210 assert(fRunInfo->
fOdb != NULL);
1211 assert(event != NULL);
1212 assert(flags != NULL);
1222 for (
unsigned i=0; i<fRunRun.size(); i++) {
1224 flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1241 flow = AnalyzeFlowEvent(flags, flow);
1269 AnalyzeFlowQueue(flags);
1275 if (fFlowQueue.empty())
1279 fFlowQueue.pop_front();
1289 fFlowQueue.push_back(flow);
1298 #include "TSystem.h"
1303 static bool gRunStartRequested =
false;
1304 static bool gRunStopRequested =
false;
1306 class RpcHandler:
public TMFeRpcHandlerInterface
1308 TMFeResult HandleBeginRun(
int run_number)
1310 printf(
"RpcHandler::HandleBeginRun(%d)\n", run_number);
1311 gRunStartRequested =
true;
1312 gRunStopRequested =
false;
1316 TMFeResult HandleEndRun(
int run_number)
1318 printf(
"RpcHandler::HandleEndRun(%d)\n", run_number);
1319 gRunStartRequested =
false;
1320 gRunStopRequested =
true;
1324 TMFeResult HandleStartAbort(
int run_number)
1326 printf(
"RpcHandler::HandleStartAbort(%d)\n", run_number);
1328 gRunStartRequested =
false;
1329 gRunStopRequested =
true;
1333 TMFeResult HandleRpc(
const char* cmd,
const char* args, std::string& result)
1339 TMFeResult ReceiveEvent(TMEventBuffer* b,
TMEvent *e,
int timeout_msec = 0)
1346 TMFeResult r = b->ReceiveEvent(&e->
data, timeout_msec);
1351 if (e->
data.size() == 0)
1361 static int ProcessMidasOnlineTmfe(
const std::vector<std::string>& args,
const char* progname,
const char* hostname,
const char* exptname,
const char* bufname,
int event_id,
int trigger_mask,
const char* sampling_type_string,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1363 TMFE *mfe = TMFE::Instance();
1365 TMFeResult r = mfe->Connect(progname, hostname, exptname);
1368 fprintf(stderr,
"Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1374 TMEventBuffer *b =
new TMEventBuffer(mfe);
1378 r = b->OpenBuffer(bufname);
1381 fprintf(stderr,
"Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1387 r = b->SetCacheSize(100000, 0);
1390 fprintf(stderr,
"Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1396 r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1399 fprintf(stderr,
"Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1403 RpcHandler* h =
new RpcHandler();
1405 mfe->AddRpcHandler(h);
1407 mfe->DeregisterTransitionPause();
1408 mfe->DeregisterTransitionResume();
1409 mfe->RegisterTransitionStartAbort();
1410 mfe->SetTransitionSequenceStart(300);
1411 mfe->SetTransitionSequenceStop(700);
1413 mfe->StartRpcThread();
1417 RunHandler rh(args, multithread, profiler, queue_interval_check);
1419 for (
unsigned i=0; i<(*gModules).size(); i++)
1422 if (mfe->fStateRunning) {
1423 rh.CreateRun(mfe->fRunNumber, NULL);
1424 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1430 while (!mfe->fShutdownRequested) {
1431 bool do_sleep =
true;
1433 if (gRunStartRequested) {
1434 gRunStartRequested =
false;
1439 rh.fRunInfo->fOdb = NULL;
1443 rh.CreateRun(mfe->fRunNumber, NULL);
1444 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1450 if (gRunStopRequested) {
1451 gRunStopRequested =
false;
1456 rh.fRunInfo->fOdb = NULL;
1467 r = ReceiveEvent(b, &e, 100);
1470 fprintf(stderr,
"Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1477 if ((e.
data_size > 0) && (rh.fRunInfo != NULL)) {
1484 rh.AnalyzeEvent(&e, &flags, writer);
1487 mfe->fShutdownRequested =
true;
1490 if (num_analyze > 0) {
1492 if (num_analyze == 0) {
1493 mfe->fShutdownRequested =
true;
1500 #ifdef HAVE_THTTP_SERVER
1511 gSystem->DispatchOneEvent(kTRUE);
1527 rh.fRunInfo->fOdb = NULL;
1531 for (
unsigned i=0; i<(*gModules).size(); i++)
1535 r = b->CloseBuffer();
1541 fprintf(stderr,
"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1556 #include "TSystem.h"
1563 int fNumAnalyze = 0;
1569 : fRun(args, multithread, profiler, queue_interval_check)
1571 fNumAnalyze = num_analyze;
1589 void Transition(
int transition,
int run_number,
int transition_time)
1593 if (transition == TR_START) {
1604 StartRun(run_number);
1605 printf(
"Begin run: %d\n", run_number);
1606 }
else if (transition == TR_STOP) {
1613 printf(
"End of run %d\n", run_number);
1617 void Event(
const void* data,
int data_size)
1634 if (fNumAnalyze > 0) {
1636 if (fNumAnalyze == 0)
1647 static int ProcessMidasOnlineOld(
const std::vector<std::string>& args,
const char* hostname,
const char* exptname,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1651 int err = midas->
connect(hostname, exptname,
"rootana");
1653 fprintf(stderr,
"Cannot connect to MIDAS, error %d\n", err);
1671 odb->
RI(
"runinfo/run number", &run_number);
1672 odb->
RI(
"runinfo/state", &run_state);
1674 for (
unsigned i=0; i<(*gModules).size(); i++)
1677 if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1682 #ifdef HAVE_THTTP_SERVER
1689 gSystem->DispatchOneEvent(kTRUE);
1703 for (
unsigned i=0; i<(*gModules).size(); i++)
1707 delete odb; odb = NULL;
1721 static int ProcessMidasFiles(
const std::vector<std::string>& files,
const std::vector<std::string>& args,
int num_skip,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1725 for (
unsigned i=0; i<files.size(); i++)
1728 for (
unsigned i=0; i<(*gModules).size(); i++)
1731 RunHandler run(args, multithread, profiler, queue_interval_check);
1743 printf(
"Could not open \"%s\", error: %s\n", filename.c_str(), reader->
fErrorString.c_str());
1759 if (event->event_id == 0x8000)
1761 int runno =
event->serial_number;
1792 else if (event->event_id == 0x8001)
1806 else if (event->event_id == 0x8002)
1831 if (num_analyze > 0) {
1833 if (num_analyze == 0)
1846 gSystem->DispatchOneEvent(kTRUE);
1858 #ifdef HAVE_THTTP_SERVER
1861 gSystem->DispatchOneEvent(kTRUE);
1875 for (
unsigned i=0; i<(*gModules).size(); i++)
1881 static int ProcessDemoMode(
const std::vector<std::string>& args,
int num_skip,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1883 for (
unsigned i=0; i<(*gModules).size(); i++)
1886 RunHandler run(args, multithread, profiler, queue_interval_check);
1892 for (
unsigned i=0;
true; i++) {
1894 sprintf(s,
"%03d", i);
1895 std::string filename = std::string(
"demo_subrun_") + s;
1912 for (
unsigned j=0; j<100; j++) {
1925 if (num_analyze > 0) {
1927 if (num_analyze == 0)
1939 gSystem->DispatchOneEvent(kTRUE);
1957 for (
unsigned i=0; i<(*gModules).size(); i++)
1966 static int ShowMem(
const char* label)
1971 FILE* fp = fopen(
"/proc/self/statm",
"r");
1976 fscanf(fp,
"%d",&mem);
1980 printf(
"memory at %s is %d\n", label, mem);
1993 printf(
"EventDumpModule::ctor, run %d\n", runinfo->
fRunNo);
1999 printf(
"EventDumpModule::dtor!\n");
2004 printf(
"EventDumpModule::BeginRun, run %d\n", runinfo->
fRunNo);
2009 printf(
"EventDumpModule::EndRun, run %d\n", runinfo->
fRunNo);
2014 printf(
"EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2019 printf(
"EventDumpModule::PauseRun, run %d\n", runinfo->
fRunNo);
2024 printf(
"EventDumpModule::ResumeRun, run %d\n", runinfo->
fRunNo);
2029 printf(
"EventDumpModule::Analyze, run %d, ", runinfo->
fRunNo);
2030 event->FindAllBanks();
2031 std::string h =
event->HeaderToString();
2032 std::string b =
event->BankListToString();
2033 printf(
"%s: %s\n", h.c_str(), b.c_str());
2039 printf(
"EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->
fRunNo, event->
serial_number, (
int)event->
event_id, event->
data_size);
2047 void Init(
const std::vector<std::string> &args)
2050 printf(
"EventDumpModuleFactory::Init!\n");
2056 printf(
"EventDumpModuleFactory::Finish!\n");
2062 printf(
"EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2069 #include <TGButton.h>
2070 #include <TBrowser.h>
2074 #define CTRL_CONTINUE 3
2075 #define CTRL_PAUSE 4
2076 #define CTRL_NEXT_FLOW 5
2078 #define CTRL_TBROWSER 11
2107 printf(
"Pressed!\n");
2112 printf(
"Released!\n");
2120 fHolder->
fValue = fValue;
2148 printf(
"MainWindow::ctor!\n");
2153 SetWindowName(
"ROOT Analyzer Control");
2156 fMenu =
new TGPopupMenu(gClient->GetRoot());
2158 fMenu->AddEntry(
"-", 0);
2163 fMenu->AddEntry(
"-", 0);
2166 fMenuBarItemLayout =
new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2168 fMenu->Associate(
this);
2170 fMenuBar =
new TGMenuBar(
this, 1, 1, kRaisedFrame);
2171 fMenuBar->AddPopup(
"&Rootana", fMenu, fMenuBarItemLayout);
2174 AddFrame(fMenuBar,
new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2176 fButtonsFrame =
new TGVerticalFrame(
this);
2181 fButtonsFrame->AddFrame(fNextButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2182 fButtonsFrame->AddFrame(fNextFlowButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2184 TGHorizontalFrame *hframe =
new TGHorizontalFrame(fButtonsFrame);
2189 hframe->AddFrame(fContinueButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2190 hframe->AddFrame(fPauseButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2192 fButtonsFrame->AddFrame(hframe,
new TGLayoutHints(kLHintsExpandX));
2195 fButtonsFrame->AddFrame(fQuitButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2197 AddFrame(fButtonsFrame,
new TGLayoutHints(kLHintsExpandX));
2201 Resize(GetDefaultSize());
2208 printf(
"MainWindow::dtor!\n");
2212 delete fMenuBarItemLayout;
2218 printf(
"MainWindow::CloseWindow()\n");
2228 switch (GET_MSG(msg))
2233 switch (GET_SUBMSG(msg))
2276 printf(
"InteractiveModule::ctor, run %d\n", runinfo->
fRunNo);
2283 if (!fgCtrlWindow && runinfo->
fRoot->
fgApp) {
2284 fgCtrlWindow =
new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2292 printf(
"InteractiveModule::dtor!\n");
2297 printf(
"InteractiveModule::BeginRun, run %d\n", runinfo->
fRunNo);
2302 printf(
"InteractiveModule::EndRun, run %d\n", runinfo->
fRunNo);
2311 #ifdef HAVE_THTTP_SERVER
2318 gSystem->DispatchOneEvent(kTRUE);
2323 TMFE* mfe = TMFE::Instance();
2325 if (mfe->fShutdownRequested) {
2338 int ctrl = fgHolder->
fValue;
2356 printf(
"InteractiveModule::PauseRun, run %d\n", runinfo->
fRunNo);
2361 printf(
"InteractiveModule::ResumeRun, run %d\n", runinfo->
fRunNo);
2369 #ifdef HAVE_THTTP_SERVER
2376 gSystem->DispatchOneEvent(kTRUE);
2381 TMFE* mfe = TMFE::Instance();
2383 if (mfe->fShutdownRequested) {
2397 int ctrl = fgHolder->
fValue;
2419 fprintf(stdout,
"manalyzer> "); fflush(stdout);
2420 char* s = fgets(str,
sizeof(str)-1, stdin);
2428 printf(
"command [%s]\n", str);
2430 if (str[0] ==
'h') {
2431 printf(
"Interactive manalyzer commands:\n");
2432 printf(
" q - quit\n");
2433 printf(
" h - help\n");
2434 printf(
" c - continue until next TAFlag_DISPLAY event\n");
2435 printf(
" n - next event\n");
2436 printf(
" aNNN - analyze N events, i.e. \"a10\"\n");
2437 }
else if (str[0] ==
'q') {
2440 }
else if (str[0] ==
'n') {
2442 }
else if (str[0] ==
'c') {
2445 }
else if (str[0] ==
'a') {
2446 int num = atoi(str+1);
2447 printf(
"Analyzing %d events\n", num);
2458 printf(
"InteractiveModule::Analyze, run %d, %s\n", runinfo->
fRunNo, event->
HeaderToString().c_str());
2480 InteractiveLoop(runinfo, flags);
2487 printf(
"InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->
fRunNo);
2504 InteractiveLoop(runinfo, flags);
2512 printf(
"InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->
fRunNo, event->
serial_number, (
int)event->
event_id, event->
data_size);
2525 void Init(
const std::vector<std::string> &args)
2528 printf(
"InteractiveModuleFactory::Init!\n");
2534 printf(
"InteractiveModuleFactory::Finish!\n");
2540 printf(
"InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2553 printf(
"\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2555 printf(
"-h: print this help message\n");
2556 printf(
"--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2558 printf(
"-Hhostname: connect to MIDAS experiment on given host\n");
2559 printf(
"-Eexptname: connect to this MIDAS experiment\n");
2560 printf(
"--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2561 printf(
"--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2562 printf(
"--midas-exptname EXPTNAME -- connect to given experiment\n");
2563 printf(
"--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2564 printf(
"--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2565 printf(
"--midas-event-id III -- receive only events with matching event ID\n");
2566 printf(
"--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2568 printf(
"-oOutputfile.mid: write selected events into this file\n");
2569 printf(
"-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2570 printf(
"-eNNN: Number of events to analyze, 0=unlimited\n");
2571 printf(
"-sNNN: Number of events to skip before starting analysis\n");
2573 printf(
"--dump: activate the event dump module\n");
2575 printf(
"-t: Enable tracing of constructors, destructors and function calls\n");
2576 printf(
"-m: Enable memory leak debugging\n");
2577 printf(
"-g: Enable graphics display when processing data files\n");
2578 printf(
"-i: Enable intractive mode\n");
2580 printf(
"--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2585 printf(
"--no-profiler: Turn off manalyzer module profiler\n");
2586 printf(
"--pqiNNN: Profile multithread queue lengths every NNN events \n");
2589 printf(
"-Doutputdirectory: Specify output root file directory\n");
2590 printf(
"-Ooutputfile.root: Specify output root file filename\n");
2593 printf(
"--: All following arguments are passed to the analyzer modules Init() method\n");
2595 printf(
"Analyzer modules usage:\n");
2597 for (
unsigned i=0; i<(*gModules).size(); i++)
2600 printf(
"Example1: analyze online data: ./manalyzer.exe -R9091\n");
2601 printf(
"Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2609 return (s.substr(0, strlen(prefix)) == prefix);
2616 setbuf(stdout, NULL);
2617 setbuf(stderr, NULL);
2619 signal(SIGILL, SIG_DFL);
2620 signal(SIGBUS, SIG_DFL);
2621 signal(SIGSEGV, SIG_DFL);
2622 signal(SIGPIPE, SIG_DFL);
2624 std::vector<std::string> args;
2625 for (
int i=0; i<argc; i++) {
2626 if (strcmp(argv[i],
"-h")==0)
2628 args.push_back(argv[i]);
2633 int num_analyze = 0;
2637 bool event_dump =
false;
2638 bool demo_mode =
false;
2640 bool root_graphics =
false;
2642 bool interactive =
false;
2644 bool multithread =
false;
2646 bool performance_profiler =
true;
2647 int snap_shot_queue_length = 100;
2649 std::vector<std::string> files;
2650 std::vector<std::string> modargs;
2653 std::string midas_hostname =
"";
2654 std::string midas_exptname =
"";
2655 std::string midas_progname =
"ana";
2656 std::string midas_buffer =
"SYSTEM";
2658 std::string midas_sampling =
"GET_NONBLOCKING";
2659 int midas_event_id = -1;
2660 int midas_trigger_mask = -1;
2663 for (
unsigned int i=1; i<args.size(); i++) {
2664 std::string arg = args[i];
2668 for (
unsigned j=i+1; j<args.size(); j++)
2669 modargs.push_back(args[j]);
2671 }
else if (arg ==
"--dump") {
2673 }
else if (arg ==
"--demo") {
2677 }
else if (arg ==
"-g") {
2678 root_graphics =
true;
2680 }
else if (arg ==
"-i") {
2682 }
else if (arg ==
"-t") {
2689 num_skip = atoi(arg.c_str()+2);
2691 num_analyze = atoi(arg.c_str()+2);
2695 httpPort = atoi(arg.c_str()+2);
2698 midas_hostname = arg.c_str()+2;
2700 midas_exptname = arg.c_str()+2;
2701 }
else if (arg ==
"--midas-progname") {
2702 midas_progname = args[i+1]; i++;
2703 }
else if (arg ==
"--midas-hostname") {
2704 midas_hostname = args[i+1]; i++;
2705 }
else if (arg ==
"--midas-exptname") {
2706 midas_exptname = args[i+1]; i++;
2707 }
else if (arg ==
"--midas-buffer") {
2708 midas_buffer = args[i+1]; i++;
2709 }
else if (arg ==
"--midas-sampling") {
2710 midas_sampling = args[i+1]; i++;
2711 }
else if (arg ==
"--midas-event-id") {
2712 midas_event_id = atoi(args[i+1].c_str()); i++;
2713 }
else if (arg ==
"--midas-trigger-mask") {
2714 midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
2722 }
else if (arg ==
"--mt") {
2724 }
else if (arg ==
"--no-profiler") {
2725 performance_profiler = 0;
2727 snap_shot_queue_length = atoi(arg.c_str()+5);
2734 }
else if (arg ==
"-h") {
2736 }
else if (arg[0] ==
'-') {
2739 files.push_back(args[i]);
2744 gModules =
new std::vector<TAFactory*>;
2746 if ((*gModules).size() == 0)
2755 printf(
"Registered modules: %d\n", (
int)(*gModules).size());
2758 if (root_graphics) {
2767 #ifdef HAVE_THTTP_SERVER
2769 sprintf(str,
"http:127.0.0.1:%d?cors", httpPort);
2770 THttpServer *s =
new THttpServer(str);
2774 fprintf(stderr,
"ERROR: No support for the THttpServer!\n");
2778 for (
unsigned i=0; i<files.size(); i++) {
2779 printf(
"file[%d]: %s\n", i, files[i].c_str());
2783 ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2784 }
else if (files.size() > 0) {
2785 ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2789 ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2791 ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
R__EXTERN TDirectory * gDirectory
TARunObject * NewRunObject(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
void PauseRun(TARunInfo *runinfo)
EventDumpModule(TARunInfo *runinfo)
void EndRun(TARunInfo *runinfo)
void NextSubrun(TARunInfo *runinfo)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
void ResumeRun(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
TARunObject * NewRunObject(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
void PauseRun(TARunInfo *runinfo)
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
void EndRun(TARunInfo *runinfo)
static MainWindow * fgCtrlWindow
void ResumeRun(TARunInfo *runinfo)
InteractiveModule(TARunInfo *runinfo)
static ValueHolder * fgHolder
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
TGLayoutHints * fMenuBarItemLayout
TextButton * fNextFlowButton
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
TGCompositeFrame * fButtonsFrame
TextButton * fPauseButton
TextButton * fContinueButton
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
void Event(const void *data, int data_size)
void Transition(int transition, int run_number, int transition_time)
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
void StartRun(int run_number)
std::vector< double > fAnalyzeFlowEventRMS
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
std::vector< double > fAnalyzeFlowEventTimeTotal
std::vector< double > fTotalUserTime
std::vector< int > fAnalyzeEventEntries
std::vector< TH1D * > fUserHistograms
std::vector< double > fAnalyzeFlowEventMean
int fQueueIntervalCounter
std::vector< double > fAnalyzeEventRMS
std::vector< double > fAnalyzeEventTimeMax
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
std::vector< std::string > fModuleNames
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
std::chrono::system_clock::time_point fStartUser
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
std::vector< double > fAnalyzeEventTimeTotal
void AddModuleMap(const char *UserProfileName, unsigned long hash)
std::vector< double > fAnalyzeFlowEventTimeMax
void LogMTQueueLength(TARunInfo *runinfo)
std::map< unsigned int, int > fUserMap
std::vector< double > fAnalyzeEventMean
Profiler(const bool time_modules, const int queue_interval_check)
std::vector< double > fMaxUserTime
std::vector< TH1D * > fAnalyzeEventTimeHistograms
std::vector< int > fAnalyzeFlowEventEntries
std::vector< TH1D * > fAnalysisQueue
std::vector< TARunObject * > fRunRun
std::vector< std::string > fArgs
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
void CreateRun(int run_number, const char *file_name)
void AnalyzeSpecialEvent(TMEvent *event)
void EndRun(TAFlags *flags)
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
void PerModuleThread(int i)
void AnalyzeFlowQueue(TAFlags *ana_flags)
virtual void Init(const std::vector< std::string > &args)
int fMtQueueFullUSleepTime
std::vector< TAFlagsQueue > fMtFlagQueue
std::vector< std::mutex > fMtFlowQueueMutex
std::vector< bool > fMtThreadIsBusy
int fMtQueueEmptyUSleepTime
std::vector< std::thread * > fMtThreads
static int gfMtMaxBacklog
static bool gfMultithread
std::vector< TAFlowEventQueue > fMtFlowQueue
std::vector< bool > fMtThreadIsRunning
bool fMtShutdownRequested
static std::string fOutputDirectory
static TApplication * fgApp
static std::string fOutputFileName
static THttpServer * fgHttpServer
static TDirectory * fgDir
void AddToFlowQueue(TAFlowEvent *)
TAFlowEvent * ReadFlowQueue()
std::vector< std::string > fArgs
static std::vector< std::string > fgFileList
TAMultithreadHelper * fMtInfo
static int fgCurrentFileIndex
virtual void EndRun(TARunInfo *runinfo)
virtual void ResumeRun(TARunInfo *runinfo)
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
virtual void NextSubrun(TARunInfo *runinfo)
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
virtual void PauseRun(TARunInfo *runinfo)
virtual void BeginRun(TARunInfo *runinfo)
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
virtual void PreEndRun(TARunInfo *runinfo)
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
const std::string fModuleName
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
void Reset()
reset everything
void ParseEvent()
parse event data
std::vector< char > data
MIDAS event bytes.
size_t event_header_size
size of MIDAS event header
uint32_t serial_number
MIDAS event serial number.
uint32_t data_size
MIDAS event data size.
uint16_t event_id
MIDAS event ID.
MIDAS online connection, including access to online ODB.
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
std::chrono::high_resolution_clock::time_point TAClock
std::chrono::duration< double > TAClockDuration
#define TAFlag_SKIP_PROFILE
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
TMReaderInterface * TMNewReader(const char *source)
TMWriterInterface * TMNewWriter(const char *destination)
TMEvent * TMReadEvent(TMReaderInterface *reader)
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
static int gDefaultMultithreadWaitFull
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow, bool wait)
std::vector< TAFactory * > * gModules
int manalyzer_main(int argc, char *argv[])
static int gDefaultMultithreadWaitEmpty
static bool starts_with(const std::string &s, const char *prefix)
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static bool gEnableShowMem
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
static int gDefaultMultithreadQueueLength
static std::string to_string(int v)
int ShowMem(const char *label)