ROOTANA
Loading...
Searching...
No Matches
manalyzer.cxx
Go to the documentation of this file.
1//
2// MIDAS analyzer
3//
4// K.Olchanski
5//
6
7#undef NDEBUG // this program requires working assert()
8
9#include <stdio.h>
10#include <unistd.h> // usleep()
11#include <assert.h>
12#include <sys/stat.h> // struct stat_buffer;
13
14#include "manalyzer.h"
15#include "midasio.h"
16
17//////////////////////////////////////////////////////////
18
19static bool gTrace = false;
20
21//////////////////////////////////////////////////////////
22//
23// Methods of TARunInfo
24//
25//////////////////////////////////////////////////////////
26
27TARunInfo::TARunInfo(int runno, const char* filename, const std::vector<std::string>& args)
28{
29 if (gTrace)
30 printf("TARunInfo::ctor!\n");
31 fRunNo = runno;
32 if (filename)
33 fFileName = filename;
34 fOdb = NULL;
35#ifdef HAVE_ROOT
36 fRoot = new TARootHelper(this);
37#else
38 fRoot = NULL;
39#endif
40 fMtInfo = NULL;
41 fArgs = args;
42}
43
45{
46 if (gTrace)
47 printf("TARunInfo::dtor!\n");
48 fRunNo = 0;
49 fFileName = "(deleted)";
50 if (fOdb) {
51 delete fOdb;
52 fOdb = NULL;
53 }
54#ifdef HAVE_ROOT
55 if (fRoot) {
56 delete fRoot;
57 fRoot = NULL;
58 }
59#endif
60 int count = 0;
61 while (1) {
62 TAFlowEvent* flow = ReadFlowQueue();
63 if (!flow)
64 break;
65 delete flow;
66 count++;
67 }
68 if (gTrace) {
69 printf("TARunInfo::dtor: deleted %d queued flow events!\n", count);
70 }
71
72 if (fMtInfo) {
73 delete fMtInfo;
74 fMtInfo = NULL;
75 }
76}
77
78//////////////////////////////////////////////////////////
79//
80// Methods of TAFlowEvent
81//
82//////////////////////////////////////////////////////////
83
85{
86 if (gTrace)
87 printf("TAFlowEvent::ctor: chain %p\n", flow);
88 fNext = flow;
89}
90
92{
93 if (gTrace)
94 printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext);
95 if (fNext)
96 delete fNext;
97 fNext = NULL;
98}
99
100//////////////////////////////////////////////////////////
101//
102// Methods of TARunObject
103//
104//////////////////////////////////////////////////////////
105
107{
108 if (gTrace)
109 printf("TARunObject::ctor, run %d\n", runinfo->fRunNo);
110}
111
113{
114 if (gTrace)
115 printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo);
116}
117
119{
120 if (gTrace)
121 printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo);
122}
123
125{
126 if (gTrace)
127 printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo);
128}
129
131{
132 if (gTrace)
133 printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo);
134}
135
137{
138 if (gTrace)
139 printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo);
140}
141
143{
144 if (gTrace)
145 printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo);
146}
147
149{
150 if (gTrace)
151 printf("TARunObject::Analyze!\n");
152
153 // This default analyze function does no work, instruct the Profiler to not time / log this
154 *flags|=TAFlag_SKIP_PROFILE;
155
156 return flow;
157}
158
160{
161 if (gTrace)
162 printf("TARunObject::Analyze!\n");
163
164 // This default analyze function does no work, instruct the Profiler to not time / log this
165 *flags|=TAFlag_SKIP_PROFILE;
166
167 return flow;
168}
169
171{
172 if (gTrace)
173 printf("TARunObject::AnalyzeSpecialEvent!\n");
174}
175
176//////////////////////////////////////////////////////////
177//
178// Methods of TAFactory
179//
180//////////////////////////////////////////////////////////
181
183{
184 if (gTrace)
185 printf("TAFactory::Usage!\n");
186}
187
188void TAFactory::Init(const std::vector<std::string> &args)
189{
190 if (gTrace)
191 printf("TAFactory::Init!\n");
192}
193
195{
196 if (gTrace)
197 printf("TAFactory::Finish!\n");
198}
199
200#ifdef HAVE_ROOT
201
202//////////////////////////////////////////////////////////
203//
204// Methods of TARootHelper
205//
206//////////////////////////////////////////////////////////
207
208std::string TARootHelper::fOutputDirectory = "root_output_files";
209std::string TARootHelper::fOutputFileName = "";
210TApplication* TARootHelper::fgApp = NULL;
212THttpServer* TARootHelper::fgHttpServer = NULL;
213
215{
216 if (gTrace)
217 printf("TARootHelper::ctor!\n");
218
219 std::string filename = fOutputFileName;
220
221 if (filename.empty()) {
222 char xfilename[256];
223 sprintf(xfilename, "output%05d.root", runinfo->fRunNo);
224
225 if (fOutputDirectory.empty()) {
226 filename = xfilename;
227 } else {
228 filename = fOutputDirectory + "/" + xfilename;
229
230 struct stat buffer;
231 int status = stat(fOutputDirectory.c_str(), &buffer);
232
233 if (status < 0 && errno == ENOENT) {
234 fprintf(stdout, "TARootHelper::ctor: creating output directory \"%s\"\n", fOutputDirectory.c_str());
235 status = mkdir(fOutputDirectory.c_str(), 0777);
236 if (status == -1) {
237 fprintf(stderr, "TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n", fOutputDirectory.c_str(), errno, strerror(errno));
238 }
239 }
240 }
241 }
242
243 fOutputFile = new TFile(filename.c_str(), "RECREATE");
244
245 if (!fOutputFile->IsOpen()) {
246 fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str());
247 fOutputFile = new TFile("/dev/null", "UPDATE");
248 assert(fOutputFile);
249 assert(fOutputFile->IsOpen());
250 }
251
252 if (fOutputFile != NULL) {
253 fOutputFile->cd();
254 }
255}
256
258{
259 if (gTrace)
260 printf("TARootHelper::dtor!\n");
261
262 if (fOutputFile != NULL) {
263 fOutputFile->Write();
264 fOutputFile->Close();
265 fOutputFile = NULL;
266 }
267
268 if (fgDir)
269 fgDir->cd();
270}
271
272#endif
273
274#include <stdio.h>
275#include <stdlib.h>
276#include <string.h>
277#include <sys/time.h>
278#include <assert.h>
279#include <signal.h>
280
281#include "manalyzer.h"
282#include "midasio.h"
283
284#ifdef HAVE_THTTP_SERVER
285#include "THttpServer.h"
286#endif
287
288#ifdef HAVE_ROOT
289#include <TSystem.h>
290#endif
291
292//////////////////////////////////////////////////////////
293//
294// Methods and Defaults of TAMultithreadHelper
295//
296//////////////////////////////////////////////////////////
297
299static int gDefaultMultithreadWaitEmpty = 100; // microseconds
300static int gDefaultMultithreadWaitFull = 100; // microseconds
301
303{
304 fprintf(stderr, "Waiting for all queues to empty out!\n");
305
306 int count_all_empty = 0;
307
308 for (int t=0; ; ) {
309 int count_not_empty = 0;
310 size_t count_events = 0;
311
312 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
313 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
314 if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown
315 continue;
316 if (!mt->fMtFlowQueue[i].empty()) {
317 count_not_empty++;
318 count_events += mt->fMtFlowQueue[i].size();
319 break;
320 }
321 if (mt->fMtThreadIsBusy[i]) {
322 count_not_empty++;
323 count_events += 1; // module is analyzing 1 event
324 break;
325 }
326 // implicit unlock
327 }
328
329 if (count_not_empty == 0) {
330 count_all_empty++;
331 }
332
333 if (count_all_empty > 1) {
334 // must loop over "all empty" at least twice! K.O.
335 break;
336 }
337
338 if (t > 10) {
339 fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (int)count_events);
340 //break;
341 }
342
343 ::sleep(1);
344 t++;
345 }
346}
347
349{
350 fprintf(stderr, "Waiting for all threads to shut down!\n");
351
352 mt->fMtShutdownRequested = true;
353
354 for (int t=0; ; ) {
355 int count_running = 0;
356
357 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
358 if (mt->fMtThreadIsRunning[i])
359 count_running++;
360 }
361
362 if (count_running == 0) {
363 break;
364 }
365
366 if (t > 10) {
367 fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running);
368 break;
369 }
370
371 ::sleep(1);
372 t++;
373 }
374
375 fprintf(stderr, "Joining all threads!\n");
376 for (unsigned i=0; i<mt->fMtThreads.size(); i++) {
377 if (!mt->fMtThreadIsRunning[i] ) {
378 // pointer is null if thread is already shutdown
379 if (mt->fMtThreads[i]) {
380 mt->fMtThreads[i]->join();
381 delete mt->fMtThreads[i];
382 mt->fMtThreads[i] = NULL;
383 }
384 } else {
385 fprintf(stderr, "Thread %d failed to shut down!\n", i);
386 }
387 }
388}
389
390
392 fMtFlowQueueMutex(nModules),
393 fMtFlowQueue(nModules),
394 fMtFlagQueue(nModules),
395 fMtThreads(nModules,NULL),
396 fMtThreadIsRunning(nModules),
397 fMtThreadIsBusy(nModules),
398 fMtShutdownRequested(false),
399 fMtQuitRequested(false)
400 // ctor
401{
402 for (auto &b: fMtThreadIsRunning) { b = false; }
403 for (auto &b: fMtThreadIsBusy) { b = false; }
404 // default max queue size
406 // queue settings
409}
410
412{
413 size_t nmodules = fMtFlowQueueMutex.size();
414
415 // just for kicks, check that all queues have correct size
416 assert(nmodules == fMtFlowQueue.size());
417 assert(nmodules == fMtFlagQueue.size());
418 assert(nmodules == fMtFlowQueueMutex.size());
419
420 // should not come to the destructor while threads are still running
422
423 int count = 0;
424 for (size_t i=0; i<nmodules; i++) {
425 // check that the thread is stopped
426 //assert(!fMtThread[i].joinable());
427 // empty the thread queue
428 std::lock_guard<std::mutex> lock(fMtFlowQueueMutex[i]);
429 while (!fMtFlowQueue[i].empty()) {
430 TAFlowEvent* flow = fMtFlowQueue[i].front();
431 TAFlags* flag = fMtFlagQueue[i].front();
432 fMtFlowQueue[i].pop_front();
433 fMtFlagQueue[i].pop_front();
434 delete flow;
435 delete flag;
436 count++;
437 }
438 // implicit unlock of mutex
439 }
440 if (gTrace) {
441 printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count);
442 }
443}
444
447std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries)
448
449static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow, bool wait)
450{
451 assert(mt);
452
453 if (flag == NULL) {
454 flag = new TAFlags;
455 *flag = 0;
456 }
457
458 //PrintQueueLength();
459
460 while (1) {
461 {
462 //Lock and queue events
463 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
464
465 if ((((int)mt->fMtFlowQueue[i].size()) < mt->fMtQueueDepth) || mt->fMtShutdownRequested || !wait) {
466 mt->fMtFlowQueue[i].push_back(flow);
467 mt->fMtFlagQueue[i].push_back(flag);
468 return;
469 }
470 // Unlock when we go out of scope
471 }
472
473 usleep(mt->fMtQueueFullUSleepTime);
474 }
475}
476
477#if 0
478//Function to print the length of the flow queue when in multithread mode
479//Maybe make root update a graphical window?
480static void PrintMtQueueLength(TAMultithreadHelper* mt)
481{
482 printf("Multithread queue lengths:\n");
483 for (unsigned i=0; i<mt->fMtFlowQueue.size(); i++) {
484 printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size());
485 }
486}
487#endif
488
489//////////////////////////////////////////////////////////
490//
491// Methods of TARegister
492//
493//////////////////////////////////////////////////////////
494
495std::vector<TAFactory*> *gModules = NULL;
496
498{
499 if (!gModules)
500 gModules = new std::vector<TAFactory*>;
501 gModules->push_back(m);
502}
503
504#if 0
505static double GetTimeSec()
506{
507 struct timeval tv;
508 gettimeofday(&tv,NULL);
509 return tv.tv_sec + 0.000001*tv.tv_usec;
510}
511#endif
512
513//////////////////////////////////////////////////////////
514//
515// Profiler class
516//
517//////////////////////////////////////////////////////////
518
519TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name)
520{
521 fStart = start;
522 fStop = TAClockNow();
523}
524
528
530{
531 TAClockDuration elapsed_seconds = fStop - fStart;
532 return elapsed_seconds.count();
533}
534
535#ifdef HAVE_ROOT
536#include "TH1D.h"
537#endif
538#include <map>
539
541{
542private:
543 std::string fBinaryName;
544 std::string fBinaryPath;
545 clock_t fStartCPU;
546 std::chrono::system_clock::time_point fStartUser;
549
550 //Track Queue lengths when multithreading
551#ifdef HAVE_ROOT
552 int fNQueues=0;
553 std::vector<TH1D*> fAnalysisQueue;
554 std::atomic<int> fQueueIntervalCounter;
555#endif
556
557 // Track Analyse TMEvent time per module (main thread)
558#ifdef HAVE_ROOT
559 std::vector<TH1D*> fAnalyzeEventTimeHistograms;
560#endif
561 std::vector<std::string> fModuleNames;
562 std::vector<double> fAnalyzeEventMean;
563 std::vector<double> fAnalyzeEventRMS;
564 std::vector<int> fAnalyzeEventEntries;
565
566 std::vector<double> fAnalyzeEventTimeMax;
567 std::vector<double> fAnalyzeEventTimeTotal;
568
569 //Track Analyse flow event time per module (can be multiple threads)
570#ifdef HAVE_ROOT
572#endif
573 std::vector<double> fAnalyzeFlowEventMean;
574 std::vector<double> fAnalyzeFlowEventRMS;
575 std::vector<int> fAnalyzeFlowEventEntries;
576
577 std::vector<double> fAnalyzeFlowEventTimeMax;
578 std::vector<double> fAnalyzeFlowEventTimeTotal;
579#ifdef HAVE_ROOT
580 //Track user profiling
581 std::map<unsigned int,int> fUserMap;
582 std::vector<TH1D*> fUserHistograms;
583 std::vector<double> fTotalUserTime;
584 std::vector<double> fMaxUserTime;
585
586 // Number of events between samples
587 const int fQueueInterval = 100;
588#endif
589
590public:
591 Profiler( const int queue_interval_check );
592 ~Profiler();
593 void Begin(TARunInfo* runinfo,const std::vector<TARunObject*> fRunRun );
594 // Function for profiling the 'main' thread (that unpacks TMEvents)
595 void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
596 // Function for profiling module threads
597 void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start);
598 // Extra function for users custom profiling windows
599 void LogUserWindows(TAFlags* flag, TAFlowEvent* flow);
600 void AddModuleMap( const char* UserProfileName, unsigned long hash);
601 void LogMTQueueLength(TARunInfo* runinfo);
602 void End(TARunInfo* runinfo);
603 void Print() const;
604};
605
606Profiler::Profiler(const int queue_interval_check)
607#ifdef HAVE_ROOT
608 :
609 fQueueIntervalCounter(0),
610 fQueueInterval(queue_interval_check)
611#endif
612{
613 if (gTrace)
614 printf("Profiler::ctor\n");
615 fMIDASStartTime = 0;
616 fMIDASStopTime = 0;
617 fStartCPU = clock();
618 fStartUser = std::chrono::system_clock::now();
619}
620
622{
623 if (gTrace)
624 printf("Profiler::dtor\n");
625
626#ifdef HAVE_ROOT
628 delete h;
630 for (TH1D* h: fAnalyzeEventTimeHistograms)
631 delete h;
633 for( TH1D* h: fAnalysisQueue)
634 delete h;
635 fAnalysisQueue.clear();
636#endif
637}
638
639void Profiler::Begin(TARunInfo* runinfo, const std::vector<TARunObject*> runrun)
640{
641 if (gTrace)
642 printf("Profiler::begin\n");
643
644 runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime);
645
646#ifdef HAVE_ROOT
647 // Put Profiler histograms in their own folders in the output root file
648 if (runinfo->fRoot->fOutputFile) {
649 runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory
650 gDirectory->mkdir("ProfilerReport")->cd();
651 runinfo->fRoot->fOutputFile->cd("ProfilerReport");
652 gDirectory->mkdir("AnalyzeFlowEventTime");
653 gDirectory->mkdir("AnalyzeFlowTime");
654 gDirectory->mkdir("MTQueueLength");
655 }
656
657 // Setup module processing time histograms
658 // Number of bins
659 Int_t Nbins=1000;
660 // Array of bin edges
661 Double_t bins[Nbins+1];
662 // Processing time range (seconds)
663 Double_t TimeRange = 10;
664 // Set uneven binning to better sample fast modules with accuracy
665 // without having a large number of bins
666 for (int i=0; i<Nbins+1; i++) {
667 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
668 }
669
670 if (runinfo->fMtInfo)
671 fNQueues = runrun.size();
672#endif
673
674 // Per module metric setup
675 for (size_t i = 0; i < runrun.size(); i++) {
676
677 if (runrun[i]->fModuleName.empty())
678 fModuleNames.push_back("Unnamed Module " + std::to_string(i));
679 else
680 fModuleNames.push_back(runrun[i]->fModuleName);
681
682 // Metrics for the AnalyzeEvent function (main thread)
683 fAnalyzeEventMean.push_back(0);
684 fAnalyzeEventRMS.push_back(0);
685 fAnalyzeEventEntries.push_back(0);
686 fAnalyzeEventTimeMax.push_back(0);
687 fAnalyzeEventTimeTotal.push_back(0);
688
689 // Metric for the AnalyzeFlowEvent function (side threads)
690 fAnalyzeFlowEventMean.push_back(0);
691 fAnalyzeFlowEventRMS.push_back(0);
692 fAnalyzeFlowEventEntries.push_back(0);
693 fAnalyzeFlowEventTimeMax.push_back(0);
694 fAnalyzeFlowEventTimeTotal.push_back(0);
695
696#ifdef HAVE_ROOT
697 if (runinfo->fRoot->fOutputFile) {
698 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
699 TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)),
700 TString(fModuleNames.at(i) + " Event Proccessing Time; s"),
701 Nbins, bins);
702 fAnalyzeFlowEventTimeHistograms.push_back(Histo);
703
704 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
705 TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"),
706 TString(fModuleNames.at(i) + " Flow Proccessing Time; s"),
707 Nbins, bins);
708 fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto);
709 } else {
710 fAnalyzeFlowEventTimeHistograms.push_back(NULL);
711 fAnalyzeEventTimeHistograms.push_back(NULL);
712 }
713
714 // Periodically (once every fQueueInterval events) record the
715 // flow queue size if running in multithreaded mode
716 if (runinfo->fMtInfo) {
717 if (runinfo->fRoot->fOutputFile) {
718 runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
719 TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"),
720 TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"),
721 runinfo->fMtInfo->fMtQueueDepth*1.2,
722 0,
723 runinfo->fMtInfo->fMtQueueDepth*1.2);
724 fAnalysisQueue.push_back(QueueHisto);
725 } else {
726 fAnalysisQueue.push_back(NULL);
727 }
728 }
729#endif
730 }
731}
732
733void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start)
734{
735 if (gTrace)
736 printf("Profiler::log\n");
737
738 TAClock stop = TAClockNow();
739 if ((*flag) & TAFlag_SKIP_PROFILE) {
740 //Unset bit
741 *flag -= TAFlag_SKIP_PROFILE;
742 return;
743 }
744
745 std::chrono::duration<double> elapsed_seconds = stop - start;
746 double dt = elapsed_seconds.count();
748 if (dt > fAnalyzeFlowEventTimeMax[i])
750
751 fAnalyzeFlowEventMean[i] +=dt;
752 fAnalyzeFlowEventRMS[i] +=dt*dt;
754
755#ifdef HAVE_ROOT
758#endif
759}
760
761void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start)
762{
763 if (gTrace)
764 printf("Profiler::log_AnalyzeEvent_time\n");
765
766 TAClock stop = TAClockNow();
767 if ((*flag) & TAFlag_SKIP_PROFILE) {
768 //Unset bit
769 *flag -= TAFlag_SKIP_PROFILE;
770 return;
771 }
772
773 std::chrono::duration<double> elapsed_seconds = stop - start;
774 double dt = elapsed_seconds.count();
775 fAnalyzeEventTimeTotal[i] += dt;
776 if (dt > fAnalyzeEventTimeMax[i])
777 fAnalyzeEventTimeMax[i] = dt;
778
779 fAnalyzeEventMean[i] +=dt;
780 fAnalyzeEventRMS[i] +=dt*dt;
782
783#ifdef HAVE_ROOT
785 fAnalyzeEventTimeHistograms[i]->Fill(dt);
786#endif
787
788}
789
791{
792 if (gTrace)
793 printf("Profiler::log_mt_queue_length\n");
794
795#ifdef HAVE_ROOT
797 if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) {
798 for (int i=0; i<fNQueues; i++) {
799 int j=0;
800 { //Lock guard
801 std::lock_guard<std::mutex> lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]);
802 j=runinfo->fMtInfo->fMtFlowQueue[i].size();
803 }
804 fAnalysisQueue.at(i)->Fill(j);
805 }
806 }
807#endif
808}
809
810
812{
813 if (gTrace)
814 printf("Profiler::LogUserWindows\n");
815
816#ifdef HAVE_ROOT
817 //Clocks unfold backwards...
818 std::vector<TAFlowEvent*> flowArray;
819 int FlowEvents=0;
820 TAFlowEvent* f = flow;
821 while (f) {
822 flowArray.push_back(f);
823 f=f->fNext;
824 FlowEvents++;
825 }
826 for (int ii=FlowEvents-1; ii>=0; ii--) {
827 f=flowArray[ii];
828 TAUserProfilerFlow* timer=dynamic_cast<TAUserProfilerFlow*>(f);
829 if (timer) {
830 const char* name = timer->fModuleName.c_str();
831 unsigned int hash = std::hash<std::string>{}(timer->fModuleName);
832 if (!fUserMap.count(hash))
833 AddModuleMap(name,hash);
834 double dt=999.;
835 dt=timer->GetTimer();
836 int i = fUserMap[hash];
837 fTotalUserTime[i] += dt;
838 if (dt > fMaxUserTime[i])
839 fMaxUserTime.at(i) = dt;
840 fUserHistograms.at(i)->Fill(dt);
841 }
842 }
843#else
844 fprintf(stderr, "manalyzer must be built with ROOT for using the user profiling tools\n");
845#endif
846}
847
848void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash)
849{
850 if (gTrace)
851 printf("Profiler::AddModuleMap\n");
852
853 std::lock_guard<std::mutex> lock(TAMultithreadHelper::gfLock);
854
855#ifdef HAVE_ROOT
856 gDirectory->cd("/ProfilerReport");
857 fUserMap[hash] = fUserHistograms.size();
858 Int_t Nbins = 100;
859 Double_t bins[Nbins+1];
860 Double_t TimeRange = 10; //seconds
861 for (int i=0; i<Nbins+1; i++) {
862 bins[i] = TimeRange*pow(1.1,i)/pow(1.1,Nbins);
863 }
864 TH1D* Histo = new TH1D(UserProfileName,UserProfileName,Nbins,bins);
865 fUserHistograms.push_back(Histo);
866 fTotalUserTime.push_back(0.);
867 fMaxUserTime.push_back(0.);
868#endif
869 return;
870}
871
873{
874 if (gTrace)
875 printf("Profiler::End\n");
876
877 for (size_t i=0; i<fAnalyzeEventMean.size(); i++) {
880 ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) ) *
881 ( fAnalyzeEventMean.at(i) / fAnalyzeEventEntries.at(i) );
882 }
883 for (size_t i=0; i<fAnalyzeFlowEventMean.size(); i++) {
888 }
889#ifdef HAVE_ROOT
890
891 if (runinfo->fRoot->fOutputFile) {
892 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime");
893 for (TH1D* h: fAnalyzeFlowEventTimeHistograms) {
894 h->Write();
895 }
896 runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime");
897 for (TH1D* h: fAnalyzeEventTimeHistograms) {
898 h->Write();
899 }
900 if (runinfo->fMtInfo) {
901 runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength");
902 for (TH1D* h: fAnalysisQueue) {
903 h->Write();
904 }
905 }
906 }
907#endif
908}
909
910void Profiler::Print() const
911{
912#ifdef HAVE_ROOT
913 if (fAnalyzeFlowEventTimeHistograms.size()>0) {
914#else
915 if (fAnalyzeEventEntries.size()>0) {
916#endif
917 double AllAnalyzeFlowEventTime=0;
918 for (auto& n : fAnalyzeFlowEventTimeTotal)
919 AllAnalyzeFlowEventTime += n;
920 double AllAnalyzeEventTime=0;
921 for (auto& n : fAnalyzeEventTimeTotal)
922 AllAnalyzeEventTime += n;
923 //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end());
924 printf("Module average processing time\n");
925 printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n");
926 printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
927 printf("----------------------------------------------------------------------------------------------------------------\n");
928 for (size_t i=0; i<fModuleNames.size(); i++) {
929 printf("%-25s", fModuleNames.at(i).c_str());
930 if (fAnalyzeEventEntries.at(i))
931 printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
933 fAnalyzeEventMean.at(i)*1000.,
934 fAnalyzeEventRMS.at(i)*1000.,
935 fAnalyzeEventTimeMax.at(i)*1000., //ms
936 fAnalyzeEventTimeTotal.at(i)); //s
937 else
938 printf("\t-\t-\t-\t-\t-");
939
940 if (fAnalyzeFlowEventEntries.at(i))
941 printf("\t%d\t%.1f\t%.1f\t%.1f\t%.3f",
943 fAnalyzeFlowEventMean.at(i)*1000.,
944 fAnalyzeFlowEventRMS.at(i)*1000.,
945 fAnalyzeFlowEventTimeMax.at(i)*1000., //ms
947 else
948 printf("\t-\t-\t-\t-\t-");
949 printf("\n");
950 }
951 printf("----------------------------------------------------------------------------------------------------------------\n");
952 printf(" Analyse TMEvent total time %f\n",AllAnalyzeEventTime);
953 printf(" Analyse FlowEvent total time %f\n",AllAnalyzeFlowEventTime);
954#ifdef HAVE_ROOT
955 if (fUserHistograms.size()) {
956 printf("Custom profiling windows\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n");
957 printf("----------------------------------------------------------------------\n");
958 for (size_t i=0; i<fUserHistograms.size(); i++) {
959 printf("%-25s\t%d\t%.1f\t%.1f\t%.1f\t%.3f\t\n",fUserHistograms.at(i)->GetTitle(),
960 (int)fUserHistograms.at(i)->GetEntries(),
961 fUserHistograms.at(i)->GetMean()*1000., //ms
962 fUserHistograms.at(i)->GetRMS()*1000., //ms
963 fMaxUserTime.at(i)*1000., //ms
964 fTotalUserTime.at(i)); //s
965 }
966 printf("----------------------------------------------------------------------\n");
967 } else {
968 printf("----------------------------------------------------------------------------------------------------------------\n");
969 }
970#else
971 printf("To use custom profile windows, please build rootana with root\n");
972#endif
973 }
974
975 //CPU and Wall clock time:
976 double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC;
977 std::chrono::duration<double> usertime = std::chrono::system_clock::now() - fStartUser;
978 printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n",
979 getenv("_"),
980 cputime,
981 usertime.count(),
982 100.*cputime/usertime.count());
983}
984
985//////////////////////////////////////////////////////////
986//
987// RunHandler class
988//
989//////////////////////////////////////////////////////////
990
991class Profiler;
992
994{
995public:
996 TARunInfo* fRunInfo = NULL;
997 std::vector<TARunObject*> fRunRun;
998 std::vector<std::string> fArgs;
999 bool fMultithreadEnabled = false;
1000 Profiler* fProfiler = NULL;
1001 bool fProfilerEnabled = false;
1003
1004 RunHandler(const std::vector<std::string>& args, bool multithread, bool profile, int queue_interval_check) // ctor
1005 {
1006 fRunInfo = NULL;
1007 fArgs = args;
1008 fMultithreadEnabled = multithread;
1009 fProfiler = NULL;
1010 fProfilerEnabled = profile;
1011 fProfilerIntervalCheck = queue_interval_check;
1012 }
1013
1014 ~RunHandler() // dtor
1015 {
1016 if (fRunInfo) {
1017 delete fRunInfo;
1018 fRunInfo = NULL;
1019 }
1020 if (fProfiler) {
1021 delete fProfiler;
1022 fProfiler = NULL;
1023 }
1024 }
1025
1027 {
1028 //bool data_processing=true;
1029 int nModules=(*gModules).size();
1030
1031 TAMultithreadHelper* mt = fRunInfo->fMtInfo;
1032
1033 assert(nModules == (int)mt->fMtFlowQueueMutex.size());
1034 assert(nModules == (int)mt->fMtFlowQueue.size());
1035 assert(nModules == (int)mt->fMtFlagQueue.size());
1036
1037 { //Lock scope
1038 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1039 mt->fMtThreadIsRunning[i] = true;
1040 }
1041
1042 while (!mt->fMtShutdownRequested) {
1043 TAFlowEvent* flow = NULL;
1044 TAFlags* flag = NULL;
1045
1046 { //Lock scope
1047 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1048 if (!mt->fMtFlowQueue[i].empty()) {
1049 flow=mt->fMtFlowQueue[i].front();
1050 flag=mt->fMtFlagQueue[i].front();
1051 mt->fMtFlowQueue[i].pop_front();
1052 mt->fMtFlagQueue[i].pop_front();
1053 }
1054
1055 if (flow == NULL)
1056 mt->fMtThreadIsBusy[i] = false; // we will sleep
1057 else
1058 mt->fMtThreadIsBusy[i] = true; // we will analyze an event
1059
1060 // implicit unlock of mutex
1061 }
1062
1063 if (flow == NULL) { // wait until queue not empty
1064 usleep(mt->fMtQueueEmptyUSleepTime);
1065 continue;
1066 }
1067
1068 TAClock start_time = TAClockNow();
1069
1070 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow);
1071
1072 if (fProfiler)
1073 fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time);
1074
1075 if ((*flag) & TAFlag_QUIT) { // shut down the analyzer
1076 delete flow;
1077 delete flag;
1078 flow = NULL;
1079 flag = NULL;
1080 mt->fMtQuitRequested = true;
1081 mt->fMtShutdownRequested = true;
1082 break; // stop the thread
1083 }
1084
1085 if ((*flag) & TAFlag_SKIP) { // stop processing this event
1086 delete flow;
1087 delete flag;
1088 flow = NULL;
1089 flag = NULL;
1090 continue;
1091 }
1092
1093 if (i==nModules-1) //If I am the last module... free memory, else queue up for next module to process
1094 {
1095 if (fProfiler) {
1096 fProfiler->LogUserWindows(flag, flow);
1097 fProfiler->LogMTQueueLength(fRunInfo);
1098 }
1099 delete flow;
1100 delete flag;
1101 flow = NULL;
1102 flag = NULL;
1103 }
1104 else
1105 {
1106 MtQueueFlowEvent(mt, i+1, flag, flow, true);
1107 flow = NULL;
1108 flag = NULL;
1109 }
1110 }
1111
1112 { //Lock scope
1113 std::lock_guard<std::mutex> lock(mt->fMtFlowQueueMutex[i]);
1114 mt->fMtThreadIsRunning[i] = false;
1115 mt->fMtThreadIsBusy[i] = false;
1116 }
1117 }
1118
1119 void CreateRun(int run_number, const char* file_name)
1120 {
1121 assert(fRunInfo == NULL);
1122 assert(fRunRun.size() == 0);
1123
1124 fRunInfo = new TARunInfo(run_number, file_name, fArgs);
1125
1126 if (fProfilerEnabled)
1127 fProfiler = new Profiler( fProfilerIntervalCheck );
1128
1129 int nModules = (*gModules).size();
1130
1131 for (int i=0; i<nModules; i++)
1132 fRunRun.push_back((*gModules)[i]->NewRunObject(fRunInfo));
1133
1134 if (fMultithreadEnabled) {
1135 TAMultithreadHelper* mt = new TAMultithreadHelper(nModules);
1136 fRunInfo->fMtInfo = mt;
1137 for (int i=0; i<nModules; i++) {
1138 printf("Create fMtFlowQueue thread %d\n",i);
1139 mt->fMtThreads[i]=new std::thread(&RunHandler::PerModuleThread,this,i);
1140 }
1141 }
1142 }
1143
1145 {
1146 assert(fRunInfo != NULL);
1147 assert(fRunInfo->fOdb != NULL);
1148 if (fProfiler)
1149 fProfiler->Begin(fRunInfo, fRunRun);
1150 for (unsigned i=0; i<fRunRun.size(); i++)
1151 fRunRun[i]->BeginRun(fRunInfo);
1152
1153 }
1154
1155 void EndRun(TAFlags* flags)
1156 {
1157 assert(fRunInfo);
1158
1159 // make sure the shutdown sequence matches the description in the README file!
1160
1161 // zeroth. Flush events queued for analysis before calling PreEndRun (insure
1162 // deterministic behaviour thats the same as in single threaded mode)
1163 if (fRunInfo->fMtInfo) {
1164 WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1165 }
1166 // first, call PreEndRun() to tell analysis modules that there will be no more
1167 // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more
1168 // flow events, they to into the flow queue or into the multithread queue
1169
1170 for (unsigned i=0; i<fRunRun.size(); i++)
1171 fRunRun[i]->PreEndRun(fRunInfo);
1172
1173 // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent()
1174 // this can generate additional flow events that will be queued in the queue.
1175
1176 AnalyzeFlowQueue(flags);
1177
1178 // if in multi threaded mode, allow all the queues to drain naturally
1179 // and shutdown the threads
1180
1181 if (fRunInfo->fMtInfo) {
1182 WaitForAllQueuesEmpty(fRunInfo->fMtInfo);
1184 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1185 (*flags) |= TAFlag_QUIT;
1186 }
1187 }
1188
1189 // all data analysis is complete
1190
1191 for (unsigned i=0; i<fRunRun.size(); i++)
1192 fRunRun[i]->EndRun(fRunInfo);
1193
1194 if (fProfiler) {
1195 fProfiler->End(fRunInfo);
1196 fProfiler->Print();
1197 }
1198 }
1199
1201 {
1202 assert(fRunInfo);
1203
1204 for (unsigned i=0; i<fRunRun.size(); i++)
1205 fRunRun[i]->NextSubrun(fRunInfo);
1206 }
1207
1209 {
1210 assert(fRunInfo);
1211
1212 for (unsigned i=0; i<fRunRun.size(); i++) {
1213 delete fRunRun[i];
1214 fRunRun[i] = NULL;
1215 }
1216
1217 fRunRun.clear();
1218 assert(fRunRun.size() == 0);
1219
1220 if (fProfiler) {
1221 delete fProfiler;
1222 fProfiler = NULL;
1223 }
1224
1225 delete fRunInfo;
1226 fRunInfo = NULL;
1227 }
1228
1230 {
1231 for (unsigned i=0; i<fRunRun.size(); i++)
1232 fRunRun[i]->AnalyzeSpecialEvent(fRunInfo, event);
1233 }
1234
1236 {
1237 for (unsigned i=0; i<fRunRun.size(); i++) {
1238 TAClock start_time = TAClockNow();
1239 flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flags, flow);
1240 if (fProfiler)
1241 fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time);
1242 if (!flow)
1243 break;
1244 if ((*flags) & TAFlag_SKIP)
1245 break;
1246 if ((*flags) & TAFlag_QUIT)
1247 break;
1248 }
1249 return flow;
1250 }
1251
1252 void AnalyzeFlowQueue(TAFlags* ana_flags)
1253 {
1254 while (1) {
1255 if (fRunInfo->fMtInfo)
1256 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1257 *ana_flags |= TAFlag_QUIT;
1258 break;
1259 }
1260
1261 TAFlowEvent* flow = fRunInfo->ReadFlowQueue();
1262 if (!flow)
1263 break;
1264
1265 int flags = 0;
1266 flow = AnalyzeFlowEvent(&flags, flow);
1267 if (flow)
1268 delete flow;
1269 if (flags & TAFlag_QUIT) {
1270 *ana_flags |= TAFlag_QUIT;
1271 break;
1272 }
1273 }
1274 }
1275
1276 void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer)
1277 {
1278 assert(fRunInfo != NULL);
1279 assert(fRunInfo->fOdb != NULL);
1280 assert(event != NULL);
1281 assert(flags != NULL);
1282
1283 if (fRunInfo->fMtInfo)
1284 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1285 *flags |= TAFlag_QUIT;
1286 return;
1287 }
1288
1289 TAFlowEvent* flow = NULL;
1290
1291 for (unsigned i=0; i<fRunRun.size(); i++) {
1292 TAClock start_time = TAClockNow();
1293 flow = fRunRun[i]->Analyze(fRunInfo, event, flags, flow);
1294 if (fProfiler)
1295 fProfiler->LogAnalyzeEvent(flags, flow, i, start_time);
1296 if (*flags & TAFlag_SKIP)
1297 break;
1298 if (*flags & TAFlag_QUIT)
1299 break;
1300 }
1301
1302 if (flow) {
1303 if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) {
1304 // skip further processing of this event
1305 } else {
1306 if (fRunInfo->fMtInfo) {
1307 MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow, true);
1308 flow = NULL; // ownership passed to the multithread event queue
1309 } else {
1310 flow = AnalyzeFlowEvent(flags, flow);
1311 }
1312 }
1313 }
1314
1315 if (fProfiler && !fRunInfo->fMtInfo) {
1316 fProfiler->LogUserWindows(flags, flow);
1317 }
1318
1319 if (*flags & TAFlag_WRITE)
1320 if (writer)
1321 TMWriteEvent(writer, event);
1322
1323 if (flow)
1324 delete flow;
1325
1326 if (*flags & TAFlag_QUIT)
1327 return;
1328
1329 if (fRunInfo->fMtInfo)
1330 if (fRunInfo->fMtInfo->fMtQuitRequested) {
1331 *flags |= TAFlag_QUIT;
1332 return;
1333 }
1334
1335 AnalyzeFlowQueue(flags);
1336 }
1337};
1338
1340{
1341 if (fFlowQueue.empty())
1342 return NULL;
1343
1344 TAFlowEvent* flow = fFlowQueue.front();
1345 fFlowQueue.pop_front();
1346 return flow;
1347}
1348
1350{
1351 if (fMtInfo) {
1352 // call MtQueueFlowEvent with wait=false to avoid deadlock
1353 MtQueueFlowEvent(fMtInfo, 0, NULL, flow, false);
1354 } else {
1355 fFlowQueue.push_back(flow);
1356 }
1357}
1358
1359#ifdef HAVE_MIDAS
1360
1361#ifdef HAVE_TMFE
1362
1363#ifdef HAVE_ROOT
1364#include "TSystem.h"
1365#endif
1366
1367#include "tmfe.h"
1368
1369static bool gRunStartRequested = false;
1370static bool gRunStopRequested = false;
1371
1372class RpcHandler: public TMFeRpcHandlerInterface
1373{
1374 TMFeResult HandleBeginRun(int run_number)
1375 {
1376 printf("RpcHandler::HandleBeginRun(%d)\n", run_number);
1377 gRunStartRequested = true;
1378 gRunStopRequested = false;
1379 return TMFeOk();
1380 }
1381
1382 TMFeResult HandleEndRun(int run_number)
1383 {
1384 printf("RpcHandler::HandleEndRun(%d)\n", run_number);
1385 gRunStartRequested = false;
1386 gRunStopRequested = true;
1387 return TMFeOk();
1388 }
1389
1390 TMFeResult HandleStartAbortRun(int run_number)
1391 {
1392 printf("RpcHandler::HandleStartAbortRun(%d)\n", run_number);
1393 // run did not really start, pretend it started and immediately ended
1394 gRunStartRequested = false;
1395 gRunStopRequested = true;
1396 return TMFeOk();
1397 }
1398
1399 TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result)
1400 {
1401 return TMFeOk();
1402 }
1403};
1404
1405TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0)
1406{
1407 assert(b != NULL);
1408 assert(e != NULL);
1409
1410 e->Reset();
1411
1412 TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec);
1413
1414 if (r.error_flag)
1415 return r;
1416
1417 if (e->data.size() == 0)
1418 return TMFeOk();
1419
1420 e->ParseEvent();
1421
1422 assert(e->data.size() == e->event_header_size + e->data_size);
1423
1424 return TMFeOk();
1425}
1426
1427static int ProcessMidasOnlineTmfe(const std::vector<std::string>& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1428{
1429 TMFE *mfe = TMFE::Instance();
1430
1431 TMFeResult r = mfe->Connect(progname, hostname, exptname);
1432
1433 if (r.error_flag) {
1434 fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str());
1435 return -1;
1436 }
1437
1438 //MVOdb* odb = mfe->fOdbRoot;
1439
1440 TMEventBuffer *b = new TMEventBuffer(mfe);
1441
1442 /* open event buffer */
1443
1444 r = b->OpenBuffer(bufname);
1445
1446 if (r.error_flag) {
1447 fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1448 return -1;
1449 }
1450
1451 /* request read cache */
1452 size_t cache_size = 100000;
1453 if(!strcmp(sampling_type_string,"GET_RECENT"))
1454 cache_size=0;
1455 r = b->SetCacheSize(cache_size, 0);
1456
1457 if (r.error_flag) {
1458 fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1459 return -1;
1460 }
1461
1462 /* request events */
1463
1464 r = b->AddRequest(event_id, trigger_mask, sampling_type_string);
1465
1466 if (r.error_flag) {
1467 fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1468 return -1;
1469 }
1470
1471 RpcHandler* h = new RpcHandler();
1472
1473 mfe->AddRpcHandler(h);
1474
1475 mfe->DeregisterTransitionPause();
1476 mfe->DeregisterTransitionResume();
1477 mfe->SetTransitionSequenceStart(300);
1478 mfe->SetTransitionSequenceStop(700);
1479
1480 mfe->StartRpcThread();
1481
1482 /* reqister event requests */
1483
1484 RunHandler rh(args, multithread, profiler, queue_interval_check);
1485
1486 for (unsigned i=0; i<(*gModules).size(); i++)
1487 (*gModules)[i]->Init(args);
1488
1489 if (mfe->fStateRunning) {
1490 rh.CreateRun(mfe->fRunNumber, NULL);
1491 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1492 rh.BeginRun();
1493 }
1494
1495 TMEvent e;
1496
1497 while (!mfe->fShutdownRequested) {
1498 bool do_sleep = true;
1499
1500 if (gRunStartRequested) {
1501 gRunStartRequested = false;
1502
1503 if (rh.fRunInfo) {
1504 TAFlags flags = 0;
1505 rh.EndRun(&flags);
1506 rh.fRunInfo->fOdb = NULL;
1507 rh.DeleteRun();
1508 }
1509
1510 rh.CreateRun(mfe->fRunNumber, NULL);
1511 rh.fRunInfo->fOdb = mfe->fOdbRoot;
1512 rh.BeginRun();
1513
1514 continue;
1515 }
1516
1517 if (gRunStopRequested) {
1518 gRunStopRequested = false;
1519
1520 if (rh.fRunInfo) {
1521 TAFlags flags = 0;
1522 rh.EndRun(&flags);
1523 rh.fRunInfo->fOdb = NULL;
1524 rh.DeleteRun();
1525 }
1526
1527 continue;
1528 }
1529
1530 //r = buf.ReceiveEvent(&e, BM_NO_WAIT);
1531 //r = buf.ReceiveEvent(&e, BM_WAIT);
1532 //r = buf.ReceiveEvent(&e, 8000);
1533 //r = buf.ReceiveEvent(&e, 5000);
1534 r = ReceiveEvent(b, &e, 100);
1535
1536 if (r.error_flag) {
1537 fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1538 break;
1539 }
1540
1541 //e.PrintHeader();
1542 //::sleep(1);
1543
1544 if ((e.data_size > 0) && (rh.fRunInfo != NULL)) {
1545
1546 //e.PrintHeader();
1547 //e.PrintBanks(2);
1548
1549 TAFlags flags = 0;
1550
1551 rh.AnalyzeEvent(&e, &flags, writer);
1552
1553 if (flags & TAFlag_QUIT) {
1554 mfe->fShutdownRequested = true;
1555 }
1556
1557 if (num_analyze > 0) {
1558 num_analyze--;
1559 if (num_analyze == 0) {
1560 mfe->fShutdownRequested = true;
1561 }
1562 }
1563
1564 do_sleep = false;
1565 }
1566
1567#ifdef HAVE_THTTP_SERVER
1569 int nreq = TARootHelper::fgHttpServer->ProcessRequests();
1570 if (nreq > 0) {
1571 do_sleep = false;
1572 //printf("ProcessRequests() returned %d\n", nreq);
1573 }
1574 }
1575#endif
1576#ifdef HAVE_ROOT
1577 if (TARootHelper::fgApp) {
1578 gSystem->DispatchOneEvent(kTRUE);
1579 }
1580#endif
1581
1582 //printf("do_sleep %d\n", do_sleep);
1583
1584 if (do_sleep) {
1585 mfe->Yield(0.010);
1586 } else {
1587 mfe->Yield(0);
1588 }
1589 }
1590
1591 if (rh.fRunInfo) {
1592 TAFlags flags = 0;
1593 rh.EndRun(&flags);
1594 rh.fRunInfo->fOdb = NULL;
1595 rh.DeleteRun();
1596 }
1597
1598 for (unsigned i=0; i<(*gModules).size(); i++)
1599 (*gModules)[i]->Finish();
1600
1601 /* close event buffer */
1602 r = b->CloseBuffer();
1603
1604 delete b;
1605 b = NULL;
1606
1607 if (r.error_flag) {
1608 fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str());
1609 //return -1;
1610 }
1611
1612 /* disconnect from experiment */
1613 mfe->Disconnect();
1614
1615 return 0;
1616}
1617
1618#else
1619
1620#include "TMidasOnline.h"
1621
1622#ifdef HAVE_ROOT
1623#include "TSystem.h"
1624#endif
1625
1627{
1628public:
1630 int fNumAnalyze = 0;
1631 TMWriterInterface* fWriter = NULL;
1632 bool fQuit = false;
1633 MVOdb* fOdb = NULL;
1634
1635 OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector<std::string>& args, bool multithread, bool profiler, int queue_interval_check) // ctor
1636 : fRun(args, multithread, profiler, queue_interval_check)
1637 {
1638 fNumAnalyze = num_analyze;
1639 fWriter = writer;
1640 fOdb = odb;
1641 }
1642
1644 {
1645 fWriter = NULL;
1646 fOdb = NULL;
1647 }
1648
1649 void StartRun(int run_number)
1650 {
1651 fRun.CreateRun(run_number, NULL);
1652 fRun.fRunInfo->fOdb = fOdb;
1653 fRun.BeginRun();
1654 }
1655
1656 void Transition(int transition, int run_number, int transition_time)
1657 {
1658 //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time);
1659
1660 if (transition == TR_START) {
1661 if (fRun.fRunInfo) {
1662 TAFlags flags = 0;
1663 fRun.EndRun(&flags);
1664 if (flags & TAFlag_QUIT)
1665 fQuit = true;
1666 fRun.fRunInfo->fOdb = NULL;
1667 fRun.DeleteRun();
1668 }
1669 assert(fRun.fRunInfo == NULL);
1670
1671 StartRun(run_number);
1672 printf("Begin run: %d\n", run_number);
1673 } else if (transition == TR_STOP) {
1674 TAFlags flags = 0;
1675 fRun.EndRun(&flags);
1676 if (flags & TAFlag_QUIT)
1677 fQuit = true;
1678 fRun.fRunInfo->fOdb = NULL;
1679 fRun.DeleteRun();
1680 printf("End of run %d\n", run_number);
1681 }
1682 }
1683
1684 void Event(const void* data, int data_size)
1685 {
1686 //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size);
1687
1688 if (!fRun.fRunInfo) {
1689 StartRun(0); // start fake run for events outside of a run
1690 }
1691
1692 TMEvent* event = new TMEvent(data, data_size);
1693
1694 TAFlags flags = 0;
1695
1696 fRun.AnalyzeEvent(event, &flags, fWriter);
1697
1698 if (flags & TAFlag_QUIT)
1699 fQuit = true;
1700
1701 if (fNumAnalyze > 0) {
1702 fNumAnalyze--;
1703 if (fNumAnalyze == 0)
1704 fQuit = true;
1705 }
1706
1707 if (event) {
1708 delete event;
1709 event = NULL;
1710 }
1711 }
1712};
1713
1714static int ProcessMidasOnlineOld(const std::vector<std::string>& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1715{
1717
1718 int err = midas->connect(hostname, exptname, "rootana");
1719 if (err != 0) {
1720 fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err);
1721 return -1;
1722 }
1723
1724 MVOdb* odb = MakeMidasOdb(midas->fDB);
1725
1726 OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check);
1727
1728 midas->RegisterHandler(h);
1729 midas->registerTransitions();
1730
1731 /* reqister event requests */
1732
1733 midas->eventRequest("SYSTEM",-1,-1,(1<<1));
1734
1735 int run_number = 0; // midas->odbReadInt("/runinfo/Run number");
1736 int run_state = 0; // midas->odbReadInt("/runinfo/State");
1737
1738 odb->RI("runinfo/run number", &run_number);
1739 odb->RI("runinfo/state", &run_state);
1740
1741 for (unsigned i=0; i<(*gModules).size(); i++)
1742 (*gModules)[i]->Init(args);
1743
1744 if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) {
1745 h->StartRun(run_number);
1746 }
1747
1748 while (!h->fQuit) {
1749#ifdef HAVE_THTTP_SERVER
1751 TARootHelper::fgHttpServer->ProcessRequests();
1752 }
1753#endif
1754#ifdef HAVE_ROOT
1755 if (TARootHelper::fgApp) {
1756 gSystem->DispatchOneEvent(kTRUE);
1757 }
1758#endif
1759 if (!TMidasOnline::instance()->poll(10))
1760 break;
1761 }
1762
1763 if (h->fRun.fRunInfo) {
1764 TAFlags flags = 0;
1765 h->fRun.EndRun(&flags);
1766 h->fRun.fRunInfo->fOdb = NULL;
1767 h->fRun.DeleteRun();
1768 }
1769
1770 for (unsigned i=0; i<(*gModules).size(); i++)
1771 (*gModules)[i]->Finish();
1772
1773 delete h; h = NULL;
1774 delete odb; odb = NULL;
1775
1776 /* disconnect from experiment */
1777 midas->disconnect();
1778
1779 return 0;
1780}
1781
1782#endif
1783#endif
1784
1785std::vector<std::string> TARunInfo::fgFileList;
1787
1788static int ProcessMidasFiles(const std::vector<std::string>& files, const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1789{
1790 int number_of_missing_files = 0;
1791
1792 TARunInfo::fgFileList.clear();
1793
1794 for (unsigned i=0; i<files.size(); i++)
1795 TARunInfo::fgFileList.push_back(files[i]);
1796
1797 for (unsigned i=0; i<(*gModules).size(); i++)
1798 (*gModules)[i]->Init(args);
1799
1800 RunHandler run(args, multithread, profiler, queue_interval_check);
1801
1802 bool done = false;
1803
1808
1809 TMReaderInterface *reader = TMNewReader(filename.c_str());
1810
1811 if (reader->fError) {
1812 printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str());
1813 delete reader;
1814 number_of_missing_files++;
1815 continue;
1816 }
1817
1818 while (1) {
1819 TMEvent* event = TMReadEvent(reader);
1820
1821 if (!event) // EOF
1822 break;
1823
1824 if (event->error) {
1825 delete event;
1826 break;
1827 }
1828
1829 if (event->event_id == 0x8000) // begin of run event
1830 {
1831 int runno = event->serial_number;
1832
1833 if (run.fRunInfo) {
1834 if (run.fRunInfo->fRunNo == runno) {
1835 // next subrun file, nothing to do
1836 run.fRunInfo->fFileName = filename;
1837 run.NextSubrun();
1838 } else {
1839 // file with a different run number
1840 TAFlags flags = 0;
1841 run.EndRun(&flags);
1842 if (flags & TAFlag_QUIT) {
1843 done = true;
1844 }
1845 run.DeleteRun();
1846 }
1847 }
1848
1849 if (!run.fRunInfo) {
1850 run.CreateRun(runno, filename.c_str());
1851 run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1852 run.BeginRun();
1853 }
1854
1855 assert(run.fRunInfo);
1856
1857 run.AnalyzeSpecialEvent(event);
1858
1859 if (writer)
1860 TMWriteEvent(writer, event);
1861 }
1862 else if (event->event_id == 0x8001) // end of run event
1863 {
1864 //int runno = event->serial_number;
1865 run.AnalyzeSpecialEvent(event);
1866 if (writer)
1867 TMWriteEvent(writer, event);
1868
1869 if (run.fRunInfo->fOdb) {
1870 delete run.fRunInfo->fOdb;
1871 run.fRunInfo->fOdb = NULL;
1872 }
1873
1874 run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size);
1875 }
1876 else if (event->event_id == 0x8002) // message event
1877 {
1878 run.AnalyzeSpecialEvent(event);
1879 if (writer)
1880 TMWriteEvent(writer, event);
1881 }
1882 else
1883 {
1884 if (!run.fRunInfo) {
1885 // create a fake begin of run
1886 run.CreateRun(0, filename.c_str());
1887 run.fRunInfo->fOdb = MakeNullOdb();
1888 run.BeginRun();
1889 }
1890
1891 if (num_skip > 0) {
1892 num_skip--;
1893 } else {
1894 TAFlags flags = 0;
1895
1896 run.AnalyzeEvent(event, &flags, writer);
1897
1898 if (flags & TAFlag_QUIT)
1899 done = true;
1900
1901 if (num_analyze > 0) {
1902 num_analyze--;
1903 if (num_analyze == 0)
1904 done = true;
1905 }
1906 }
1907 }
1908
1909 delete event;
1910
1911 if (done)
1912 break;
1913
1914#ifdef HAVE_ROOT
1915 if (TARootHelper::fgApp) {
1916 gSystem->DispatchOneEvent(kTRUE);
1917 }
1918#endif
1919 }
1920
1921 reader->Close();
1922 delete reader;
1923
1924 if (done)
1925 break;
1926 }
1927
1928#ifdef HAVE_THTTP_SERVER
1929 if (0 && TARootHelper::fgHttpServer) {
1930 while (1) {
1931 gSystem->DispatchOneEvent(kTRUE);
1932 //sleep(1);
1933 }
1934 }
1935#endif
1936
1937 if (run.fRunInfo) {
1938 TAFlags flags = 0;
1939 run.EndRun(&flags);
1940 if (flags & TAFlag_QUIT)
1941 done = true;
1942 run.DeleteRun();
1943 }
1944
1945 for (unsigned i=0; i<(*gModules).size(); i++)
1946 (*gModules)[i]->Finish();
1947 if (number_of_missing_files)
1948 {
1949 printf("%d midas files were not openable\n",number_of_missing_files);
1950 return number_of_missing_files;
1951 }
1952 return 0;
1953}
1954
1955static int ProcessDemoMode(const std::vector<std::string>& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check)
1956{
1957 for (unsigned i=0; i<(*gModules).size(); i++)
1958 (*gModules)[i]->Init(args);
1959
1960 RunHandler run(args, multithread, profiler, queue_interval_check);
1961
1962 bool done = false;
1963
1964 int runno = 1;
1965
1966 for (unsigned i=0; true; i++) {
1967 char s[256];
1968 snprintf(s, sizeof(s), "%03d", i);
1969 std::string filename = std::string("demo_subrun_") + s;
1970
1971 if (!run.fRunInfo) {
1972 run.CreateRun(runno, filename.c_str());
1973 run.fRunInfo->fOdb = MakeNullOdb();
1974 run.BeginRun();
1975 }
1976
1977 // we do not generate a fake begin of run event...
1978 //run.AnalyzeSpecialEvent(event);
1979
1980 // only switch subruns after the first subrun file
1981 if (i>0) {
1982 run.fRunInfo->fFileName = filename;
1983 run.NextSubrun();
1984 }
1985
1986 TMEvent* event = new TMEvent();
1987
1988 for (unsigned j=0; j<100; j++) {
1989 event->Init(0x0001, 0xFFFF, j+1, 0, 0);
1990 uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 };
1991 event->AddBank("TEST", TID_DWORD, (const char*)test_data, sizeof(test_data));
1992
1993 if (num_skip > 0) {
1994 num_skip--;
1995 } else {
1996 TAFlags flags = 0;
1997
1998 run.AnalyzeEvent(event, &flags, writer);
1999
2000 if (flags & TAFlag_QUIT)
2001 done = true;
2002
2003 if (num_analyze > 0) {
2004 num_analyze--;
2005 if (num_analyze == 0)
2006 done = true;
2007 }
2008 }
2009
2010 if (done)
2011 break;
2012
2013#ifdef HAVE_ROOT
2014 if (TARootHelper::fgApp) {
2015 gSystem->DispatchOneEvent(kTRUE);
2016 }
2017#endif
2018 }
2019
2020 delete event;
2021
2022 // we do not generate a fake end of run event...
2023 //run.AnalyzeSpecialEvent(event);
2024
2025 if (done)
2026 break;
2027 }
2028
2029 if (run.fRunInfo) {
2030 TAFlags flags = 0;
2031 run.EndRun(&flags);
2032 run.DeleteRun();
2033 }
2034
2035 for (unsigned i=0; i<(*gModules).size(); i++)
2036 (*gModules)[i]->Finish();
2037
2038 return 0;
2039}
2040
2041static bool gEnableShowMem = false;
2042
2043#if 0
2044static int ShowMem(const char* label)
2045{
2046 if (!gEnableShowMem)
2047 return 0;
2048
2049 FILE* fp = fopen("/proc/self/statm","r");
2050 if (!fp)
2051 return 0;
2052
2053 int mem = 0;
2054 fscanf(fp,"%d",&mem);
2055 fclose(fp);
2056
2057 if (label)
2058 printf("memory at %s is %d\n", label, mem);
2059
2060 return mem;
2061}
2062#endif
2063
2065{
2066public:
2068 : TARunObject(runinfo)
2069 {
2070 if (gTrace)
2071 printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo);
2072 }
2073
2075 {
2076 if (gTrace)
2077 printf("EventDumpModule::dtor!\n");
2078 }
2079
2080 void BeginRun(TARunInfo* runinfo)
2081 {
2082 printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo);
2083 }
2084
2085 void EndRun(TARunInfo* runinfo)
2086 {
2087 printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo);
2088 }
2089
2090 void NextSubrun(TARunInfo* runinfo)
2091 {
2092 printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2093 }
2094
2095 void PauseRun(TARunInfo* runinfo)
2096 {
2097 printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo);
2098 }
2099
2100 void ResumeRun(TARunInfo* runinfo)
2101 {
2102 printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo);
2103 }
2104
2105 TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2106 {
2107 printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo);
2108 event->FindAllBanks();
2109 std::string h = event->HeaderToString();
2110 std::string b = event->BankListToString();
2111 printf("%s: %s\n", h.c_str(), b.c_str());
2112 return flow;
2113 }
2114
2116 {
2117 printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2118 }
2119};
2120
2122{
2123public:
2124
2125 void Init(const std::vector<std::string> &args)
2126 {
2127 if (gTrace)
2128 printf("EventDumpModuleFactory::Init!\n");
2129 }
2130
2131 void Finish()
2132 {
2133 if (gTrace)
2134 printf("EventDumpModuleFactory::Finish!\n");
2135 }
2136
2138 {
2139 if (gTrace)
2140 printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2141 return new EventDumpModule(runinfo);
2142 }
2143};
2144
2145#ifdef HAVE_ROOT
2146#include <TGMenu.h>
2147#include <TGButton.h>
2148#include <TBrowser.h>
2149
2150#define CTRL_QUIT 1
2151#define CTRL_NEXT 2
2152#define CTRL_CONTINUE 3
2153#define CTRL_PAUSE 4
2154#define CTRL_NEXT_FLOW 5
2155
2156#define CTRL_TBROWSER 11
2157
2159{
2160public:
2162
2163 ValueHolder() // ctor
2164 {
2165 fValue = 0;
2166 }
2167};
2168
2170{
2171public:
2174
2175 TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor
2176 : TGTextButton(p, text)
2177 {
2178 fHolder = holder;
2179 fValue = value;
2180 }
2181
2182#if 0
2183 void Pressed()
2184 {
2185 printf("Pressed!\n");
2186 }
2187
2188 void Released()
2189 {
2190 printf("Released!\n");
2191 }
2192#endif
2193
2194 void Clicked()
2195 {
2196 //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue);
2197 if (fHolder)
2198 fHolder->fValue = fValue;
2199 //gSystem->ExitLoop();
2200 }
2201};
2202
2203class MainWindow: public TGMainFrame
2204{
2205public:
2206 TGPopupMenu* fMenu;
2207 TGMenuBar* fMenuBar;
2208 TGLayoutHints* fMenuBarItemLayout;
2209
2210 TGCompositeFrame* fButtonsFrame;
2211
2213
2218
2220
2221public:
2222 MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor
2223 : TGMainFrame(w, s1, s2)
2224 {
2225 if (gTrace)
2226 printf("MainWindow::ctor!\n");
2227
2228 fHolder = holder;
2229 //SetCleanup(kDeepCleanup);
2230
2231 SetWindowName("ROOT Analyzer Control");
2232
2233 // layout the gui
2234 fMenu = new TGPopupMenu(gClient->GetRoot());
2235 fMenu->AddEntry("New TBrowser", CTRL_TBROWSER);
2236 fMenu->AddEntry("-", 0);
2237 fMenu->AddEntry("Next", CTRL_NEXT);
2238 fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW);
2239 fMenu->AddEntry("Continue", CTRL_CONTINUE);
2240 fMenu->AddEntry("Pause", CTRL_PAUSE);
2241 fMenu->AddEntry("-", 0);
2242 fMenu->AddEntry("Quit", CTRL_QUIT);
2243
2244 fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0);
2245
2246 fMenu->Associate(this);
2247
2248 fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame);
2249 fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout);
2250 fMenuBar->Layout();
2251
2252 AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX));
2253
2254 fButtonsFrame = new TGVerticalFrame(this);
2255
2256 fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT);
2257 fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW);
2258
2259 fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2260 fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2261
2262 TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame);
2263
2264 fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE);
2265 fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE);
2266
2267 hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2268 hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2269
2270 fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX));
2271
2272 fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT);
2273 fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1));
2274
2275 AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX));
2276
2277 MapSubwindows();
2278 Layout();
2279 Resize(GetDefaultSize());
2280 MapWindow();
2281 }
2282
2283 ~MainWindow() // dtor // Closing the control window closes the whole program
2284 {
2285 if (gTrace)
2286 printf("MainWindow::dtor!\n");
2287
2288 delete fMenu;
2289 delete fMenuBar;
2290 delete fMenuBarItemLayout;
2291 }
2292
2294 {
2295 if (gTrace)
2296 printf("MainWindow::CloseWindow()\n");
2297
2298 if (fHolder)
2299 fHolder->fValue = CTRL_QUIT;
2300 //gSystem->ExitLoop();
2301 }
2302
2303 Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
2304 {
2305 //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2);
2306 switch (GET_MSG(msg))
2307 {
2308 default:
2309 break;
2310 case kC_COMMAND:
2311 switch (GET_SUBMSG(msg))
2312 {
2313 default:
2314 break;
2315 case kCM_MENU:
2316 //printf("parm1 %d\n", (int)parm1);
2317 switch (parm1)
2318 {
2319 case CTRL_TBROWSER:
2320 new TBrowser();
2321 break;
2322 default:
2323 //printf("Control %d!\n", (int)parm1);
2324 if (fHolder)
2325 fHolder->fValue = parm1;
2326 //gSystem->ExitLoop();
2327 break;
2328 }
2329 break;
2330 }
2331 break;
2332 }
2333
2334 return kTRUE;
2335 }
2336};
2337#endif
2338
2340{
2341public:
2345#ifdef HAVE_ROOT
2348#endif
2349
2351 : TARunObject(runinfo)
2352 {
2353 if (gTrace)
2354 printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo);
2355 fContinue = false;
2356 fNextFlow = false;
2357 fSkip = 0;
2358#ifdef HAVE_ROOT
2359 if (!fgHolder)
2360 fgHolder = new ValueHolder;
2361 if (!fgCtrlWindow && runinfo->fRoot->fgApp) {
2362 fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder);
2363 }
2364#endif
2365 }
2366
2368 {
2369 if (gTrace)
2370 printf("InteractiveModule::dtor!\n");
2371 }
2372
2373 void BeginRun(TARunInfo* runinfo)
2374 {
2375 printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo);
2376 }
2377
2378 void EndRun(TARunInfo* runinfo)
2379 {
2380 printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo);
2381
2382#ifdef HAVE_ROOT
2383 if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2384 fgCtrlWindow->fNextButton->SetEnabled(false);
2385 fgCtrlWindow->fNextFlowButton->SetEnabled(false);
2386 fgCtrlWindow->fContinueButton->SetEnabled(false);
2387 fgCtrlWindow->fPauseButton->SetEnabled(false);
2388 while (1) {
2389#ifdef HAVE_THTTP_SERVER
2391 TARootHelper::fgHttpServer->ProcessRequests();
2392 }
2393#endif
2394#ifdef HAVE_ROOT
2395 if (TARootHelper::fgApp) {
2396 gSystem->DispatchOneEvent(kTRUE);
2397 }
2398#endif
2399#ifdef HAVE_MIDAS
2400#ifdef HAVE_TMFE
2401 TMFE* mfe = TMFE::Instance();
2402 mfe->Yield(0.010);
2403 if (mfe->fShutdownRequested) {
2404 return;
2405 }
2406#else
2407 if (!TMidasOnline::instance()->sleep(10)) {
2408 // FIXME: indicate that we should exit the analyzer
2409 return;
2410 }
2411#endif
2412#else
2413 gSystem->Sleep(10);
2414#endif
2415
2416 int ctrl = fgHolder->fValue;
2417 fgHolder->fValue = 0;
2418
2419 switch (ctrl) {
2420 case CTRL_QUIT:
2421 return;
2422 case CTRL_NEXT:
2423 return;
2424 case CTRL_CONTINUE:
2425 return;
2426 }
2427 }
2428 }
2429#endif
2430 }
2431
2432 void PauseRun(TARunInfo* runinfo)
2433 {
2434 printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo);
2435 }
2436
2437 void ResumeRun(TARunInfo* runinfo)
2438 {
2439 printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo);
2440 }
2441
2442 void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags)
2443 {
2444#ifdef HAVE_ROOT
2445 if (fgCtrlWindow && runinfo->fRoot->fgApp) {
2446 while (1) {
2447#ifdef HAVE_THTTP_SERVER
2449 TARootHelper::fgHttpServer->ProcessRequests();
2450 }
2451#endif
2452#ifdef HAVE_ROOT
2453 if (TARootHelper::fgApp) {
2454 gSystem->DispatchOneEvent(kTRUE);
2455 }
2456#endif
2457#ifdef HAVE_MIDAS
2458#ifdef HAVE_TMFE
2459 TMFE* mfe = TMFE::Instance();
2460 mfe->Yield(0.010);
2461 if (mfe->fShutdownRequested) {
2462 *flags |= TAFlag_QUIT;
2463 return;
2464 }
2465#else
2466 if (!TMidasOnline::instance()->sleep(10)) {
2467 *flags |= TAFlag_QUIT;
2468 return;
2469 }
2470#endif
2471#else
2472 gSystem->Sleep(10);
2473#endif
2474
2475 int ctrl = fgHolder->fValue;
2476 fgHolder->fValue = 0;
2477
2478 switch (ctrl) {
2479 case CTRL_QUIT:
2480 *flags |= TAFlag_QUIT;
2481 return;
2482 case CTRL_NEXT:
2483 return;
2484 case CTRL_NEXT_FLOW:
2485 fNextFlow = true;
2486 return;
2487 case CTRL_CONTINUE:
2488 fContinue = true;
2489 return;
2490 }
2491 }
2492 }
2493#endif
2494
2495 while (1) {
2496 char str[256];
2497 fprintf(stdout, "manalyzer> "); fflush(stdout);
2498 char* s = fgets(str, sizeof(str)-1, stdin);
2499
2500 if (s == NULL) {
2501 // EOF
2502 *flags |= TAFlag_QUIT;
2503 return;
2504 }
2505
2506 printf("command [%s]\n", str);
2507
2508 if (str[0] == 'h') { // "help"
2509 printf("Interactive manalyzer commands:\n");
2510 printf(" q - quit\n");
2511 printf(" h - help\n");
2512 printf(" c - continue until next TAFlag_DISPLAY event\n");
2513 printf(" n - next event\n");
2514 printf(" aNNN - analyze N events, i.e. \"a10\"\n");
2515 } else if (str[0] == 'q') { // "quit"
2516 *flags |= TAFlag_QUIT;
2517 return;
2518 } else if (str[0] == 'n') { // "next"
2519 return;
2520 } else if (str[0] == 'c') { // "continue"
2521 fContinue = true;
2522 return;
2523 } else if (str[0] == 'a') { // "analyze" N events
2524 int num = atoi(str+1);
2525 printf("Analyzing %d events\n", num);
2526 if (num > 0) {
2527 fSkip = num-1;
2528 }
2529 return;
2530 }
2531 }
2532 }
2533
2534 TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow)
2535 {
2536 printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str());
2537
2538#ifdef HAVE_ROOT
2539 if (fgHolder->fValue == CTRL_QUIT) {
2540 *flags |= TAFlag_QUIT;
2541 return flow;
2542 } else if (fgHolder->fValue == CTRL_PAUSE) {
2543 fContinue = false;
2544 }
2545#endif
2546
2547 if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2548 return flow;
2549 } else {
2550 fContinue = false;
2551 }
2552
2553 if (fSkip > 0) {
2554 fSkip--;
2555 return flow;
2556 }
2557
2558 InteractiveLoop(runinfo, flags);
2559
2560 return flow;
2561 }
2562
2564 {
2565 printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo);
2566
2567#ifdef HAVE_ROOT
2568 if (fgHolder->fValue == CTRL_QUIT) {
2569 *flags |= TAFlag_QUIT;
2570 return flow;
2571 } else if (fgHolder->fValue == CTRL_PAUSE) {
2572 fContinue = false;
2573 }
2574#endif
2575
2576 if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) {
2577 return flow;
2578 }
2579
2580 fNextFlow = false;
2581
2582 InteractiveLoop(runinfo, flags);
2583
2584 return flow;
2585 }
2586
2588 {
2589 if (gTrace)
2590 printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size);
2591 }
2592};
2593
2594#ifdef HAVE_ROOT
2597#endif
2598
2600{
2601public:
2602
2603 void Init(const std::vector<std::string> &args)
2604 {
2605 if (gTrace)
2606 printf("InteractiveModuleFactory::Init!\n");
2607 }
2608
2609 void Finish()
2610 {
2611 if (gTrace)
2612 printf("InteractiveModuleFactory::Finish!\n");
2613 }
2614
2616 {
2617 if (gTrace)
2618 printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str());
2619 return new InteractiveModule(runinfo);
2620 }
2621};
2622
2623//////////////////////////////////////////////////////////
2624//
2625// main program
2626//
2627//////////////////////////////////////////////////////////
2628
2629static void help()
2630{
2631 printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n");
2632 printf("\n");
2633 printf("-h: print this help message\n");
2634 printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n");
2635 printf("\n");
2636 printf("-Hhostname: connect to MIDAS experiment on given host\n");
2637 printf("-Eexptname: connect to this MIDAS experiment\n");
2638 printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n");
2639 printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n");
2640 printf("--midas-exptname EXPTNAME -- connect to given experiment\n");
2641 printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n");
2642 printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n");
2643 printf("--midas-event-id III -- receive only events with matching event ID\n");
2644 printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n");
2645 printf("\n");
2646 printf("-oOutputfile.mid: write selected events into this file\n");
2647 printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n");
2648 printf("-eNNN: Number of events to analyze, 0=unlimited\n");
2649 printf("-sNNN: Number of events to skip before starting analysis\n");
2650 printf("\n");
2651 printf("--dump: activate the event dump module\n");
2652 printf("\n");
2653 printf("-t: Enable tracing of constructors, destructors and function calls\n");
2654 printf("-m: Enable memory leak debugging\n");
2655 printf("-g: Enable graphics display when processing data files\n");
2656 printf("-i: Enable intractive mode\n");
2657 printf("\n");
2658 printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n");
2659 printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength);
2660 printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty);
2661 printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull);
2662 printf("\n");
2663 printf("--no-profiler: Turn off manalyzer module profiler\n");
2664 printf("--pqiNNN: Profile multithread queue lengths every NNN events \n");
2665#ifdef HAVE_ROOT
2666 printf("\n");
2667 printf("-Doutputdirectory: Specify output root file directory\n");
2668 printf("-Ooutputfile.root: Specify output root file filename\n");
2669#endif
2670 printf("\n");
2671 printf("--: All following arguments are passed to the analyzer modules Init() method\n");
2672 printf("\n");
2673 printf("Analyzer modules usage:\n");
2674 if (gModules)
2675 for (unsigned i=0; i<(*gModules).size(); i++)
2676 (*gModules)[i]->Usage();
2677 printf("\n");
2678 printf("Example1: analyze online data: ./manalyzer.exe -R9091\n");
2679 printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n");
2680 exit(1);
2681}
2682
2683// duplicate c++20 std::string s.starts_with()
2684
2685static bool starts_with(const std::string& s, const char* prefix)
2686{
2687 return (s.substr(0, strlen(prefix)) == prefix);
2688}
2689
2690// Main function call
2691
2692int manalyzer_main(int argc, char* argv[])
2693{
2694 setbuf(stdout, NULL);
2695 setbuf(stderr, NULL);
2696
2697 signal(SIGILL, SIG_DFL);
2698 signal(SIGBUS, SIG_DFL);
2699 signal(SIGSEGV, SIG_DFL);
2700 signal(SIGPIPE, SIG_DFL);
2701
2702 std::vector<std::string> args;
2703 for (int i=0; i<argc; i++) {
2704 if (strcmp(argv[i],"-h")==0)
2705 help(); // does not return
2706 args.push_back(argv[i]);
2707 }
2708
2709 int httpPort = 0;
2710 int num_skip = 0;
2711 int num_analyze = 0;
2712
2713 TMWriterInterface *writer = NULL;
2714
2715 bool event_dump = false;
2716 bool demo_mode = false;
2717#ifdef HAVE_ROOT
2718 bool root_graphics = false;
2719#endif
2720 bool interactive = false;
2721
2722 bool multithread = false;
2723
2724 bool performance_profiler = true;
2725 int snap_shot_queue_length = 100;
2726
2727 std::vector<std::string> files;
2728 std::vector<std::string> modargs;
2729
2730#ifdef HAVE_MIDAS
2731 std::string midas_hostname = "";
2732 std::string midas_exptname = "";
2733 std::string midas_progname = "ana";
2734 std::string midas_buffer = "SYSTEM";
2735 //std::string midas_sampling = "GET_ALL";
2736 std::string midas_sampling = "GET_NONBLOCKING";
2737 int midas_event_id = -1;
2738 int midas_trigger_mask = -1;
2739#endif
2740
2741 for (unsigned int i=1; i<args.size(); i++) { // loop over the commandline options
2742 std::string arg = args[i];
2743 //printf("argv[%d] is %s\n",i,arg);
2744
2745 if (arg == "--") {
2746 for (unsigned j=i+1; j<args.size(); j++)
2747 modargs.push_back(args[j]);
2748 break;
2749 } else if (arg == "--dump") {
2750 event_dump = true;
2751 } else if (arg == "--demo") {
2752 demo_mode = true;
2753 num_analyze = 100;
2754#ifdef HAVE_ROOT
2755 } else if (arg == "-g") {
2756 root_graphics = true;
2757#endif
2758 } else if (arg == "-i") {
2759 interactive = true;
2760 } else if (arg == "-t") {
2761 gTrace = true;
2764 } else if (starts_with(arg, "-o")) {
2765 writer = TMNewWriter(arg.c_str()+2);
2766 } else if (starts_with(arg, "-s")) {
2767 num_skip = atoi(arg.c_str()+2);
2768 } else if (starts_with(arg, "-e")) {
2769 num_analyze = atoi(arg.c_str()+2);
2770 } else if (starts_with(arg, "-m")) { // Enable memory debugging
2771 gEnableShowMem = true;
2772 } else if (starts_with(arg, "-R")) { // Set the ROOT THttpServer HTTP port
2773 httpPort = atoi(arg.c_str()+2);
2774#ifdef HAVE_MIDAS
2775 } else if (starts_with(arg, "-H")) {
2776 midas_hostname = arg.c_str()+2;
2777 } else if (starts_with(arg, "-E")) {
2778 midas_exptname = arg.c_str()+2;
2779 } else if (arg == "--midas-progname") {
2780 midas_progname = args[i+1]; i++;
2781 } else if (arg == "--midas-hostname") {
2782 midas_hostname = args[i+1]; i++;
2783 } else if (arg == "--midas-exptname") {
2784 midas_exptname = args[i+1]; i++;
2785 } else if (arg == "--midas-buffer") {
2786 midas_buffer = args[i+1]; i++;
2787 } else if (arg == "--midas-sampling") {
2788 midas_sampling = args[i+1]; i++;
2789 } else if (arg == "--midas-event-id") {
2790 midas_event_id = atoi(args[i+1].c_str()); i++;
2791 } else if (arg == "--midas-trigger-mask") {
2792 midas_trigger_mask = strtoul(args[i+1].c_str(), NULL, 0); i++;
2793#endif
2794 } else if (starts_with(arg, "--mtql")) {
2795 gDefaultMultithreadQueueLength = atoi(arg.c_str()+6);
2796 } else if (starts_with(arg, "--mtse")) {
2797 gDefaultMultithreadWaitEmpty = atoi(arg.c_str()+6);
2798 } else if (starts_with(arg, "--mtsf")) {
2799 gDefaultMultithreadWaitFull = atoi(arg.c_str()+6);
2800 } else if (arg == "--mt") {
2801 multithread=true;
2802 } else if (arg == "--no-profiler") {
2803 performance_profiler = 0;
2804 } else if (starts_with(arg, "--pqi")) {
2805 snap_shot_queue_length = atoi(arg.c_str()+5);
2806#ifdef HAVE_ROOT
2807 } else if (starts_with(arg, "-O")) {
2808 TARootHelper::fOutputFileName = arg.c_str()+2;
2809 } else if (starts_with(arg, "-D")) {
2810 TARootHelper::fOutputDirectory = arg.c_str()+2;
2811#endif
2812 } else if (arg == "-h") {
2813 help(); // does not return
2814 } else if (arg[0] == '-') {
2815 help(); // does not return
2816 } else {
2817 files.push_back(args[i]);
2818 }
2819 }
2820
2821 if (!gModules)
2822 gModules = new std::vector<TAFactory*>;
2823
2824 if ((*gModules).size() == 0)
2825 event_dump = true;
2826
2827 if (event_dump)
2828 (*gModules).push_back(new EventDumpModuleFactory);
2829
2830 if (interactive)
2831 (*gModules).push_back(new InteractiveModuleFactory);
2832
2833 printf("Registered modules: %d\n", (int)(*gModules).size());
2834
2835#ifdef HAVE_ROOT
2836 if (root_graphics) {
2837 TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0);
2838 }
2839
2840 TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms");
2842#endif
2843
2844 if (httpPort) {
2845#ifdef HAVE_THTTP_SERVER
2846 char str[256];
2847 sprintf(str, "http:127.0.0.1:%d?cors", httpPort);
2848 THttpServer *s = new THttpServer(str);
2849 //s->SetTimer(100, kFALSE);
2851#else
2852 fprintf(stderr,"ERROR: No support for the THttpServer!\n");
2853#endif
2854 }
2855
2856 for (unsigned i=0; i<files.size(); i++) {
2857 printf("file[%d]: %s\n", i, files[i].c_str());
2858 }
2859 int exit_state = 0;
2860 if (demo_mode) {
2861 exit_state = ProcessDemoMode(modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2862 } else if (files.size() > 0) {
2863 exit_state = ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2864 } else {
2865#ifdef HAVE_MIDAS
2866#ifdef HAVE_TMFE
2867 exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2868#else
2869 exit_state = ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length);
2870#endif
2871#endif
2872 }
2873
2874 if (writer) {
2875 writer->Close();
2876 delete writer;
2877 writer = NULL;
2878 }
2879
2880 return exit_state;
2881}
2882
2883/* emacs
2884 * Local Variables:
2885 * tab-width: 8
2886 * c-basic-offset: 3
2887 * indent-tabs-mode: nil
2888 * End:
2889 */
R__EXTERN TDirectory * gDirectory
double GetTimeSec()
TARunObject * NewRunObject(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
void PauseRun(TARunInfo *runinfo)
EventDumpModule(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
void EndRun(TARunInfo *runinfo)
void NextSubrun(TARunInfo *runinfo)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
void ResumeRun(TARunInfo *runinfo)
void Init(const std::vector< std::string > &args)
TARunObject * NewRunObject(TARunInfo *runinfo)
TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
void PauseRun(TARunInfo *runinfo)
void EndRun(TARunInfo *runinfo)
static MainWindow * fgCtrlWindow
void ResumeRun(TARunInfo *runinfo)
TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
InteractiveModule(TARunInfo *runinfo)
static ValueHolder * fgHolder
void InteractiveLoop(TARunInfo *runinfo, TAFlags *flags)
void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
void BeginRun(TARunInfo *runinfo)
Definition mvodb.h:21
virtual void RU32(const char *varname, uint32_t *value, bool create=false, MVOdbError *error=NULL)=0
virtual void RI(const char *varname, int *value, bool create=false, MVOdbError *error=NULL)=0
void CloseWindow()
TGLayoutHints * fMenuBarItemLayout
TextButton * fNextFlowButton
MainWindow(const TGWindow *w, int s1, int s2, ValueHolder *holder)
TGCompositeFrame * fButtonsFrame
TGPopupMenu * fMenu
TextButton * fPauseButton
ValueHolder * fHolder
TextButton * fNextButton
TGMenuBar * fMenuBar
TextButton * fContinueButton
Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2)
TextButton * fQuitButton
void Event(const void *data, int data_size)
void Transition(int transition, int run_number, int transition_time)
RunHandler fRun
OnlineHandler(int num_analyze, TMWriterInterface *writer, MVOdb *odb, const std::vector< std::string > &args, bool multithread, bool profiler, int queue_interval_check)
void StartRun(int run_number)
std::vector< double > fAnalyzeFlowEventRMS
const int fQueueInterval
void LogAnalyzeEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
void LogUserWindows(TAFlags *flag, TAFlowEvent *flow)
std::vector< double > fAnalyzeFlowEventTimeTotal
uint32_t fMIDASStopTime
std::vector< double > fTotalUserTime
std::vector< int > fAnalyzeEventEntries
std::vector< TH1D * > fUserHistograms
std::vector< double > fAnalyzeFlowEventMean
uint32_t fMIDASStartTime
std::atomic< int > fQueueIntervalCounter
std::string fBinaryPath
std::vector< double > fAnalyzeEventRMS
void Print() const
std::vector< double > fAnalyzeEventTimeMax
clock_t fStartCPU
std::string fBinaryName
void LogAnalyzeFlowEvent(TAFlags *flag, TAFlowEvent *flow, const int i, const TAClock &start)
std::vector< std::string > fModuleNames
void End(TARunInfo *runinfo)
std::vector< TH1D * > fAnalyzeFlowEventTimeHistograms
std::chrono::system_clock::time_point fStartUser
void Begin(TARunInfo *runinfo, const std::vector< TARunObject * > fRunRun)
std::vector< double > fAnalyzeEventTimeTotal
void AddModuleMap(const char *UserProfileName, unsigned long hash)
std::vector< double > fAnalyzeFlowEventTimeMax
void LogMTQueueLength(TARunInfo *runinfo)
std::map< unsigned int, int > fUserMap
std::vector< double > fAnalyzeEventMean
std::vector< double > fMaxUserTime
std::vector< TH1D * > fAnalyzeEventTimeHistograms
Profiler(const int queue_interval_check)
std::vector< int > fAnalyzeFlowEventEntries
std::vector< TH1D * > fAnalysisQueue
TARunInfo * fRunInfo
std::vector< TARunObject * > fRunRun
std::vector< std::string > fArgs
void NextSubrun()
void AnalyzeEvent(TMEvent *event, TAFlags *flags, TMWriterInterface *writer)
void CreateRun(int run_number, const char *file_name)
void DeleteRun()
void AnalyzeSpecialEvent(TMEvent *event)
void EndRun(TAFlags *flags)
TAFlowEvent * AnalyzeFlowEvent(TAFlags *flags, TAFlowEvent *flow)
int fProfilerIntervalCheck
RunHandler(const std::vector< std::string > &args, bool multithread, bool profile, int queue_interval_check)
void PerModuleThread(int i)
void AnalyzeFlowQueue(TAFlags *ana_flags)
void BeginRun()
virtual void Finish()
virtual void Init(const std::vector< std::string > &args)
virtual void Usage()
virtual ~TAFlowEvent()
Definition manalyzer.cxx:91
TAFlowEvent * fNext
Definition manalyzer.h:51
std::vector< TAFlagsQueue > fMtFlagQueue
Definition manalyzer.h:176
std::vector< std::mutex > fMtFlowQueueMutex
Definition manalyzer.h:174
std::atomic< bool > fMtShutdownRequested
Definition manalyzer.h:182
std::vector< std::thread * > fMtThreads
Definition manalyzer.h:177
std::vector< std::atomic< bool > > fMtThreadIsBusy
Definition manalyzer.h:179
TAMultithreadHelper(int nModules)
static int gfMtMaxBacklog
Definition manalyzer.h:192
static bool gfMultithread
Definition manalyzer.h:191
std::vector< TAFlowEventQueue > fMtFlowQueue
Definition manalyzer.h:175
static std::mutex gfLock
Definition manalyzer.h:193
std::vector< std::atomic< bool > > fMtThreadIsRunning
Definition manalyzer.h:178
std::atomic< bool > fMtQuitRequested
Definition manalyzer.h:183
TARegister(TAFactory *m)
static std::string fOutputDirectory
Definition manalyzer.h:152
static TApplication * fgApp
Definition manalyzer.h:156
static std::string fOutputFileName
Definition manalyzer.h:153
static THttpServer * fgHttpServer
Definition manalyzer.h:157
TFile * fOutputFile
Definition manalyzer.h:154
static TDirectory * fgDir
Definition manalyzer.h:155
void AddToFlowQueue(TAFlowEvent *)
TAFlowEvent * ReadFlowQueue()
std::string fFileName
Definition manalyzer.h:25
std::vector< std::string > fArgs
Definition manalyzer.h:29
MVOdb * fOdb
Definition manalyzer.h:26
int fRunNo
Definition manalyzer.h:24
static std::vector< std::string > fgFileList
Definition manalyzer.h:30
TARootHelper * fRoot
Definition manalyzer.h:27
TAMultithreadHelper * fMtInfo
Definition manalyzer.h:28
static int fgCurrentFileIndex
Definition manalyzer.h:31
virtual void EndRun(TARunInfo *runinfo)
virtual void ResumeRun(TARunInfo *runinfo)
virtual void AnalyzeSpecialEvent(TARunInfo *runinfo, TMEvent *event)
virtual void NextSubrun(TARunInfo *runinfo)
virtual TAFlowEvent * Analyze(TARunInfo *runinfo, TMEvent *event, TAFlags *flags, TAFlowEvent *flow)
virtual void PauseRun(TARunInfo *runinfo)
virtual void BeginRun(TARunInfo *runinfo)
virtual TAFlowEvent * AnalyzeFlowEvent(TARunInfo *runinfo, TAFlags *flags, TAFlowEvent *flow)
virtual void PreEndRun(TARunInfo *runinfo)
TAUserProfilerFlow(TAFlowEvent *flow, const char *name, const TAClock &start)
const std::string fModuleName
Definition manalyzer.h:220
double GetTimer() const
virtual TDirectory * mkdir(const char *name, const char *title="")
virtual Bool_t cd(const char *path=0)
std::string HeaderToString() const
print the MIDAS event header
Definition midasio.cxx:656
void Reset()
reset everything
Definition midasio.cxx:688
void ParseEvent()
parse event data
Definition midasio.cxx:708
std::vector< char > data
MIDAS event bytes.
Definition midasio.h:67
size_t event_header_size
size of MIDAS event header
Definition midasio.h:63
uint32_t serial_number
MIDAS event serial number.
Definition midasio.h:59
uint32_t data_size
MIDAS event data size.
Definition midasio.h:61
uint16_t event_id
MIDAS event ID.
Definition midasio.h:57
MIDAS online connection, including access to online ODB.
virtual int Close()=0
std::string fErrorString
Definition midasio.h:116
static bool fgTrace
Definition midasio.h:117
virtual int Close()=0
static bool fgTrace
Definition midasio.h:127
void RegisterHandler(TMHandlerInterface *h)
void registerTransitions()
Ask MIDAS to tell us about run transitions.
int disconnect()
Disconnect from MIDAS.
static TMidasOnline * instance()
int connect(const char *hostname, const char *exptname, const char *progname)
Connect to MIDAS experiment.
HNDLE fDB
ODB handle.
int eventRequest(const char *bufferName, int eventId, int triggerMask, int samplingType, bool poll=false)
Request data for delivery via callback (setEventHandler) or by polling (via receiveEvent)
TextButton(TGWindow *p, const char *text, ValueHolder *holder, int value)
ValueHolder * fHolder
void Clicked()
std::chrono::high_resolution_clock::time_point TAClock
Definition manalyzer.h:207
int TAFlags
Definition manalyzer.h:72
#define TAFlag_DISPLAY
Definition manalyzer.h:78
#define TAFlag_SKIP
Definition manalyzer.h:75
#define TAFlag_WRITE
Definition manalyzer.h:77
TAClock TAClockNow()
Definition manalyzer.h:209
#define TAFlag_QUIT
Definition manalyzer.h:76
#define TAFlag_SKIP_PROFILE
Definition manalyzer.h:79
void TMWriteEvent(TMWriterInterface *writer, const TMEvent *event)
Definition midasio.cxx:651
TMWriterInterface * TMNewWriter(const char *destination)
Definition midasio.cxx:543
TMEvent * TMReadEvent(TMReaderInterface *reader)
Definition midasio.cxx:585
TMReaderInterface * TMNewReader(const char *source)
Definition midasio.cxx:447
#define TID_DWORD
Definition midasio.h:22
MVOdb * MakeMidasOdb(int hDB, MVOdbError *error=NULL)
Definition midasodb.cxx:924
MVOdb * MakeFileDumpOdb(const char *buf, int bufsize, MVOdbError *error=NULL)
Access ODB from a midas file dump. FOrmat could be .xml, .json or .odb.
Definition mvodb.cxx:91
MVOdb * MakeNullOdb()
Definition nullodb.cxx:129
std::chrono::high_resolution_clock::time_point TAClock
Definition manalyzer.h:207
int TAFlags
Definition manalyzer.h:72
std::chrono::duration< double > TAClockDuration
Definition manalyzer.h:208
TAClock TAClockNow()
Definition manalyzer.h:209
#define TAFlag_SKIP_PROFILE
Definition manalyzer.h:79
#define CTRL_NEXT_FLOW
static int gDefaultMultithreadWaitFull
static int ProcessDemoMode(const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void MtQueueFlowEvent(TAMultithreadHelper *mt, int i, TAFlags *flag, TAFlowEvent *flow, bool wait)
std::vector< TAFactory * > * gModules
static bool gTrace
Definition manalyzer.cxx:19
#define CTRL_NEXT
int manalyzer_main(int argc, char *argv[])
static int gDefaultMultithreadWaitEmpty
#define CTRL_TBROWSER
#define CTRL_CONTINUE
#define CTRL_QUIT
static bool starts_with(const std::string &s, const char *prefix)
static int ProcessMidasOnlineOld(const std::vector< std::string > &args, const char *hostname, const char *exptname, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static bool gEnableShowMem
#define CTRL_PAUSE
static void WaitForAllQueuesEmpty(TAMultithreadHelper *mt)
static int ProcessMidasFiles(const std::vector< std::string > &files, const std::vector< std::string > &args, int num_skip, int num_analyze, TMWriterInterface *writer, bool multithread, bool profiler, int queue_interval_check)
static void WaitForAllThreadsShutdown(TAMultithreadHelper *mt)
static void help()
static int gDefaultMultithreadQueueLength
int ShowMem(const char *label)