ROOTANA
manalyzer.cxx
Go to the documentation of this file.
1 //
2 // MIDAS analyzer
3 //
4 // K.Olchanski
5 //
6 
7 #undef NDEBUG // this program requires working assert()
8 
9 #include <stdio.h>
10 #include <unistd.h> // usleep()
11 #include <assert.h>
12 #include <sys/stat.h> // struct stat_buffer;
13 
14 #include "manalyzer.h"
15 #include "midasio.h"
16 
17 //////////////////////////////////////////////////////////
18 
19 static bool gTrace = false;
20 
21 //////////////////////////////////////////////////////////
22 //
23 // Methods of TARunInfo
24 //
25 //////////////////////////////////////////////////////////
26 
27 TARunInfo::TARunInfo(int runno, const char* filename, const std::vector<std::string>& args)
28 {
29  if (gTrace)
30  printf("TARunInfo::ctor!\n");
31  fRunNo = runno;
32  if (filename)
33  fFileName = filename;
34  fOdb = NULL;
35 #ifdef HAVE_ROOT
36  fRoot = new TARootHelper(this);
37 #else
38  fRoot = NULL;
39 #endif
40  fMtInfo = NULL;
41  fArgs = args;
42 }
43 
45 {
46  if (gTrace)
47  printf("TARunInfo::dtor!\n");
48  fRunNo = 0;
49  fFileName = "(deleted)";
50  if (fOdb) {
51  delete fOdb;
52  fOdb = NULL;
53  }
54 #ifdef HAVE_ROOT
55  if (fRoot) {
56  delete fRoot;
57  fRoot = NULL;
58  }
59 #endif
60  int count = 0;
61  while (1) {
62  TAFlowEvent* flow = ReadFlowQueue();
63  if (!flow)
64  break;
65  delete flow;
66  count++;
67  }
68  if (gTrace) {
69  printf("TARunInfo::dtor: deleted %d queued flow events!\n", count);
70  }
71 
72  if (fMtInfo) {
73  delete fMtInfo;
74  fMtInfo = NULL;
75  }
76 }
77 
78 //////////////////////////////////////////////////////////
79 //
80 // Methods of TAFlowEvent
81 //
82 //////////////////////////////////////////////////////////
83 
85 {
86  if (gTrace)
87  printf("TAFlowEvent::ctor: chain %p\n", flow);
88  fNext = flow;
89 }
90 
92 {
93  if (gTrace)
94  printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext);
95  if (fNext)
96  delete fNext;
97  fNext = NULL;
98 }
99 
100 //////////////////////////////////////////////////////////
101 //
102 // Methods of TARunObject
103 //
104 //////////////////////////////////////////////////////////
105 
107 {
108  if (gTrace)
109  printf("TARunObject::ctor, run %d\n", runinfo->fRunNo);
110 }
111 
113 {
114  if (gTrace)
115  printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo);
116 }
117 
119 {
120  if (gTrace)
121  printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo);
122 }
123 
125 {
126  if (gTrace)
127  printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo);
128 }
129 
131 {
132  if (gTrace)
133  printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo);
134 }
135 
137 {
138  if (gTrace)
139  printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo);
140 }
141 
143 {
144  if (gTrace)
145  printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo);
146 }
147 
149 {
150  if (gTrace)
151  printf("TARunObject::Analyze!\n");
152 
153  // This default analyze function does no work, instruct the Profiler to not time / log this
154  *flags|=TAFlag_SKIP_PROFILE;
155 
156  return flow;
157 }
158 
160 {
161  if (gTrace)
162  printf("TARunObject::Analyze!\n");
163 
164  // This default analyze function does no work, instruct the Profiler to not time / log this
165  *flags|=TAFlag_SKIP_PROFILE;
166 
167  return flow;
168 }
169 
171 {
172  if (gTrace)
173  printf("TARunObject::AnalyzeSpecialEvent!\n");
174 }
175 
176 //////////////////////////////////////////////////////////
177 //
178 // Methods of TAFactory
179 //
180 //////////////////////////////////////////////////////////
181 
183 {
184  if (gTrace)
185  printf("TAFactory::Usage!\n");
186 }
187 
188 void TAFactory::Init(const std::vector<std::string> &args)
189 {
190  if (gTrace)
191  printf("TAFactory::Init!\n");
192 }
193 
195 {
196  if (gTrace)
197  printf("TAFactory::Finish!\n");
198 }
199 
200 #ifdef HAVE_ROOT
201 
202 //////////////////////////////////////////////////////////
203 //
204 // Methods of TARootHelper
205 //
206 //////////////////////////////////////////////////////////
207 
208 std::string TARootHelper::fOutputDirectory = "root_output_files";
209 std::string TARootHelper::fOutputFileName = "";
210 TApplication* TARootHelper::fgApp = NULL;
212 THttpServer* TARootHelper::fgHttpServer = NULL;
213 
214 TARootHelper::TARootHelper(const TARunInfo* runinfo) // ctor
215 {
216  if (gTrace)
217  printf("TARootHelper::ctor!\n");
218 
219  std::string filename = fOutputFileName;
220 
221  if (filename.empty()) {
222  char xfilename[256];
223  sprintf(xfilename, "output%05d.root", runinfo->fRunNo);
224 
225  if (fOutputDirectory.empty()) {
226  filename = xfilename;
227  } else {
228  filename = fOutputDirectory + "/" + xfilename;
229 
230  struct stat buffer;
231  int status = stat(fOutputDirectory.c_str(), &buffer);
232 
233  if (status < 0 && errno == ENOENT) {
234  fprintf(stdout, "TARootHelper::ctor: creating output directory \"%s\"\n", fOutputDirectory.c_str());
235  status = mkdir(fOutputDirectory.c_str(), 0777);
236  if (status == -1) {
237  fprintf(stderr, "TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n", fOutputDirectory.c_str(), errno, strerror(errno));
238  }
239  }
240  }
241  }
242 
243  fOutputFile = new TFile(filename.c_str(), "RECREATE");
244 
245  if (!fOutputFile->IsOpen()) {
246  fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
247  fOutputFile = new TFile("/dev/null", "UPDATE");
248  assert(fOutputFile);
249  assert(fOutputFile->IsOpen());
250  }
251 
252  if (fOutputFile != NULL) {
253  fOutputFile->cd();
254  }
255 }
256 
258 {
259  if (gTrace)
260  printf("TARootHelper::dtor!\n");
261 
262  if (fOutputFile != NULL) {
263  fOutputFile->Write();
264  fOutputFile->Close();
265  fOutputFile = NULL;
266  }
267 
268  if (fgDir)
269  fgDir->cd();
270 }
271 
272 #endif
273 
274 #include <stdio.h>
275 #include <stdlib.h>
276 #include <string.h>
277 #include <sys/time.h>
278 #include <assert.h>
279 #include <signal.h>
280 
281 #include "manalyzer.h"
282 #include "midasio.h"
283 
284 #ifdef HAVE_THTTP_SERVER
285 #include "THttpServer.h"
286 #endif
287 
288 #ifdef HAVE_ROOT
289 #include <TSystem.h>
290 #endif
291 
292 //////////////////////////////////////////////////////////
293 //
294 // Methods and Defaults of TAMultithreadHelper
295 //
296 //////////////////////////////////////////////////////////
297 
299 static int gDefaultMultithreadWaitEmpty = 100; // microseconds
300 static int gDefaultMultithreadWaitFull = 100; // microseconds
301 
303 {
304  fprintf(stderr, "Waiting for all queues to empty out!\n");
305 
306  int count_all_empty = 0;
307 
308  for (int t=0; ; ) {
309  int count_not_empty = 0;
310  size_t count_events = 0;
311 
312  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
313  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
314  if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown
315  continue;
316  if (!mt->fMtFlowQueue[i].empty()) {
317  count_not_empty++;
318  count_events += mt->fMtFlowQueue[i].size();
319  break;
320  }
321  if (mt->fMtThreadIsBusy[i]) {
322  count_not_empty++;
323  count_events += 1; // module is analyzing 1 event
324  break;
325  }
326  // implicit unlock
327  }
328 
329  if (count_not_empty == 0) {
330  count_all_empty++;
331  }
332 
333  if (count_all_empty > 1) {
334  // must loop over "all empty" at least twice! K.O.
335  break;
336  }
337 
338  if (t > 10) {
339  fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (int)count_events);
340  //break;
341  }
342 
343  ::sleep(1);
344  t++;
345  }
346 }
347 
349 {
350  fprintf(stderr, "Waiting for all threads to shut down!\n");
351 
352  mt->fMtShutdownRequested = true;
353 
354  for (int t=0; ; ) {
355  int count_running = 0;
356 
357  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
358  if (mt->fMtThreadIsRunning[i])
359  count_running++;
360  }
361 
362  if (count_running == 0) {
363  break;
364  }
365 
366  if (t > 10) {
367  fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running);
368  break;
369  }
370 
371  ::sleep(1);
372  t++;
373  }
374 
375  fprintf(stderr, "Joining all threads!\n");
376  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
377  if (!mt->fMtThreadIsRunning[i] ) {
378  // pointer is null if thread is already shutdown
379  if (mt->fMtThreads[i]) {
380  mt->fMtThreads[i]->join();
381  delete mt->fMtThreads[i];
382  mt->fMtThreads[i] = NULL;
383  }
384  } else {
385  fprintf(stderr, "Thread %d failed to shut down!\n", i);
386  }
387  }
388 }
389 
390 
392  fMtFlowQueueMutex(nModules),
393  fMtFlowQueue(nModules),
394  fMtFlagQueue(nModules),
395  fMtThreads(nModules,NULL),
396  fMtThreadIsRunning(nModules),
397  fMtThreadIsBusy(nModules),
398  fMtShutdownRequested(false),
399  fMtQuitRequested(false)
400  // ctor
401 {
402  for (auto &b: fMtThreadIsRunning) { b = false; }
403  for (auto &b: fMtThreadIsBusy) { b = false; }
404  // default max queue size
406  // queue settings
409 }
410 
412 {
413  size_t nmodules = fMtFlowQueueMutex.size();
414 
415  // just for kicks, check that all queues have correct size
416  assert(nmodules == fMtFlowQueue.size());
417  assert(nmodules == fMtFlagQueue.size());
418  assert(nmodules == fMtFlowQueueMutex.size());
419 
420  // should not come to the destructor while threads are still running
422 
423  int count = 0;
424  for (size_t i=0; i<nmodules; i++) {
425  // check that the thread is stopped
426  //assert(!fMtThread[i].joinable());
427  // empty the thread queue
428  std::lock_guard<std::mutex> lock(fMtFlowQueueMutex[i]);
429  while (!fMtFlowQueue[i].empty()) {
430  TAFlowEvent* flow = fMtFlowQueue[i].front();
431  TAFlags* flag = fMtFlagQueue[i].front();
432  fMtFlowQueue[i].pop_front();
433  fMtFlagQueue[i].pop_front();
434  delete flow;
435  delete flag;
436  count++;
437  }
438  // implicit unlock of mutex
439  }
440  if (gTrace) {
441  printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
442  }
443 }
444 
447 std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries)
448 
449 static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow, bool wait)
450 {
451  assert(mt);
452 
453  if (flag == NULL) {
454  flag = new TAFlags;
455  *flag = 0;
456  }
457 
458  //PrintQueueLength();
459 
460  while (1) {
461  {
462  //Lock and queue events
463  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
464 
465  if ((((int)mt->fMtFlowQueue[i].size()) < mt->fMtQueueDepth) || mt->fMtShutdownRequested || !wait) {
466  mt->fMtFlowQueue[i].push_back(flow);
467  mt->fMtFlagQueue[i].push_back(flag);
468  return;
469  }
470  // Unlock when we go out of scope
471  }
472 
473  usleep(mt->fMtQueueFullUSleepTime);
474  }
475 }
476 
477 #if 0
478 //Function to print the length of the flow queue when in multithread mode
479 //Maybe make root update a graphical window?
480 static void PrintMtQueueLength(TAMultithreadHelper* mt)
481 {
482  printf("Multithread queue lengths:\n");
483  for (unsigned i=0; i<mt->fMtFlowQueue.size(); i++) {
484  printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size());
485  }
486 }
487 #endif
488 
489 //////////////////////////////////////////////////////////
490 //
491 // Methods of TARegister
492 //
493 //////////////////////////////////////////////////////////
494 
495 std::vector<TAFactory*> *gModules = NULL;
496 
498 {
499  if (!gModules)
500  gModules = new std::vector<TAFactory*>;
501  gModules->push_back(m);
502 }
503 
504 #if 0
505 static double GetTimeSec()
506 {
507  struct timeval tv;
508  gettimeofday(&tv,NULL);
509  return tv.tv_sec + 0.000001*tv.tv_usec;
510 }
511 #endif
512 
513 //////////////////////////////////////////////////////////
514 //
515 // Profiler class
516 //
517 //////////////////////////////////////////////////////////
518 
519 TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name)
520 {
521  fStart = start;
522  fStop = TAClockNow();
523 }
524 
526 {
527 }
528 
530 {
531  TAClockDuration elapsed_seconds = fStop - fStart;
532  return elapsed_seconds.count();
533 }
534 
535 #ifdef HAVE_ROOT
536 #include "TH1D.h"
537 #endif
538 #include <map>
539 
540 class Profiler
541 {
542 private:
543  std::string fBinaryName;
544  std::string fBinaryPath;
545  clock_t fStartCPU;
546  std::chrono::system_clock::time_point fStartUser;
547  uint32_t fMIDASStartTime;
548  uint32_t fMIDASStopTime;
549 
550  //Track Queue lengths when multithreading
551 #ifdef HAVE_ROOT
552  int fNQueues=0;
553  std::vector<TH1D*> fAnalysisQueue;
554  std::atomic<int> fQueueIntervalCounter;
555 #endif
556 
557  // Track Analyse TMEvent time per module (main thread)
558 #ifdef HAVE_ROOT
559  std::vector<TH1D*> fAnalyzeEventTimeHistograms;
560 #endif
561  std::vector<std::string> fModuleNames;
562  std::vector<double> fAnalyzeEventMean;
563  std::vector<double> fAnalyzeEventRMS;
564  std::vector<int> fAnalyzeEventEntries;
565 
566  std::vector<double> fAnalyzeEventTimeMax;
567  std::vector<double> fAnalyzeEventTimeTotal;
568 
569  //Track Analyse flow event time per module (can be multiple threads)
570 #ifdef HAVE_ROOT
572 #endif
573  std::vector<double> fAnalyzeFlowEventMean;
574  std::vector<double> fAnalyzeFlowEventRMS;
575  std::vector<int> fAnalyzeFlowEventEntries;
576 
577  std::vector<double> fAnalyzeFlowEventTimeMax;
578  std::vector<double> fAnalyzeFlowEventTimeTotal;
579 #ifdef HAVE_ROOT
580  //Track user profiling
581  std::map<unsigned int,int> fUserMap;
582  std::vector<TH1D*> fUserHistograms;
583  std::vector<double> fTotalUserTime;
584  std::vector<double> fMaxUserTime;
585 
586  // Number of events between samples
587  const int fQueueInterval = 100;
588 #endif
589 
590 public:
591  Profiler( const int queue_interval_check );
592  ~Profiler();
593  void Begin(TARunInfo* runinfo,const std::vector<TARunObject*> fRunRun );
594  // Function for profiling the 'main' thread (that unpacks TMEvents)
595  void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
596  // Function for profiling module threads
597  void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
598  // Extra function for users custom profiling windows
599  void LogUserWindows(TAFlags* flag, TAFlowEvent* flow);
600  void AddModuleMap( const char* UserProfileName, unsigned long hash);
601  void LogMTQueueLength(TARunInfo* runinfo);
602  void End(TARunInfo* runinfo);
603  void Print() const;
604 };
605 
606 Profiler::Profiler(const int queue_interval_check)
607 #ifdef HAVE_ROOT
608  :
609  fQueueIntervalCounter(0),
610  fQueueInterval(queue_interval_check)
611 #endif
612 {
613  if (gTrace)
614  printf("Profiler::ctor\n");
615  fMIDASStartTime = 0;
616  fMIDASStopTime = 0;
617  fStartCPU = clock();
618  fStartUser = std::chrono::system_clock::now();
619 }
620 
622 {
623  if (gTrace)
624  printf("Profiler::dtor\n");
625 
626 #ifdef HAVE_ROOT
627  for (TH1D* h: fAnalyzeFlowEventTimeHistograms)
628  delete h;
630  for (TH1D* h: fAnalyzeEventTimeHistograms)
631  delete h;
633  for( TH1D* h: fAnalysisQueue)
634  delete h;
635  fAnalysisQueue.clear();
636 #endif
637 }
638 
639 void Profiler::Begin(TARunInfo* runinfo, const std::vector<TARunObject*> runrun)
640 {
641  if (gTrace)
642  printf("Profiler::begin\n");
643 
644  runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime);
645 
646 #ifdef HAVE_ROOT
647  // Put Profiler histograms in their own folders in the output root file
648  if (runinfo->fRoot->fOutputFile) {
649  runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory
650  gDirectory->mkdir("ProfilerReport")->cd();
651  runinfo->fRoot->fOutputFile->cd("ProfilerReport");
652  gDirectory->mkdir("AnalyzeFlowEventTime");
653  gDirectory->mkdir("AnalyzeFlowTime");
654  gDirectory->mkdir("MTQueueLength");
655  }
656 
657  // Setup module processing time histograms
658  // Number of bins
659  Int_t Nbins=1000;
660  // Array of bin edges
661  Double_t bins[Nbins+1];
662  // Processing time range (seconds)
663  Double_t TimeRange = 10;
664  // Set uneven binning to better sample fast modules with accuracy
665  // without having a large number of bins
666  for (int i=0; i<Nbins+1; i++) {
667  bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
668  }
669 
670  if (runinfo->fMtInfo)
671  fNQueues = runrun.size();
672 #endif
673 
674  // Per module metric setup
675  for (size_t i = 0; i < runrun.size(); i++) {
676 
677  if (runrun[i]->fModuleName.empty())
678  fModuleNames.push_back("Unnamed Module " + std::to_string(i));
679  else
680  fModuleNames.push_back(runrun[i]->fModuleName);
681 
682  // Metrics for the AnalyzeEvent function (main thread)
683  fAnalyzeEventMean.push_back(0);
684  fAnalyzeEventRMS.push_back(0);
685  fAnalyzeEventEntries.push_back(0);
686  fAnalyzeEventTimeMax.push_back(0);
687  fAnalyzeEventTimeTotal.push_back(0);
688 
689  // Metric for the AnalyzeFlowEvent function (side threads)
690  fAnalyzeFlowEventMean.push_back(0);
691  fAnalyzeFlowEventRMS.push_back(0);
692  fAnalyzeFlowEventEntries.push_back(0);
693  fAnalyzeFlowEventTimeMax.push_back(0);
694  fAnalyzeFlowEventTimeTotal.push_back(0);
695 
696 #ifdef HAVE_ROOT
697  if (runinfo->fRoot->fOutputFile) {
698  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
699  TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)),
700  TString(fModuleNames.at(i) + " Event Proccessing Time; s"),
701  Nbins, bins);
702  fAnalyzeFlowEventTimeHistograms.push_back(Histo);
703 
704  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
705  TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"),
706  TString(fModuleNames.at(i) + " Flow Proccessing Time; s"),
707  Nbins, bins);
708  fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto);
709  } else {
710  fAnalyzeFlowEventTimeHistograms.push_back(NULL);
711  fAnalyzeEventTimeHistograms.push_back(NULL);
712  }
713 
714  // Periodically (once every fQueueInterval events) record the
715  // flow queue size if running in multithreaded mode
716  if (runinfo->fMtInfo) {
717  if (runinfo->fRoot->fOutputFile) {
718  runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
719  TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"),
720  TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"),
721  runinfo->fMtInfo->fMtQueueDepth*1.2,
722  0,
723  runinfo->fMtInfo->fMtQueueDepth*1.2);
724  fAnalysisQueue.push_back(QueueHisto);
725  } else {
726  fAnalysisQueue.push_back(NULL);
727  }
728  }
729 #endif
730  }
731 }
732 
733 void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start)
734 {
735  if (gTrace)
736  printf("Profiler::log\n");
737 
738  TAClock stop = TAClockNow();
739  if ((*flag) & TAFlag_SKIP_PROFILE) {
740  //Unset bit
741  *flag -= TAFlag_SKIP_PROFILE;
742  return;
743  }
744 
745  std::chrono::duration<double> elapsed_seconds = stop - start;
746  double dt = elapsed_seconds.count();
748  if (dt > fAnalyzeFlowEventTimeMax[i])
749  fAnalyzeFlowEventTimeMax[i] = dt;
750 
751  fAnalyzeFlowEventMean[i] +=dt;
752  fAnalyzeFlowEventRMS[i] +=dt*dt;
754 
755 #ifdef HAVE_ROOT
757  fAnalyzeFlowEventTimeHistograms[i]->Fill(dt);
758 #endif
759 }
760 
761 void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start)
762 {
763  if (gTrace)
764  printf("Profiler::log_AnalyzeEvent_time\n");
765 
766  TAClock stop = TAClockNow();
767  if ((*flag) & TAFlag_SKIP_PROFILE) {
768  //Unset bit
769  *flag -= TAFlag_SKIP_PROFILE;
770  return;
771  }
772 
773  std::chrono::duration<double> elapsed_seconds = stop - start;
774  double dt = elapsed_seconds.count();
775  fAnalyzeEventTimeTotal[i] += dt;
776  if (dt > fAnalyzeEventTimeMax[i])
777  fAnalyzeEventTimeMax[i] = dt;
778 
779  fAnalyzeEventMean[i] +=dt;
780  fAnalyzeEventRMS[i] +=dt*dt;
782 
783 #ifdef HAVE_ROOT
785  fAnalyzeEventTimeHistograms[i]->Fill(dt);
786 #endif
787 
788 }
789 
791 {
792  if (gTrace)
793  printf("Profiler::log_mt_queue_length\n");
794 
795 #ifdef HAVE_ROOT
797  if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) {
798  for (int i=0; i<fNQueues; i++) {
799  int j=0;
800  { //Lock guard
801  std::lock_guard<std::mutex> lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]);
802  j=runinfo->fMtInfo->fMtFlowQueue[i].size();
803  }
804  fAnalysisQueue.at(i)->Fill(j);
805  }
806  }
807 #endif
808 }
809 
810 
812 {
813  if (gTrace)
814  printf("Profiler::LogUserWindows\n");
815 
816 #ifdef HAVE_ROOT
817  //Clocks unfold backwards...
818  std::vector<TAFlowEvent*> flowArray;
819  int FlowEvents=0;
820  TAFlowEvent* f = flow;
821  while (f) {
822  flowArray.push_back(f);
823  f=f->fNext;
824  FlowEvents++;
825  }
826  for (int ii=FlowEvents-1; ii>=0; ii--) {
827  f=flowArray[ii];
828  TAUserProfilerFlow* timer=dynamic_cast<TAUserProfilerFlow*>(f);
829  if (timer) {
830  const char* name = timer->fModuleName.c_str();
831  unsigned int hash = std::hash<std::string>{}(timer->fModuleName);
832  if (!fUserMap.count(hash))
833  AddModuleMap(name,hash);
834  double dt=999.;
835  dt=timer->GetTimer();
836  int i = fUserMap[hash];
837  fTotalUserTime[i] += dt;
838  if (dt > fMaxUserTime[i])
839  fMaxUserTime.at(i) = dt;
840  fUserHistograms.at(i)->Fill(dt);
841  }
842  }
843 #else
844  fprintf(stderr, "manalyzer must be built with ROOT for using the user profiling tools\n");
845 #endif
846 }
847 
848 void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash)
849 {
850  if (gTrace)
851  printf("Profiler::AddModuleMap\n");
852 
853  std::lock_guard<std::mutex> lock(TAMultithreadHelper::gfLock);
854 
855 #ifdef HAVE_ROOT
856  gDirectory->cd("/ProfilerReport");
857  fUserMap[hash] = fUserHistograms.size();
858  Int_t Nbins = 100;
859  Double_t bins[Nbins+1];
860  Double_t TimeRange = 10; //seconds
861  for (int i=0; i<Nbins+1; i++) {
862  bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
863  }
864  TH1D* Histo = new TH1D(UserProfileName,UserProfileName,Nbins,bins);
865  fUserHistograms.push_back(Histo);
866  fTotalUserTime.push_back(0.);
867  fMaxUserTime.push_back(0.);
868 #endif
869  return;
870 }
871 
872 void Profiler::End(TARunInfo* runinfo)
873 {
874  if (gTrace)
875  printf("Profiler::End\n");
876 
877  for (size_t i=0; i<fAnalyzeEventMean.size(); i++) {
880  ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) ) *
881  ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) );
882  }
883  for (size_t i=0; i<fAnalyzeFlowEventMean.size(); i++) {
888  }
889 #ifdef HAVE_ROOT
890 
891  if (runinfo->fRoot->fOutputFile) {
892  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
893  for (TH1D* h: fAnalyzeFlowEventTimeHistograms) {
894  h->Write();
895  }
896  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
897  for (TH1D* h: fAnalyzeEventTimeHistograms) {
898  h->Write();
899  }
900  if (runinfo->fMtInfo) {
901  runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
902  for (TH1D* h: fAnalysisQueue) {
903  h->Write();
904  }
905  }
906  }
907 #endif
908 }
909 
910 void Profiler::Print() const
911 {
912 #ifdef HAVE_ROOT
913  if (fAnalyzeFlowEventTimeHistograms.size()>0) {
914 #else
915  if (fAnalyzeEventEntries.size()>0) {
916 #endif
917  double AllAnalyzeFlowEventTime=0;
918  for (auto& n : fAnalyzeFlowEventTimeTotal)
919  AllAnalyzeFlowEventTime += n;
920  double AllAnalyzeEventTime=0;
921  for (auto& n : fAnalyzeEventTimeTotal)
922  AllAnalyzeEventTime += n;
923  //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end());
924  printf("Module average processing time\n");
925  printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
926  printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
927  printf("----------------------------------------------------------------------------------------------------------------\n");
928  for (size_t i=0; i<fModuleNames.size(); i++) {
929  printf("%-25s", fModuleNames.at(i).c_str());
930  if (fAnalyzeEventEntries.at(i))
931  printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
932  fAnalyzeEventEntries.at(i),
933  fAnalyzeEventMean.at(i)*1000.,
934  fAnalyzeEventRMS.at(i)*1000.,
935  fAnalyzeEventTimeMax.at(i)*1000., //ms
936  fAnalyzeEventTimeTotal.at(i)); //s
937  else
938  printf("\t-\t-\t-\t-\t-");
939 
940  if (fAnalyzeFlowEventEntries.at(i))
941  printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
943  fAnalyzeFlowEventMean.at(i)*1000.,
944  fAnalyzeFlowEventRMS.at(i)*1000.,
945  fAnalyzeFlowEventTimeMax.at(i)*1000., //ms
946  fAnalyzeFlowEventTimeTotal.at(i)); //s
947  else
948  printf("\t-\t-\t-\t-\t-");
949  printf("\n");
950  }
951  printf("----------------------------------------------------------------------------------------------------------------\n");
952  printf(" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
953  printf(" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
954 #ifdef HAVE_ROOT
955  if (fUserHistograms.size()) {
956  printf("Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
957  printf("----------------------------------------------------------------------\n");
958  for (size_t i=0; i<fUserHistograms.size(); i++) {
959  printf("%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",fUserHistograms.at(i)->GetTitle(),
960  (int)fUserHistograms.at(i)->GetEntries(),
961  fUserHistograms.at(i)->GetMean()*1000., //ms
962  fUserHistograms.at(i)->GetRMS()*1000., //ms
963  fMaxUserTime.at(i)*1000., //ms
964  fTotalUserTime.at(i)); //s
965  }
966  printf("----------------------------------------------------------------------\n");
967  } else {
968  printf("----------------------------------------------------------------------------------------------------------------\n");
969  }
970 #else
971  printf("To use custom profile windows, please build rootana with root\n");
972 #endif
973  }
974 
975  //CPU and Wall clock time:
976  double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC;
977  std::chrono::duration<double> usertime = std::chrono::system_clock::now() - fStartUser;
978  printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
979  getenv("_"),
980  cputime,
981  usertime.count(),
982  100.*cputime/usertime.count());
983 }
984 
985 //////////////////////////////////////////////////////////
986 //
987 // RunHandler class
988 //
989 //////////////////////////////////////////////////////////
990 
991 class Profiler;
992 
994 {
995 public:
996  TARunInfo* fRunInfo = NULL;
997  std::vector<TARunObject*> fRunRun;
998  std::vector<std::string> fArgs;
999  bool fMultithreadEnabled = false;
1000  Profiler* fProfiler = NULL;
1001  bool fProfilerEnabled = false;
1003 
1004  RunHandler(const std::vector<std::string>& args, bool multithread, bool profile, int queue_interval_check) // ctor
1005  {
1006  fRunInfo = NULL;
1007  fArgs = args;
1008  fMultithreadEnabled = multithread;
1009  fProfiler = NULL;
1010  fProfilerEnabled = profile;
1011  fProfilerIntervalCheck = queue_interval_check;
1012  }
1013 
1014  ~RunHandler() // dtor
1015  {
1016  if (fRunInfo) {
1017  delete fRunInfo;
1018  fRunInfo = NULL;
1019  }
1020  if (fProfiler) {
1021  delete fProfiler;
1022  fProfiler = NULL;
1023  }
1024  }
1025 
1026  void PerModuleThread(int i)
1027  {
1028  //bool data_processing=true;
1029  int nModules=(*gModules).size();
1030 
1031  TAMultithreadHelper* mt = fRunInfo->fMtInfo;
1032 
1033  assert(nModules == (int)mt->fMtFlowQueueMutex.size());
1034  assert(nModules == (int)mt->fMtFlowQueue.size());
1035  assert(nModules == (int)mt->fMtFlagQueue.size());
1036 
1037  { //Lock scope
1038  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1039  mt->fMtThreadIsRunning[i] = true;
1040  }
1041 
1042  while (!mt->fMtShutdownRequested) {
1043  TAFlowEvent* flow = NULL;
1044  TAFlags* flag = NULL;
1045 
1046  { //Lock scope
1047  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1048  if (!mt->fMtFlowQueue[i].empty()) {
1049  flow=mt->fMtFlowQueue[i].front();
1050  flag=mt->fMtFlagQueue[i].front();
1051  mt->fMtFlowQueue[i].pop_front();
1052  mt->fMtFlagQueue[i].pop_front();
1053  }
1054 
1055  if (flow == NULL)
1056  mt->fMtThreadIsBusy[i] = false; // we will sleep
1057  else
1058  mt->fMtThreadIsBusy[i] = true; // we will analyze an event
1059 
1060  // implicit unlock of mutex
1061  }
1062 
1063  if (flow == NULL) { // wait until queue not empty
1064  usleep(mt->fMtQueueEmptyUSleepTime);
1065  continue;
1066  }
1067 
1068  TAClock start_time = TAClockNow();
1069 
1070  flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1071 
1072  if (fProfiler)
1073  fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time);
1074 
1075  if ((*flag) & TAFlag_QUIT) { // shut down the analyzer
1076  delete flow;
1077  delete flag;
1078  flow = NULL;
1079  flag = NULL;
1080  mt->fMtQuitRequested = true;
1081  mt->fMtShutdownRequested = true;
1082  break; // stop the thread
1083  }
1084 
1085  if ((*flag) & TAFlag_SKIP) { // stop processing this event
1086  delete flow;
1087  delete flag;
1088  flow = NULL;
1089  flag = NULL;
1090  continue;
1091  }
1092 
1093  if (i==nModules-1) //If I am the last module... free memory, else queue up for next module to process
1094  {
1095  if (fProfiler) {
1096  fProfiler->LogUserWindows(flag, flow);
1097  fProfiler->LogMTQueueLength(fRunInfo);
1098  }
1099  delete flow;
1100  delete flag;
1101  flow = NULL;
1102  flag = NULL;
1103  }
1104  else
1105  {
1106  MtQueueFlowEvent(mt, i+1, flag, flow, true);
1107  flow = NULL;
1108  flag = NULL;
1109  }
1110  }
1111 
1112  { //Lock scope
1113  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1114  mt->fMtThreadIsRunning[i] = false;
1115  mt->fMtThreadIsBusy[i] = false;
1116  }
1117  }
1118 
1119  void CreateRun(int run_number, const char* file_name)
1120  {
1121  assert(fRunInfo == NULL);
1122  assert(fRunRun.size() == 0);
1123 
1124  fRunInfo = new TARunInfo(run_number, file_name, fArgs);
1125 
1126  if (fProfilerEnabled)
1127  fProfiler = new Profiler( fProfilerIntervalCheck );
1128 
1129  int nModules = (*gModules).size();
1130 
1131  for (int i=0; i<nModules; i++)
1132  fRunRun.push_back((*gModules)[i]->NewRunObject(fRunInfo));
1133 
1134  if (fMultithreadEnabled) {
1135  TAMultithreadHelper* mt = new TAMultithreadHelper(nModules);
1136  fRunInfo->fMtInfo = mt;
1137  for (int i=0; i<nModules; i++) {
1138  printf("Create fMtFlowQueue thread %d\n",i);
1139  mt->fMtThreads[i]=new std::thread(&RunHandler::PerModuleThread,this,i);
1140  }
1141  }
1142  }
1143 
1144  void BeginRun()
1145  {
1146  assert(fRunInfo != NULL);
1147  assert(fRunInfo->fOdb != NULL);
1148  if (fProfiler)
1149  fProfiler->Begin(fRunInfo, fRunRun);
1150  for (unsigned i=0; i<fRunRun.size(); i++)
1151  fRunRun[i]->BeginRun(fRunInfo);
1152 
1153  }
1154 
1155  void EndRun(TAFlags* flags)
1156  {
1157  assert(fRunInfo);
1158 
1159  // make sure the shutdown sequence matches the description in the README file!
1160 
1161  // zeroth. Flush events queued for analysis before calling PreEndRun (insure
1162  // deterministic behaviour thats the same as in single threaded mode)
1163  if (fRunInfo->fMtInfo) {
1164  WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1165  }
1166  // first, call PreEndRun() to tell analysis modules that there will be no more
1167  // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more
1168  // flow events, they to into the flow queue or into the multithread queue
1169 
1170  for (unsigned i=0; i<fRunRun.size(); i++)
1171  fRunRun[i]->PreEndRun(fRunInfo);
1172 
1173  // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent()
1174  // this can generate additional flow events that will be queued in the queue.
1175 
1176  AnalyzeFlowQueue(flags);
1177 
1178  // if in multi threaded mode, allow all the queues to drain naturally
1179  // and shutdown the threads
1180 
1181  if (fRunInfo->fMtInfo) {
1182  WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1184  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1185  (*flags) |= TAFlag_QUIT;
1186  }
1187  }
1188 
1189  // all data analysis is complete
1190 
1191  for (unsigned i=0; i<fRunRun.size(); i++)
1192  fRunRun[i]->EndRun(fRunInfo);
1193 
1194  if (fProfiler) {
1195  fProfiler->End(fRunInfo);
1196  fProfiler->Print();
1197  }
1198  }
1199 
1200  void NextSubrun()
1201  {
1202  assert(fRunInfo);
1203 
1204  for (unsigned i=0; i<fRunRun.size(); i++)
1205  fRunRun[i]->NextSubrun(fRunInfo);
1206  }
1207 
1208  void DeleteRun()
1209  {
1210  assert(fRunInfo);
1211 
1212  for (unsigned i=0; i<fRunRun.size(); i++) {
1213  delete fRunRun[i];
1214  fRunRun[i] = NULL;
1215  }
1216 
1217  fRunRun.clear();
1218  assert(fRunRun.size() == 0);
1219 
1220  if (fProfiler) {
1221  delete fProfiler;
1222  fProfiler = NULL;
1223  }
1224 
1225  delete fRunInfo;
1226  fRunInfo = NULL;
1227  }
1228 
1230  {
1231  for (unsigned i=0; i<fRunRun.size(); i++)
1232  fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1233  }
1234 
1236  {
1237  for (unsigned i=0; i<fRunRun.size(); i++) {
1238  TAClock start_time = TAClockNow();
1239  flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1240  if (fProfiler)
1241  fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time);
1242  if (!flow)
1243  break;
1244  if ((*flags) & TAFlag_SKIP)
1245  break;
1246  if ((*flags) & TAFlag_QUIT)
1247  break;
1248  }
1249  return flow;
1250  }
1251 
1252  void AnalyzeFlowQueue(TAFlags* ana_flags)
1253  {
1254  while (1) {
1255  if (fRunInfo->fMtInfo)
1256  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1257  *ana_flags |= TAFlag_QUIT;
1258  break;
1259  }
1260 
1261  TAFlowEvent* flow = fRunInfo->ReadFlowQueue();
1262  if (!flow)
1263  break;
1264 
1265  int flags = 0;
1266  flow = AnalyzeFlowEvent(&flags, flow);
1267  if (flow)
1268  delete flow;
1269  if (flags & TAFlag_QUIT) {
1270  *ana_flags |= TAFlag_QUIT;
1271  break;
1272  }
1273  }
1274  }
1275 
1276  void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer)
1277  {
1278  assert(fRunInfo != NULL);
1279  assert(fRunInfo->fOdb != NULL);
1280  assert(event != NULL);
1281  assert(flags != NULL);
1282 
1283  if (fRunInfo->fMtInfo)
1284  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1285  *flags |= TAFlag_QUIT;
1286  return;
1287  }
1288 
1289  TAFlowEvent* flow = NULL;
1290 
1291  for (unsigned i=0; i<fRunRun.size(); i++) {
1292  TAClock start_time = TAClockNow();
1293  flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1294  if (fProfiler)
1295  fProfiler->LogAnalyzeEvent(flags, flow, i, start_time);
1296  if (*flags & TAFlag_SKIP)
1297  break;
1298  if (*flags & TAFlag_QUIT)
1299  break;
1300  }
1301 
1302  if (flow) {
1303  if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) {
1304  // skip further processing of this event
1305  } else {
1306  if (fRunInfo->fMtInfo) {
1307  MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow, true);
1308  flow = NULL; // ownership passed to the multithread event queue
1309  } else {
1310  flow = AnalyzeFlowEvent(flags, flow);
1311  }
1312  }
1313  }
1314 
1315  if (fProfiler && !fRunInfo->fMtInfo) {
1316  fProfiler->LogUserWindows(flags, flow);
1317  }
1318 
1319  if (*flags & TAFlag_WRITE)
1320  if (writer)
1321  TMWriteEvent(writer, event);
1322 
1323  if (flow)
1324  delete flow;
1325 
1326  if (*flags & TAFlag_QUIT)
1327  return;
1328 
1329  if (fRunInfo->fMtInfo)
1330  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1331  *flags |= TAFlag_QUIT;
1332  return;
1333  }
1334 
1335  AnalyzeFlowQueue(flags);
1336  }
1337 };
1338 
1340 {
1341  if (fFlowQueue.empty())
1342  return NULL;
1343 
1344  TAFlowEvent* flow = fFlowQueue.front();
1345  fFlowQueue.pop_front();
1346  return flow;
1347 }
1348 
1350 {
1351  if (fMtInfo) {
1352  // call MtQueueFlowEvent with wait=false to avoid deadlock
1353  MtQueueFlowEvent(fMtInfo, 0, NULL, flow, false);
1354  } else {
1355  fFlowQueue.push_back(flow);
1356  }
1357 }
1358 
1359 #ifdef HAVE_MIDAS
1360 
1361 #ifdef HAVE_TMFE
1362 
1363 #ifdef HAVE_ROOT
1364 #include "TSystem.h"
1365 #endif
1366 
1367 #include "tmfe.h"
1368 
1369 static bool gRunStartRequested = false;
1370 static bool gRunStopRequested = false;
1371 
1372 class RpcHandler: public TMFeRpcHandlerInterface
1373 {
1374  TMFeResult HandleBeginRun(int run_number)
1375  {
1376  printf("RpcHandler::HandleBeginRun(%d)\n", run_number);
1377  gRunStartRequested = true;
1378  gRunStopRequested = false;
1379  return TMFeOk();
1380  }
1381 
1382  TMFeResult HandleEndRun(int run_number)
1383  {
1384  printf("RpcHandler::HandleEndRun(%d)\n", run_number);
1385  gRunStartRequested = false;
1386  gRunStopRequested = true;
1387  return TMFeOk();
1388  }
1389 
1390  TMFeResult HandleStartAbortRun(int run_number)
1391  {
1392  printf("RpcHandler::HandleStartAbortRun(%d)\n", run_number);
1393  // run did not really start, pretend it started and immediately ended
1394  gRunStartRequested = false;
1395  gRunStopRequested = true;
1396  return TMFeOk();
1397  }
1398 
1399  TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result)
1400  {
1401  return TMFeOk();
1402  }
1403 };
1404 
1405 TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0)
1406 {
1407  assert(b != NULL);
1408  assert(e != NULL);
1409 
1410  e->Reset();
1411 
1412  TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec);
1413 
1414  if (r.error_flag)
1415  return r;
1416 
1417  if (e->data.size() == 0)
1418  return TMFeOk();
1419 
1420  e->ParseEvent();
1421 
1422  assert(e->data.size() == e->event_header_size + e->data_size);
1423 
1424  return TMFeOk();
1425 }
1426 
1427 static int ProcessMidasOnlineTmfe(const std::vector<std::string>& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1428 {
1429  TMFE *mfe = TMFE::Instance();
1430 
1431  TMFeResult r = mfe->Connect(progname, hostname, exptname);
1432 
1433  if (r.error_flag) {
1434  fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1435  return -1;
1436  }
1437 
1438  //MVOdb* odb = mfe->fOdbRoot;
1439 
1440  TMEventBuffer *b = new TMEventBuffer(mfe);
1441 
1442  /* open event buffer */
1443 
1444  r = b->OpenBuffer(bufname);
1445 
1446  if (r.error_flag) {
1447  fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1448  return -1;
1449  }
1450 
1451  /* request read cache */
1452  size_t cache_size = 100000;
1453  if(!strcmp(sampling_type_string,"GET_RECENT"))
1454  cache_size=0;
1455  r = b->SetCacheSize(cache_size, 0);
1456 
1457  if (r.error_flag) {
1458  fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1459  return -1;
1460  }
1461 
1462  /* request events */
1463 
1464  r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1465 
1466  if (r.error_flag) {
1467  fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1468  return -1;
1469  }
1470 
1471  RpcHandler* h = new RpcHandler();
1472 
1473  mfe->AddRpcHandler(h);
1474 
1475  mfe->DeregisterTransitionPause();
1476  mfe->DeregisterTransitionResume();
1477  mfe->SetTransitionSequenceStart(300);
1478  mfe->SetTransitionSequenceStop(700);
1479 
1480  mfe->StartRpcThread();
1481 
1482  /* reqister event requests */
1483 
1484  RunHandler rh(args, multithread, profiler, queue_interval_check);
1485 
1486  for (unsigned i=0; i<(*gModules).size(); i++)
1487  (*gModules)[i]->Init(args);
1488 
1489  if (mfe->fStateRunning) {
1490  rh.CreateRun(mfe->fRunNumber, NULL);
1491  rh.fRunInfo->fOdb = mfe->fOdbRoot;
1492  rh.BeginRun();
1493  }
1494 
1495  TMEvent e;
1496 
1497  while (!mfe->fShutdownRequested) {
1498  bool do_sleep = true;
1499 
1500  if (gRunStartRequested) {
1501  gRunStartRequested = false;
1502 
1503  if (rh.fRunInfo) {
1504  TAFlags flags = 0;
1505  rh.EndRun(&flags);
1506  rh.fRunInfo->fOdb = NULL;
1507  rh.DeleteRun();
1508  }
1509 
1510  rh.CreateRun(mfe->fRunNumber, NULL);
1511  rh.fRunInfo->fOdb = mfe->fOdbRoot;
1512  rh.BeginRun();
1513 
1514  continue;
1515  }
1516 
1517  if (gRunStopRequested) {
1518  gRunStopRequested = false;
1519 
1520  if (rh.fRunInfo) {
1521  TAFlags flags = 0;
1522  rh.EndRun(&flags);
1523  rh.fRunInfo->fOdb = NULL;
1524  rh.DeleteRun();
1525  }
1526 
1527  continue;
1528  }
1529 
1530  //r = buf.ReceiveEvent(&e, BM_NO_WAIT);
1531  //r = buf.ReceiveEvent(&e, BM_WAIT);
1532  //r = buf.ReceiveEvent(&e, 8000);
1533  //r = buf.ReceiveEvent(&e, 5000);
1534  r = ReceiveEvent(b, &e, 100);
1535 
1536  if (r.error_flag) {
1537  fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1538  break;
1539  }
1540 
1541  //e.PrintHeader();
1542  //::sleep(1);
1543 
1544  if ((e.data_size > 0) && (rh.fRunInfo != NULL)) {
1545 
1546  //e.PrintHeader();
1547  //e.PrintBanks(2);
1548 
1549  TAFlags flags = 0;
1550 
1551  rh.AnalyzeEvent(&e, &flags, writer);
1552 
1553  if (flags & TAFlag_QUIT) {
1554  mfe->fShutdownRequested = true;
1555  }
1556 
1557  if (num_analyze > 0) {
1558  num_analyze--;
1559  if (num_analyze == 0) {
1560  mfe->fShutdownRequested = true;
1561  }
1562  }
1563 
1564  do_sleep = false;
1565  }
1566 
1567 #ifdef HAVE_THTTP_SERVER
1569  int nreq = TARootHelper::fgHttpServer->ProcessRequests();
1570  if (nreq > 0) {
1571  do_sleep = false;
1572  //printf("ProcessRequests() returned %d\n", nreq);
1573  }
1574  }
1575 #endif
1576 #ifdef HAVE_ROOT
1577  if (TARootHelper::fgApp) {
1578  gSystem->DispatchOneEvent(kTRUE);
1579  }
1580 #endif
1581 
1582  //printf("do_sleep %d\n", do_sleep);
1583 
1584  if (do_sleep) {
1585  mfe->Yield(0.010);
1586  } else {
1587  mfe->Yield(0);
1588  }
1589  }
1590 
1591  if (rh.fRunInfo) {
1592  TAFlags flags = 0;
1593  rh.EndRun(&flags);
1594  rh.fRunInfo->fOdb = NULL;
1595  rh.DeleteRun();
1596  }
1597 
1598  for (unsigned i=0; i<(*gModules).size(); i++)
1599  (*gModules)[i]->Finish();
1600 
1601  /* close event buffer */
1602  r = b->CloseBuffer();
1603 
1604  delete b;
1605  b = NULL;
1606 
1607  if (r.error_flag) {
1608  fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1609  //return -1;
1610  }
1611 
1612  /* disconnect from experiment */
1613  mfe->Disconnect();
1614 
1615  return 0;
1616 }
1617 
1618 #else
1619 
1620 #include "TMidasOnline.h"
1621 
1622 #ifdef HAVE_ROOT
1623 #include "TSystem.h"
1624 #endif
1625 
1627 {
1628 public:
1630  int fNumAnalyze = 0;
1631  TMWriterInterface* fWriter = NULL;
1632  bool fQuit = false;
1633  MVOdb* fOdb = NULL;
1634 
1635  OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector<std::string>& args, bool multithread, bool profiler, int queue_interval_check) // ctor
1636  : fRun(args, multithread, profiler, queue_interval_check)
1637  {
1638  fNumAnalyze = num_analyze;
1639  fWriter = writer;
1640  fOdb = odb;
1641  }
1642 
1643  ~OnlineHandler() // dtor
1644  {
1645  fWriter = NULL;
1646  fOdb = NULL;
1647  }
1648 
1649  void StartRun(int run_number)
1650  {
1651  fRun.CreateRun(run_number, NULL);
1652  fRun.fRunInfo->fOdb = fOdb;
1653  fRun.BeginRun();
1654  }
1655 
1656  void Transition(int transition, int run_number, int transition_time)
1657  {
1658  //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time);
1659 
1660  if (transition == TR_START) {
1661  if (fRun.fRunInfo) {
1662  TAFlags flags = 0;
1663  fRun.EndRun(&flags);
1664  if (flags & TAFlag_QUIT)
1665  fQuit = true;
1666  fRun.fRunInfo->fOdb = NULL;
1667  fRun.DeleteRun();
1668  }
1669  assert(fRun.fRunInfo == NULL);
1670 
1671  StartRun(run_number);
1672  printf("Begin run: %d\n", run_number);
1673  } else if (transition == TR_STOP) {
1674  TAFlags flags = 0;
1675  fRun.EndRun(&flags);
1676  if (flags & TAFlag_QUIT)
1677  fQuit = true;
1678  fRun.fRunInfo->fOdb = NULL;
1679  fRun.DeleteRun();
1680  printf("End of run %d\n", run_number);
1681  }
1682  }
1683 
1684  void Event(const void* data, int data_size)
1685  {
1686  //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size);
1687 
1688  if (!fRun.fRunInfo) {
1689  StartRun(0); // start fake run for events outside of a run
1690  }
1691 
1692  TMEvent* event = new TMEvent(data, data_size);
1693 
1694  TAFlags flags = 0;
1695 
1696  fRun.AnalyzeEvent(event, &flags, fWriter);
1697 
1698  if (flags & TAFlag_QUIT)
1699  fQuit = true;
1700 
1701  if (fNumAnalyze > 0) {
1702  fNumAnalyze--;
1703  if (fNumAnalyze == 0)
1704  fQuit = true;
1705  }
1706 
1707  if (event) {
1708  delete event;
1709  event = NULL;
1710  }
1711  }
1712 };
1713 
1714 static int ProcessMidasOnlineOld(const std::vector<std::string>& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1715 {
1717 
1718  int err = midas->connect(hostname, exptname, "rootana");
1719  if (err != 0) {
1720  fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err);
1721  return -1;
1722  }
1723 
1724  MVOdb* odb = MakeMidasOdb(midas->fDB);
1725 
1726  OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check);
1727 
1728  midas->RegisterHandler(h);
1729  midas->registerTransitions();
1730 
1731  /* reqister event requests */
1732 
1733  midas->eventRequest("SYSTEM",-1,-1,(1<<1));
1734 
1735  int run_number = 0; // midas->odbReadInt("/runinfo/Run number");
1736  int run_state = 0; // midas->odbReadInt("/runinfo/State");
1737 
1738  odb->RI("runinfo/run number", &run_number);
1739  odb->RI("runinfo/state", &run_state);
1740 
1741  for (unsigned i=0; i<(*gModules).size(); i++)
1742  (*gModules)[i]->Init(args);
1743 
1744  if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1745  h->StartRun(run_number);
1746  }
1747 
1748  while (!h->fQuit) {
1749 #ifdef HAVE_THTTP_SERVER
1751  TARootHelper::fgHttpServer->ProcessRequests();
1752  }
1753 #endif
1754 #ifdef HAVE_ROOT
1755  if (TARootHelper::fgApp) {
1756  gSystem->DispatchOneEvent(kTRUE);
1757  }
1758 #endif
1759  if (!TMidasOnline::instance()->poll(10))
1760  break;
1761  }
1762 
1763  if (h->fRun.fRunInfo) {
1764  TAFlags flags = 0;
1765  h->fRun.EndRun(&flags);
1766  h->fRun.fRunInfo->fOdb = NULL;
1767  h->fRun.DeleteRun();
1768  }
1769 
1770  for (unsigned i=0; i<(*gModules).size(); i++)
1771  (*gModules)[i]->Finish();
1772 
1773  delete h; h = NULL;
1774  delete odb; odb = NULL;
1775 
1776  /* disconnect from experiment */
1777  midas->disconnect();
1778 
1779  return 0;
1780 }
1781 
1782 #endif
1783 #endif
1784 
1785 std::vector<std::string> TARunInfo::fgFileList;
1787 
1788 static int ProcessMidasFiles(const std::vector<std::string>& files, const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1789 {
1790  int number_of_missing_files = 0;
1791 
1792  TARunInfo::fgFileList.clear();
1793 
1794  for (unsigned i=0; i<files.size(); i++)
1795  TARunInfo::fgFileList.push_back(files[i]);
1796 
1797  for (unsigned i=0; i<(*gModules).size(); i++)
1798  (*gModules)[i]->Init(args);
1799 
1800  RunHandler run(args, multithread, profiler, queue_interval_check);
1801 
1802  bool done = false;
1803 
1807  std::string filename = TARunInfo::fgFileList[TARunInfo::fgCurrentFileIndex];
1808 
1809  TMReaderInterface *reader = TMNewReader(filename.c_str());
1810 
1811  if (reader->fError) {
1812  printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str());
1813  delete reader;
1814  number_of_missing_files++;
1815  continue;
1816  }
1817 
1818  while (1) {
1819  TMEvent* event = TMReadEvent(reader);
1820 
1821  if (!event) // EOF
1822  break;
1823 
1824  if (event->error) {
1825  delete event;
1826  break;
1827  }
1828 
1829  if (event->event_id == 0x8000) // begin of run event
1830  {
1831  int runno = event->serial_number;
1832 
1833  if (run.fRunInfo) {
1834  if (run.fRunInfo->fRunNo == runno) {
1835  // next subrun file, nothing to do
1836  run.fRunInfo->fFileName = filename;
1837  run.NextSubrun();
1838  } else {
1839  // file with a different run number
1840  TAFlags flags = 0;
1841  run.EndRun(&flags);
1842  if (flags & TAFlag_QUIT) {
1843  done = true;
1844  }
1845  run.DeleteRun();
1846  }
1847  }
1848 
1849  if (!run.fRunInfo) {
1850  run.CreateRun(runno, filename.c_str());
1851  run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1852  run.BeginRun();
1853  }
1854 
1855  assert(run.fRunInfo);
1856 
1857  run.AnalyzeSpecialEvent(event);
1858 
1859  if (writer)
1860  TMWriteEvent(writer, event);
1861  }
1862  else if (event->event_id == 0x8001) // end of run event
1863  {
1864  //int runno = event->serial_number;
1865  run.AnalyzeSpecialEvent(event);
1866  if (writer)
1867  TMWriteEvent(writer, event);
1868 
1869  if (run.fRunInfo->fOdb) {
1870  delete run.fRunInfo->fOdb;
1871  run.fRunInfo->fOdb = NULL;
1872  }
1873 
1874  run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1875  }
1876  else if (event->event_id == 0x8002) // message event
1877  {
1878  run.AnalyzeSpecialEvent(event);
1879  if (writer)
1880  TMWriteEvent(writer, event);
1881  }
1882  else
1883  {
1884  if (!run.fRunInfo) {
1885  // create a fake begin of run
1886  run.CreateRun(0, filename.c_str());
1887  run.fRunInfo->fOdb = MakeNullOdb();
1888  run.BeginRun();
1889  }
1890 
1891  if (num_skip > 0) {
1892  num_skip--;
1893  } else {
1894  TAFlags flags = 0;
1895 
1896  run.AnalyzeEvent(event, &flags, writer);
1897 
1898  if (flags & TAFlag_QUIT)
1899  done = true;
1900 
1901  if (num_analyze > 0) {
1902  num_analyze--;
1903  if (num_analyze == 0)
1904  done = true;
1905  }
1906  }
1907  }
1908 
1909  delete event;
1910 
1911  if (done)
1912  break;
1913 
1914 #ifdef HAVE_ROOT
1915  if (TARootHelper::fgApp) {
1916  gSystem->DispatchOneEvent(kTRUE);
1917  }
1918 #endif
1919  }
1920 
1921  reader->Close();
1922  delete reader;
1923 
1924  if (done)
1925  break;
1926  }
1927 
1928 #ifdef HAVE_THTTP_SERVER
1929  if (0 && TARootHelper::fgHttpServer) {
1930  while (1) {
1931  gSystem->DispatchOneEvent(kTRUE);
1932  //sleep(1);
1933  }
1934  }
1935 #endif
1936 
1937  if (run.fRunInfo) {
1938  TAFlags flags = 0;
1939  run.EndRun(&flags);
1940  if (flags & TAFlag_QUIT)
1941  done = true;
1942  run.DeleteRun();
1943  }
1944 
1945  for (unsigned i=0; i<(*gModules).size(); i++)
1946  (*gModules)[i]->Finish();
1947  if (number_of_missing_files)
1948  {
1949  printf("%d midas files were not openable\n",number_of_missing_files);
1950  return number_of_missing_files;
1951  }
1952  return 0;
1953 }
1954 
1955 static int ProcessDemoMode(const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1956 {
1957  for (unsigned i=0; i<(*gModules).size(); i++)
1958  (*gModules)[i]->Init(args);
1959 
1960  RunHandler run(args, multithread, profiler, queue_interval_check);
1961 
1962  bool done = false;
1963 
1964  int runno = 1;
1965 
1966  for (unsigned i=0; true; i++) {
1967  char s[256];
1968  snprintf(s, sizeof(s), "%03d", i);
1969  std::string filename = std::string("demo_subrun_") + s;
1970 
1971  if (!run.fRunInfo) {
1972  run.CreateRun(runno, filename.c_str());
1973  run.fRunInfo->fOdb = MakeNullOdb();
1974  run.BeginRun();
1975  }
1976 
1977  // we do not generate a fake begin of run event...
1978  //run.AnalyzeSpecialEvent(event);
1979 
1980  // only switch subruns after the first subrun file
1981  if (i>0) {
1982  run.fRunInfo->fFileName = filename;
1983  run.NextSubrun();
1984  }
1985 
1986  TMEvent* event = new TMEvent();
1987 
1988  for (unsigned j=0; j<100; j++) {
1989  event->Init(0x0001, 0xFFFF, j+1, 0, 0);
1990  uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 };
1991  event->AddBank("TEST", TID_DWORD, (const char*)test_data, sizeof(test_data));
1992 
1993  if (num_skip > 0) {
1994  num_skip--;
1995  } else {
1996  TAFlags flags = 0;
1997 
1998  run.AnalyzeEvent(event, &flags, writer);
1999 
2000  if (flags & TAFlag_QUIT)
2001  done = true;
2002 
2003  if (num_analyze > 0) {
2004  num_analyze--;
2005  if (num_analyze == 0)
2006  done = true;
2007  }
2008  }
2009 
2010  if (done)
2011  break;
2012 
2013 #ifdef HAVE_ROOT
2014  if (TARootHelper::fgApp) {
2015  gSystem->DispatchOneEvent(kTRUE);
2016  }
2017 #endif
2018  }
2019 
2020  delete event;
2021 
2022  // we do not generate a fake end of run event...
2023  //run.AnalyzeSpecialEvent(event);
2024 
2025  if (done)
2026  break;
2027  }
2028 
2029  if (run.fRunInfo) {
2030  TAFlags flags = 0;
2031  run.EndRun(&flags);
2032  run.DeleteRun();
2033  }
2034 
2035  for (unsigned i=0; i<(*gModules).size(); i++)
2036  (*gModules)[i]->Finish();
2037 
2038  return 0;
2039 }
2040 
2041 static bool gEnableShowMem = false;
2042 
2043 #if 0
2044 static int ShowMem(const char* label)
2045 {
2046  if (!gEnableShowMem)
2047  return 0;
2048 
2049  FILE* fp = fopen("/proc/self/statm","r");
2050  if (!fp)
2051  return 0;
2052 
2053  int mem = 0;
2054  fscanf(fp,"%d",&mem);
2055  fclose(fp);
2056 
2057  if (label)
2058  printf("memory at %s is %d\n", label, mem);
2059 
2060  return mem;
2061 }
2062 #endif
2063 
2065 {
2066 public:
2068  : TARunObject(runinfo)
2069  {
2070  if (gTrace)
2071  printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo);
2072  }
2073 
2075  {
2076  if (gTrace)
2077  printf("EventDumpModule::dtor!\n");
2078  }
2079 
2080  void BeginRun(TARunInfo* runinfo)
2081  {
2082  printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo);
2083  }
2084 
2085  void EndRun(TARunInfo* runinfo)
2086  {
2087  printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo);
2088  }
2089 
2090  void NextSubrun(TARunInfo* runinfo)
2091  {
2092  printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2093  }
2094 
2095  void PauseRun(TARunInfo* runinfo)
2096  {
2097  printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo);
2098  }
2099 
2100  void ResumeRun(TARunInfo* runinfo)
2101  {
2102  printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo);
2103  }
2104 
2105  TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2106  {
2107  printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo);
2108  event->FindAllBanks();
2109  std::string h = event->HeaderToString();
2110  std::string b = event->BankListToString();
2111  printf("%s: %s\n", h.c_str(), b.c_str());
2112  return flow;
2113  }
2114 
2115  void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2116  {
2117  printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2118  }
2119 };
2120 
2122 {
2123 public:
2124 
2125  void Init(const std::vector<std::string> &args)
2126  {
2127  if (gTrace)
2128  printf("EventDumpModuleFactory::Init!\n");
2129  }
2130 
2131  void Finish()
2132  {
2133  if (gTrace)
2134  printf("EventDumpModuleFactory::Finish!\n");
2135  }
2136 
2138  {
2139  if (gTrace)
2140  printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2141  return new EventDumpModule(runinfo);
2142  }
2143 };
2144 
2145 #ifdef HAVE_ROOT
2146 #include <TGMenu.h>
2147 #include <TGButton.h>
2148 #include <TBrowser.h>
2149 
2150 #define CTRL_QUIT 1
2151 #define CTRL_NEXT 2
2152 #define CTRL_CONTINUE 3
2153 #define CTRL_PAUSE 4
2154 #define CTRL_NEXT_FLOW 5
2155 
2156 #define CTRL_TBROWSER 11
2157 
2159 {
2160 public:
2161  int fValue;
2162 
2163  ValueHolder() // ctor
2164  {
2165  fValue = 0;
2166  }
2167 };
2168 
2170 {
2171 public:
2173  int fValue;
2174 
2175  TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor
2176  : TGTextButton(p, text)
2177  {
2178  fHolder = holder;
2179  fValue = value;
2180  }
2181 
2182 #if 0
2183  void Pressed()
2184  {
2185  printf("Pressed!\n");
2186  }
2187 
2188  void Released()
2189  {
2190  printf("Released!\n");
2191  }
2192 #endif
2193 
2194  void Clicked()
2195  {
2196  //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue);
2197  if (fHolder)
2198  fHolder->fValue = fValue;
2199  //gSystem->ExitLoop();
2200  }
2201 };
2202 
2203 class MainWindow: public TGMainFrame
2204 {
2205 public:
2206  TGPopupMenu* fMenu;
2207  TGMenuBar* fMenuBar;
2208  TGLayoutHints* fMenuBarItemLayout;
2209 
2210  TGCompositeFrame* fButtonsFrame;
2211 
2213 
2218 
2220 
2221 public:
2222  MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor
2223  : TGMainFrame(w, s1, s2)
2224  {
2225  if (gTrace)
2226  printf("MainWindow::ctor!\n");
2227 
2228  fHolder = holder;
2229  //SetCleanup(kDeepCleanup);
2230 
2231  SetWindowName("ROOT Analyzer Control");
2232 
2233  // layout the gui
2234  fMenu = new TGPopupMenu(gClient->GetRoot());
2235  fMenu->AddEntry("New TBrowser", CTRL_TBROWSER);
2236  fMenu->AddEntry("-", 0);
2237  fMenu->AddEntry("Next", CTRL_NEXT);
2238  fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW);
2239  fMenu->AddEntry("Continue", CTRL_CONTINUE);
2240  fMenu->AddEntry("Pause", CTRL_PAUSE);
2241  fMenu->AddEntry("-", 0);
2242  fMenu->AddEntry("Quit", CTRL_QUIT);
2243 
2244  fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2245 
2246  fMenu->Associate(this);
2247 
2248  fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame);
2249  fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout);
2250  fMenuBar->Layout();
2251 
2252  AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2253 
2254  fButtonsFrame = new TGVerticalFrame(this);
2255 
2256  fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT);
2257  fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW);
2258 
2259  fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2260  fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2261 
2262  TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame);
2263 
2264  fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE);
2265  fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE);
2266 
2267  hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2268  hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2269 
2270  fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX));
2271 
2272  fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT);
2273  fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2274 
2275  AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX));
2276 
2277  MapSubwindows();
2278  Layout();
2279  Resize(GetDefaultSize());
2280  MapWindow();
2281  }
2282 
2283  ~MainWindow() // dtor // Closing the control window closes the whole program
2284  {
2285  if (gTrace)
2286  printf("MainWindow::dtor!\n");
2287 
2288  delete fMenu;
2289  delete fMenuBar;
2290  delete fMenuBarItemLayout;
2291  }
2292 
2294  {
2295  if (gTrace)
2296  printf("MainWindow::CloseWindow()\n");
2297 
2298  if (fHolder)
2299  fHolder->fValue = CTRL_QUIT;
2300  //gSystem->ExitLoop();
2301  }
2302 
2303  Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
2304  {
2305  //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2);
2306  switch (GET_MSG(msg))
2307  {
2308  default:
2309  break;
2310  case kC_COMMAND:
2311  switch (GET_SUBMSG(msg))
2312  {
2313  default:
2314  break;
2315  case kCM_MENU:
2316  //printf("parm1 %d\n", (int)parm1);
2317  switch (parm1)
2318  {
2319  case CTRL_TBROWSER:
2320  new TBrowser();
2321  break;
2322  default:
2323  //printf("Control %d!\n", (int)parm1);
2324  if (fHolder)
2325  fHolder->fValue = parm1;
2326  //gSystem->ExitLoop();
2327  break;
2328  }
2329  break;
2330  }
2331  break;
2332  }
2333 
2334  return kTRUE;
2335  }
2336 };
2337 #endif
2338 
2340 {
2341 public:
2344  int fSkip;
2345 #ifdef HAVE_ROOT
2348 #endif
2349 
2351  : TARunObject(runinfo)
2352  {
2353  if (gTrace)
2354  printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo);
2355  fContinue = false;
2356  fNextFlow = false;
2357  fSkip = 0;
2358 #ifdef HAVE_ROOT
2359  if (!fgHolder)
2360  fgHolder = new ValueHolder;
2361  if (!fgCtrlWindow && runinfo->fRoot->fgApp) {
2362  fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2363  }
2364 #endif
2365  }
2366 
2368  {
2369  if (gTrace)
2370  printf("InteractiveModule::dtor!\n");
2371  }
2372 
2373  void BeginRun(TARunInfo* runinfo)
2374  {
2375  printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo);
2376  }
2377 
2378  void EndRun(TARunInfo* runinfo)
2379  {
2380  printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo);
2381 
2382 #ifdef HAVE_ROOT
2383  if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2384  fgCtrlWindow->fNextButton->SetEnabled(false);
2385  fgCtrlWindow->fNextFlowButton->SetEnabled(false);
2386  fgCtrlWindow->fContinueButton->SetEnabled(false);
2387  fgCtrlWindow->fPauseButton->SetEnabled(false);
2388  while (1) {
2389 #ifdef HAVE_THTTP_SERVER
2391  TARootHelper::fgHttpServer->ProcessRequests();
2392  }
2393 #endif
2394 #ifdef HAVE_ROOT
2395  if (TARootHelper::fgApp) {
2396  gSystem->DispatchOneEvent(kTRUE);
2397  }
2398 #endif
2399 #ifdef HAVE_MIDAS
2400 #ifdef HAVE_TMFE
2401  TMFE* mfe = TMFE::Instance();
2402  mfe->Yield(0.010);
2403  if (mfe->fShutdownRequested) {
2404  return;
2405  }
2406 #else
2407  if (!TMidasOnline::instance()->sleep(10)) {
2408  // FIXME: indicate that we should exit the analyzer
2409  return;
2410  }
2411 #endif
2412 #else
2413  gSystem->Sleep(10);
2414 #endif
2415 
2416  int ctrl = fgHolder->fValue;
2417  fgHolder->fValue = 0;
2418 
2419  switch (ctrl) {
2420  case CTRL_QUIT:
2421  return;
2422  case CTRL_NEXT:
2423  return;
2424  case CTRL_CONTINUE:
2425  return;
2426  }
2427  }
2428  }
2429 #endif
2430  }
2431 
2432  void PauseRun(TARunInfo* runinfo)
2433  {
2434  printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo);
2435  }
2436 
2437  void ResumeRun(TARunInfo* runinfo)
2438  {
2439  printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo);
2440  }
2441 
2442  void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags)
2443  {
2444 #ifdef HAVE_ROOT
2445  if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2446  while (1) {
2447 #ifdef HAVE_THTTP_SERVER
2449  TARootHelper::fgHttpServer->ProcessRequests();
2450  }
2451 #endif
2452 #ifdef HAVE_ROOT
2453  if (TARootHelper::fgApp) {
2454  gSystem->DispatchOneEvent(kTRUE);
2455  }
2456 #endif
2457 #ifdef HAVE_MIDAS
2458 #ifdef HAVE_TMFE
2459  TMFE* mfe = TMFE::Instance();
2460  mfe->Yield(0.010);
2461  if (mfe->fShutdownRequested) {
2462  *flags |= TAFlag_QUIT;
2463  return;
2464  }
2465 #else
2466  if (!TMidasOnline::instance()->sleep(10)) {
2467  *flags |= TAFlag_QUIT;
2468  return;
2469  }
2470 #endif
2471 #else
2472  gSystem->Sleep(10);
2473 #endif
2474 
2475  int ctrl = fgHolder->fValue;
2476  fgHolder->fValue = 0;
2477 
2478  switch (ctrl) {
2479  case CTRL_QUIT:
2480  *flags |= TAFlag_QUIT;
2481  return;
2482  case CTRL_NEXT:
2483  return;
2484  case CTRL_NEXT_FLOW:
2485  fNextFlow = true;
2486  return;
2487  case CTRL_CONTINUE:
2488  fContinue = true;
2489  return;
2490  }
2491  }
2492  }
2493 #endif
2494 
2495  while (1) {
2496  char str[256];
2497  fprintf(stdout, "manalyzer> "); fflush(stdout);
2498  char* s = fgets(str, sizeof(str)-1, stdin);
2499 
2500  if (s == NULL) {
2501  // EOF
2502  *flags |= TAFlag_QUIT;
2503  return;
2504  }
2505 
2506  printf("command [%s]\n", str);
2507 
2508  if (str[0] == 'h') { // "help"
2509  printf("Interactive manalyzer commands:\n");
2510  printf(" q - quit\n");
2511  printf(" h - help\n");
2512  printf(" c - continue until next TAFlag_DISPLAY event\n");
2513  printf(" n - next event\n");
2514  printf(" aNNN - analyze N events, i.e. \"a10\"\n");
2515  } else if (str[0] == 'q') { // "quit"
2516  *flags |= TAFlag_QUIT;
2517  return;
2518  } else if (str[0] == 'n') { // "next"
2519  return;
2520  } else if (str[0] == 'c') { // "continue"
2521  fContinue = true;
2522  return;
2523  } else if (str[0] == 'a') { // "analyze" N events
2524  int num = atoi(str+1);
2525  printf("Analyzing %d events\n", num);
2526  if (num > 0) {
2527  fSkip = num-1;
2528  }
2529  return;
2530  }
2531  }
2532  }
2533 
2534  TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2535  {
2536  printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str());
2537 
2538 #ifdef HAVE_ROOT
2539  if (fgHolder->fValue == CTRL_QUIT) {
2540  *flags |= TAFlag_QUIT;
2541  return flow;
2542  } else if (fgHolder->fValue == CTRL_PAUSE) {
2543  fContinue = false;
2544  }
2545 #endif
2546 
2547  if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2548  return flow;
2549  } else {
2550  fContinue = false;
2551  }
2552 
2553  if (fSkip > 0) {
2554  fSkip--;
2555  return flow;
2556  }
2557 
2558  InteractiveLoop(runinfo, flags);
2559 
2560  return flow;
2561  }
2562 
2564  {
2565  printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo);
2566 
2567 #ifdef HAVE_ROOT
2568  if (fgHolder->fValue == CTRL_QUIT) {
2569  *flags |= TAFlag_QUIT;
2570  return flow;
2571  } else if (fgHolder->fValue == CTRL_PAUSE) {
2572  fContinue = false;
2573  }
2574 #endif
2575 
2576  if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2577  return flow;
2578  }
2579 
2580  fNextFlow = false;
2581 
2582  InteractiveLoop(runinfo, flags);
2583 
2584  return flow;
2585  }
2586 
2587  void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2588  {
2589  if (gTrace)
2590  printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2591  }
2592 };
2593 
2594 #ifdef HAVE_ROOT
2597 #endif
2598 
2600 {
2601 public:
2602 
2603  void Init(const std::vector<std::string> &args)
2604  {
2605  if (gTrace)
2606  printf("InteractiveModuleFactory::Init!\n");
2607  }
2608 
2609  void Finish()
2610  {
2611  if (gTrace)
2612  printf("InteractiveModuleFactory::Finish!\n");
2613  }
2614 
2616  {
2617  if (gTrace)
2618  printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2619  return new InteractiveModule(runinfo);
2620  }
2621 };
2622 
2623 //////////////////////////////////////////////////////////
2624 //
2625 // main program
2626 //
2627 //////////////////////////////////////////////////////////
2628 
2629 static void help()
2630 {
2631  printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2632  printf("\n");
2633  printf("-h: print this help message\n");
2634  printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2635  printf("\n");
2636  printf("-Hhostname: connect to MIDAS experiment on given host\n");
2637  printf("-Eexptname: connect to this MIDAS experiment\n");
2638  printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2639  printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2640  printf("--midas-exptname EXPTNAME -- connect to given experiment\n");
2641  printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2642  printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2643  printf("--midas-event-id III -- receive only events with matching event ID\n");
2644  printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2645  printf("\n");
2646  printf("-oOutputfile.mid: write selected events into this file\n");
2647  printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2648  printf("-eNNN: Number of events to analyze, 0=unlimited\n");
2649  printf("-sNNN: Number of events to skip before starting analysis\n");
2650  printf("\n");
2651  printf("--dump: activate the event dump module\n");
2652  printf("\n");
2653  printf("-t: Enable tracing of constructors, destructors and function calls\n");
2654  printf("-m: Enable memory leak debugging\n");
2655  printf("-g: Enable graphics display when processing data files\n");
2656  printf("-i: Enable intractive mode\n");
2657  printf("\n");
2658  printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2659  printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength);
2660  printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty);
2661  printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull);
2662  printf("\n");
2663  printf("--no-profiler: Turn off manalyzer module profiler\n");
2664  printf("--pqiNNN: Profile multithread queue lengths every NNN events \n");
2665 #ifdef HAVE_ROOT
2666  printf("\n");
2667  printf("-Doutputdirectory: Specify output root file directory\n");
2668  printf("-Ooutputfile.root: Specify output root file filename\n");
2669 #endif
2670  printf("\n");
2671  printf("--: All following arguments are passed to the analyzer modules Init() method\n");
2672  printf("\n");
2673  printf("Analyzer modules usage:\n");
2674  if (gModules)
2675  for (unsigned i=0; i<(*gModules).size(); i++)
2676  (*gModules)[i]->Usage();
2677  printf("\n");
2678  printf("Example1: analyze online data: ./manalyzer.exe -R9091\n");
2679  printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2680  exit(1);
2681 }
2682 
2683 // duplicate c++20 std::string s.starts_with()
2684 
2685 static bool starts_with(const std::string& s, const char* prefix)
2686 {
2687  return (s.substr(0, strlen(prefix)) == prefix);
2688 }
2689 
2690 // Main function call
2691 
2692 int manalyzer_main(int argc, char* argv[])
2693 {
2694  setbuf(stdout, NULL);
2695  setbuf(stderr, NULL);
2696 
2697  signal(SIGILL, SIG_DFL);
2698  signal(SIGBUS, SIG_DFL);
2699  signal(SIGSEGV, SIG_DFL);
2700  signal(SIGPIPE, SIG_DFL);
2701 
2702  std::vector<std::string> args;
2703  for (int i=0; i<argc; i++) {
2704  if (strcmp(argv[i],"-h")==0)
2705  help(); // does not return
2706  args.push_back(argv[i]);
2707  }
2708 
2709  int httpPort = 0;
2710  int num_skip = 0;
2711  int num_analyze = 0;
2712 
2713  TMWriterInterface *writer = NULL;
2714 
2715  bool event_dump = false;
2716  bool demo_mode = false;
2717 #ifdef HAVE_ROOT
2718  bool root_graphics = false;
2719 #endif
2720  bool interactive = false;
2721 
2722  bool multithread = false;
2723 
2724  bool performance_profiler = true;
2725  int snap_shot_queue_length = 100;
2726 
2727  std::vector<std::string> files;
2728  std::vector<std::string> modargs;
2729 
2730 #ifdef HAVE_MIDAS
2731  std::string midas_hostname = "";
2732  std::string midas_exptname = "";
2733  std::string midas_progname = "ana";
2734  std::string midas_buffer = "SYSTEM";
2735  //std::string midas_sampling = "GET_ALL";
2736  std::string midas_sampling = "GET_NONBLOCKING";
2737  int midas_event_id = -1;
2738  int midas_trigger_mask = -1;
2739 #endif
2740 
2741  for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
2742  std::string arg = args[i];
2743  //printf("argv[%d] is %s\n",i,arg);
2744 
2745  if (arg == "--") {
2746  for (unsigned j=i+1; j<args.size(); j++)
2747  modargs.push_back(args[j]);
2748  break;
2749  } else if (arg == "--dump") {
2750  event_dump = true;
2751  } else if (arg == "--demo") {
2752  demo_mode = true;
2753  num_analyze = 100;
2754 #ifdef HAVE_ROOT
2755  } else if (arg == "-g") {
2756  root_graphics = true;
2757 #endif
2758  } else if (arg == "-i") {
2759  interactive = true;
2760  } else if (arg == "-t") {
2761  gTrace = true;
2764  } else if (starts_with(arg, "-o")) {
2765  writer = TMNewWriter(arg.c_str()+2);
2766  } else if (starts_with(arg, "-s")) {
2767  num_skip = atoi(arg.c_str()+2);
2768  } else if (starts_with(arg, "-e")) {
2769  num_analyze = atoi(arg.c_str()+2);
2770  } else if (starts_with(arg, "-m")) { // Enable memory debugging
2771  gEnableShowMem = true;
2772  } else if (starts_with(arg, "-R")) { // Set the ROOT THttpServer HTTP port
2773  httpPort = atoi(arg.c_str()+2);
2774 #ifdef HAVE_MIDAS
2775  } else if (starts_with(arg, "-H")) {
2776  midas_hostname = arg.c_str()+2;
2777  } else if (starts_with(arg, "-E")) {
2778  midas_exptname = arg.c_str()+2;
2779  } else if (arg == "--midas-progname") {
2780  midas_progname = args[i+1]; i++;
2781  } else if (arg == "--midas-hostname") {
2782  midas_hostname = args[i+1]; i++;
2783  } else if (arg == "--midas-exptname") {
2784  midas_exptname = args[i+1]; i++;
2785  } else if (arg == "--midas-buffer") {
2786  midas_buffer = args[i+1]; i++;
2787  } else if (arg == "--midas-sampling") {
2788  midas_sampling = args[i+1]; i++;
2789  } else if (arg == "--midas-event-id") {
2790  midas_event_id = atoi(args[i+1].c_str()); i++;
2791  } else if (arg == "--midas-trigger-mask") {
2792  midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
2793 #endif
2794  } else if (starts_with(arg, "--mtql")) {
2795  gDefaultMultithreadQueueLength = atoi(arg.c_str()+6);
2796  } else if (starts_with(arg, "--mtse")) {
2797  gDefaultMultithreadWaitEmpty = atoi(arg.c_str()+6);
2798  } else if (starts_with(arg, "--mtsf")) {
2799  gDefaultMultithreadWaitFull = atoi(arg.c_str()+6);
2800  } else if (arg == "--mt") {
2801  multithread=true;
2802  } else if (arg == "--no-profiler") {
2803  performance_profiler = 0;
2804  } else if (starts_with(arg, "--pqi")) {
2805  snap_shot_queue_length = atoi(arg.c_str()+5);
2806 #ifdef HAVE_ROOT
2807  } else if (starts_with(arg, "-O")) {
2808  TARootHelper::fOutputFileName = arg.c_str()+2;
2809  } else if (starts_with(arg, "-D")) {
2810  TARootHelper::fOutputDirectory = arg.c_str()+2;
2811 #endif
2812  } else if (arg == "-h") {
2813  help(); // does not return
2814  } else if (arg[0] == '-') {
2815  help(); // does not return
2816  } else {
2817  files.push_back(args[i]);
2818  }
2819  }
2820 
2821  if (!gModules)
2822  gModules = new std::vector<TAFactory*>;
2823 
2824  if ((*gModules).size() == 0)
2825  event_dump = true;
2826 
2827  if (event_dump)
2828  (*gModules).push_back(new EventDumpModuleFactory);
2829 
2830  if (interactive)
2831  (*gModules).push_back(new InteractiveModuleFactory);
2832 
2833  printf("Registered modules: %d\n", (int)(*gModules).size());
2834 
2835 #ifdef HAVE_ROOT
2836  if (root_graphics) {
2837  TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0);
2838  }
2839 
2840  TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms");
2842 #endif
2843 
2844  if (httpPort) {
2845 #ifdef HAVE_THTTP_SERVER
2846  char str[256];
2847  sprintf(str, "http:127.0.0.1:%d?cors", httpPort);
2848  THttpServer *s = new THttpServer(str);
2849  //s->SetTimer(100, kFALSE);
2851 #else
2852  fprintf(stderr,"ERROR: No support for the THttpServer!\n");
2853 #endif
2854  }
2855 
2856  for (unsigned i=0; i<files.size(); i++) {
2857  printf("file[%d]: %s\n", i, files[i].c_str());
2858  }
2859  int exit_state = 0;
2860  if (demo_mode) {
2861  exit_state = ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2862  } else if (files.size() > 0) {
2863  exit_state = ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2864  } else {
2865 #ifdef HAVE_MIDAS
2866 #ifdef HAVE_TMFE
2867  exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2868 #else
2869  exit_state = ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2870 #endif
2871 #endif
2872  }
2873 
2874  if (writer) {
2875  writer->Close();
2876  delete writer;
2877  writer = NULL;
2878  }
2879 
2880  return exit_state;
2881 }
2882 
2883 /* emacs
2884  * Local Variables:
2885  * tab-width: 8
2886  * c-basic-offset: 3
2887  * indent-tabs-mode: nil
2888  * End:
2889  */
R__EXTERN TDirectory * gDirectory
double GetTimeSec()
TARunObject * NewRunObject(TARunInfo *runinfo)
Definition: manalyzer.cxx:2137
void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:2125
void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2095
EventDumpModule(TARunInfo *runinfo)
Definition: manalyzer.cxx:2067
void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2085
void NextSubrun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2090
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:2115
void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2080
void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2100
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2105
TARunObject * NewRunObject(TARunInfo *runinfo)
Definition: manalyzer.cxx:2615
void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:2603
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2534
void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2432
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2563
void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2378
static MainWindow * fgCtrlWindow
Definition: manalyzer.cxx:2347
void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2437
InteractiveModule(TARunInfo *runinfo)
Definition: manalyzer.cxx:2350
static ValueHolder * fgHolder
Definition: manalyzer.cxx:2346
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
Definition: manalyzer.cxx:2442
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:2587
void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2373
Definition: mvodb.h:21
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
void CloseWindow()
Definition: manalyzer.cxx:2293
TGLayoutHints * fMenuBarItemLayout
Definition: manalyzer.cxx:2208
TextButton * fNextFlowButton
Definition: manalyzer.cxx:2215
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
Definition: manalyzer.cxx:2222
TGCompositeFrame * fButtonsFrame
Definition: manalyzer.cxx:2210
TGPopupMenu * fMenu
Definition: manalyzer.cxx:2206
TextButton * fPauseButton
Definition: manalyzer.cxx:2217
ValueHolder * fHolder
Definition: manalyzer.cxx:2212
TextButton * fNextButton
Definition: manalyzer.cxx:2214
TGMenuBar * fMenuBar
Definition: manalyzer.cxx:2207
TextButton * fContinueButton
Definition: manalyzer.cxx:2216
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
Definition: manalyzer.cxx:2303
TextButton * fQuitButton
Definition: manalyzer.cxx:2219
void Event(const void *data, int data_size)
Definition: manalyzer.cxx:1684
void Transition(int transition, int run_number, int transition_time)
Definition: manalyzer.cxx:1656
RunHandler fRun
Definition: manalyzer.cxx:1629
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1635
void StartRun(int run_number)
Definition: manalyzer.cxx:1649
std::vector< double > fAnalyzeFlowEventRMS
Definition: manalyzer.cxx:574
const int fQueueInterval
Definition: manalyzer.cxx:587
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
Definition: manalyzer.cxx:761
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
Definition: manalyzer.cxx:811
std::vector< double > fAnalyzeFlowEventTimeTotal
Definition: manalyzer.cxx:578
uint32_t fMIDASStopTime
Definition: manalyzer.cxx:548
std::vector< double > fTotalUserTime
Definition: manalyzer.cxx:583
std::vector< int > fAnalyzeEventEntries
Definition: manalyzer.cxx:564
std::vector< TH1D * > fUserHistograms
Definition: manalyzer.cxx:582
std::vector< double > fAnalyzeFlowEventMean
Definition: manalyzer.cxx:573
uint32_t fMIDASStartTime
Definition: manalyzer.cxx:547
std::atomic< int > fQueueIntervalCounter
Definition: manalyzer.cxx:554
std::string fBinaryPath
Definition: manalyzer.cxx:544
int fNQueues
Definition: manalyzer.cxx:552
std::vector< double > fAnalyzeEventRMS
Definition: manalyzer.cxx:563
void Print() const
Definition: manalyzer.cxx:910
std::vector< double > fAnalyzeEventTimeMax
Definition: manalyzer.cxx:566
clock_t fStartCPU
Definition: manalyzer.cxx:545
std::string fBinaryName
Definition: manalyzer.cxx:543
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
Definition: manalyzer.cxx:733
std::vector< std::string > fModuleNames
Definition: manalyzer.cxx:561
void End(TARunInfo *runinfo)
Definition: manalyzer.cxx:872
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
Definition: manalyzer.cxx:571
std::chrono::system_clock::time_point fStartUser
Definition: manalyzer.cxx:546
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
Definition: manalyzer.cxx:639
std::vector< double > fAnalyzeEventTimeTotal
Definition: manalyzer.cxx:567
void AddModuleMap(const char *UserProfileName, unsigned long hash)
Definition: manalyzer.cxx:848
std::vector< double > fAnalyzeFlowEventTimeMax
Definition: manalyzer.cxx:577
void LogMTQueueLength(TARunInfo *runinfo)
Definition: manalyzer.cxx:790
std::map< unsigned int, int > fUserMap
Definition: manalyzer.cxx:581
std::vector< double > fAnalyzeEventMean
Definition: manalyzer.cxx:562
std::vector< double > fMaxUserTime
Definition: manalyzer.cxx:584
std::vector< TH1D * > fAnalyzeEventTimeHistograms
Definition: manalyzer.cxx:559
Profiler(const int queue_interval_check)
Definition: manalyzer.cxx:606
std::vector< int > fAnalyzeFlowEventEntries
Definition: manalyzer.cxx:575
std::vector< TH1D * > fAnalysisQueue
Definition: manalyzer.cxx:553
TARunInfo * fRunInfo
Definition: manalyzer.cxx:996
std::vector< TARunObject * > fRunRun
Definition: manalyzer.cxx:997
std::vector< std::string > fArgs
Definition: manalyzer.cxx:998
void NextSubrun()
Definition: manalyzer.cxx:1200
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
Definition: manalyzer.cxx:1276
void CreateRun(int run_number, const char *file_name)
Definition: manalyzer.cxx:1119
void DeleteRun()
Definition: manalyzer.cxx:1208
void AnalyzeSpecialEvent(TMEvent *event)
Definition: manalyzer.cxx:1229
void EndRun(TAFlags *flags)
Definition: manalyzer.cxx:1155
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:1235
int fProfilerIntervalCheck
Definition: manalyzer.cxx:1002
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
Definition: manalyzer.cxx:1004
void PerModuleThread(int i)
Definition: manalyzer.cxx:1026
void AnalyzeFlowQueue(TAFlags *ana_flags)
Definition: manalyzer.cxx:1252
void BeginRun()
Definition: manalyzer.cxx:1144
virtual void Finish()
Definition: manalyzer.cxx:194
virtual void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:188
virtual void Usage()
Definition: manalyzer.cxx:182
virtual ~TAFlowEvent()
Definition: manalyzer.cxx:91
TAFlowEvent * fNext
Definition: manalyzer.h:51
std::vector< TAFlagsQueue > fMtFlagQueue
Definition: manalyzer.h:176
std::vector< std::mutex > fMtFlowQueueMutex
Definition: manalyzer.h:174
std::atomic< bool > fMtShutdownRequested
Definition: manalyzer.h:182
std::vector< std::thread * > fMtThreads
Definition: manalyzer.h:177
std::vector< std::atomic< bool > > fMtThreadIsBusy
Definition: manalyzer.h:179
TAMultithreadHelper(int nModules)
Definition: manalyzer.cxx:391
static int gfMtMaxBacklog
Definition: manalyzer.h:192
static bool gfMultithread
Definition: manalyzer.h:191
std::vector< TAFlowEventQueue > fMtFlowQueue
Definition: manalyzer.h:175
static std::mutex gfLock
Definition: manalyzer.h:193
std::vector< std::atomic< bool > > fMtThreadIsRunning
Definition: manalyzer.h:178
std::atomic< bool > fMtQuitRequested
Definition: manalyzer.h:183
TARegister(TAFactory *m)
Definition: manalyzer.cxx:497
static std::string fOutputDirectory
Definition: manalyzer.h:152
static TApplication * fgApp
Definition: manalyzer.h:156
static std::string fOutputFileName
Definition: manalyzer.h:153
static THttpServer * fgHttpServer
Definition: manalyzer.h:157
TFile * fOutputFile
Definition: manalyzer.h:154
static TDirectory * fgDir
Definition: manalyzer.h:155
void AddToFlowQueue(TAFlowEvent *)
Definition: manalyzer.cxx:1349
TAFlowEvent * ReadFlowQueue()
Definition: manalyzer.cxx:1339
std::string fFileName
Definition: manalyzer.h:25
std::vector< std::string > fArgs
Definition: manalyzer.h:29
MVOdb * fOdb
Definition: manalyzer.h:26
int fRunNo
Definition: manalyzer.h:24
static std::vector< std::string > fgFileList
Definition: manalyzer.h:30
TARunInfo()
Definition: manalyzer.h:42
TARootHelper * fRoot
Definition: manalyzer.h:27
TAMultithreadHelper * fMtInfo
Definition: manalyzer.h:28
static int fgCurrentFileIndex
Definition: manalyzer.h:31
virtual void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:118
virtual void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:136
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:170
virtual void NextSubrun(TARunInfo *runinfo)
Definition: manalyzer.cxx:124
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:148
virtual void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:130
virtual void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:112
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:159
virtual void PreEndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:142
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
Definition: manalyzer.cxx:519
const std::string fModuleName
Definition: manalyzer.h:220
double GetTimer() const
Definition: manalyzer.cxx:529
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
Definition: midasio.cxx:656
void Reset()
reset everything
Definition: midasio.cxx:688
void ParseEvent()
parse event data
Definition: midasio.cxx:708
std::vector< char > data
MIDAS event bytes.
Definition: midasio.h:67
size_t event_header_size
size of MIDAS event header
Definition: midasio.h:63
uint32_t serial_number
MIDAS event serial number.
Definition: midasio.h:59
uint32_t data_size
MIDAS event data size.
Definition: midasio.h:61
uint16_t event_id
MIDAS event ID.
Definition: midasio.h:57
MIDAS online connection, including access to online ODB.
Definition: TMidasOnline.h:36
virtual int Close()=0
std::string fErrorString
Definition: midasio.h:116
static bool fgTrace
Definition: midasio.h:117
virtual int Close()=0
static bool fgTrace
Definition: midasio.h:125
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
HNDLE fDB
ODB handle.
Definition: TMidasOnline.h:58
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
Definition: manalyzer.cxx:2175
ValueHolder * fHolder
Definition: manalyzer.cxx:2172
void Clicked()
Definition: manalyzer.cxx:2194
std::chrono::high_resolution_clock::time_point TAClock
Definition: manalyzer.h:207
int TAFlags
Definition: manalyzer.h:72
#define TAFlag_DISPLAY
Definition: manalyzer.h:78
std::chrono::duration< double > TAClockDuration
Definition: manalyzer.h:208
#define TAFlag_SKIP
Definition: manalyzer.h:75
#define TAFlag_WRITE
Definition: manalyzer.h:77
TAClock TAClockNow()
Definition: manalyzer.h:209
#define TAFlag_QUIT
Definition: manalyzer.h:76
#define TAFlag_SKIP_PROFILE
Definition: manalyzer.h:79
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
Definition: midasio.cxx:651
TMReaderInterface * TMNewReader(const char *source)
Definition: midasio.cxx:447
TMWriterInterface * TMNewWriter(const char *destination)
Definition: midasio.cxx:543
TMEvent * TMReadEvent(TMReaderInterface *reader)
Definition: midasio.cxx:585
#define TID_DWORD
Definition: midasio.h:22
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
Definition: midasodb.cxx:924
MVOdb * MakeNullOdb()
Definition: nullodb.cxx:129
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
Definition: mvodb.cxx:91
#define CTRL_NEXT_FLOW
Definition: manalyzer.cxx:2154
static int gDefaultMultithreadWaitFull
Definition: manalyzer.cxx:300
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1955
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow, bool wait)
Definition: manalyzer.cxx:449
std::vector< TAFactory * > * gModules
Definition: manalyzer.cxx:495
static bool gTrace
Definition: manalyzer.cxx:19
#define CTRL_NEXT
Definition: manalyzer.cxx:2151
int manalyzer_main(int argc, char *argv[])
Definition: manalyzer.cxx:2692
static int gDefaultMultithreadWaitEmpty
Definition: manalyzer.cxx:299
#define CTRL_TBROWSER
Definition: manalyzer.cxx:2156
#define CTRL_CONTINUE
Definition: manalyzer.cxx:2152
#define CTRL_QUIT
Definition: manalyzer.cxx:2150
static bool starts_with(const std::string &s, const char *prefix)
Definition: manalyzer.cxx:2685
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1714
static bool gEnableShowMem
Definition: manalyzer.cxx:2041
#define CTRL_PAUSE
Definition: manalyzer.cxx:2153
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
Definition: manalyzer.cxx:302
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1788
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
Definition: manalyzer.cxx:348
static void help()
Definition: manalyzer.cxx:2629
static int gDefaultMultithreadQueueLength
Definition: manalyzer.cxx:298
static std::string to_string(int v)
Definition: midasio.cxx:28
int ShowMem(const char *label)