30 printf(
"TARunInfo::ctor!\n");
47 printf(
"TARunInfo::dtor!\n");
69 printf(
"TARunInfo::dtor: deleted %d queued flow events!\n", count);
87 printf(
"TAFlowEvent::ctor: chain %p\n", flow);
94 printf(
"TAFlowEvent::dtor: this %p, next %p\n",
this,
fNext);
109 printf(
"TARunObject::ctor, run %d\n", runinfo->
fRunNo);
115 printf(
"TARunObject::BeginRun, run %d\n", runinfo->
fRunNo);
121 printf(
"TARunObject::EndRun, run %d\n", runinfo->
fRunNo);
127 printf(
"TARunObject::NextSubrun, run %d\n", runinfo->
fRunNo);
133 printf(
"TARunObject::PauseRun, run %d\n", runinfo->
fRunNo);
139 printf(
"TARunObject::ResumeRun, run %d\n", runinfo->
fRunNo);
145 printf(
"TARunObject::PreEndRun, run %d\n", runinfo->
fRunNo);
151 printf(
"TARunObject::Analyze!\n");
162 printf(
"TARunObject::Analyze!\n");
173 printf(
"TARunObject::AnalyzeSpecialEvent!\n");
185 printf(
"TAFactory::Usage!\n");
191 printf(
"TAFactory::Init!\n");
197 printf(
"TAFactory::Finish!\n");
217 printf(
"TARootHelper::ctor!\n");
221 if (filename.empty()) {
223 sprintf(xfilename,
"output%05d.root", runinfo->
fRunNo);
226 filename = xfilename;
233 if (status < 0 && errno == ENOENT) {
234 fprintf(stdout,
"TARootHelper::ctor: creating output directory \"%s\"\n",
fOutputDirectory.c_str());
237 fprintf(stderr,
"TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n",
fOutputDirectory.c_str(), errno, strerror(errno));
243 fOutputFile =
new TFile(filename.c_str(),
"RECREATE");
246 fprintf(stderr,
"TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
260 printf(
"TARootHelper::dtor!\n");
284#ifdef HAVE_THTTP_SERVER
285#include "THttpServer.h"
304 fprintf(stderr,
"Waiting for all queues to empty out!\n");
306 int count_all_empty = 0;
309 int count_not_empty = 0;
310 size_t count_events = 0;
312 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
329 if (count_not_empty == 0) {
333 if (count_all_empty > 1) {
339 fprintf(stderr,
"Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (
int)count_events);
350 fprintf(stderr,
"Waiting for all threads to shut down!\n");
355 int count_running = 0;
357 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
362 if (count_running == 0) {
367 fprintf(stderr,
"Timeout waiting for all threads to shut down, %d still running!\n", count_running);
375 fprintf(stderr,
"Joining all threads!\n");
376 for (
unsigned i=0; i<mt->
fMtThreads.size(); i++) {
385 fprintf(stderr,
"Thread %d failed to shut down!\n", i);
392 fMtFlowQueueMutex(nModules),
393 fMtFlowQueue(nModules),
394 fMtFlagQueue(nModules),
395 fMtThreads(nModules,NULL),
396 fMtThreadIsRunning(nModules),
397 fMtThreadIsBusy(nModules),
398 fMtShutdownRequested(false),
399 fMtQuitRequested(false)
424 for (
size_t i=0; i<nmodules; i++) {
441 printf(
"TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
482 printf(
"Multithread queue lengths:\n");
500 gModules =
new std::vector<TAFactory*>;
508 gettimeofday(&tv,NULL);
509 return tv.tv_sec + 0.000001*tv.tv_usec;
532 return elapsed_seconds.count();
591 Profiler(
const int queue_interval_check );
593 void Begin(
TARunInfo* runinfo,
const std::vector<TARunObject*> fRunRun );
600 void AddModuleMap(
const char* UserProfileName,
unsigned long hash);
609 fQueueIntervalCounter(0),
610 fQueueInterval(queue_interval_check)
614 printf(
"Profiler::ctor\n");
618 fStartUser = std::chrono::system_clock::now();
624 printf(
"Profiler::dtor\n");
642 printf(
"Profiler::begin\n");
661 Double_t bins[Nbins+1];
663 Double_t TimeRange = 10;
666 for (
int i=0; i<Nbins+1; i++) {
667 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
675 for (
size_t i = 0; i < runrun.size(); i++) {
677 if (runrun[i]->fModuleName.empty())
678 fModuleNames.push_back(
"Unnamed Module " + std::to_string(i));
699 TH1D* Histo =
new TH1D( TString(std::to_string(i) +
"_" +
fModuleNames.at(i)),
700 TString(
fModuleNames.at(i) +
" Event Proccessing Time; s"),
705 TH1D* AnalyzeEventHisto =
new TH1D(TString(std::to_string(i) +
"_" +
fModuleNames.at(i) +
"_TMEvent"),
706 TString(
fModuleNames.at(i) +
" Flow Proccessing Time; s"),
719 TH1D* QueueHisto =
new TH1D(TString(std::to_string(i) +
"_" +
fModuleNames.at(i) +
"_Queue"),
720 TString(
fModuleNames.at(i) +
" Multithread Queue Length; Queue Depth"),
736 printf(
"Profiler::log\n");
745 std::chrono::duration<double> elapsed_seconds = stop - start;
746 double dt = elapsed_seconds.count();
764 printf(
"Profiler::log_AnalyzeEvent_time\n");
773 std::chrono::duration<double> elapsed_seconds = stop - start;
774 double dt = elapsed_seconds.count();
793 printf(
"Profiler::log_mt_queue_length\n");
814 printf(
"Profiler::LogUserWindows\n");
818 std::vector<TAFlowEvent*> flowArray;
822 flowArray.push_back(f);
826 for (
int ii=FlowEvents-1; ii>=0; ii--) {
831 unsigned int hash = std::hash<std::string>{}(timer->
fModuleName);
844 fprintf(stderr,
"manalyzer must be built with ROOT for using the user profiling tools\n");
851 printf(
"Profiler::AddModuleMap\n");
859 Double_t bins[Nbins+1];
860 Double_t TimeRange = 10;
861 for (
int i=0; i<Nbins+1; i++) {
862 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
864 TH1D* Histo =
new TH1D(UserProfileName,UserProfileName,Nbins,bins);
875 printf(
"Profiler::End\n");
917 double AllAnalyzeFlowEventTime=0;
919 AllAnalyzeFlowEventTime += n;
920 double AllAnalyzeEventTime=0;
922 AllAnalyzeEventTime += n;
924 printf(
"Module average processing time\n");
925 printf(
" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
926 printf(
"Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
927 printf(
"----------------------------------------------------------------------------------------------------------------\n");
931 printf(
"\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
938 printf(
"\t-\t-\t-\t-\t-");
941 printf(
"\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
948 printf(
"\t-\t-\t-\t-\t-");
951 printf(
"----------------------------------------------------------------------------------------------------------------\n");
952 printf(
" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
953 printf(
" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
956 printf(
"Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
957 printf(
"----------------------------------------------------------------------\n");
959 printf(
"%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",
fUserHistograms.at(i)->GetTitle(),
966 printf(
"----------------------------------------------------------------------\n");
968 printf(
"----------------------------------------------------------------------------------------------------------------\n");
971 printf(
"To use custom profile windows, please build rootana with root\n");
976 double cputime = (double)(clock() -
fStartCPU)/CLOCKS_PER_SEC;
977 std::chrono::duration<double> usertime = std::chrono::system_clock::now() -
fStartUser;
978 printf(
"%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
982 100.*cputime/usertime.count());
999 bool fMultithreadEnabled =
false;
1001 bool fProfilerEnabled =
false;
1004 RunHandler(
const std::vector<std::string>& args,
bool multithread,
bool profile,
int queue_interval_check)
1008 fMultithreadEnabled = multithread;
1010 fProfilerEnabled = profile;
1011 fProfilerIntervalCheck = queue_interval_check;
1029 int nModules=(*gModules).size();
1070 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1121 assert(fRunInfo == NULL);
1122 assert(fRunRun.size() == 0);
1124 fRunInfo =
new TARunInfo(run_number, file_name, fArgs);
1126 if (fProfilerEnabled)
1127 fProfiler =
new Profiler( fProfilerIntervalCheck );
1129 int nModules = (*gModules).size();
1131 for (
int i=0; i<nModules; i++)
1132 fRunRun.push_back((*
gModules)[i]->NewRunObject(fRunInfo));
1134 if (fMultithreadEnabled) {
1137 for (
int i=0; i<nModules; i++) {
1138 printf(
"Create fMtFlowQueue thread %d\n",i);
1146 assert(fRunInfo != NULL);
1147 assert(fRunInfo->
fOdb != NULL);
1149 fProfiler->
Begin(fRunInfo, fRunRun);
1150 for (
unsigned i=0; i<fRunRun.size(); i++)
1151 fRunRun[i]->BeginRun(fRunInfo);
1170 for (
unsigned i=0; i<fRunRun.size(); i++)
1171 fRunRun[i]->PreEndRun(fRunInfo);
1176 AnalyzeFlowQueue(flags);
1191 for (
unsigned i=0; i<fRunRun.size(); i++)
1192 fRunRun[i]->EndRun(fRunInfo);
1195 fProfiler->
End(fRunInfo);
1204 for (
unsigned i=0; i<fRunRun.size(); i++)
1205 fRunRun[i]->NextSubrun(fRunInfo);
1212 for (
unsigned i=0; i<fRunRun.size(); i++) {
1218 assert(fRunRun.size() == 0);
1231 for (
unsigned i=0; i<fRunRun.size(); i++)
1232 fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1237 for (
unsigned i=0; i<fRunRun.size(); i++) {
1239 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1266 flow = AnalyzeFlowEvent(&flags, flow);
1278 assert(fRunInfo != NULL);
1279 assert(fRunInfo->
fOdb != NULL);
1280 assert(event != NULL);
1281 assert(flags != NULL);
1291 for (
unsigned i=0; i<fRunRun.size(); i++) {
1293 flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1310 flow = AnalyzeFlowEvent(flags, flow);
1315 if (fProfiler && !fRunInfo->
fMtInfo) {
1335 AnalyzeFlowQueue(flags);
1341 if (fFlowQueue.empty())
1345 fFlowQueue.pop_front();
1355 fFlowQueue.push_back(flow);
1369static bool gRunStartRequested =
false;
1370static bool gRunStopRequested =
false;
1372class RpcHandler:
public TMFeRpcHandlerInterface
1374 TMFeResult HandleBeginRun(
int run_number)
1376 printf(
"RpcHandler::HandleBeginRun(%d)\n", run_number);
1377 gRunStartRequested =
true;
1378 gRunStopRequested =
false;
1382 TMFeResult HandleEndRun(
int run_number)
1384 printf(
"RpcHandler::HandleEndRun(%d)\n", run_number);
1385 gRunStartRequested =
false;
1386 gRunStopRequested =
true;
1390 TMFeResult HandleStartAbortRun(
int run_number)
1392 printf(
"RpcHandler::HandleStartAbortRun(%d)\n", run_number);
1394 gRunStartRequested =
false;
1395 gRunStopRequested =
true;
1399 TMFeResult HandleRpc(
const char* cmd,
const char* args, std::string& result)
1405TMFeResult ReceiveEvent(TMEventBuffer* b,
TMEvent *e,
int timeout_msec = 0)
1412 TMFeResult r = b->ReceiveEvent(&e->
data, timeout_msec);
1417 if (e->
data.size() == 0)
1427static int ProcessMidasOnlineTmfe(
const std::vector<std::string>& args,
const char* progname,
const char* hostname,
const char* exptname,
const char* bufname,
int event_id,
int trigger_mask,
const char* sampling_type_string,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1429 TMFE *mfe = TMFE::Instance();
1431 TMFeResult r = mfe->Connect(progname, hostname, exptname);
1434 fprintf(stderr,
"Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1440 TMEventBuffer *b =
new TMEventBuffer(mfe);
1444 r = b->OpenBuffer(bufname);
1447 fprintf(stderr,
"Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1452 size_t cache_size = 100000;
1453 if(!strcmp(sampling_type_string,
"GET_RECENT"))
1455 r = b->SetCacheSize(cache_size, 0);
1458 fprintf(stderr,
"Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1464 r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1467 fprintf(stderr,
"Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1471 RpcHandler* h =
new RpcHandler();
1473 mfe->AddRpcHandler(h);
1475 mfe->DeregisterTransitionPause();
1476 mfe->DeregisterTransitionResume();
1477 mfe->SetTransitionSequenceStart(300);
1478 mfe->SetTransitionSequenceStop(700);
1480 mfe->StartRpcThread();
1484 RunHandler rh(args, multithread, profiler, queue_interval_check);
1486 for (
unsigned i=0; i<(*gModules).size(); i++)
1489 if (mfe->fStateRunning) {
1490 rh.CreateRun(mfe->fRunNumber, NULL);
1491 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1497 while (!mfe->fShutdownRequested) {
1498 bool do_sleep =
true;
1500 if (gRunStartRequested) {
1501 gRunStartRequested =
false;
1506 rh.fRunInfo->fOdb = NULL;
1510 rh.CreateRun(mfe->fRunNumber, NULL);
1511 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1517 if (gRunStopRequested) {
1518 gRunStopRequested =
false;
1523 rh.fRunInfo->fOdb = NULL;
1534 r = ReceiveEvent(b, &e, 100);
1537 fprintf(stderr,
"Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1544 if ((e.
data_size > 0) && (rh.fRunInfo != NULL)) {
1551 rh.AnalyzeEvent(&e, &flags, writer);
1554 mfe->fShutdownRequested =
true;
1557 if (num_analyze > 0) {
1559 if (num_analyze == 0) {
1560 mfe->fShutdownRequested =
true;
1567#ifdef HAVE_THTTP_SERVER
1578 gSystem->DispatchOneEvent(kTRUE);
1594 rh.fRunInfo->fOdb = NULL;
1598 for (
unsigned i=0; i<(*gModules).size(); i++)
1602 r = b->CloseBuffer();
1608 fprintf(stderr,
"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1630 int fNumAnalyze = 0;
1636 : fRun(args, multithread, profiler, queue_interval_check)
1638 fNumAnalyze = num_analyze;
1656 void Transition(
int transition,
int run_number,
int transition_time)
1660 if (transition == TR_START) {
1671 StartRun(run_number);
1672 printf(
"Begin run: %d\n", run_number);
1673 }
else if (transition == TR_STOP) {
1680 printf(
"End of run %d\n", run_number);
1684 void Event(
const void* data,
int data_size)
1701 if (fNumAnalyze > 0) {
1703 if (fNumAnalyze == 0)
1714static int ProcessMidasOnlineOld(
const std::vector<std::string>& args,
const char* hostname,
const char* exptname,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1718 int err = midas->
connect(hostname, exptname,
"rootana");
1720 fprintf(stderr,
"Cannot connect to MIDAS, error %d\n", err);
1738 odb->
RI(
"runinfo/run number", &run_number);
1739 odb->
RI(
"runinfo/state", &run_state);
1741 for (
unsigned i=0; i<(*gModules).size(); i++)
1744 if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1749#ifdef HAVE_THTTP_SERVER
1756 gSystem->DispatchOneEvent(kTRUE);
1770 for (
unsigned i=0; i<(*gModules).size(); i++)
1774 delete odb; odb = NULL;
1788static int ProcessMidasFiles(
const std::vector<std::string>& files,
const std::vector<std::string>& args,
int num_skip,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1790 int number_of_missing_files = 0;
1794 for (
unsigned i=0; i<files.size(); i++)
1797 for (
unsigned i=0; i<(*gModules).size(); i++)
1800 RunHandler run(args, multithread, profiler, queue_interval_check);
1812 printf(
"Could not open \"%s\", error: %s\n", filename.c_str(), reader->
fErrorString.c_str());
1814 number_of_missing_files++;
1829 if (event->event_id == 0x8000)
1831 int runno =
event->serial_number;
1862 else if (event->event_id == 0x8001)
1876 else if (event->event_id == 0x8002)
1901 if (num_analyze > 0) {
1903 if (num_analyze == 0)
1916 gSystem->DispatchOneEvent(kTRUE);
1928#ifdef HAVE_THTTP_SERVER
1931 gSystem->DispatchOneEvent(kTRUE);
1945 for (
unsigned i=0; i<(*gModules).size(); i++)
1947 if (number_of_missing_files)
1949 printf(
"%d midas files were not openable\n",number_of_missing_files);
1950 return number_of_missing_files;
1955static int ProcessDemoMode(
const std::vector<std::string>& args,
int num_skip,
int num_analyze,
TMWriterInterface* writer,
bool multithread,
bool profiler,
int queue_interval_check)
1957 for (
unsigned i=0; i<(*gModules).size(); i++)
1960 RunHandler run(args, multithread, profiler, queue_interval_check);
1966 for (
unsigned i=0;
true; i++) {
1968 snprintf(s,
sizeof(s),
"%03d", i);
1969 std::string filename = std::string(
"demo_subrun_") + s;
1988 for (
unsigned j=0; j<100; j++) {
1989 event->Init(0x0001, 0xFFFF, j+1, 0, 0);
1990 uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 };
1991 event->AddBank(
"TEST",
TID_DWORD, (
const char*)test_data,
sizeof(test_data));
2003 if (num_analyze > 0) {
2005 if (num_analyze == 0)
2015 gSystem->DispatchOneEvent(kTRUE);
2035 for (
unsigned i=0; i<(*gModules).size(); i++)
2044static int ShowMem(
const char* label)
2049 FILE* fp = fopen(
"/proc/self/statm",
"r");
2054 fscanf(fp,
"%d",&mem);
2058 printf(
"memory at %s is %d\n", label, mem);
2071 printf(
"EventDumpModule::ctor, run %d\n", runinfo->
fRunNo);
2077 printf(
"EventDumpModule::dtor!\n");
2082 printf(
"EventDumpModule::BeginRun, run %d\n", runinfo->
fRunNo);
2087 printf(
"EventDumpModule::EndRun, run %d\n", runinfo->
fRunNo);
2092 printf(
"EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2097 printf(
"EventDumpModule::PauseRun, run %d\n", runinfo->
fRunNo);
2102 printf(
"EventDumpModule::ResumeRun, run %d\n", runinfo->
fRunNo);
2107 printf(
"EventDumpModule::Analyze, run %d, ", runinfo->
fRunNo);
2108 event->FindAllBanks();
2109 std::string h =
event->HeaderToString();
2110 std::string b =
event->BankListToString();
2111 printf(
"%s: %s\n", h.c_str(), b.c_str());
2117 printf(
"EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->
fRunNo, event->
serial_number, (
int)event->
event_id, event->
data_size);
2125 void Init(
const std::vector<std::string> &args)
2128 printf(
"EventDumpModuleFactory::Init!\n");
2134 printf(
"EventDumpModuleFactory::Finish!\n");
2140 printf(
"EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2147#include <TGButton.h>
2148#include <TBrowser.h>
2152#define CTRL_CONTINUE 3
2154#define CTRL_NEXT_FLOW 5
2156#define CTRL_TBROWSER 11
2185 printf(
"Pressed!\n");
2190 printf(
"Released!\n");
2198 fHolder->
fValue = fValue;
2226 printf(
"MainWindow::ctor!\n");
2231 SetWindowName(
"ROOT Analyzer Control");
2234 fMenu =
new TGPopupMenu(gClient->GetRoot());
2236 fMenu->AddEntry(
"-", 0);
2241 fMenu->AddEntry(
"-", 0);
2244 fMenuBarItemLayout =
new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2246 fMenu->Associate(
this);
2248 fMenuBar =
new TGMenuBar(
this, 1, 1, kRaisedFrame);
2249 fMenuBar->AddPopup(
"&Rootana", fMenu, fMenuBarItemLayout);
2252 AddFrame(fMenuBar,
new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2254 fButtonsFrame =
new TGVerticalFrame(
this);
2259 fButtonsFrame->AddFrame(fNextButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2260 fButtonsFrame->AddFrame(fNextFlowButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2262 TGHorizontalFrame *hframe =
new TGHorizontalFrame(fButtonsFrame);
2267 hframe->AddFrame(fContinueButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2268 hframe->AddFrame(fPauseButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2270 fButtonsFrame->AddFrame(hframe,
new TGLayoutHints(kLHintsExpandX));
2273 fButtonsFrame->AddFrame(fQuitButton,
new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2275 AddFrame(fButtonsFrame,
new TGLayoutHints(kLHintsExpandX));
2279 Resize(GetDefaultSize());
2286 printf(
"MainWindow::dtor!\n");
2290 delete fMenuBarItemLayout;
2296 printf(
"MainWindow::CloseWindow()\n");
2306 switch (GET_MSG(msg))
2311 switch (GET_SUBMSG(msg))
2354 printf(
"InteractiveModule::ctor, run %d\n", runinfo->
fRunNo);
2361 if (!fgCtrlWindow && runinfo->
fRoot->
fgApp) {
2362 fgCtrlWindow =
new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2370 printf(
"InteractiveModule::dtor!\n");
2375 printf(
"InteractiveModule::BeginRun, run %d\n", runinfo->
fRunNo);
2380 printf(
"InteractiveModule::EndRun, run %d\n", runinfo->
fRunNo);
2389#ifdef HAVE_THTTP_SERVER
2396 gSystem->DispatchOneEvent(kTRUE);
2401 TMFE* mfe = TMFE::Instance();
2403 if (mfe->fShutdownRequested) {
2416 int ctrl = fgHolder->
fValue;
2434 printf(
"InteractiveModule::PauseRun, run %d\n", runinfo->
fRunNo);
2439 printf(
"InteractiveModule::ResumeRun, run %d\n", runinfo->
fRunNo);
2447#ifdef HAVE_THTTP_SERVER
2454 gSystem->DispatchOneEvent(kTRUE);
2459 TMFE* mfe = TMFE::Instance();
2461 if (mfe->fShutdownRequested) {
2475 int ctrl = fgHolder->
fValue;
2497 fprintf(stdout,
"manalyzer> "); fflush(stdout);
2498 char* s = fgets(str,
sizeof(str)-1, stdin);
2506 printf(
"command [%s]\n", str);
2508 if (str[0] ==
'h') {
2509 printf(
"Interactive manalyzer commands:\n");
2510 printf(
" q - quit\n");
2511 printf(
" h - help\n");
2512 printf(
" c - continue until next TAFlag_DISPLAY event\n");
2513 printf(
" n - next event\n");
2514 printf(
" aNNN - analyze N events, i.e. \"a10\"\n");
2515 }
else if (str[0] ==
'q') {
2518 }
else if (str[0] ==
'n') {
2520 }
else if (str[0] ==
'c') {
2523 }
else if (str[0] ==
'a') {
2524 int num = atoi(str+1);
2525 printf(
"Analyzing %d events\n", num);
2536 printf(
"InteractiveModule::Analyze, run %d, %s\n", runinfo->
fRunNo, event->
HeaderToString().c_str());
2558 InteractiveLoop(runinfo, flags);
2565 printf(
"InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->
fRunNo);
2582 InteractiveLoop(runinfo, flags);
2590 printf(
"InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->
fRunNo, event->
serial_number, (
int)event->
event_id, event->
data_size);
2603 void Init(
const std::vector<std::string> &args)
2606 printf(
"InteractiveModuleFactory::Init!\n");
2612 printf(
"InteractiveModuleFactory::Finish!\n");
2618 printf(
"InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->
fRunNo, runinfo->
fFileName.c_str());
2631 printf(
"\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2633 printf(
"-h: print this help message\n");
2634 printf(
"--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2636 printf(
"-Hhostname: connect to MIDAS experiment on given host\n");
2637 printf(
"-Eexptname: connect to this MIDAS experiment\n");
2638 printf(
"--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2639 printf(
"--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2640 printf(
"--midas-exptname EXPTNAME -- connect to given experiment\n");
2641 printf(
"--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2642 printf(
"--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2643 printf(
"--midas-event-id III -- receive only events with matching event ID\n");
2644 printf(
"--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2646 printf(
"-oOutputfile.mid: write selected events into this file\n");
2647 printf(
"-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2648 printf(
"-eNNN: Number of events to analyze, 0=unlimited\n");
2649 printf(
"-sNNN: Number of events to skip before starting analysis\n");
2651 printf(
"--dump: activate the event dump module\n");
2653 printf(
"-t: Enable tracing of constructors, destructors and function calls\n");
2654 printf(
"-m: Enable memory leak debugging\n");
2655 printf(
"-g: Enable graphics display when processing data files\n");
2656 printf(
"-i: Enable intractive mode\n");
2658 printf(
"--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2663 printf(
"--no-profiler: Turn off manalyzer module profiler\n");
2664 printf(
"--pqiNNN: Profile multithread queue lengths every NNN events \n");
2667 printf(
"-Doutputdirectory: Specify output root file directory\n");
2668 printf(
"-Ooutputfile.root: Specify output root file filename\n");
2671 printf(
"--: All following arguments are passed to the analyzer modules Init() method\n");
2673 printf(
"Analyzer modules usage:\n");
2675 for (
unsigned i=0; i<(*gModules).size(); i++)
2678 printf(
"Example1: analyze online data: ./manalyzer.exe -R9091\n");
2679 printf(
"Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2687 return (s.substr(0, strlen(prefix)) == prefix);
2694 setbuf(stdout, NULL);
2695 setbuf(stderr, NULL);
2697 signal(SIGILL, SIG_DFL);
2698 signal(SIGBUS, SIG_DFL);
2699 signal(SIGSEGV, SIG_DFL);
2700 signal(SIGPIPE, SIG_DFL);
2702 std::vector<std::string> args;
2703 for (
int i=0; i<argc; i++) {
2704 if (strcmp(argv[i],
"-h")==0)
2706 args.push_back(argv[i]);
2711 int num_analyze = 0;
2715 bool event_dump =
false;
2716 bool demo_mode =
false;
2718 bool root_graphics =
false;
2720 bool interactive =
false;
2722 bool multithread =
false;
2724 bool performance_profiler =
true;
2725 int snap_shot_queue_length = 100;
2727 std::vector<std::string> files;
2728 std::vector<std::string> modargs;
2731 std::string midas_hostname =
"";
2732 std::string midas_exptname =
"";
2733 std::string midas_progname =
"ana";
2734 std::string midas_buffer =
"SYSTEM";
2736 std::string midas_sampling =
"GET_NONBLOCKING";
2737 int midas_event_id = -1;
2738 int midas_trigger_mask = -1;
2741 for (
unsigned int i=1; i<args.size(); i++) {
2742 std::string arg = args[i];
2746 for (
unsigned j=i+1; j<args.size(); j++)
2747 modargs.push_back(args[j]);
2749 }
else if (arg ==
"--dump") {
2751 }
else if (arg ==
"--demo") {
2755 }
else if (arg ==
"-g") {
2756 root_graphics =
true;
2758 }
else if (arg ==
"-i") {
2760 }
else if (arg ==
"-t") {
2767 num_skip = atoi(arg.c_str()+2);
2769 num_analyze = atoi(arg.c_str()+2);
2773 httpPort = atoi(arg.c_str()+2);
2776 midas_hostname = arg.c_str()+2;
2778 midas_exptname = arg.c_str()+2;
2779 }
else if (arg ==
"--midas-progname") {
2780 midas_progname = args[i+1]; i++;
2781 }
else if (arg ==
"--midas-hostname") {
2782 midas_hostname = args[i+1]; i++;
2783 }
else if (arg ==
"--midas-exptname") {
2784 midas_exptname = args[i+1]; i++;
2785 }
else if (arg ==
"--midas-buffer") {
2786 midas_buffer = args[i+1]; i++;
2787 }
else if (arg ==
"--midas-sampling") {
2788 midas_sampling = args[i+1]; i++;
2789 }
else if (arg ==
"--midas-event-id") {
2790 midas_event_id = atoi(args[i+1].c_str()); i++;
2791 }
else if (arg ==
"--midas-trigger-mask") {
2792 midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
2800 }
else if (arg ==
"--mt") {
2802 }
else if (arg ==
"--no-profiler") {
2803 performance_profiler = 0;
2805 snap_shot_queue_length = atoi(arg.c_str()+5);
2812 }
else if (arg ==
"-h") {
2814 }
else if (arg[0] ==
'-') {
2817 files.push_back(args[i]);
2822 gModules =
new std::vector<TAFactory*>;
2824 if ((*gModules).size() == 0)
2833 printf(
"Registered modules: %d\n", (
int)(*gModules).size());
2836 if (root_graphics) {
2845#ifdef HAVE_THTTP_SERVER
2847 sprintf(str,
"http:127.0.0.1:%d?cors", httpPort);
2848 THttpServer *s =
new THttpServer(str);
2852 fprintf(stderr,
"ERROR: No support for the THttpServer!\n");
2856 for (
unsigned i=0; i<files.size(); i++) {
2857 printf(
"file[%d]: %s\n", i, files[i].c_str());
2861 exit_state =
ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2862 }
else if (files.size() > 0) {
2863 exit_state =
ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2867 exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2869 exit_state =
ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
R__EXTERN TDirectory * gDirectory
TARunObject * NewRunObject(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
void PauseRun(TARunInfo *runinfo)
EventDumpModule(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
void EndRun(TARunInfo *runinfo)
void NextSubrun(TARunInfo *runinfo)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
void ResumeRun(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
TARunObject * NewRunObject(TARunInfo *runinfo)
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
void PauseRun(TARunInfo *runinfo)
void EndRun(TARunInfo *runinfo)
static MainWindow * fgCtrlWindow
void ResumeRun(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
InteractiveModule(TARunInfo *runinfo)
static ValueHolder * fgHolder
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
TGLayoutHints * fMenuBarItemLayout
TextButton * fNextFlowButton
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
TGCompositeFrame * fButtonsFrame
TextButton * fPauseButton
TextButton * fContinueButton
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
void Event(const void *data, int data_size)
void Transition(int transition, int run_number, int transition_time)
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
void StartRun(int run_number)
std::vector< double > fAnalyzeFlowEventRMS
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
std::vector< double > fAnalyzeFlowEventTimeTotal
std::vector< double > fTotalUserTime
std::vector< int > fAnalyzeEventEntries
std::vector< TH1D * > fUserHistograms
std::vector< double > fAnalyzeFlowEventMean
std::atomic< int > fQueueIntervalCounter
std::vector< double > fAnalyzeEventRMS
std::vector< double > fAnalyzeEventTimeMax
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
std::vector< std::string > fModuleNames
void End(TARunInfo *runinfo)
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
std::chrono::system_clock::time_point fStartUser
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
std::vector< double > fAnalyzeEventTimeTotal
void AddModuleMap(const char *UserProfileName, unsigned long hash)
std::vector< double > fAnalyzeFlowEventTimeMax
void LogMTQueueLength(TARunInfo *runinfo)
std::map< unsigned int, int > fUserMap
std::vector< double > fAnalyzeEventMean
std::vector< double > fMaxUserTime
std::vector< TH1D * > fAnalyzeEventTimeHistograms
Profiler(const int queue_interval_check)
std::vector< int > fAnalyzeFlowEventEntries
std::vector< TH1D * > fAnalysisQueue
std::vector< TARunObject * > fRunRun
std::vector< std::string > fArgs
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
void CreateRun(int run_number, const char *file_name)
void AnalyzeSpecialEvent(TMEvent *event)
void EndRun(TAFlags *flags)
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
int fProfilerIntervalCheck
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
void PerModuleThread(int i)
void AnalyzeFlowQueue(TAFlags *ana_flags)
virtual void Init(const std::vector< std::string > &args)
int fMtQueueFullUSleepTime
std::vector< TAFlagsQueue > fMtFlagQueue
std::vector< std::mutex > fMtFlowQueueMutex
int fMtQueueEmptyUSleepTime
std::atomic< bool > fMtShutdownRequested
std::vector< std::thread * > fMtThreads
std::vector< std::atomic< bool > > fMtThreadIsBusy
TAMultithreadHelper(int nModules)
static int gfMtMaxBacklog
static bool gfMultithread
std::vector< TAFlowEventQueue > fMtFlowQueue
std::vector< std::atomic< bool > > fMtThreadIsRunning
std::atomic< bool > fMtQuitRequested
static std::string fOutputDirectory
static TApplication * fgApp
static std::string fOutputFileName
static THttpServer * fgHttpServer
static TDirectory * fgDir
void AddToFlowQueue(TAFlowEvent *)
TAFlowEvent * ReadFlowQueue()
std::vector< std::string > fArgs
static std::vector< std::string > fgFileList
TAMultithreadHelper * fMtInfo
static int fgCurrentFileIndex
virtual void EndRun(TARunInfo *runinfo)
virtual void ResumeRun(TARunInfo *runinfo)
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
virtual void NextSubrun(TARunInfo *runinfo)
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
virtual void PauseRun(TARunInfo *runinfo)
virtual void BeginRun(TARunInfo *runinfo)
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
virtual void PreEndRun(TARunInfo *runinfo)
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
const std::string fModuleName
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
void Reset()
reset everything
void ParseEvent()
parse event data
std::vector< char > data
MIDAS event bytes.
size_t event_header_size
size of MIDAS event header
uint32_t serial_number
MIDAS event serial number.
uint32_t data_size
MIDAS event data size.
uint16_t event_id
MIDAS event ID.
MIDAS online connection, including access to online ODB.
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
std::chrono::high_resolution_clock::time_point TAClock
#define TAFlag_SKIP_PROFILE
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
TMWriterInterface * TMNewWriter(const char *destination)
TMEvent * TMReadEvent(TMReaderInterface *reader)
TMReaderInterface * TMNewReader(const char *source)
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
std::chrono::high_resolution_clock::time_point TAClock
std::chrono::duration< double > TAClockDuration
#define TAFlag_SKIP_PROFILE
static int gDefaultMultithreadWaitFull
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow, bool wait)
std::vector< TAFactory * > * gModules
int manalyzer_main(int argc, char *argv[])
static int gDefaultMultithreadWaitEmpty
static bool starts_with(const std::string &s, const char *prefix)
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static bool gEnableShowMem
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
static int gDefaultMultithreadQueueLength
int ShowMem(const char *label)