ROOTANA
manalyzer.cxx
Go to the documentation of this file.
1 //
2 // MIDAS analyzer
3 //
4 // K.Olchanski
5 //
6 
7 #undef NDEBUG // this program requires assert()
8 
9 #include <stdio.h>
10 #include <unistd.h> // usleep()
11 #include <assert.h>
12 
13 #include "manalyzer.h"
14 #include "midasio.h"
15 
16 //////////////////////////////////////////////////////////
17 
18 static bool gTrace = false;
19 
20 //////////////////////////////////////////////////////////
21 //
22 // Methods of TARunInfo
23 //
24 //////////////////////////////////////////////////////////
25 
26 TARunInfo::TARunInfo(int runno, const char* filename, const std::vector<std::string>& args)
27 {
28  if (gTrace)
29  printf("TARunInfo::ctor!\n");
30  fRunNo = runno;
31  if (filename)
32  fFileName = filename;
33  fOdb = NULL;
34 #ifdef HAVE_ROOT
35  fRoot = new TARootHelper(this);
36 #else
37  fRoot = NULL;
38 #endif
39  fMtInfo = NULL;
40  fArgs = args;
41 }
42 
44 {
45  if (gTrace)
46  printf("TARunInfo::dtor!\n");
47  fRunNo = 0;
48  fFileName = "(deleted)";
49  if (fOdb) {
50  delete fOdb;
51  fOdb = NULL;
52  }
53 #ifdef HAVE_ROOT
54  if (fRoot) {
55  delete fRoot;
56  fRoot = NULL;
57  }
58 #endif
59  int count = 0;
60  while (1) {
61  TAFlowEvent* flow = ReadFlowQueue();
62  if (!flow)
63  break;
64  delete flow;
65  count++;
66  }
67  if (gTrace) {
68  printf("TARunInfo::dtor: deleted %d queued flow events!\n", count);
69  }
70 
71  if (fMtInfo) {
72  delete fMtInfo;
73  fMtInfo = NULL;
74  }
75 }
76 
77 //////////////////////////////////////////////////////////
78 //
79 // Methods of TAFlowEvent
80 //
81 //////////////////////////////////////////////////////////
82 
84 {
85  if (gTrace)
86  printf("TAFlowEvent::ctor: chain %p\n", flow);
87  fNext = flow;
88 }
89 
91 {
92  if (gTrace)
93  printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext);
94  if (fNext)
95  delete fNext;
96  fNext = NULL;
97 }
98 
99 //////////////////////////////////////////////////////////
100 //
101 // Methods of TARunObject
102 //
103 //////////////////////////////////////////////////////////
104 
106 {
107  if (gTrace)
108  printf("TARunObject::ctor, run %d\n", runinfo->fRunNo);
109 }
110 
112 {
113  if (gTrace)
114  printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo);
115 }
116 
118 {
119  if (gTrace)
120  printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo);
121 }
122 
124 {
125  if (gTrace)
126  printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo);
127 }
128 
130 {
131  if (gTrace)
132  printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo);
133 }
134 
136 {
137  if (gTrace)
138  printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo);
139 }
140 
142 {
143  if (gTrace)
144  printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo);
145 }
146 
148 {
149  if (gTrace)
150  printf("TARunObject::Analyze!\n");
151  return flow;
152 }
153 
155 {
156  if (gTrace)
157  printf("TARunObject::Analyze!\n");
158  return flow;
159 }
160 
162 {
163  if (gTrace)
164  printf("TARunObject::AnalyzeSpecialEvent!\n");
165 }
166 
167 //////////////////////////////////////////////////////////
168 //
169 // Methods of TAFactory
170 //
171 //////////////////////////////////////////////////////////
172 
174 {
175  if (gTrace)
176  printf("TAFactory::Usage!\n");
177 }
178 
179 void TAFactory::Init(const std::vector<std::string> &args)
180 {
181  if (gTrace)
182  printf("TAFactory::Init!\n");
183 }
184 
186 {
187  if (gTrace)
188  printf("TAFactory::Finish!\n");
189 }
190 
191 #ifdef HAVE_ROOT
192 
193 //////////////////////////////////////////////////////////
194 //
195 // Methods of TARootHelper
196 //
197 //////////////////////////////////////////////////////////
198 
199 std::string TARootHelper::fOutputDirectory = "root_output_files";
200 std::string TARootHelper::fOutputFileName = "";
201 TApplication* TARootHelper::fgApp = NULL;
203 THttpServer* TARootHelper::fgHttpServer = NULL;
204 
205 TARootHelper::TARootHelper(const TARunInfo* runinfo) // ctor
206 {
207  if (gTrace)
208  printf("TARootHelper::ctor!\n");
209 
210  std::string filename = fOutputFileName;
211 
212  if (filename.empty()) {
213  char xfilename[256];
214  sprintf(xfilename, "output%05d.root", runinfo->fRunNo);
215  filename = fOutputDirectory + "/" + xfilename;
216  }
217 
218  fOutputFile = new TFile(filename.c_str(), "RECREATE");
219 
220  if (!fOutputFile->IsOpen()) {
221  fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
222  fOutputFile = new TFile("/dev/null", "UPDATE");
223  assert(fOutputFile);
224  assert(fOutputFile->IsOpen());
225  }
226 
227  if (fOutputFile != NULL) {
228  fOutputFile->cd();
229  }
230 }
231 
233 {
234  if (gTrace)
235  printf("TARootHelper::dtor!\n");
236 
237  if (fOutputFile != NULL) {
238  fOutputFile->Write();
239  fOutputFile->Close();
240  fOutputFile = NULL;
241  }
242 
243  if (fgDir)
244  fgDir->cd();
245 }
246 
247 #endif
248 
249 #include <stdio.h>
250 #include <stdlib.h>
251 #include <string.h>
252 #include <sys/time.h>
253 #include <assert.h>
254 #include <signal.h>
255 
256 #include "manalyzer.h"
257 #include "midasio.h"
258 
259 #ifdef HAVE_THTTP_SERVER
260 #include "THttpServer.h"
261 #endif
262 
263 #ifdef HAVE_ROOT
264 #include <TSystem.h>
265 #endif
266 
267 //////////////////////////////////////////////////////////
268 //
269 // Methods and Defaults of TAMultithreadHelper
270 //
271 //////////////////////////////////////////////////////////
272 
274 static int gDefaultMultithreadWaitEmpty = 100; // microseconds
275 static int gDefaultMultithreadWaitFull = 100; // microseconds
276 
278 {
279  fprintf(stderr, "Waiting for all queues to empty out!\n");
280 
281  int count_all_empty = 0;
282 
283  for (int t=0; ; ) {
284  int count_not_empty = 0;
285  size_t count_events = 0;
286 
287  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
288  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
289  if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown
290  continue;
291  if (!mt->fMtFlowQueue[i].empty()) {
292  count_not_empty++;
293  count_events += mt->fMtFlowQueue[i].size();
294  break;
295  }
296  if (mt->fMtThreadIsBusy[i]) {
297  count_not_empty++;
298  count_events += 1; // module is analyzing 1 event
299  break;
300  }
301  // implicit unlock
302  }
303 
304  if (count_not_empty == 0) {
305  count_all_empty++;
306  }
307 
308  if (count_all_empty > 1) {
309  // must loop over "all empty" at least twice! K.O.
310  break;
311  }
312 
313  if (t > 10) {
314  fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (int)count_events);
315  //break;
316  }
317 
318  ::sleep(1);
319  t++;
320  }
321 }
322 
324 {
325  fprintf(stderr, "Waiting for all threads to shut down!\n");
326 
327  mt->fMtShutdownRequested = true;
328 
329  for (int t=0; ; ) {
330  int count_running = 0;
331 
332  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
333  if (mt->fMtThreadIsRunning[i])
334  count_running++;
335  }
336 
337  if (count_running == 0) {
338  break;
339  }
340 
341  if (t > 10) {
342  fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running);
343  break;
344  }
345 
346  ::sleep(1);
347  t++;
348  }
349 
350  fprintf(stderr, "Joining all threads!\n");
351  for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
352  if (!mt->fMtThreadIsRunning[i]) {
353  mt->fMtThreads[i]->join();
354  delete mt->fMtThreads[i];
355  mt->fMtThreads[i] = NULL;
356  } else {
357  fprintf(stderr, "Thread %d failed to shut down!\n", i);
358  }
359  }
360 }
361 
363 {
364  // default max queue size
366  // queue settings
369 }
370 
372 {
373  size_t nmodules = fMtFlowQueueMutex.size();
374 
375  // just for kicks, check that all queues have correct size
376  assert(nmodules == fMtFlowQueue.size());
377  assert(nmodules == fMtFlagQueue.size());
378  assert(nmodules == fMtFlowQueueMutex.size());
379 
380  // should not come to the destructor while threads are still running
382 
383  int count = 0;
384  for (size_t i=0; i<nmodules; i++) {
385  // check that the thread is stopped
386  //assert(!fMtThread[i].joinable());
387  // empty the thread queue
388  std::lock_guard<std::mutex> lock(fMtFlowQueueMutex[i]);
389  while (!fMtFlowQueue[i].empty()) {
390  TAFlowEvent* flow = fMtFlowQueue[i].front();
391  TAFlags* flag = fMtFlagQueue[i].front();
392  fMtFlowQueue[i].pop_front();
393  fMtFlagQueue[i].pop_front();
394  delete flow;
395  delete flag;
396  count++;
397  }
398  // implicit unlock of mutex
399  }
400  if (gTrace) {
401  printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
402  }
403 }
404 
407 std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries)
408 
409 static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow, bool wait)
410 {
411  assert(mt);
412 
413  if (flag == NULL) {
414  flag = new TAFlags;
415  *flag = 0;
416  }
417 
418  //PrintQueueLength();
419 
420  while (1) {
421  {
422  //Lock and queue events
423  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
424 
425  if ((((int)mt->fMtFlowQueue[i].size()) < mt->fMtQueueDepth) || mt->fMtShutdownRequested || !wait) {
426  mt->fMtFlowQueue[i].push_back(flow);
427  mt->fMtFlagQueue[i].push_back(flag);
428  return;
429  }
430  // Unlock when we go out of scope
431  }
432 
433  usleep(mt->fMtQueueFullUSleepTime);
434  }
435 }
436 
437 #if 0
438 //Function to print the length of the flow queue when in multithread mode
439 //Maybe make root update a graphical window?
440 static void PrintMtQueueLength(TAMultithreadHelper* mt)
441 {
442  printf("Multithread queue lengths:\n");
443  for (unsigned i=0; i<mt->fMtFlowQueue.size(); i++) {
444  printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size());
445  }
446 }
447 #endif
448 
449 //////////////////////////////////////////////////////////
450 //
451 // Methods of TARegister
452 //
453 //////////////////////////////////////////////////////////
454 
455 std::vector<TAFactory*> *gModules = NULL;
456 
458 {
459  if (!gModules)
460  gModules = new std::vector<TAFactory*>;
461  gModules->push_back(m);
462 }
463 
464 #if 0
465 static double GetTimeSec()
466 {
467  struct timeval tv;
468  gettimeofday(&tv,NULL);
469  return tv.tv_sec + 0.000001*tv.tv_usec;
470 }
471 #endif
472 
473 //////////////////////////////////////////////////////////
474 //
475 // Profiler class
476 //
477 //////////////////////////////////////////////////////////
478 
479 TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name)
480 {
481  fStart = start;
482  fStop = TAClockNow();
483 }
484 
486 {
487 }
488 
490 {
491  TAClockDuration elapsed_seconds = fStop - fStart;
492  return elapsed_seconds.count();
493 }
494 
495 #ifdef HAVE_ROOT
496 #include "TH1D.h"
497 #endif
498 #include <map>
499 
500 class Profiler
501 {
502 private:
503  std::string fBinaryName;
504  std::string fBinaryPath;
505  clock_t fStartCPU;
506  std::chrono::system_clock::time_point fStartUser;
507  uint32_t fMIDASStartTime;
508  uint32_t fMIDASStopTime;
509 
510  //Track Queue lengths when multithreading
511 #ifdef HAVE_ROOT
512  int fNQueues=0;
513  std::vector<TH1D*> fAnalysisQueue;
515 #endif
516 
517  // Track Analyse TMEvent time per module (main thread)
518 #ifdef HAVE_ROOT
519  std::vector<TH1D*> fAnalyzeEventTimeHistograms;
520 #endif
521  std::vector<std::string> fModuleNames;
522  std::vector<double> fAnalyzeEventMean;
523  std::vector<double> fAnalyzeEventRMS;
524  std::vector<int> fAnalyzeEventEntries;
525 
526  std::vector<double> fAnalyzeEventTimeMax;
527  std::vector<double> fAnalyzeEventTimeTotal;
528 
529  //Track Analyse flow event time per module (can be multiple threads)
530 #ifdef HAVE_ROOT
532 #endif
533  std::vector<double> fAnalyzeFlowEventMean;
534  std::vector<double> fAnalyzeFlowEventRMS;
535  std::vector<int> fAnalyzeFlowEventEntries;
536 
537  std::vector<double> fAnalyzeFlowEventTimeMax;
538  std::vector<double> fAnalyzeFlowEventTimeTotal;
539 #ifdef HAVE_ROOT
540  //Track user profiling
541  std::map<unsigned int,int> fUserMap;
542  std::vector<TH1D*> fUserHistograms;
543  std::vector<double> fTotalUserTime;
544  std::vector<double> fMaxUserTime;
545 #endif
546 
547  const bool fTimeModules;
548  // Number of events between samples
549  const int fQueueInterval = 100;
550 public:
551  Profiler(const bool time_modules, const int queue_interval_check);
552  ~Profiler();
553  void Begin(TARunInfo* runinfo,const std::vector<TARunObject*> fRunRun );
554  // Function for profiling the 'main' thread (that unpacks TMEvents)
555  void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
556  // Function for profiling module threads
557  void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
558  // Extra function for users custom profiling windows
559  void LogUserWindows(TAFlags* flag, TAFlowEvent* flow);
560  void AddModuleMap( const char* UserProfileName, unsigned long hash);
561  void LogMTQueueLength(TARunInfo* runinfo);
562  void End();
563  void Print() const;
564 };
565 
566 Profiler::Profiler(const bool time_modules, const int queue_interval_check):
567  fTimeModules(time_modules), fQueueInterval(queue_interval_check)
568 {
569  if (gTrace)
570  printf("Profiler::ctor\n");
571  fMIDASStartTime = 0;
572  fMIDASStopTime = 0;
573  fStartCPU = clock();
574  fStartUser = std::chrono::system_clock::now();
575 }
576 
578 {
579  if (gTrace)
580  printf("Profiler::dtor\n");
581 }
582 
583 void Profiler::Begin(TARunInfo* runinfo, const std::vector<TARunObject*> runrun)
584 {
585  if (gTrace)
586  printf("Profiler::begin\n");
587 
588  runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime);
589 
590 #ifdef HAVE_ROOT
591  // Put Profiler histograms in their own folders in the output root file
592  if (runinfo->fRoot->fOutputFile) {
593  runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory
594  gDirectory->mkdir("ProfilerReport")->cd();
595  runinfo->fRoot->fOutputFile->cd("ProfilerReport");
596  gDirectory->mkdir("AnalyzeFlowEventTime");
597  gDirectory->mkdir("AnalyzeFlowTime");
598  gDirectory->mkdir("MTQueueLength");
599  }
600 
601  // Setup module processing time histograms
602  // Number of bins
603  Int_t Nbins=1000;
604  // Array of bin edges
605  Double_t bins[Nbins+1];
606  // Processing time range (seconds)
607  Double_t TimeRange = 10;
608  // Set uneven binning to better sample fast modules with accuracy
609  // without having a large number of bins
610  for (int i=0; i<Nbins+1; i++) {
611  bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
612  }
613 
614  if (runinfo->fMtInfo)
615  fNQueues = runrun.size();
616 #endif
617 
618  // Per module metric setup
619  for (size_t i = 0; i < runrun.size(); i++) {
620 
621  if (runrun[i]->fModuleName.empty())
622  fModuleNames.push_back("Unnamed Module " + std::to_string(i));
623  else
624  fModuleNames.push_back(runrun[i]->fModuleName);
625 
626  // Metrics for the AnalyzeEvent function (main thread)
627  fAnalyzeEventMean.push_back(0);
628  fAnalyzeEventRMS.push_back(0);
629  fAnalyzeEventEntries.push_back(0);
630  fAnalyzeEventTimeMax.push_back(0);
631  fAnalyzeEventTimeTotal.push_back(0);
632 
633  // Metric for the AnalyzeFlowEvent function (side threads)
634  fAnalyzeFlowEventMean.push_back(0);
635  fAnalyzeFlowEventRMS.push_back(0);
636  fAnalyzeFlowEventEntries.push_back(0);
637  fAnalyzeFlowEventTimeMax.push_back(0);
638  fAnalyzeFlowEventTimeTotal.push_back(0);
639 
640 #ifdef HAVE_ROOT
641  if (runinfo->fRoot->fOutputFile) {
642  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
643  TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)),
644  TString(fModuleNames.at(i) + " Event Proccessing Time; s"),
645  Nbins, bins);
646  fAnalyzeFlowEventTimeHistograms.push_back(Histo);
647 
648  runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
649  TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"),
650  TString(fModuleNames.at(i) + " Flow Proccessing Time; s"),
651  Nbins, bins);
652  fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto);
653  } else {
654  fAnalyzeFlowEventTimeHistograms.push_back(NULL);
655  fAnalyzeEventTimeHistograms.push_back(NULL);
656  }
657 
658  // Periodically (once every fQueueInterval events) record the
659  // flow queue size if running in multithreaded mode
660  if (runinfo->fMtInfo) {
661  if (runinfo->fRoot->fOutputFile) {
662  runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
663  TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"),
664  TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"),
665  runinfo->fMtInfo->fMtQueueDepth*1.2,
666  0,
667  runinfo->fMtInfo->fMtQueueDepth*1.2);
668  fAnalysisQueue.push_back(QueueHisto);
669  } else {
670  fAnalysisQueue.push_back(NULL);
671  }
672  }
673 #endif
674  }
675 }
676 
677 void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start)
678 {
679  if (!fTimeModules)
680  return;
681 
682  if (gTrace)
683  printf("Profiler::log\n");
684 
685  TAClock stop = TAClockNow();
686  if ((*flag) & TAFlag_SKIP_PROFILE) {
687  //Unset bit
688  *flag -= TAFlag_SKIP_PROFILE;
689  return;
690  }
691 
692  std::chrono::duration<double> elapsed_seconds = stop - start;
693  double dt = elapsed_seconds.count();
695  if (dt > fAnalyzeFlowEventTimeMax[i])
696  fAnalyzeFlowEventTimeMax[i] = dt;
697 
698  fAnalyzeFlowEventMean[i] +=dt;
699  fAnalyzeFlowEventRMS[i] +=dt*dt;
701 
702 #ifdef HAVE_ROOT
704  fAnalyzeFlowEventTimeHistograms[i]->Fill(dt);
705 #endif
706 }
707 
708 void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start)
709 {
710  if (!fTimeModules)
711  return;
712 
713  if (gTrace)
714  printf("Profiler::log_AnalyzeEvent_time\n");
715 
716  TAClock stop = TAClockNow();
717  if ((*flag) & TAFlag_SKIP_PROFILE) {
718  //Unset bit
719  *flag -= TAFlag_SKIP_PROFILE;
720  return;
721  }
722 
723  std::chrono::duration<double> elapsed_seconds = stop - start;
724  double dt = elapsed_seconds.count();
725  fAnalyzeEventTimeTotal[i] += dt;
726  if (dt > fAnalyzeEventTimeMax[i])
727  fAnalyzeEventTimeMax[i] = dt;
728 
729  fAnalyzeEventMean[i] +=dt;
730  fAnalyzeEventRMS[i] +=dt*dt;
732 
733 #ifdef HAVE_ROOT
735  fAnalyzeEventTimeHistograms[i]->Fill(dt);
736 #endif
737 
738 }
739 
741 {
742  if (gTrace)
743  printf("Profiler::log_mt_queue_length\n");
744 
745 #ifdef HAVE_ROOT
747  if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) {
748  for (int i=0; i<fNQueues; i++) {
749  int j=0;
750  { //Lock guard
751  std::lock_guard<std::mutex> lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]);
752  j=runinfo->fMtInfo->fMtFlowQueue[i].size();
753  }
754  fAnalysisQueue.at(i)->Fill(j);
755  }
756  }
757 #endif
758 }
759 
760 
762 {
763  if (!fTimeModules)
764  return;
765 
766  if (gTrace)
767  printf("Profiler::LogUserWindows\n");
768 
769 #ifdef HAVE_ROOT
770  //Clocks unfold backwards...
771  std::vector<TAFlowEvent*> flowArray;
772  int FlowEvents=0;
773  TAFlowEvent* f = flow;
774  while (f) {
775  flowArray.push_back(f);
776  f=f->fNext;
777  FlowEvents++;
778  }
779  for (int ii=FlowEvents-1; ii>=0; ii--) {
780  f=flowArray[ii];
781  TAUserProfilerFlow* timer=dynamic_cast<TAUserProfilerFlow*>(f);
782  if (timer) {
783  const char* name = timer->fModuleName.c_str();
784  unsigned int hash = std::hash<std::string>{}(timer->fModuleName);
785  if (!fUserMap.count(hash))
786  AddModuleMap(name,hash);
787  double dt=999.;
788  dt=timer->GetTimer();
789  int i = fUserMap[hash];
790  fTotalUserTime[i] += dt;
791  if (dt > fMaxUserTime[i])
792  fMaxUserTime.at(i) = dt;
793  fUserHistograms.at(i)->Fill(dt);
794  }
795  }
796 #else
797  fprintf(stderr, "manalyzer must be built with ROOT for using the user profiling tools\n");
798 #endif
799 }
800 
801 void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash)
802 {
803  if (gTrace)
804  printf("Profiler::AddModuleMap\n");
805 
806  std::lock_guard<std::mutex> lock(TAMultithreadHelper::gfLock);
807 
808 #ifdef HAVE_ROOT
809  gDirectory->cd("/ProfilerReport");
810  fUserMap[hash] = fUserHistograms.size();
811  Int_t Nbins = 100;
812  Double_t bins[Nbins+1];
813  Double_t TimeRange = 10; //seconds
814  for (int i=0; i<Nbins+1; i++) {
815  bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
816  }
817  TH1D* Histo = new TH1D(UserProfileName,UserProfileName,Nbins,bins);
818  fUserHistograms.push_back(Histo);
819  fTotalUserTime.push_back(0.);
820  fMaxUserTime.push_back(0.);
821 #endif
822  return;
823 }
824 
826 {
827  if (!fTimeModules)
828  return;
829 
830  if (gTrace)
831  printf("Profiler::End\n");
832 
833  for (size_t i=0; i<fAnalyzeEventMean.size(); i++) {
836  ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) ) *
837  ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) );
838  }
839  for (size_t i=0; i<fAnalyzeFlowEventMean.size(); i++) {
844  }
845 }
846 
847 void Profiler::Print() const
848 {
849  if (!fTimeModules)
850  return;
851 #ifdef HAVE_ROOT
852  if (fAnalyzeFlowEventTimeHistograms.size()>0) {
853 #else
854  if (fAnalyzeEventEntries.size()>0) {
855 #endif
856  double AllAnalyzeFlowEventTime=0;
857  for (auto& n : fAnalyzeFlowEventTimeTotal)
858  AllAnalyzeFlowEventTime += n;
859  double AllAnalyzeEventTime=0;
860  for (auto& n : fAnalyzeEventTimeTotal)
861  AllAnalyzeEventTime += n;
862  //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end());
863  printf("Module average processing time\n");
864  printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
865  printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
866  printf("----------------------------------------------------------------------------------------------------------------\n");
867  for (size_t i=0; i<fModuleNames.size(); i++) {
868  printf("%-25s", fModuleNames.at(i).c_str());
869  if (fAnalyzeEventEntries.at(i))
870  printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
871  fAnalyzeEventEntries.at(i),
872  fAnalyzeEventMean.at(i)*1000.,
873  fAnalyzeEventRMS.at(i)*1000.,
874  fAnalyzeEventTimeMax.at(i)*1000., //ms
875  fAnalyzeEventTimeTotal.at(i)); //s
876  else
877  printf("\t-\t-\t-\t-\t-");
878 
879  if (fAnalyzeFlowEventEntries.at(i))
880  printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
882  fAnalyzeFlowEventMean.at(i)*1000.,
883  fAnalyzeFlowEventRMS.at(i)*1000.,
884  fAnalyzeFlowEventTimeMax.at(i)*1000., //ms
885  fAnalyzeFlowEventTimeTotal.at(i)); //s
886  else
887  printf("\t-\t-\t-\t-\t-");
888  printf("\n");
889  }
890  printf("----------------------------------------------------------------------------------------------------------------\n");
891  printf(" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
892  printf(" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
893 #ifdef HAVE_ROOT
894  if (fUserHistograms.size()) {
895  printf("Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
896  printf("----------------------------------------------------------------------\n");
897  for (size_t i=0; i<fUserHistograms.size(); i++) {
898  printf("%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",fUserHistograms.at(i)->GetTitle(),
899  (int)fUserHistograms.at(i)->GetEntries(),
900  fUserHistograms.at(i)->GetMean()*1000., //ms
901  fUserHistograms.at(i)->GetRMS()*1000., //ms
902  fMaxUserTime.at(i)*1000., //ms
903  fTotalUserTime.at(i)); //s
904  }
905  printf("----------------------------------------------------------------------\n");
906  } else {
907  printf("----------------------------------------------------------------------------------------------------------------\n");
908  }
909 #else
910  printf("To use custom profile windows, please build rootana with root\n");
911 #endif
912  }
913 
914  //CPU and Wall clock time:
915  double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC;
916  std::chrono::duration<double> usertime = std::chrono::system_clock::now() - fStartUser;
917  printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
918  getenv("_"),
919  cputime,
920  usertime.count(),
921  100.*cputime/usertime.count());
922 }
923 
924 //////////////////////////////////////////////////////////
925 //
926 // RunHandler class
927 //
928 //////////////////////////////////////////////////////////
929 
930 class Profiler;
931 
933 {
934 public:
935  TARunInfo* fRunInfo = NULL;
936  std::vector<TARunObject*> fRunRun;
937  std::vector<std::string> fArgs;
938  bool fMultithreadMode = false;
939  Profiler* fProfiler = NULL;
940 
941  RunHandler(const std::vector<std::string>& args, bool multithread, bool profile, int queue_interval_check) // ctor
942  {
943  fRunInfo = NULL;
944  fArgs = args;
945  fMultithreadMode = multithread;
946  if (profile)
947  fProfiler = new Profiler(profile, queue_interval_check);
948  }
949 
950  ~RunHandler() // dtor
951  {
952  if (fRunInfo) {
953  delete fRunInfo;
954  fRunInfo = NULL;
955  }
956  if (fProfiler) {
957  delete fProfiler;
958  fProfiler = NULL;
959  }
960  }
961 
962  void PerModuleThread(int i)
963  {
964  //bool data_processing=true;
965  int nModules=(*gModules).size();
966 
967  TAMultithreadHelper* mt = fRunInfo->fMtInfo;
968 
969  assert(nModules == (int)mt->fMtFlowQueueMutex.size());
970  assert(nModules == (int)mt->fMtFlowQueue.size());
971  assert(nModules == (int)mt->fMtFlagQueue.size());
972 
973  { //Lock scope
974  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
975  mt->fMtThreadIsRunning[i] = true;
976  }
977 
978  while (!mt->fMtShutdownRequested) {
979  TAFlowEvent* flow = NULL;
980  TAFlags* flag = NULL;
981 
982  { //Lock scope
983  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
984  if (!mt->fMtFlowQueue[i].empty()) {
985  flow=mt->fMtFlowQueue[i].front();
986  flag=mt->fMtFlagQueue[i].front();
987  mt->fMtFlowQueue[i].pop_front();
988  mt->fMtFlagQueue[i].pop_front();
989  }
990 
991  if (flow == NULL)
992  mt->fMtThreadIsBusy[i] = false; // we will sleep
993  else
994  mt->fMtThreadIsBusy[i] = true; // we will analyze an event
995 
996  // implicit unlock of mutex
997  }
998 
999  if (flow == NULL) { // wait until queue not empty
1000  usleep(mt->fMtQueueEmptyUSleepTime);
1001  continue;
1002  }
1003 
1004  TAClock start_time = TAClockNow();
1005 
1006  flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1007 
1008  if (fProfiler)
1009  fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time);
1010 
1011  if ((*flag) & TAFlag_QUIT) { // shut down the analyzer
1012  delete flow;
1013  delete flag;
1014  flow = NULL;
1015  flag = NULL;
1016  mt->fMtQuitRequested = true;
1017  mt->fMtShutdownRequested = true;
1018  break; // stop the thread
1019  }
1020 
1021  if ((*flag) & TAFlag_SKIP) { // stop processing this event
1022  delete flow;
1023  delete flag;
1024  flow = NULL;
1025  flag = NULL;
1026  continue;
1027  }
1028 
1029  if (i==nModules-1) //If I am the last module... free memory, else queue up for next module to process
1030  {
1031  if (fProfiler) {
1032  fProfiler->LogUserWindows(flag, flow);
1033  fProfiler->LogMTQueueLength(fRunInfo);
1034  }
1035  delete flow;
1036  delete flag;
1037  flow = NULL;
1038  flag = NULL;
1039  }
1040  else
1041  {
1042  MtQueueFlowEvent(mt, i+1, flag, flow, true);
1043  flow = NULL;
1044  flag = NULL;
1045  }
1046  }
1047 
1048  { //Lock scope
1049  std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1050  mt->fMtThreadIsRunning[i] = false;
1051  mt->fMtThreadIsBusy[i] = false;
1052  }
1053  }
1054 
1055  void CreateRun(int run_number, const char* file_name)
1056  {
1057  assert(fRunInfo == NULL);
1058  assert(fRunRun.size() == 0);
1059 
1060  fRunInfo = new TARunInfo(run_number, file_name, fArgs);
1061 
1062  int nModules = (*gModules).size();
1063 
1064  for (int i=0; i<nModules; i++)
1065  fRunRun.push_back((*gModules)[i]->NewRunObject(fRunInfo));
1066 
1067  if (fMultithreadMode) {
1069  fRunInfo->fMtInfo = mt;
1070  mt->fMtFlowQueue.resize(nModules);
1071  mt->fMtFlagQueue.resize(nModules);
1072  std::vector<std::mutex> mutsize(nModules);
1073  mt->fMtFlowQueueMutex.swap(mutsize);
1074  mt->fMtThreads.resize(nModules);
1075  mt->fMtThreadIsRunning.resize(nModules);
1076  mt->fMtThreadIsBusy.resize(nModules);
1077  for (int i=0; i<nModules; i++) {
1078  printf("Create fMtFlowQueue thread %d\n",i);
1079  mt->fMtThreadIsRunning[i] = false;
1080  mt->fMtThreadIsBusy[i] = false;
1081  mt->fMtThreads[i]=new std::thread(&RunHandler::PerModuleThread,this,i);
1082  }
1083  }
1084  }
1085 
1086  void BeginRun()
1087  {
1088  assert(fRunInfo != NULL);
1089  assert(fRunInfo->fOdb != NULL);
1090  for (unsigned i=0; i<fRunRun.size(); i++)
1091  fRunRun[i]->BeginRun(fRunInfo);
1092  if (fProfiler)
1093  fProfiler->Begin(fRunInfo, fRunRun);
1094  }
1095 
1096  void EndRun(TAFlags* flags)
1097  {
1098  assert(fRunInfo);
1099 
1100  // make sure the shutdown sequence matches the description in the README file!
1101 
1102  // first, call PreEndRun() to tell analysis modules that there will be no more
1103  // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more
1104  // flow events, they to into the flow queue or into the multithread queue
1105 
1106  for (unsigned i=0; i<fRunRun.size(); i++)
1107  fRunRun[i]->PreEndRun(fRunInfo);
1108 
1109  // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent()
1110  // this can generate additional flow events that will be queued in the queue.
1111 
1112  AnalyzeFlowQueue(flags);
1113 
1114  // if in multi threaded mode, allow all the queues to drain naturally
1115  // and shutdown the threads
1116 
1117  if (fRunInfo->fMtInfo) {
1118  WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1120  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1121  (*flags) |= TAFlag_QUIT;
1122  }
1123  }
1124 
1125  // all data analysis is complete
1126 
1127  for (unsigned i=0; i<fRunRun.size(); i++)
1128  fRunRun[i]->EndRun(fRunInfo);
1129 
1130  if (fProfiler) {
1131  fProfiler->End();
1132  fProfiler->Print();
1133  }
1134  }
1135 
1136  void NextSubrun()
1137  {
1138  assert(fRunInfo);
1139 
1140  for (unsigned i=0; i<fRunRun.size(); i++)
1141  fRunRun[i]->NextSubrun(fRunInfo);
1142  }
1143 
1144  void DeleteRun()
1145  {
1146  assert(fRunInfo);
1147 
1148  for (unsigned i=0; i<fRunRun.size(); i++) {
1149  delete fRunRun[i];
1150  fRunRun[i] = NULL;
1151  }
1152 
1153  fRunRun.clear();
1154  assert(fRunRun.size() == 0);
1155 
1156  delete fRunInfo;
1157  fRunInfo = NULL;
1158  }
1159 
1161  {
1162  for (unsigned i=0; i<fRunRun.size(); i++)
1163  fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1164  }
1165 
1167  {
1168  for (unsigned i=0; i<fRunRun.size(); i++) {
1169  TAClock start_time = TAClockNow();
1170  flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1171  if (fProfiler)
1172  fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time);
1173  if (!flow)
1174  break;
1175  if ((*flags) & TAFlag_SKIP)
1176  break;
1177  if ((*flags) & TAFlag_QUIT)
1178  break;
1179  }
1180  return flow;
1181  }
1182 
1183  void AnalyzeFlowQueue(TAFlags* ana_flags)
1184  {
1185  while (1) {
1186  if (fRunInfo->fMtInfo)
1187  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1188  *ana_flags |= TAFlag_QUIT;
1189  break;
1190  }
1191 
1192  TAFlowEvent* flow = fRunInfo->ReadFlowQueue();
1193  if (!flow)
1194  break;
1195 
1196  int flags = 0;
1197  flow = AnalyzeFlowEvent(&flags, flow);
1198  if (flow)
1199  delete flow;
1200  if (flags & TAFlag_QUIT) {
1201  *ana_flags |= TAFlag_QUIT;
1202  break;
1203  }
1204  }
1205  }
1206 
1207  void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer)
1208  {
1209  assert(fRunInfo != NULL);
1210  assert(fRunInfo->fOdb != NULL);
1211  assert(event != NULL);
1212  assert(flags != NULL);
1213 
1214  if (fRunInfo->fMtInfo)
1215  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1216  *flags |= TAFlag_QUIT;
1217  return;
1218  }
1219 
1220  TAFlowEvent* flow = NULL;
1221 
1222  for (unsigned i=0; i<fRunRun.size(); i++) {
1223  TAClock start_time = TAClockNow();
1224  flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1225  if (fProfiler)
1226  fProfiler->LogAnalyzeEvent(flags, flow, i, start_time);
1227  if (*flags & TAFlag_SKIP)
1228  break;
1229  if (*flags & TAFlag_QUIT)
1230  break;
1231  }
1232 
1233  if (flow) {
1234  if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) {
1235  // skip further processing of this event
1236  } else {
1237  if (fRunInfo->fMtInfo) {
1238  MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow, true);
1239  flow = NULL; // ownership passed to the multithread event queue
1240  } else {
1241  flow = AnalyzeFlowEvent(flags, flow);
1242  }
1243  }
1244  }
1245 
1246  if (fProfiler) {
1247  fProfiler->LogUserWindows(flags, flow);
1248  if (fRunInfo->fMtInfo) {
1249  fProfiler->LogMTQueueLength(fRunInfo);
1250  }
1251  }
1252 
1253  if (*flags & TAFlag_WRITE)
1254  if (writer)
1255  TMWriteEvent(writer, event);
1256 
1257  if (flow)
1258  delete flow;
1259 
1260  if (*flags & TAFlag_QUIT)
1261  return;
1262 
1263  if (fRunInfo->fMtInfo)
1264  if (fRunInfo->fMtInfo->fMtQuitRequested) {
1265  *flags |= TAFlag_QUIT;
1266  return;
1267  }
1268 
1269  AnalyzeFlowQueue(flags);
1270  }
1271 };
1272 
1274 {
1275  if (fFlowQueue.empty())
1276  return NULL;
1277 
1278  TAFlowEvent* flow = fFlowQueue.front();
1279  fFlowQueue.pop_front();
1280  return flow;
1281 }
1282 
1284 {
1285  if (fMtInfo) {
1286  // call MtQueueFlowEvent with wait=false to avoid deadlock
1287  MtQueueFlowEvent(fMtInfo, 0, NULL, flow, false);
1288  } else {
1289  fFlowQueue.push_back(flow);
1290  }
1291 }
1292 
1293 #ifdef HAVE_MIDAS
1294 
1295 #ifdef HAVE_TMFE
1296 
1297 #ifdef HAVE_ROOT
1298 #include "TSystem.h"
1299 #endif
1300 
1301 #include "tmfe.h"
1302 
1303 static bool gRunStartRequested = false;
1304 static bool gRunStopRequested = false;
1305 
1306 class RpcHandler: public TMFeRpcHandlerInterface
1307 {
1308  TMFeResult HandleBeginRun(int run_number)
1309  {
1310  printf("RpcHandler::HandleBeginRun(%d)\n", run_number);
1311  gRunStartRequested = true;
1312  gRunStopRequested = false;
1313  return TMFeOk();
1314  }
1315 
1316  TMFeResult HandleEndRun(int run_number)
1317  {
1318  printf("RpcHandler::HandleEndRun(%d)\n", run_number);
1319  gRunStartRequested = false;
1320  gRunStopRequested = true;
1321  return TMFeOk();
1322  }
1323 
1324  TMFeResult HandleStartAbort(int run_number)
1325  {
1326  printf("RpcHandler::HandleStartAbort(%d)\n", run_number);
1327  // run did not really start, pretend it started and immediately ended
1328  gRunStartRequested = false;
1329  gRunStopRequested = true;
1330  return TMFeOk();
1331  }
1332 
1333  TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result)
1334  {
1335  return TMFeOk();
1336  }
1337 };
1338 
1339 TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0)
1340 {
1341  assert(b != NULL);
1342  assert(e != NULL);
1343 
1344  e->Reset();
1345 
1346  TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec);
1347 
1348  if (r.error_flag)
1349  return r;
1350 
1351  if (e->data.size() == 0)
1352  return TMFeOk();
1353 
1354  e->ParseEvent();
1355 
1356  assert(e->data.size() == e->event_header_size + e->data_size);
1357 
1358  return TMFeOk();
1359 }
1360 
1361 static int ProcessMidasOnlineTmfe(const std::vector<std::string>& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1362 {
1363  TMFE *mfe = TMFE::Instance();
1364 
1365  TMFeResult r = mfe->Connect(progname, hostname, exptname);
1366 
1367  if (r.error_flag) {
1368  fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1369  return -1;
1370  }
1371 
1372  //MVOdb* odb = mfe->fOdbRoot;
1373 
1374  TMEventBuffer *b = new TMEventBuffer(mfe);
1375 
1376  /* open event buffer */
1377 
1378  r = b->OpenBuffer(bufname);
1379 
1380  if (r.error_flag) {
1381  fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1382  return -1;
1383  }
1384 
1385  /* request read cache */
1386 
1387  r = b->SetCacheSize(100000, 0);
1388 
1389  if (r.error_flag) {
1390  fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1391  return -1;
1392  }
1393 
1394  /* request events */
1395 
1396  r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1397 
1398  if (r.error_flag) {
1399  fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1400  return -1;
1401  }
1402 
1403  RpcHandler* h = new RpcHandler();
1404 
1405  mfe->AddRpcHandler(h);
1406 
1407  mfe->DeregisterTransitionPause();
1408  mfe->DeregisterTransitionResume();
1409  mfe->RegisterTransitionStartAbort();
1410  mfe->SetTransitionSequenceStart(300);
1411  mfe->SetTransitionSequenceStop(700);
1412 
1413  mfe->StartRpcThread();
1414 
1415  /* reqister event requests */
1416 
1417  RunHandler rh(args, multithread, profiler, queue_interval_check);
1418 
1419  for (unsigned i=0; i<(*gModules).size(); i++)
1420  (*gModules)[i]->Init(args);
1421 
1422  if (mfe->fStateRunning) {
1423  rh.CreateRun(mfe->fRunNumber, NULL);
1424  rh.fRunInfo->fOdb = mfe->fOdbRoot;
1425  rh.BeginRun();
1426  }
1427 
1428  TMEvent e;
1429 
1430  while (!mfe->fShutdownRequested) {
1431  bool do_sleep = true;
1432 
1433  if (gRunStartRequested) {
1434  gRunStartRequested = false;
1435 
1436  if (rh.fRunInfo) {
1437  TAFlags flags = 0;
1438  rh.EndRun(&flags);
1439  rh.fRunInfo->fOdb = NULL;
1440  rh.DeleteRun();
1441  }
1442 
1443  rh.CreateRun(mfe->fRunNumber, NULL);
1444  rh.fRunInfo->fOdb = mfe->fOdbRoot;
1445  rh.BeginRun();
1446 
1447  continue;
1448  }
1449 
1450  if (gRunStopRequested) {
1451  gRunStopRequested = false;
1452 
1453  if (rh.fRunInfo) {
1454  TAFlags flags = 0;
1455  rh.EndRun(&flags);
1456  rh.fRunInfo->fOdb = NULL;
1457  rh.DeleteRun();
1458  }
1459 
1460  continue;
1461  }
1462 
1463  //r = buf.ReceiveEvent(&e, BM_NO_WAIT);
1464  //r = buf.ReceiveEvent(&e, BM_WAIT);
1465  //r = buf.ReceiveEvent(&e, 8000);
1466  //r = buf.ReceiveEvent(&e, 5000);
1467  r = ReceiveEvent(b, &e, 100);
1468 
1469  if (r.error_flag) {
1470  fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1471  break;
1472  }
1473 
1474  //e.PrintHeader();
1475  //::sleep(1);
1476 
1477  if ((e.data_size > 0) && (rh.fRunInfo != NULL)) {
1478 
1479  //e.PrintHeader();
1480  //e.PrintBanks(2);
1481 
1482  TAFlags flags = 0;
1483 
1484  rh.AnalyzeEvent(&e, &flags, writer);
1485 
1486  if (flags & TAFlag_QUIT) {
1487  mfe->fShutdownRequested = true;
1488  }
1489 
1490  if (num_analyze > 0) {
1491  num_analyze--;
1492  if (num_analyze == 0) {
1493  mfe->fShutdownRequested = true;
1494  }
1495  }
1496 
1497  do_sleep = false;
1498  }
1499 
1500 #ifdef HAVE_THTTP_SERVER
1502  int nreq = TARootHelper::fgHttpServer->ProcessRequests();
1503  if (nreq > 0) {
1504  do_sleep = false;
1505  //printf("ProcessRequests() returned %d\n", nreq);
1506  }
1507  }
1508 #endif
1509 #ifdef HAVE_ROOT
1510  if (TARootHelper::fgApp) {
1511  gSystem->DispatchOneEvent(kTRUE);
1512  }
1513 #endif
1514 
1515  //printf("do_sleep %d\n", do_sleep);
1516 
1517  if (do_sleep) {
1518  mfe->Yield(0.010);
1519  } else {
1520  mfe->Yield(0);
1521  }
1522  }
1523 
1524  if (rh.fRunInfo) {
1525  TAFlags flags = 0;
1526  rh.EndRun(&flags);
1527  rh.fRunInfo->fOdb = NULL;
1528  rh.DeleteRun();
1529  }
1530 
1531  for (unsigned i=0; i<(*gModules).size(); i++)
1532  (*gModules)[i]->Finish();
1533 
1534  /* close event buffer */
1535  r = b->CloseBuffer();
1536 
1537  delete b;
1538  b = NULL;
1539 
1540  if (r.error_flag) {
1541  fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1542  //return -1;
1543  }
1544 
1545  /* disconnect from experiment */
1546  mfe->Disconnect();
1547 
1548  return 0;
1549 }
1550 
1551 #else
1552 
1553 #include "TMidasOnline.h"
1554 
1555 #ifdef HAVE_ROOT
1556 #include "TSystem.h"
1557 #endif
1558 
1560 {
1561 public:
1563  int fNumAnalyze = 0;
1564  TMWriterInterface* fWriter = NULL;
1565  bool fQuit = false;
1566  MVOdb* fOdb = NULL;
1567 
1568  OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector<std::string>& args, bool multithread, bool profiler, int queue_interval_check) // ctor
1569  : fRun(args, multithread, profiler, queue_interval_check)
1570  {
1571  fNumAnalyze = num_analyze;
1572  fWriter = writer;
1573  fOdb = odb;
1574  }
1575 
1576  ~OnlineHandler() // dtor
1577  {
1578  fWriter = NULL;
1579  fOdb = NULL;
1580  }
1581 
1582  void StartRun(int run_number)
1583  {
1584  fRun.CreateRun(run_number, NULL);
1585  fRun.fRunInfo->fOdb = fOdb;
1586  fRun.BeginRun();
1587  }
1588 
1589  void Transition(int transition, int run_number, int transition_time)
1590  {
1591  //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time);
1592 
1593  if (transition == TR_START) {
1594  if (fRun.fRunInfo) {
1595  TAFlags flags = 0;
1596  fRun.EndRun(&flags);
1597  if (flags & TAFlag_QUIT)
1598  fQuit = true;
1599  fRun.fRunInfo->fOdb = NULL;
1600  fRun.DeleteRun();
1601  }
1602  assert(fRun.fRunInfo == NULL);
1603 
1604  StartRun(run_number);
1605  printf("Begin run: %d\n", run_number);
1606  } else if (transition == TR_STOP) {
1607  TAFlags flags = 0;
1608  fRun.EndRun(&flags);
1609  if (flags & TAFlag_QUIT)
1610  fQuit = true;
1611  fRun.fRunInfo->fOdb = NULL;
1612  fRun.DeleteRun();
1613  printf("End of run %d\n", run_number);
1614  }
1615  }
1616 
1617  void Event(const void* data, int data_size)
1618  {
1619  //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size);
1620 
1621  if (!fRun.fRunInfo) {
1622  StartRun(0); // start fake run for events outside of a run
1623  }
1624 
1625  TMEvent* event = new TMEvent(data, data_size);
1626 
1627  TAFlags flags = 0;
1628 
1629  fRun.AnalyzeEvent(event, &flags, fWriter);
1630 
1631  if (flags & TAFlag_QUIT)
1632  fQuit = true;
1633 
1634  if (fNumAnalyze > 0) {
1635  fNumAnalyze--;
1636  if (fNumAnalyze == 0)
1637  fQuit = true;
1638  }
1639 
1640  if (event) {
1641  delete event;
1642  event = NULL;
1643  }
1644  }
1645 };
1646 
1647 static int ProcessMidasOnlineOld(const std::vector<std::string>& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1648 {
1650 
1651  int err = midas->connect(hostname, exptname, "rootana");
1652  if (err != 0) {
1653  fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err);
1654  return -1;
1655  }
1656 
1657  MVOdb* odb = MakeMidasOdb(midas->fDB);
1658 
1659  OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check);
1660 
1661  midas->RegisterHandler(h);
1662  midas->registerTransitions();
1663 
1664  /* reqister event requests */
1665 
1666  midas->eventRequest("SYSTEM",-1,-1,(1<<1));
1667 
1668  int run_number = 0; // midas->odbReadInt("/runinfo/Run number");
1669  int run_state = 0; // midas->odbReadInt("/runinfo/State");
1670 
1671  odb->RI("runinfo/run number", &run_number);
1672  odb->RI("runinfo/state", &run_state);
1673 
1674  for (unsigned i=0; i<(*gModules).size(); i++)
1675  (*gModules)[i]->Init(args);
1676 
1677  if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1678  h->StartRun(run_number);
1679  }
1680 
1681  while (!h->fQuit) {
1682 #ifdef HAVE_THTTP_SERVER
1684  TARootHelper::fgHttpServer->ProcessRequests();
1685  }
1686 #endif
1687 #ifdef HAVE_ROOT
1688  if (TARootHelper::fgApp) {
1689  gSystem->DispatchOneEvent(kTRUE);
1690  }
1691 #endif
1692  if (!TMidasOnline::instance()->poll(10))
1693  break;
1694  }
1695 
1696  if (h->fRun.fRunInfo) {
1697  TAFlags flags = 0;
1698  h->fRun.EndRun(&flags);
1699  h->fRun.fRunInfo->fOdb = NULL;
1700  h->fRun.DeleteRun();
1701  }
1702 
1703  for (unsigned i=0; i<(*gModules).size(); i++)
1704  (*gModules)[i]->Finish();
1705 
1706  delete h; h = NULL;
1707  delete odb; odb = NULL;
1708 
1709  /* disconnect from experiment */
1710  midas->disconnect();
1711 
1712  return 0;
1713 }
1714 
1715 #endif
1716 #endif
1717 
1718 std::vector<std::string> TARunInfo::fgFileList;
1720 
1721 static int ProcessMidasFiles(const std::vector<std::string>& files, const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1722 {
1723  TARunInfo::fgFileList.clear();
1724 
1725  for (unsigned i=0; i<files.size(); i++)
1726  TARunInfo::fgFileList.push_back(files[i]);
1727 
1728  for (unsigned i=0; i<(*gModules).size(); i++)
1729  (*gModules)[i]->Init(args);
1730 
1731  RunHandler run(args, multithread, profiler, queue_interval_check);
1732 
1733  bool done = false;
1734 
1738  std::string filename = TARunInfo::fgFileList[TARunInfo::fgCurrentFileIndex];
1739 
1740  TMReaderInterface *reader = TMNewReader(filename.c_str());
1741 
1742  if (reader->fError) {
1743  printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str());
1744  delete reader;
1745  continue;
1746  }
1747 
1748  while (1) {
1749  TMEvent* event = TMReadEvent(reader);
1750 
1751  if (!event) // EOF
1752  break;
1753 
1754  if (event->error) {
1755  delete event;
1756  break;
1757  }
1758 
1759  if (event->event_id == 0x8000) // begin of run event
1760  {
1761  int runno = event->serial_number;
1762 
1763  if (run.fRunInfo) {
1764  if (run.fRunInfo->fRunNo == runno) {
1765  // next subrun file, nothing to do
1766  run.fRunInfo->fFileName = filename;
1767  run.NextSubrun();
1768  } else {
1769  // file with a different run number
1770  TAFlags flags = 0;
1771  run.EndRun(&flags);
1772  if (flags & TAFlag_QUIT) {
1773  done = true;
1774  }
1775  run.DeleteRun();
1776  }
1777  }
1778 
1779  if (!run.fRunInfo) {
1780  run.CreateRun(runno, filename.c_str());
1781  run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1782  run.BeginRun();
1783  }
1784 
1785  assert(run.fRunInfo);
1786 
1787  run.AnalyzeSpecialEvent(event);
1788 
1789  if (writer)
1790  TMWriteEvent(writer, event);
1791  }
1792  else if (event->event_id == 0x8001) // end of run event
1793  {
1794  //int runno = event->serial_number;
1795  run.AnalyzeSpecialEvent(event);
1796  if (writer)
1797  TMWriteEvent(writer, event);
1798 
1799  if (run.fRunInfo->fOdb) {
1800  delete run.fRunInfo->fOdb;
1801  run.fRunInfo->fOdb = NULL;
1802  }
1803 
1804  run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1805  }
1806  else if (event->event_id == 0x8002) // message event
1807  {
1808  run.AnalyzeSpecialEvent(event);
1809  if (writer)
1810  TMWriteEvent(writer, event);
1811  }
1812  else
1813  {
1814  if (!run.fRunInfo) {
1815  // create a fake begin of run
1816  run.CreateRun(0, filename.c_str());
1817  run.fRunInfo->fOdb = MakeNullOdb();
1818  run.BeginRun();
1819  }
1820 
1821  if (num_skip > 0) {
1822  num_skip--;
1823  } else {
1824  TAFlags flags = 0;
1825 
1826  run.AnalyzeEvent(event, &flags, writer);
1827 
1828  if (flags & TAFlag_QUIT)
1829  done = true;
1830 
1831  if (num_analyze > 0) {
1832  num_analyze--;
1833  if (num_analyze == 0)
1834  done = true;
1835  }
1836  }
1837  }
1838 
1839  delete event;
1840 
1841  if (done)
1842  break;
1843 
1844 #ifdef HAVE_ROOT
1845  if (TARootHelper::fgApp) {
1846  gSystem->DispatchOneEvent(kTRUE);
1847  }
1848 #endif
1849  }
1850 
1851  reader->Close();
1852  delete reader;
1853 
1854  if (done)
1855  break;
1856  }
1857 
1858 #ifdef HAVE_THTTP_SERVER
1859  if (0 && TARootHelper::fgHttpServer) {
1860  while (1) {
1861  gSystem->DispatchOneEvent(kTRUE);
1862  //sleep(1);
1863  }
1864  }
1865 #endif
1866 
1867  if (run.fRunInfo) {
1868  TAFlags flags = 0;
1869  run.EndRun(&flags);
1870  if (flags & TAFlag_QUIT)
1871  done = true;
1872  run.DeleteRun();
1873  }
1874 
1875  for (unsigned i=0; i<(*gModules).size(); i++)
1876  (*gModules)[i]->Finish();
1877 
1878  return 0;
1879 }
1880 
1881 static int ProcessDemoMode(const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1882 {
1883  for (unsigned i=0; i<(*gModules).size(); i++)
1884  (*gModules)[i]->Init(args);
1885 
1886  RunHandler run(args, multithread, profiler, queue_interval_check);
1887 
1888  bool done = false;
1889 
1890  int runno = 1;
1891 
1892  for (unsigned i=0; true; i++) {
1893  char s[256];
1894  sprintf(s, "%03d", i);
1895  std::string filename = std::string("demo_subrun_") + s;
1896 
1897  if (!run.fRunInfo) {
1898  run.CreateRun(runno, filename.c_str());
1899  run.fRunInfo->fOdb = MakeNullOdb();
1900  run.BeginRun();
1901  }
1902 
1903  // we do not generate a fake begin of run event...
1904  //run.AnalyzeSpecialEvent(event);
1905 
1906  // only switch subruns after the first subrun file
1907  if (i>0) {
1908  run.fRunInfo->fFileName = filename;
1909  run.NextSubrun();
1910  }
1911 
1912  for (unsigned j=0; j<100; j++) {
1913  TMEvent* event = new TMEvent();
1914 
1915  if (num_skip > 0) {
1916  num_skip--;
1917  } else {
1918  TAFlags flags = 0;
1919 
1920  run.AnalyzeEvent(event, &flags, writer);
1921 
1922  if (flags & TAFlag_QUIT)
1923  done = true;
1924 
1925  if (num_analyze > 0) {
1926  num_analyze--;
1927  if (num_analyze == 0)
1928  done = true;
1929  }
1930  }
1931 
1932  delete event;
1933 
1934  if (done)
1935  break;
1936 
1937 #ifdef HAVE_ROOT
1938  if (TARootHelper::fgApp) {
1939  gSystem->DispatchOneEvent(kTRUE);
1940  }
1941 #endif
1942  }
1943 
1944  // we do not generate a fake end of run event...
1945  //run.AnalyzeSpecialEvent(event);
1946 
1947  if (done)
1948  break;
1949  }
1950 
1951  if (run.fRunInfo) {
1952  TAFlags flags = 0;
1953  run.EndRun(&flags);
1954  run.DeleteRun();
1955  }
1956 
1957  for (unsigned i=0; i<(*gModules).size(); i++)
1958  (*gModules)[i]->Finish();
1959 
1960  return 0;
1961 }
1962 
1963 static bool gEnableShowMem = false;
1964 
1965 #if 0
1966 static int ShowMem(const char* label)
1967 {
1968  if (!gEnableShowMem)
1969  return 0;
1970 
1971  FILE* fp = fopen("/proc/self/statm","r");
1972  if (!fp)
1973  return 0;
1974 
1975  int mem = 0;
1976  fscanf(fp,"%d",&mem);
1977  fclose(fp);
1978 
1979  if (label)
1980  printf("memory at %s is %d\n", label, mem);
1981 
1982  return mem;
1983 }
1984 #endif
1985 
1987 {
1988 public:
1990  : TARunObject(runinfo)
1991  {
1992  if (gTrace)
1993  printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo);
1994  }
1995 
1997  {
1998  if (gTrace)
1999  printf("EventDumpModule::dtor!\n");
2000  }
2001 
2002  void BeginRun(TARunInfo* runinfo)
2003  {
2004  printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo);
2005  }
2006 
2007  void EndRun(TARunInfo* runinfo)
2008  {
2009  printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo);
2010  }
2011 
2012  void NextSubrun(TARunInfo* runinfo)
2013  {
2014  printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2015  }
2016 
2017  void PauseRun(TARunInfo* runinfo)
2018  {
2019  printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo);
2020  }
2021 
2022  void ResumeRun(TARunInfo* runinfo)
2023  {
2024  printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo);
2025  }
2026 
2027  TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2028  {
2029  printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo);
2030  event->FindAllBanks();
2031  std::string h = event->HeaderToString();
2032  std::string b = event->BankListToString();
2033  printf("%s: %s\n", h.c_str(), b.c_str());
2034  return flow;
2035  }
2036 
2037  void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2038  {
2039  printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2040  }
2041 };
2042 
2044 {
2045 public:
2046 
2047  void Init(const std::vector<std::string> &args)
2048  {
2049  if (gTrace)
2050  printf("EventDumpModuleFactory::Init!\n");
2051  }
2052 
2053  void Finish()
2054  {
2055  if (gTrace)
2056  printf("EventDumpModuleFactory::Finish!\n");
2057  }
2058 
2060  {
2061  if (gTrace)
2062  printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2063  return new EventDumpModule(runinfo);
2064  }
2065 };
2066 
2067 #ifdef HAVE_ROOT
2068 #include <TGMenu.h>
2069 #include <TGButton.h>
2070 #include <TBrowser.h>
2071 
2072 #define CTRL_QUIT 1
2073 #define CTRL_NEXT 2
2074 #define CTRL_CONTINUE 3
2075 #define CTRL_PAUSE 4
2076 #define CTRL_NEXT_FLOW 5
2077 
2078 #define CTRL_TBROWSER 11
2079 
2081 {
2082 public:
2083  int fValue;
2084 
2085  ValueHolder() // ctor
2086  {
2087  fValue = 0;
2088  }
2089 };
2090 
2092 {
2093 public:
2095  int fValue;
2096 
2097  TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor
2098  : TGTextButton(p, text)
2099  {
2100  fHolder = holder;
2101  fValue = value;
2102  }
2103 
2104 #if 0
2105  void Pressed()
2106  {
2107  printf("Pressed!\n");
2108  }
2109 
2110  void Released()
2111  {
2112  printf("Released!\n");
2113  }
2114 #endif
2115 
2116  void Clicked()
2117  {
2118  //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue);
2119  if (fHolder)
2120  fHolder->fValue = fValue;
2121  //gSystem->ExitLoop();
2122  }
2123 };
2124 
2125 class MainWindow: public TGMainFrame
2126 {
2127 public:
2128  TGPopupMenu* fMenu;
2129  TGMenuBar* fMenuBar;
2130  TGLayoutHints* fMenuBarItemLayout;
2131 
2132  TGCompositeFrame* fButtonsFrame;
2133 
2135 
2140 
2142 
2143 public:
2144  MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor
2145  : TGMainFrame(w, s1, s2)
2146  {
2147  if (gTrace)
2148  printf("MainWindow::ctor!\n");
2149 
2150  fHolder = holder;
2151  //SetCleanup(kDeepCleanup);
2152 
2153  SetWindowName("ROOT Analyzer Control");
2154 
2155  // layout the gui
2156  fMenu = new TGPopupMenu(gClient->GetRoot());
2157  fMenu->AddEntry("New TBrowser", CTRL_TBROWSER);
2158  fMenu->AddEntry("-", 0);
2159  fMenu->AddEntry("Next", CTRL_NEXT);
2160  fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW);
2161  fMenu->AddEntry("Continue", CTRL_CONTINUE);
2162  fMenu->AddEntry("Pause", CTRL_PAUSE);
2163  fMenu->AddEntry("-", 0);
2164  fMenu->AddEntry("Quit", CTRL_QUIT);
2165 
2166  fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2167 
2168  fMenu->Associate(this);
2169 
2170  fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame);
2171  fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout);
2172  fMenuBar->Layout();
2173 
2174  AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2175 
2176  fButtonsFrame = new TGVerticalFrame(this);
2177 
2178  fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT);
2179  fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW);
2180 
2181  fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2182  fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2183 
2184  TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame);
2185 
2186  fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE);
2187  fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE);
2188 
2189  hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2190  hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2191 
2192  fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX));
2193 
2194  fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT);
2195  fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2196 
2197  AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX));
2198 
2199  MapSubwindows();
2200  Layout();
2201  Resize(GetDefaultSize());
2202  MapWindow();
2203  }
2204 
2205  ~MainWindow() // dtor // Closing the control window closes the whole program
2206  {
2207  if (gTrace)
2208  printf("MainWindow::dtor!\n");
2209 
2210  delete fMenu;
2211  delete fMenuBar;
2212  delete fMenuBarItemLayout;
2213  }
2214 
2216  {
2217  if (gTrace)
2218  printf("MainWindow::CloseWindow()\n");
2219 
2220  if (fHolder)
2221  fHolder->fValue = CTRL_QUIT;
2222  //gSystem->ExitLoop();
2223  }
2224 
2225  Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
2226  {
2227  //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2);
2228  switch (GET_MSG(msg))
2229  {
2230  default:
2231  break;
2232  case kC_COMMAND:
2233  switch (GET_SUBMSG(msg))
2234  {
2235  default:
2236  break;
2237  case kCM_MENU:
2238  //printf("parm1 %d\n", (int)parm1);
2239  switch (parm1)
2240  {
2241  case CTRL_TBROWSER:
2242  new TBrowser();
2243  break;
2244  default:
2245  //printf("Control %d!\n", (int)parm1);
2246  if (fHolder)
2247  fHolder->fValue = parm1;
2248  //gSystem->ExitLoop();
2249  break;
2250  }
2251  break;
2252  }
2253  break;
2254  }
2255 
2256  return kTRUE;
2257  }
2258 };
2259 #endif
2260 
2262 {
2263 public:
2266  int fSkip;
2267 #ifdef HAVE_ROOT
2270 #endif
2271 
2273  : TARunObject(runinfo)
2274  {
2275  if (gTrace)
2276  printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo);
2277  fContinue = false;
2278  fNextFlow = false;
2279  fSkip = 0;
2280 #ifdef HAVE_ROOT
2281  if (!fgHolder)
2282  fgHolder = new ValueHolder;
2283  if (!fgCtrlWindow && runinfo->fRoot->fgApp) {
2284  fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2285  }
2286 #endif
2287  }
2288 
2290  {
2291  if (gTrace)
2292  printf("InteractiveModule::dtor!\n");
2293  }
2294 
2295  void BeginRun(TARunInfo* runinfo)
2296  {
2297  printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo);
2298  }
2299 
2300  void EndRun(TARunInfo* runinfo)
2301  {
2302  printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo);
2303 
2304 #ifdef HAVE_ROOT
2305  if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2306  fgCtrlWindow->fNextButton->SetEnabled(false);
2307  fgCtrlWindow->fNextFlowButton->SetEnabled(false);
2308  fgCtrlWindow->fContinueButton->SetEnabled(false);
2309  fgCtrlWindow->fPauseButton->SetEnabled(false);
2310  while (1) {
2311 #ifdef HAVE_THTTP_SERVER
2313  TARootHelper::fgHttpServer->ProcessRequests();
2314  }
2315 #endif
2316 #ifdef HAVE_ROOT
2317  if (TARootHelper::fgApp) {
2318  gSystem->DispatchOneEvent(kTRUE);
2319  }
2320 #endif
2321 #ifdef HAVE_MIDAS
2322 #ifdef HAVE_TMFE
2323  TMFE* mfe = TMFE::Instance();
2324  mfe->Yield(0.010);
2325  if (mfe->fShutdownRequested) {
2326  return;
2327  }
2328 #else
2329  if (!TMidasOnline::instance()->sleep(10)) {
2330  // FIXME: indicate that we should exit the analyzer
2331  return;
2332  }
2333 #endif
2334 #else
2335  gSystem->Sleep(10);
2336 #endif
2337 
2338  int ctrl = fgHolder->fValue;
2339  fgHolder->fValue = 0;
2340 
2341  switch (ctrl) {
2342  case CTRL_QUIT:
2343  return;
2344  case CTRL_NEXT:
2345  return;
2346  case CTRL_CONTINUE:
2347  return;
2348  }
2349  }
2350  }
2351 #endif
2352  }
2353 
2354  void PauseRun(TARunInfo* runinfo)
2355  {
2356  printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo);
2357  }
2358 
2359  void ResumeRun(TARunInfo* runinfo)
2360  {
2361  printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo);
2362  }
2363 
2364  void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags)
2365  {
2366 #ifdef HAVE_ROOT
2367  if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2368  while (1) {
2369 #ifdef HAVE_THTTP_SERVER
2371  TARootHelper::fgHttpServer->ProcessRequests();
2372  }
2373 #endif
2374 #ifdef HAVE_ROOT
2375  if (TARootHelper::fgApp) {
2376  gSystem->DispatchOneEvent(kTRUE);
2377  }
2378 #endif
2379 #ifdef HAVE_MIDAS
2380 #ifdef HAVE_TMFE
2381  TMFE* mfe = TMFE::Instance();
2382  mfe->Yield(0.010);
2383  if (mfe->fShutdownRequested) {
2384  *flags |= TAFlag_QUIT;
2385  return;
2386  }
2387 #else
2388  if (!TMidasOnline::instance()->sleep(10)) {
2389  *flags |= TAFlag_QUIT;
2390  return;
2391  }
2392 #endif
2393 #else
2394  gSystem->Sleep(10);
2395 #endif
2396 
2397  int ctrl = fgHolder->fValue;
2398  fgHolder->fValue = 0;
2399 
2400  switch (ctrl) {
2401  case CTRL_QUIT:
2402  *flags |= TAFlag_QUIT;
2403  return;
2404  case CTRL_NEXT:
2405  return;
2406  case CTRL_NEXT_FLOW:
2407  fNextFlow = true;
2408  return;
2409  case CTRL_CONTINUE:
2410  fContinue = true;
2411  return;
2412  }
2413  }
2414  }
2415 #endif
2416 
2417  while (1) {
2418  char str[256];
2419  fprintf(stdout, "manalyzer> "); fflush(stdout);
2420  char* s = fgets(str, sizeof(str)-1, stdin);
2421 
2422  if (s == NULL) {
2423  // EOF
2424  *flags |= TAFlag_QUIT;
2425  return;
2426  }
2427 
2428  printf("command [%s]\n", str);
2429 
2430  if (str[0] == 'h') { // "help"
2431  printf("Interactive manalyzer commands:\n");
2432  printf(" q - quit\n");
2433  printf(" h - help\n");
2434  printf(" c - continue until next TAFlag_DISPLAY event\n");
2435  printf(" n - next event\n");
2436  printf(" aNNN - analyze N events, i.e. \"a10\"\n");
2437  } else if (str[0] == 'q') { // "quit"
2438  *flags |= TAFlag_QUIT;
2439  return;
2440  } else if (str[0] == 'n') { // "next"
2441  return;
2442  } else if (str[0] == 'c') { // "continue"
2443  fContinue = true;
2444  return;
2445  } else if (str[0] == 'a') { // "analyze" N events
2446  int num = atoi(str+1);
2447  printf("Analyzing %d events\n", num);
2448  if (num > 0) {
2449  fSkip = num-1;
2450  }
2451  return;
2452  }
2453  }
2454  }
2455 
2456  TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2457  {
2458  printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str());
2459 
2460 #ifdef HAVE_ROOT
2461  if (fgHolder->fValue == CTRL_QUIT) {
2462  *flags |= TAFlag_QUIT;
2463  return flow;
2464  } else if (fgHolder->fValue == CTRL_PAUSE) {
2465  fContinue = false;
2466  }
2467 #endif
2468 
2469  if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2470  return flow;
2471  } else {
2472  fContinue = false;
2473  }
2474 
2475  if (fSkip > 0) {
2476  fSkip--;
2477  return flow;
2478  }
2479 
2480  InteractiveLoop(runinfo, flags);
2481 
2482  return flow;
2483  }
2484 
2486  {
2487  printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo);
2488 
2489 #ifdef HAVE_ROOT
2490  if (fgHolder->fValue == CTRL_QUIT) {
2491  *flags |= TAFlag_QUIT;
2492  return flow;
2493  } else if (fgHolder->fValue == CTRL_PAUSE) {
2494  fContinue = false;
2495  }
2496 #endif
2497 
2498  if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2499  return flow;
2500  }
2501 
2502  fNextFlow = false;
2503 
2504  InteractiveLoop(runinfo, flags);
2505 
2506  return flow;
2507  }
2508 
2509  void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event)
2510  {
2511  if (gTrace)
2512  printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2513  }
2514 };
2515 
2516 #ifdef HAVE_ROOT
2519 #endif
2520 
2522 {
2523 public:
2524 
2525  void Init(const std::vector<std::string> &args)
2526  {
2527  if (gTrace)
2528  printf("InteractiveModuleFactory::Init!\n");
2529  }
2530 
2531  void Finish()
2532  {
2533  if (gTrace)
2534  printf("InteractiveModuleFactory::Finish!\n");
2535  }
2536 
2538  {
2539  if (gTrace)
2540  printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2541  return new InteractiveModule(runinfo);
2542  }
2543 };
2544 
2545 //////////////////////////////////////////////////////////
2546 //
2547 // main program
2548 //
2549 //////////////////////////////////////////////////////////
2550 
2551 static void help()
2552 {
2553  printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2554  printf("\n");
2555  printf("-h: print this help message\n");
2556  printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2557  printf("\n");
2558  printf("-Hhostname: connect to MIDAS experiment on given host\n");
2559  printf("-Eexptname: connect to this MIDAS experiment\n");
2560  printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2561  printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2562  printf("--midas-exptname EXPTNAME -- connect to given experiment\n");
2563  printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2564  printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2565  printf("--midas-event-id III -- receive only events with matching event ID\n");
2566  printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2567  printf("\n");
2568  printf("-oOutputfile.mid: write selected events into this file\n");
2569  printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2570  printf("-eNNN: Number of events to analyze, 0=unlimited\n");
2571  printf("-sNNN: Number of events to skip before starting analysis\n");
2572  printf("\n");
2573  printf("--dump: activate the event dump module\n");
2574  printf("\n");
2575  printf("-t: Enable tracing of constructors, destructors and function calls\n");
2576  printf("-m: Enable memory leak debugging\n");
2577  printf("-g: Enable graphics display when processing data files\n");
2578  printf("-i: Enable intractive mode\n");
2579  printf("\n");
2580  printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2581  printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength);
2582  printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty);
2583  printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull);
2584  printf("\n");
2585  printf("--no-profiler: Turn off manalyzer module profiler\n");
2586  printf("--pqiNNN: Profile multithread queue lengths every NNN events \n");
2587 #ifdef HAVE_ROOT
2588  printf("\n");
2589  printf("-Doutputdirectory: Specify output root file directory\n");
2590  printf("-Ooutputfile.root: Specify output root file filename\n");
2591 #endif
2592  printf("\n");
2593  printf("--: All following arguments are passed to the analyzer modules Init() method\n");
2594  printf("\n");
2595  printf("Analyzer modules usage:\n");
2596  if (gModules)
2597  for (unsigned i=0; i<(*gModules).size(); i++)
2598  (*gModules)[i]->Usage();
2599  printf("\n");
2600  printf("Example1: analyze online data: ./manalyzer.exe -R9091\n");
2601  printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2602  exit(1);
2603 }
2604 
2605 // duplicate c++20 std::string s.starts_with()
2606 
2607 static bool starts_with(const std::string& s, const char* prefix)
2608 {
2609  return (s.substr(0, strlen(prefix)) == prefix);
2610 }
2611 
2612 // Main function call
2613 
2614 int manalyzer_main(int argc, char* argv[])
2615 {
2616  setbuf(stdout, NULL);
2617  setbuf(stderr, NULL);
2618 
2619  signal(SIGILL, SIG_DFL);
2620  signal(SIGBUS, SIG_DFL);
2621  signal(SIGSEGV, SIG_DFL);
2622  signal(SIGPIPE, SIG_DFL);
2623 
2624  std::vector<std::string> args;
2625  for (int i=0; i<argc; i++) {
2626  if (strcmp(argv[i],"-h")==0)
2627  help(); // does not return
2628  args.push_back(argv[i]);
2629  }
2630 
2631  int httpPort = 0;
2632  int num_skip = 0;
2633  int num_analyze = 0;
2634 
2635  TMWriterInterface *writer = NULL;
2636 
2637  bool event_dump = false;
2638  bool demo_mode = false;
2639 #ifdef HAVE_ROOT
2640  bool root_graphics = false;
2641 #endif
2642  bool interactive = false;
2643 
2644  bool multithread = false;
2645 
2646  bool performance_profiler = true;
2647  int snap_shot_queue_length = 100;
2648 
2649  std::vector<std::string> files;
2650  std::vector<std::string> modargs;
2651 
2652 #ifdef HAVE_MIDAS
2653  std::string midas_hostname = "";
2654  std::string midas_exptname = "";
2655  std::string midas_progname = "ana";
2656  std::string midas_buffer = "SYSTEM";
2657  //std::string midas_sampling = "GET_ALL";
2658  std::string midas_sampling = "GET_NONBLOCKING";
2659  int midas_event_id = -1;
2660  int midas_trigger_mask = -1;
2661 #endif
2662 
2663  for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
2664  std::string arg = args[i];
2665  //printf("argv[%d] is %s\n",i,arg);
2666 
2667  if (arg == "--") {
2668  for (unsigned j=i+1; j<args.size(); j++)
2669  modargs.push_back(args[j]);
2670  break;
2671  } else if (arg == "--dump") {
2672  event_dump = true;
2673  } else if (arg == "--demo") {
2674  demo_mode = true;
2675  num_analyze = 100;
2676 #ifdef HAVE_ROOT
2677  } else if (arg == "-g") {
2678  root_graphics = true;
2679 #endif
2680  } else if (arg == "-i") {
2681  interactive = true;
2682  } else if (arg == "-t") {
2683  gTrace = true;
2686  } else if (starts_with(arg, "-o")) {
2687  writer = TMNewWriter(arg.c_str()+2);
2688  } else if (starts_with(arg, "-s")) {
2689  num_skip = atoi(arg.c_str()+2);
2690  } else if (starts_with(arg, "-e")) {
2691  num_analyze = atoi(arg.c_str()+2);
2692  } else if (starts_with(arg, "-m")) { // Enable memory debugging
2693  gEnableShowMem = true;
2694  } else if (starts_with(arg, "-R")) { // Set the ROOT THttpServer HTTP port
2695  httpPort = atoi(arg.c_str()+2);
2696 #ifdef HAVE_MIDAS
2697  } else if (starts_with(arg, "-H")) {
2698  midas_hostname = arg.c_str()+2;
2699  } else if (starts_with(arg, "-E")) {
2700  midas_exptname = arg.c_str()+2;
2701  } else if (arg == "--midas-progname") {
2702  midas_progname = args[i+1]; i++;
2703  } else if (arg == "--midas-hostname") {
2704  midas_hostname = args[i+1]; i++;
2705  } else if (arg == "--midas-exptname") {
2706  midas_exptname = args[i+1]; i++;
2707  } else if (arg == "--midas-buffer") {
2708  midas_buffer = args[i+1]; i++;
2709  } else if (arg == "--midas-sampling") {
2710  midas_sampling = args[i+1]; i++;
2711  } else if (arg == "--midas-event-id") {
2712  midas_event_id = atoi(args[i+1].c_str()); i++;
2713  } else if (arg == "--midas-trigger-mask") {
2714  midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
2715 #endif
2716  } else if (starts_with(arg, "--mtql")) {
2717  gDefaultMultithreadQueueLength = atoi(arg.c_str()+6);
2718  } else if (starts_with(arg, "--mtse")) {
2719  gDefaultMultithreadWaitEmpty = atoi(arg.c_str()+6);
2720  } else if (starts_with(arg, "--mtsf")) {
2721  gDefaultMultithreadWaitFull = atoi(arg.c_str()+6);
2722  } else if (arg == "--mt") {
2723  multithread=true;
2724  } else if (arg == "--no-profiler") {
2725  performance_profiler = 0;
2726  } else if (starts_with(arg, "--pqi")) {
2727  snap_shot_queue_length = atoi(arg.c_str()+5);
2728 #ifdef HAVE_ROOT
2729  } else if (starts_with(arg, "-O")) {
2730  TARootHelper::fOutputFileName = arg.c_str()+2;
2731  } else if (starts_with(arg, "-D")) {
2732  TARootHelper::fOutputDirectory = arg.c_str()+2;
2733 #endif
2734  } else if (arg == "-h") {
2735  help(); // does not return
2736  } else if (arg[0] == '-') {
2737  help(); // does not return
2738  } else {
2739  files.push_back(args[i]);
2740  }
2741  }
2742 
2743  if (!gModules)
2744  gModules = new std::vector<TAFactory*>;
2745 
2746  if ((*gModules).size() == 0)
2747  event_dump = true;
2748 
2749  if (event_dump)
2750  (*gModules).push_back(new EventDumpModuleFactory);
2751 
2752  if (interactive)
2753  (*gModules).push_back(new InteractiveModuleFactory);
2754 
2755  printf("Registered modules: %d\n", (int)(*gModules).size());
2756 
2757 #ifdef HAVE_ROOT
2758  if (root_graphics) {
2759  TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0);
2760  }
2761 
2762  TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms");
2764 #endif
2765 
2766  if (httpPort) {
2767 #ifdef HAVE_THTTP_SERVER
2768  char str[256];
2769  sprintf(str, "http:127.0.0.1:%d?cors", httpPort);
2770  THttpServer *s = new THttpServer(str);
2771  //s->SetTimer(100, kFALSE);
2773 #else
2774  fprintf(stderr,"ERROR: No support for the THttpServer!\n");
2775 #endif
2776  }
2777 
2778  for (unsigned i=0; i<files.size(); i++) {
2779  printf("file[%d]: %s\n", i, files[i].c_str());
2780  }
2781 
2782  if (demo_mode) {
2783  ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2784  } else if (files.size() > 0) {
2785  ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2786  } else {
2787 #ifdef HAVE_MIDAS
2788 #ifdef HAVE_TMFE
2789  ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2790 #else
2791  ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2792 #endif
2793 #endif
2794  }
2795 
2796  if (writer) {
2797  writer->Close();
2798  delete writer;
2799  writer = NULL;
2800  }
2801 
2802  return 0;
2803 }
2804 
2805 /* emacs
2806  * Local Variables:
2807  * tab-width: 8
2808  * c-basic-offset: 3
2809  * indent-tabs-mode: nil
2810  * End:
2811  */
R__EXTERN TDirectory * gDirectory
double GetTimeSec()
TARunObject * NewRunObject(TARunInfo *runinfo)
Definition: manalyzer.cxx:2059
void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:2047
void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2017
EventDumpModule(TARunInfo *runinfo)
Definition: manalyzer.cxx:1989
void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2007
void NextSubrun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2012
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:2037
void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2002
void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2022
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2027
TARunObject * NewRunObject(TARunInfo *runinfo)
Definition: manalyzer.cxx:2537
void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:2525
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2456
void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2354
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:2485
void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2300
static MainWindow * fgCtrlWindow
Definition: manalyzer.cxx:2269
void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2359
InteractiveModule(TARunInfo *runinfo)
Definition: manalyzer.cxx:2272
static ValueHolder * fgHolder
Definition: manalyzer.cxx:2268
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
Definition: manalyzer.cxx:2364
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:2509
void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:2295
Definition: mvodb.h:21
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
void CloseWindow()
Definition: manalyzer.cxx:2215
TGLayoutHints * fMenuBarItemLayout
Definition: manalyzer.cxx:2130
TextButton * fNextFlowButton
Definition: manalyzer.cxx:2137
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
Definition: manalyzer.cxx:2144
TGCompositeFrame * fButtonsFrame
Definition: manalyzer.cxx:2132
TGPopupMenu * fMenu
Definition: manalyzer.cxx:2128
TextButton * fPauseButton
Definition: manalyzer.cxx:2139
ValueHolder * fHolder
Definition: manalyzer.cxx:2134
TextButton * fNextButton
Definition: manalyzer.cxx:2136
TGMenuBar * fMenuBar
Definition: manalyzer.cxx:2129
TextButton * fContinueButton
Definition: manalyzer.cxx:2138
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
Definition: manalyzer.cxx:2225
TextButton * fQuitButton
Definition: manalyzer.cxx:2141
void Event(const void *data, int data_size)
Definition: manalyzer.cxx:1617
void Transition(int transition, int run_number, int transition_time)
Definition: manalyzer.cxx:1589
RunHandler fRun
Definition: manalyzer.cxx:1562
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1568
void StartRun(int run_number)
Definition: manalyzer.cxx:1582
std::vector< double > fAnalyzeFlowEventRMS
Definition: manalyzer.cxx:534
const int fQueueInterval
Definition: manalyzer.cxx:549
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
Definition: manalyzer.cxx:708
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
Definition: manalyzer.cxx:761
std::vector< double > fAnalyzeFlowEventTimeTotal
Definition: manalyzer.cxx:538
uint32_t fMIDASStopTime
Definition: manalyzer.cxx:508
std::vector< double > fTotalUserTime
Definition: manalyzer.cxx:543
std::vector< int > fAnalyzeEventEntries
Definition: manalyzer.cxx:524
std::vector< TH1D * > fUserHistograms
Definition: manalyzer.cxx:542
std::vector< double > fAnalyzeFlowEventMean
Definition: manalyzer.cxx:533
uint32_t fMIDASStartTime
Definition: manalyzer.cxx:507
std::string fBinaryPath
Definition: manalyzer.cxx:504
int fNQueues
Definition: manalyzer.cxx:512
int fQueueIntervalCounter
Definition: manalyzer.cxx:514
std::vector< double > fAnalyzeEventRMS
Definition: manalyzer.cxx:523
void Print() const
Definition: manalyzer.cxx:847
std::vector< double > fAnalyzeEventTimeMax
Definition: manalyzer.cxx:526
clock_t fStartCPU
Definition: manalyzer.cxx:505
std::string fBinaryName
Definition: manalyzer.cxx:503
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
Definition: manalyzer.cxx:677
std::vector< std::string > fModuleNames
Definition: manalyzer.cxx:521
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
Definition: manalyzer.cxx:531
std::chrono::system_clock::time_point fStartUser
Definition: manalyzer.cxx:506
const bool fTimeModules
Definition: manalyzer.cxx:547
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
Definition: manalyzer.cxx:583
std::vector< double > fAnalyzeEventTimeTotal
Definition: manalyzer.cxx:527
void AddModuleMap(const char *UserProfileName, unsigned long hash)
Definition: manalyzer.cxx:801
std::vector< double > fAnalyzeFlowEventTimeMax
Definition: manalyzer.cxx:537
void LogMTQueueLength(TARunInfo *runinfo)
Definition: manalyzer.cxx:740
std::map< unsigned int, int > fUserMap
Definition: manalyzer.cxx:541
std::vector< double > fAnalyzeEventMean
Definition: manalyzer.cxx:522
Profiler(const bool time_modules, const int queue_interval_check)
Definition: manalyzer.cxx:566
std::vector< double > fMaxUserTime
Definition: manalyzer.cxx:544
std::vector< TH1D * > fAnalyzeEventTimeHistograms
Definition: manalyzer.cxx:519
std::vector< int > fAnalyzeFlowEventEntries
Definition: manalyzer.cxx:535
std::vector< TH1D * > fAnalysisQueue
Definition: manalyzer.cxx:513
void End()
Definition: manalyzer.cxx:825
TARunInfo * fRunInfo
Definition: manalyzer.cxx:935
std::vector< TARunObject * > fRunRun
Definition: manalyzer.cxx:936
std::vector< std::string > fArgs
Definition: manalyzer.cxx:937
void NextSubrun()
Definition: manalyzer.cxx:1136
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
Definition: manalyzer.cxx:1207
void CreateRun(int run_number, const char *file_name)
Definition: manalyzer.cxx:1055
void DeleteRun()
Definition: manalyzer.cxx:1144
void AnalyzeSpecialEvent(TMEvent *event)
Definition: manalyzer.cxx:1160
void EndRun(TAFlags *flags)
Definition: manalyzer.cxx:1096
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:1166
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
Definition: manalyzer.cxx:941
void PerModuleThread(int i)
Definition: manalyzer.cxx:962
void AnalyzeFlowQueue(TAFlags *ana_flags)
Definition: manalyzer.cxx:1183
void BeginRun()
Definition: manalyzer.cxx:1086
virtual void Finish()
Definition: manalyzer.cxx:185
virtual void Init(const std::vector< std::string > &args)
Definition: manalyzer.cxx:179
virtual void Usage()
Definition: manalyzer.cxx:173
virtual ~TAFlowEvent()
Definition: manalyzer.cxx:90
TAFlowEvent * fNext
Definition: manalyzer.h:50
std::vector< TAFlagsQueue > fMtFlagQueue
Definition: manalyzer.h:175
std::vector< std::mutex > fMtFlowQueueMutex
Definition: manalyzer.h:173
std::vector< bool > fMtThreadIsBusy
Definition: manalyzer.h:178
std::vector< std::thread * > fMtThreads
Definition: manalyzer.h:176
static int gfMtMaxBacklog
Definition: manalyzer.h:191
static bool gfMultithread
Definition: manalyzer.h:190
std::vector< TAFlowEventQueue > fMtFlowQueue
Definition: manalyzer.h:174
std::vector< bool > fMtThreadIsRunning
Definition: manalyzer.h:177
static std::mutex gfLock
Definition: manalyzer.h:192
TARegister(TAFactory *m)
Definition: manalyzer.cxx:457
static std::string fOutputDirectory
Definition: manalyzer.h:151
static TApplication * fgApp
Definition: manalyzer.h:155
static std::string fOutputFileName
Definition: manalyzer.h:152
static THttpServer * fgHttpServer
Definition: manalyzer.h:156
TFile * fOutputFile
Definition: manalyzer.h:153
static TDirectory * fgDir
Definition: manalyzer.h:154
void AddToFlowQueue(TAFlowEvent *)
Definition: manalyzer.cxx:1283
TAFlowEvent * ReadFlowQueue()
Definition: manalyzer.cxx:1273
std::string fFileName
Definition: manalyzer.h:24
std::vector< std::string > fArgs
Definition: manalyzer.h:28
MVOdb * fOdb
Definition: manalyzer.h:25
int fRunNo
Definition: manalyzer.h:23
static std::vector< std::string > fgFileList
Definition: manalyzer.h:29
TARunInfo()
Definition: manalyzer.h:41
TARootHelper * fRoot
Definition: manalyzer.h:26
TAMultithreadHelper * fMtInfo
Definition: manalyzer.h:27
static int fgCurrentFileIndex
Definition: manalyzer.h:30
virtual void EndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:117
virtual void ResumeRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:135
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
Definition: manalyzer.cxx:161
virtual void NextSubrun(TARunInfo *runinfo)
Definition: manalyzer.cxx:123
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:147
virtual void PauseRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:129
virtual void BeginRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:111
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
Definition: manalyzer.cxx:154
virtual void PreEndRun(TARunInfo *runinfo)
Definition: manalyzer.cxx:141
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
Definition: manalyzer.cxx:479
const std::string fModuleName
Definition: manalyzer.h:219
double GetTimer() const
Definition: manalyzer.cxx:489
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
Definition: midasio.cxx:655
void Reset()
reset everything
Definition: midasio.cxx:685
void ParseEvent()
parse event data
Definition: midasio.cxx:705
std::vector< char > data
MIDAS event bytes.
Definition: midasio.h:67
size_t event_header_size
size of MIDAS event header
Definition: midasio.h:63
uint32_t serial_number
MIDAS event serial number.
Definition: midasio.h:59
uint32_t data_size
MIDAS event data size.
Definition: midasio.h:61
uint16_t event_id
MIDAS event ID.
Definition: midasio.h:57
MIDAS online connection, including access to online ODB.
Definition: TMidasOnline.h:36
virtual int Close()=0
std::string fErrorString
Definition: midasio.h:116
static bool fgTrace
Definition: midasio.h:117
virtual int Close()=0
static bool fgTrace
Definition: midasio.h:125
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
HNDLE fDB
ODB handle.
Definition: TMidasOnline.h:58
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
Definition: manalyzer.cxx:2097
ValueHolder * fHolder
Definition: manalyzer.cxx:2094
void Clicked()
Definition: manalyzer.cxx:2116
std::chrono::high_resolution_clock::time_point TAClock
Definition: manalyzer.h:206
int TAFlags
Definition: manalyzer.h:71
#define TAFlag_DISPLAY
Definition: manalyzer.h:77
std::chrono::duration< double > TAClockDuration
Definition: manalyzer.h:207
#define TAFlag_SKIP
Definition: manalyzer.h:74
#define TAFlag_WRITE
Definition: manalyzer.h:76
TAClock TAClockNow()
Definition: manalyzer.h:208
#define TAFlag_QUIT
Definition: manalyzer.h:75
#define TAFlag_SKIP_PROFILE
Definition: manalyzer.h:78
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
Definition: midasio.cxx:650
TMReaderInterface * TMNewReader(const char *source)
Definition: midasio.cxx:442
TMWriterInterface * TMNewWriter(const char *destination)
Definition: midasio.cxx:540
TMEvent * TMReadEvent(TMReaderInterface *reader)
Definition: midasio.cxx:584
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
Definition: midasodb.cxx:836
MVOdb * MakeNullOdb()
Definition: nullodb.cxx:123
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
Definition: mvodb.cxx:91
#define CTRL_NEXT_FLOW
Definition: manalyzer.cxx:2076
static int gDefaultMultithreadWaitFull
Definition: manalyzer.cxx:275
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1881
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow, bool wait)
Definition: manalyzer.cxx:409
std::vector< TAFactory * > * gModules
Definition: manalyzer.cxx:455
static bool gTrace
Definition: manalyzer.cxx:18
#define CTRL_NEXT
Definition: manalyzer.cxx:2073
int manalyzer_main(int argc, char *argv[])
Definition: manalyzer.cxx:2614
static int gDefaultMultithreadWaitEmpty
Definition: manalyzer.cxx:274
#define CTRL_TBROWSER
Definition: manalyzer.cxx:2078
#define CTRL_CONTINUE
Definition: manalyzer.cxx:2074
#define CTRL_QUIT
Definition: manalyzer.cxx:2072
static bool starts_with(const std::string &s, const char *prefix)
Definition: manalyzer.cxx:2607
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1647
static bool gEnableShowMem
Definition: manalyzer.cxx:1963
#define CTRL_PAUSE
Definition: manalyzer.cxx:2075
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
Definition: manalyzer.cxx:277
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
Definition: manalyzer.cxx:1721
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
Definition: manalyzer.cxx:323
static void help()
Definition: manalyzer.cxx:2551
static int gDefaultMultithreadQueueLength
Definition: manalyzer.cxx:273
static std::string to_string(int v)
Definition: midasio.cxx:27
int ShowMem(const char *label)