// // MIDAS analyzer // // K.Olchanski // #undef NDEBUG // this program requires working assert() #include #include // usleep() #include #include // struct stat_buffer; #include "manalyzer.h" #include "midasio.h" ////////////////////////////////////////////////////////// static bool gTrace = false; ////////////////////////////////////////////////////////// // // Methods of TARunInfo // ////////////////////////////////////////////////////////// TARunInfo::TARunInfo(int runno, const char* filename, const std::vector& args) { if (gTrace) printf("TARunInfo::ctor!\n"); fRunNo = runno; if (filename) fFileName = filename; fOdb = NULL; #ifdef HAVE_ROOT fRoot = new TARootHelper(this); #else fRoot = NULL; #endif fMtInfo = NULL; fArgs = args; } TARunInfo::~TARunInfo() { if (gTrace) printf("TARunInfo::dtor!\n"); fRunNo = 0; fFileName = "(deleted)"; if (fOdb) { delete fOdb; fOdb = NULL; } #ifdef HAVE_ROOT if (fRoot) { delete fRoot; fRoot = NULL; } #endif int count = 0; while (1) { TAFlowEvent* flow = ReadFlowQueue(); if (!flow) break; delete flow; count++; } if (gTrace) { printf("TARunInfo::dtor: deleted %d queued flow events!\n", count); } if (fMtInfo) { delete fMtInfo; fMtInfo = NULL; } } ////////////////////////////////////////////////////////// // // Methods of TAFlowEvent // ////////////////////////////////////////////////////////// TAFlowEvent::TAFlowEvent(TAFlowEvent* flow) // ctor { if (gTrace) printf("TAFlowEvent::ctor: chain %p\n", flow); fNext = flow; } TAFlowEvent::~TAFlowEvent() // dtor { if (gTrace) printf("TAFlowEvent::dtor: this %p, next %p\n", this, fNext); if (fNext) delete fNext; fNext = NULL; } ////////////////////////////////////////////////////////// // // Methods of TARunObject // ////////////////////////////////////////////////////////// TARunObject::TARunObject(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::ctor, run %d\n", runinfo->fRunNo); } void TARunObject::BeginRun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::BeginRun, run %d\n", runinfo->fRunNo); } void TARunObject::EndRun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::EndRun, run %d\n", runinfo->fRunNo); } void TARunObject::NextSubrun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::NextSubrun, run %d\n", runinfo->fRunNo); } void TARunObject::PauseRun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::PauseRun, run %d\n", runinfo->fRunNo); } void TARunObject::ResumeRun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::ResumeRun, run %d\n", runinfo->fRunNo); } void TARunObject::PreEndRun(TARunInfo* runinfo) { if (gTrace) printf("TARunObject::PreEndRun, run %d\n", runinfo->fRunNo); } TAFlowEvent* TARunObject::Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow) { if (gTrace) printf("TARunObject::Analyze!\n"); // This default analyze function does no work, instruct the Profiler to not time / log this *flags|=TAFlag_SKIP_PROFILE; return flow; } TAFlowEvent* TARunObject::AnalyzeFlowEvent(TARunInfo* runinfo, TAFlags* flags, TAFlowEvent* flow) { if (gTrace) printf("TARunObject::Analyze!\n"); // This default analyze function does no work, instruct the Profiler to not time / log this *flags|=TAFlag_SKIP_PROFILE; return flow; } void TARunObject::AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event) { if (gTrace) printf("TARunObject::AnalyzeSpecialEvent!\n"); } ////////////////////////////////////////////////////////// // // Methods of TAFactory // ////////////////////////////////////////////////////////// void TAFactory::Usage() { if (gTrace) printf("TAFactory::Usage!\n"); } void TAFactory::Init(const std::vector &args) { if (gTrace) printf("TAFactory::Init!\n"); } void TAFactory::Finish() { if (gTrace) printf("TAFactory::Finish!\n"); } #ifdef HAVE_ROOT ////////////////////////////////////////////////////////// // // Methods of TARootHelper // ////////////////////////////////////////////////////////// std::string TARootHelper::fOutputDirectory = "root_output_files"; std::string TARootHelper::fOutputFileName = ""; TApplication* TARootHelper::fgApp = NULL; TDirectory* TARootHelper::fgDir = NULL; THttpServer* TARootHelper::fgHttpServer = NULL; TARootHelper::TARootHelper(const TARunInfo* runinfo) // ctor { if (gTrace) printf("TARootHelper::ctor!\n"); std::string filename = fOutputFileName; if (filename.empty()) { char xfilename[256]; snprintf(xfilename, sizeof(xfilename), "output%05d.root", runinfo->fRunNo); if (fOutputDirectory.empty()) { filename = xfilename; } else { filename = fOutputDirectory + "/" + xfilename; struct stat buffer; int status = stat(fOutputDirectory.c_str(), &buffer); if (status < 0 && errno == ENOENT) { fprintf(stdout, "TARootHelper::ctor: creating output directory \"%s\"\n", fOutputDirectory.c_str()); status = mkdir(fOutputDirectory.c_str(), 0777); if (status == -1) { fprintf(stderr, "TARootHelper::ctor: Error: cannot output directory \"%s\", errno %d (%s)\n", fOutputDirectory.c_str(), errno, strerror(errno)); } } } } fOutputFile = new TFile(filename.c_str(), "RECREATE"); if (!fOutputFile->IsOpen()) { fprintf(stderr, "TARootHelper::ctor: Error: cannot open output ROOT file \"%s\"\n", filename.c_str()); fOutputFile = new TFile("/dev/null", "UPDATE"); assert(fOutputFile); assert(fOutputFile->IsOpen()); } if (fOutputFile != NULL) { fOutputFile->cd(); } } TARootHelper::~TARootHelper() // dtor { if (gTrace) printf("TARootHelper::dtor!\n"); if (fOutputFile != NULL) { fOutputFile->Write(); fOutputFile->Close(); fOutputFile = NULL; } if (fgDir) fgDir->cd(); } #endif #include #include #include #include #include #include #include "manalyzer.h" #include "midasio.h" #ifdef HAVE_THTTP_SERVER #include "THttpServer.h" #endif #ifdef HAVE_ROOT #include #include #endif ////////////////////////////////////////////////////////// // // Methods and Defaults of TAMultithreadHelper // ////////////////////////////////////////////////////////// static int gDefaultMultithreadQueueLength = 100; static int gDefaultMultithreadWaitEmpty = 100; // microseconds static int gDefaultMultithreadWaitFull = 100; // microseconds static void WaitForAllQueuesEmpty(TAMultithreadHelper* mt) { fprintf(stderr, "Waiting for all queues to empty out!\n"); int count_all_empty = 0; for (int t=0; ; ) { int count_not_empty = 0; size_t count_events = 0; for (unsigned i=0; ifMtThreads.size(); i++) { std::lock_guard lock(mt->fMtFlowQueueMutex[i]); if (!mt->fMtThreadIsRunning[i]) // skip threads that have shutdown continue; if (!mt->fMtFlowQueue[i].empty()) { count_not_empty++; count_events += mt->fMtFlowQueue[i].size(); break; } if (mt->fMtThreadIsBusy[i]) { count_not_empty++; count_events += 1; // module is analyzing 1 event break; } // implicit unlock } if (count_not_empty == 0) { count_all_empty++; } if (count_all_empty > 1) { // must loop over "all empty" at least twice! K.O. break; } if (t > 10) { fprintf(stderr, "Timeout waiting for all queues to empty out, %d queues still have %d flow events!\n", count_not_empty, (int)count_events); //break; } ::sleep(1); t++; } } static void WaitForAllThreadsShutdown(TAMultithreadHelper* mt) { fprintf(stderr, "Waiting for all threads to shut down!\n"); mt->fMtShutdownRequested = true; for (int t=0; ; ) { int count_running = 0; for (unsigned i=0; ifMtThreads.size(); i++) { if (mt->fMtThreadIsRunning[i]) count_running++; } if (count_running == 0) { break; } if (t > 10) { fprintf(stderr, "Timeout waiting for all threads to shut down, %d still running!\n", count_running); break; } ::sleep(1); t++; } fprintf(stderr, "Joining all threads!\n"); for (unsigned i=0; ifMtThreads.size(); i++) { if (!mt->fMtThreadIsRunning[i] ) { // pointer is null if thread is already shutdown if (mt->fMtThreads[i]) { mt->fMtThreads[i]->join(); delete mt->fMtThreads[i]; mt->fMtThreads[i] = NULL; } } else { fprintf(stderr, "Thread %d failed to shut down!\n", i); } } } TAMultithreadHelper::TAMultithreadHelper(int nModules): fMtFlowQueueMutex(nModules), fMtFlowQueue(nModules), fMtFlagQueue(nModules), fMtThreads(nModules,NULL), fMtThreadIsRunning(nModules), fMtThreadIsBusy(nModules), fMtShutdownRequested(false), fMtQuitRequested(false) // ctor { for (auto &b: fMtThreadIsRunning) { b = false; } for (auto &b: fMtThreadIsBusy) { b = false; } // default max queue size fMtQueueDepth = gDefaultMultithreadQueueLength; // queue settings fMtQueueFullUSleepTime = gDefaultMultithreadWaitFull; //u seconds fMtQueueEmptyUSleepTime = gDefaultMultithreadWaitEmpty; //u seconds } TAMultithreadHelper::~TAMultithreadHelper() // dtor { size_t nmodules = fMtFlowQueueMutex.size(); // just for kicks, check that all queues have correct size assert(nmodules == fMtFlowQueue.size()); assert(nmodules == fMtFlagQueue.size()); assert(nmodules == fMtFlowQueueMutex.size()); // should not come to the destructor while threads are still running WaitForAllThreadsShutdown(this); int count = 0; for (size_t i=0; i lock(fMtFlowQueueMutex[i]); while (!fMtFlowQueue[i].empty()) { TAFlowEvent* flow = fMtFlowQueue[i].front(); TAFlags* flag = fMtFlagQueue[i].front(); fMtFlowQueue[i].pop_front(); fMtFlagQueue[i].pop_front(); delete flow; delete flag; count++; } // implicit unlock of mutex } if (gTrace) { printf("TAMultithreadInfo::dtor: deleted %d queued flow events!\n", count); } } bool TAMultithreadHelper::gfMultithread = false; int TAMultithreadHelper::gfMtMaxBacklog = 100; std::mutex TAMultithreadHelper::gfLock; //Lock for modules to execute code that is not thread safe (many root fitting libraries) static void MtQueueFlowEvent(TAMultithreadHelper* mt, int i, TAFlags* flag, TAFlowEvent* flow, bool wait) { assert(mt); if (flag == NULL) { flag = new TAFlags; *flag = 0; } //PrintQueueLength(); while (1) { { //Lock and queue events std::lock_guard lock(mt->fMtFlowQueueMutex[i]); if ((((int)mt->fMtFlowQueue[i].size()) < mt->fMtQueueDepth) || mt->fMtShutdownRequested || !wait) { mt->fMtFlowQueue[i].push_back(flow); mt->fMtFlagQueue[i].push_back(flag); return; } // Unlock when we go out of scope } usleep(mt->fMtQueueFullUSleepTime); } } #if 0 //Function to print the length of the flow queue when in multithread mode //Maybe make root update a graphical window? static void PrintMtQueueLength(TAMultithreadHelper* mt) { printf("Multithread queue lengths:\n"); for (unsigned i=0; ifMtFlowQueue.size(); i++) { printf("%d:\t%zu\n",i,mt->fMtFlowQueue[i].size()); } } #endif ////////////////////////////////////////////////////////// // // Methods of TARegister // ////////////////////////////////////////////////////////// std::vector *gModules = NULL; TARegister::TARegister(TAFactory* m) { if (!gModules) gModules = new std::vector; gModules->push_back(m); } #if 0 static double GetTimeSec() { struct timeval tv; gettimeofday(&tv,NULL); return tv.tv_sec + 0.000001*tv.tv_usec; } #endif ////////////////////////////////////////////////////////// // // Profiler class // ////////////////////////////////////////////////////////// TAUserProfilerFlow::TAUserProfilerFlow(TAFlowEvent* flow, const char* name, const TAClock& start) : TAFlowEvent(flow), fModuleName(name) { fStart = start; fStop = TAClockNow(); } TAUserProfilerFlow::~TAUserProfilerFlow() // dtor { } double TAUserProfilerFlow::GetTimer() const { TAClockDuration elapsed_seconds = fStop - fStart; return elapsed_seconds.count(); } #ifdef HAVE_ROOT #include "TH1D.h" #endif #include class Profiler { private: std::string fBinaryName; std::string fBinaryPath; clock_t fStartCPU; std::chrono::system_clock::time_point fStartUser; uint32_t fMIDASStartTime; uint32_t fMIDASStopTime; //Track Queue lengths when multithreading #ifdef HAVE_ROOT int fNQueues=0; std::vector fAnalysisQueue; std::atomic fQueueIntervalCounter; #endif // Track Analyse TMEvent time per module (main thread) #ifdef HAVE_ROOT std::vector fAnalyzeEventTimeHistograms; #endif std::vector fModuleNames; std::vector fAnalyzeEventMean; std::vector fAnalyzeEventRMS; std::vector fAnalyzeEventEntries; std::vector fAnalyzeEventTimeMax; std::vector fAnalyzeEventTimeTotal; //Track Analyse flow event time per module (can be multiple threads) #ifdef HAVE_ROOT std::vector fAnalyzeFlowEventTimeHistograms; #endif std::vector fAnalyzeFlowEventMean; std::vector fAnalyzeFlowEventRMS; std::vector fAnalyzeFlowEventEntries; std::vector fAnalyzeFlowEventTimeMax; std::vector fAnalyzeFlowEventTimeTotal; #ifdef HAVE_ROOT //Track user profiling std::map fUserMap; std::vector fUserHistograms; std::vector fTotalUserTime; std::vector fMaxUserTime; // Number of events between samples const int fQueueInterval = 100; #endif public: Profiler( const int queue_interval_check ); ~Profiler(); void Begin(TARunInfo* runinfo,const std::vector fRunRun ); // Function for profiling the 'main' thread (that unpacks TMEvents) void LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start); // Function for profiling module threads void LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start); // Extra function for users custom profiling windows void LogUserWindows(TAFlags* flag, TAFlowEvent* flow); void AddModuleMap( const char* UserProfileName, unsigned long hash); void LogMTQueueLength(TARunInfo* runinfo); void End(TARunInfo* runinfo); void Print() const; }; Profiler::Profiler(const int queue_interval_check) #ifdef HAVE_ROOT : fQueueIntervalCounter(0), fQueueInterval(queue_interval_check) #endif { if (gTrace) printf("Profiler::ctor\n"); fMIDASStartTime = 0; fMIDASStopTime = 0; fStartCPU = clock(); fStartUser = std::chrono::system_clock::now(); } Profiler::~Profiler() { if (gTrace) printf("Profiler::dtor\n"); #ifdef HAVE_ROOT for (TH1D* h: fAnalyzeFlowEventTimeHistograms) delete h; fAnalyzeFlowEventTimeHistograms.clear(); for (TH1D* h: fAnalyzeEventTimeHistograms) delete h; fAnalyzeEventTimeHistograms.clear(); for( TH1D* h: fAnalysisQueue) delete h; fAnalysisQueue.clear(); #endif } void Profiler::Begin(TARunInfo* runinfo, const std::vector runrun) { if (gTrace) printf("Profiler::begin\n"); runinfo->fOdb->RU32("Runinfo/Start time binary",(uint32_t*) &fMIDASStartTime); #ifdef HAVE_ROOT // Put Profiler histograms in their own folders in the output root file if (runinfo->fRoot->fOutputFile) { runinfo->fRoot->fOutputFile->cd(); // select correct ROOT directory gDirectory->mkdir("ProfilerReport")->cd(); runinfo->fRoot->fOutputFile->cd("ProfilerReport"); gDirectory->mkdir("AnalyzeFlowEventTime"); gDirectory->mkdir("AnalyzeFlowTime"); gDirectory->mkdir("MTQueueLength"); } // Setup module processing time histograms // Number of bins Int_t Nbins=1000; // Array of bin edges Double_t bins[Nbins+1]; // Processing time range (seconds) Double_t TimeRange = 10; // Set uneven binning to better sample fast modules with accuracy // without having a large number of bins for (int i=0; ifMtInfo) fNQueues = runrun.size(); #endif // Per module metric setup for (size_t i = 0; i < runrun.size(); i++) { if (runrun[i]->fModuleName.empty()) fModuleNames.push_back("Unnamed Module " + std::to_string(i)); else fModuleNames.push_back(runrun[i]->fModuleName); // Metrics for the AnalyzeEvent function (main thread) fAnalyzeEventMean.push_back(0); fAnalyzeEventRMS.push_back(0); fAnalyzeEventEntries.push_back(0); fAnalyzeEventTimeMax.push_back(0); fAnalyzeEventTimeTotal.push_back(0); // Metric for the AnalyzeFlowEvent function (side threads) fAnalyzeFlowEventMean.push_back(0); fAnalyzeFlowEventRMS.push_back(0); fAnalyzeFlowEventEntries.push_back(0); fAnalyzeFlowEventTimeMax.push_back(0); fAnalyzeFlowEventTimeTotal.push_back(0); #ifdef HAVE_ROOT if (runinfo->fRoot->fOutputFile) { runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime"); TH1D* Histo = new TH1D( TString(std::to_string(i) + "_" + fModuleNames.at(i)), TString(fModuleNames.at(i) + " Event Proccessing Time; s"), Nbins, bins); fAnalyzeFlowEventTimeHistograms.push_back(Histo); runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime"); TH1D* AnalyzeEventHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_TMEvent"), TString(fModuleNames.at(i) + " Flow Proccessing Time; s"), Nbins, bins); fAnalyzeEventTimeHistograms.push_back(AnalyzeEventHisto); } else { fAnalyzeFlowEventTimeHistograms.push_back(NULL); fAnalyzeEventTimeHistograms.push_back(NULL); } // Periodically (once every fQueueInterval events) record the // flow queue size if running in multithreaded mode if (runinfo->fMtInfo) { if (runinfo->fRoot->fOutputFile) { runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength"); TH1D* QueueHisto = new TH1D(TString(std::to_string(i) + "_" + fModuleNames.at(i) + "_Queue"), TString(fModuleNames.at(i) + " Multithread Queue Length; Queue Depth"), runinfo->fMtInfo->fMtQueueDepth*1.2, 0, runinfo->fMtInfo->fMtQueueDepth*1.2); fAnalysisQueue.push_back(QueueHisto); } else { fAnalysisQueue.push_back(NULL); } } #endif } } void Profiler::LogAnalyzeFlowEvent(TAFlags* flag, TAFlowEvent* flow, const int i, const TAClock& start) { if (gTrace) printf("Profiler::log\n"); TAClock stop = TAClockNow(); if ((*flag) & TAFlag_SKIP_PROFILE) { //Unset bit *flag -= TAFlag_SKIP_PROFILE; return; } std::chrono::duration elapsed_seconds = stop - start; double dt = elapsed_seconds.count(); fAnalyzeFlowEventTimeTotal[i] += dt; if (dt > fAnalyzeFlowEventTimeMax[i]) fAnalyzeFlowEventTimeMax[i] = dt; fAnalyzeFlowEventMean[i] +=dt; fAnalyzeFlowEventRMS[i] +=dt*dt; fAnalyzeFlowEventEntries[i]++; #ifdef HAVE_ROOT if (fAnalyzeFlowEventTimeHistograms[i]) fAnalyzeFlowEventTimeHistograms[i]->Fill(dt); #endif } void Profiler::LogAnalyzeEvent(TAFlags* flag, TAFlowEvent* flow, int i, const TAClock& start) { if (gTrace) printf("Profiler::log_AnalyzeEvent_time\n"); TAClock stop = TAClockNow(); if ((*flag) & TAFlag_SKIP_PROFILE) { //Unset bit *flag -= TAFlag_SKIP_PROFILE; return; } std::chrono::duration elapsed_seconds = stop - start; double dt = elapsed_seconds.count(); fAnalyzeEventTimeTotal[i] += dt; if (dt > fAnalyzeEventTimeMax[i]) fAnalyzeEventTimeMax[i] = dt; fAnalyzeEventMean[i] +=dt; fAnalyzeEventRMS[i] +=dt*dt; fAnalyzeEventEntries[i]++; #ifdef HAVE_ROOT if (fAnalyzeEventTimeHistograms[i]) fAnalyzeEventTimeHistograms[i]->Fill(dt); #endif } void Profiler::LogMTQueueLength(TARunInfo* runinfo) { if (gTrace) printf("Profiler::log_mt_queue_length\n"); #ifdef HAVE_ROOT fQueueIntervalCounter++; if (runinfo->fMtInfo && (fQueueIntervalCounter % fQueueInterval ==0 )) { for (int i=0; i lock(runinfo->fMtInfo->fMtFlowQueueMutex[i]); j=runinfo->fMtInfo->fMtFlowQueue[i].size(); } fAnalysisQueue.at(i)->Fill(j); } } #endif } void Profiler::LogUserWindows(TAFlags* flag, TAFlowEvent* flow) { if (gTrace) printf("Profiler::LogUserWindows\n"); #ifdef HAVE_ROOT //Clocks unfold backwards... std::vector flowArray; int FlowEvents=0; TAFlowEvent* f = flow; while (f) { flowArray.push_back(f); f=f->fNext; FlowEvents++; } for (int ii=FlowEvents-1; ii>=0; ii--) { f=flowArray[ii]; TAUserProfilerFlow* timer=dynamic_cast(f); if (timer) { const char* name = timer->fModuleName.c_str(); unsigned int hash = std::hash{}(timer->fModuleName); if (!fUserMap.count(hash)) AddModuleMap(name,hash); double dt=999.; dt=timer->GetTimer(); int i = fUserMap[hash]; fTotalUserTime[i] += dt; if (dt > fMaxUserTime[i]) fMaxUserTime.at(i) = dt; fUserHistograms.at(i)->Fill(dt); } } #else fprintf(stderr, "manalyzer must be built with ROOT for using the user profiling tools\n"); #endif } void Profiler::AddModuleMap( const char* UserProfileName, unsigned long hash) { if (gTrace) printf("Profiler::AddModuleMap\n"); std::lock_guard lock(TAMultithreadHelper::gfLock); #ifdef HAVE_ROOT gDirectory->cd("/ProfilerReport"); fUserMap[hash] = fUserHistograms.size(); Int_t Nbins = 100; Double_t bins[Nbins+1]; Double_t TimeRange = 10; //seconds for (int i=0; ifRoot->fOutputFile) { runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowTime"); for (TH1D* h: fAnalyzeFlowEventTimeHistograms) { h->Write(); } runinfo->fRoot->fOutputFile->cd("ProfilerReport/AnalyzeFlowEventTime"); for (TH1D* h: fAnalyzeEventTimeHistograms) { h->Write(); } if (runinfo->fMtInfo) { runinfo->fRoot->fOutputFile->cd("ProfilerReport/MTQueueLength"); for (TH1D* h: fAnalysisQueue) { h->Write(); } } } #endif } void Profiler::Print() const { #ifdef HAVE_ROOT if (fAnalyzeFlowEventTimeHistograms.size()>0) { #else if (fAnalyzeEventEntries.size()>0) { #endif double AllAnalyzeFlowEventTime=0; for (auto& n : fAnalyzeFlowEventTimeTotal) AllAnalyzeFlowEventTime += n; double AllAnalyzeEventTime=0; for (auto& n : fAnalyzeEventTimeTotal) AllAnalyzeEventTime += n; //double max_AnalyzeEvent_time=*std::max_element(TotalAnalyzeEventTime.begin(),TotalAnalyzeEventTime.end()); printf("Module average processing time\n"); printf(" \t\t\t\tAnalyzeEvent (one thread) \tAnalyzeFlowEvent (multithreadable)\n"); printf("Module\t\t\t\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\tEntries\tMean(ms)RMS(ms)\tMax(ms)\tSum(s)\n"); printf("----------------------------------------------------------------------------------------------------------------\n"); for (size_t i=0; iGetTitle(), (int)fUserHistograms.at(i)->GetEntries(), fUserHistograms.at(i)->GetMean()*1000., //ms fUserHistograms.at(i)->GetRMS()*1000., //ms fMaxUserTime.at(i)*1000., //ms fTotalUserTime.at(i)); //s } printf("----------------------------------------------------------------------\n"); } else { printf("----------------------------------------------------------------------------------------------------------------\n"); } #else printf("To use custom profile windows, please build rootana with root\n"); #endif } //CPU and Wall clock time: double cputime = (double)(clock() - fStartCPU)/CLOCKS_PER_SEC; std::chrono::duration usertime = std::chrono::system_clock::now() - fStartUser; printf("%s\tCPU time: %.2fs\tUser time: %.2fs\tAverage CPU Usage: ~%.1f%%\n", getenv("_"), cputime, usertime.count(), 100.*cputime/usertime.count()); } ////////////////////////////////////////////////////////// // // RunHandler class // ////////////////////////////////////////////////////////// class Profiler; class RunHandler { public: TARunInfo* fRunInfo = NULL; std::vector fRunRun; std::vector fArgs; bool fMultithreadEnabled = false; Profiler* fProfiler = NULL; bool fProfilerEnabled = false; int fProfilerIntervalCheck; RunHandler(const std::vector& args, bool multithread, bool profile, int queue_interval_check) // ctor { fRunInfo = NULL; fArgs = args; fMultithreadEnabled = multithread; fProfiler = NULL; fProfilerEnabled = profile; fProfilerIntervalCheck = queue_interval_check; } ~RunHandler() // dtor { if (fRunInfo) { delete fRunInfo; fRunInfo = NULL; } if (fProfiler) { delete fProfiler; fProfiler = NULL; } } void PerModuleThread(int i) { //bool data_processing=true; int nModules=(*gModules).size(); TAMultithreadHelper* mt = fRunInfo->fMtInfo; assert(nModules == (int)mt->fMtFlowQueueMutex.size()); assert(nModules == (int)mt->fMtFlowQueue.size()); assert(nModules == (int)mt->fMtFlagQueue.size()); { //Lock scope std::lock_guard lock(mt->fMtFlowQueueMutex[i]); mt->fMtThreadIsRunning[i] = true; } while (!mt->fMtShutdownRequested) { TAFlowEvent* flow = NULL; TAFlags* flag = NULL; { //Lock scope std::lock_guard lock(mt->fMtFlowQueueMutex[i]); if (!mt->fMtFlowQueue[i].empty()) { flow=mt->fMtFlowQueue[i].front(); flag=mt->fMtFlagQueue[i].front(); mt->fMtFlowQueue[i].pop_front(); mt->fMtFlagQueue[i].pop_front(); } if (flow == NULL) mt->fMtThreadIsBusy[i] = false; // we will sleep else mt->fMtThreadIsBusy[i] = true; // we will analyze an event // implicit unlock of mutex } if (flow == NULL) { // wait until queue not empty usleep(mt->fMtQueueEmptyUSleepTime); continue; } TAClock start_time = TAClockNow(); flow = fRunRun[i]->AnalyzeFlowEvent(fRunInfo, flag, flow); if (fProfiler) fProfiler->LogAnalyzeFlowEvent(flag, flow, i, start_time); if ((*flag) & TAFlag_QUIT) { // shut down the analyzer delete flow; delete flag; flow = NULL; flag = NULL; mt->fMtQuitRequested = true; mt->fMtShutdownRequested = true; break; // stop the thread } if ((*flag) & TAFlag_SKIP) { // stop processing this event delete flow; delete flag; flow = NULL; flag = NULL; continue; } if (i==nModules-1) //If I am the last module... free memory, else queue up for next module to process { if (fProfiler) { fProfiler->LogUserWindows(flag, flow); fProfiler->LogMTQueueLength(fRunInfo); } delete flow; delete flag; flow = NULL; flag = NULL; } else { MtQueueFlowEvent(mt, i+1, flag, flow, true); flow = NULL; flag = NULL; } } { //Lock scope std::lock_guard lock(mt->fMtFlowQueueMutex[i]); mt->fMtThreadIsRunning[i] = false; mt->fMtThreadIsBusy[i] = false; } } void CreateRun(int run_number, const char* file_name) { assert(fRunInfo == NULL); assert(fRunRun.size() == 0); fRunInfo = new TARunInfo(run_number, file_name, fArgs); if (fProfilerEnabled) fProfiler = new Profiler( fProfilerIntervalCheck ); int nModules = (*gModules).size(); for (int i=0; iNewRunObject(fRunInfo)); if (fMultithreadEnabled) { TAMultithreadHelper* mt = new TAMultithreadHelper(nModules); fRunInfo->fMtInfo = mt; for (int i=0; ifMtThreads[i]=new std::thread(&RunHandler::PerModuleThread,this,i); } } } void BeginRun() { assert(fRunInfo != NULL); assert(fRunInfo->fOdb != NULL); if (fProfiler) fProfiler->Begin(fRunInfo, fRunRun); for (unsigned i=0; iBeginRun(fRunInfo); } void EndRun(TAFlags* flags) { assert(fRunInfo); // make sure the shutdown sequence matches the description in the README file! // zeroth. Flush events queued for analysis before calling PreEndRun (insure // deterministic behaviour thats the same as in single threaded mode) if (fRunInfo->fMtInfo) { WaitForAllQueuesEmpty(fRunInfo->fMtInfo); } // first, call PreEndRun() to tell analysis modules that there will be no more // MIDAS events, no more calls to Analyze(). PreEndRun() may generate more // flow events, they to into the flow queue or into the multithread queue for (unsigned i=0; iPreEndRun(fRunInfo); // if in single threaded mode, analyze all queued flow events - call AnalyzeFlowEvent() // this can generate additional flow events that will be queued in the queue. AnalyzeFlowQueue(flags); // if in multi threaded mode, allow all the queues to drain naturally // and shutdown the threads if (fRunInfo->fMtInfo) { WaitForAllQueuesEmpty(fRunInfo->fMtInfo); WaitForAllThreadsShutdown(fRunInfo->fMtInfo); if (fRunInfo->fMtInfo->fMtQuitRequested) { (*flags) |= TAFlag_QUIT; } } // all data analysis is complete for (unsigned i=0; iEndRun(fRunInfo); if (fProfiler) { fProfiler->End(fRunInfo); fProfiler->Print(); } } void NextSubrun() { assert(fRunInfo); for (unsigned i=0; iNextSubrun(fRunInfo); } void DeleteRun() { assert(fRunInfo); for (unsigned i=0; iAnalyzeSpecialEvent(fRunInfo, event); } TAFlowEvent* AnalyzeFlowEvent(TAFlags* flags, TAFlowEvent* flow) { for (unsigned i=0; iAnalyzeFlowEvent(fRunInfo, flags, flow); if (fProfiler) fProfiler->LogAnalyzeFlowEvent(flags, flow, i, start_time); if (!flow) break; if ((*flags) & TAFlag_SKIP) break; if ((*flags) & TAFlag_QUIT) break; } return flow; } void AnalyzeFlowQueue(TAFlags* ana_flags) { while (1) { if (fRunInfo->fMtInfo) if (fRunInfo->fMtInfo->fMtQuitRequested) { *ana_flags |= TAFlag_QUIT; break; } TAFlowEvent* flow = fRunInfo->ReadFlowQueue(); if (!flow) break; int flags = 0; flow = AnalyzeFlowEvent(&flags, flow); if (flow) delete flow; if (flags & TAFlag_QUIT) { *ana_flags |= TAFlag_QUIT; break; } } } void AnalyzeEvent(TMEvent* event, TAFlags* flags, TMWriterInterface *writer) { assert(fRunInfo != NULL); assert(fRunInfo->fOdb != NULL); assert(event != NULL); assert(flags != NULL); if (fRunInfo->fMtInfo) if (fRunInfo->fMtInfo->fMtQuitRequested) { *flags |= TAFlag_QUIT; return; } TAFlowEvent* flow = NULL; for (unsigned i=0; iAnalyze(fRunInfo, event, flags, flow); if (fProfiler) fProfiler->LogAnalyzeEvent(flags, flow, i, start_time); if (*flags & TAFlag_SKIP) break; if (*flags & TAFlag_QUIT) break; } if (flow) { if ((*flags & TAFlag_SKIP)||(*flags & TAFlag_QUIT)) { // skip further processing of this event } else { if (fRunInfo->fMtInfo) { MtQueueFlowEvent(fRunInfo->fMtInfo, 0, NULL, flow, true); flow = NULL; // ownership passed to the multithread event queue } else { flow = AnalyzeFlowEvent(flags, flow); } } } if (fProfiler && !fRunInfo->fMtInfo) { fProfiler->LogUserWindows(flags, flow); } if (*flags & TAFlag_WRITE) if (writer) TMWriteEvent(writer, event); if (flow) delete flow; if (*flags & TAFlag_QUIT) return; if (fRunInfo->fMtInfo) if (fRunInfo->fMtInfo->fMtQuitRequested) { *flags |= TAFlag_QUIT; return; } AnalyzeFlowQueue(flags); } }; TAFlowEvent* TARunInfo::ReadFlowQueue() { if (fFlowQueue.empty()) return NULL; TAFlowEvent* flow = fFlowQueue.front(); fFlowQueue.pop_front(); return flow; } void TARunInfo::AddToFlowQueue(TAFlowEvent* flow) { if (fMtInfo) { // call MtQueueFlowEvent with wait=false to avoid deadlock MtQueueFlowEvent(fMtInfo, 0, NULL, flow, false); } else { fFlowQueue.push_back(flow); } } #ifdef HAVE_MIDAS #ifdef HAVE_TMFE #ifdef HAVE_ROOT #include "TSystem.h" #endif #include "tmfe.h" static bool gRunStartRequested = false; static bool gRunStopRequested = false; class RpcHandler: public TMFeRpcHandlerInterface { TMFeResult HandleBeginRun(int run_number) { printf("RpcHandler::HandleBeginRun(%d)\n", run_number); gRunStartRequested = true; gRunStopRequested = false; return TMFeOk(); } TMFeResult HandleEndRun(int run_number) { printf("RpcHandler::HandleEndRun(%d)\n", run_number); gRunStartRequested = false; gRunStopRequested = true; return TMFeOk(); } TMFeResult HandleStartAbortRun(int run_number) { printf("RpcHandler::HandleStartAbortRun(%d)\n", run_number); // run did not really start, pretend it started and immediately ended gRunStartRequested = false; gRunStopRequested = true; return TMFeOk(); } TMFeResult HandleRpc(const char* cmd, const char* args, std::string& result) { return TMFeOk(); } }; TMFeResult ReceiveEvent(TMEventBuffer* b, TMEvent *e, int timeout_msec = 0) { assert(b != NULL); assert(e != NULL); e->Reset(); TMFeResult r = b->ReceiveEvent(&e->data, timeout_msec); if (r.error_flag) return r; if (e->data.size() == 0) return TMFeOk(); e->ParseEvent(); assert(e->data.size() == e->event_header_size + e->data_size); return TMFeOk(); } static int ProcessMidasOnlineTmfe(const std::vector& args, const char* progname, const char* hostname, const char* exptname, const char* bufname, int event_id, int trigger_mask, const char* sampling_type_string, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check) { TMFE *mfe = TMFE::Instance(); TMFeResult r = mfe->Connect(progname, hostname, exptname); if (r.error_flag) { fprintf(stderr, "Cannot connect to MIDAS: %s\n", r.error_message.c_str()); return -1; } //MVOdb* odb = mfe->fOdbRoot; TMEventBuffer *b = new TMEventBuffer(mfe); /* open event buffer */ r = b->OpenBuffer(bufname); if (r.error_flag) { fprintf(stderr, "Cannot open event buffer \"%s\": %s\n", bufname, r.error_message.c_str()); return -1; } /* request read cache */ size_t cache_size = 100000; if(!strcmp(sampling_type_string,"GET_RECENT")) cache_size=0; r = b->SetCacheSize(cache_size, 0); if (r.error_flag) { fprintf(stderr, "Cannot set cache size on event buffer \"%s\": %s\n", bufname, r.error_message.c_str()); return -1; } /* request events */ r = b->AddRequest(event_id, trigger_mask, sampling_type_string); if (r.error_flag) { fprintf(stderr, "Cannot add event request on event buffer \"%s\": %s\n", bufname, r.error_message.c_str()); return -1; } RpcHandler* h = new RpcHandler(); mfe->AddRpcHandler(h); mfe->DeregisterTransitionPause(); mfe->DeregisterTransitionResume(); mfe->SetTransitionSequenceStart(300); mfe->SetTransitionSequenceStop(700); mfe->StartRpcThread(); /* reqister event requests */ RunHandler rh(args, multithread, profiler, queue_interval_check); for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Init(args); if (mfe->fStateRunning) { rh.CreateRun(mfe->fRunNumber, NULL); rh.fRunInfo->fOdb = mfe->fOdbRoot; rh.BeginRun(); } TMEvent e; while (!mfe->fShutdownRequested) { bool do_sleep = true; if (gRunStartRequested) { gRunStartRequested = false; if (rh.fRunInfo) { TAFlags flags = 0; rh.EndRun(&flags); rh.fRunInfo->fOdb = NULL; rh.DeleteRun(); } rh.CreateRun(mfe->fRunNumber, NULL); rh.fRunInfo->fOdb = mfe->fOdbRoot; rh.BeginRun(); continue; } if (gRunStopRequested) { gRunStopRequested = false; if (rh.fRunInfo) { TAFlags flags = 0; rh.EndRun(&flags); rh.fRunInfo->fOdb = NULL; rh.DeleteRun(); } continue; } //r = buf.ReceiveEvent(&e, BM_NO_WAIT); //r = buf.ReceiveEvent(&e, BM_WAIT); //r = buf.ReceiveEvent(&e, 8000); //r = buf.ReceiveEvent(&e, 5000); r = ReceiveEvent(b, &e, 100); if (r.error_flag) { fprintf(stderr, "Cannot read event on event buffer \"%s\": %s\n", bufname, r.error_message.c_str()); break; } //e.PrintHeader(); //::sleep(1); if ((e.data_size > 0) && (rh.fRunInfo != NULL)) { //e.PrintHeader(); //e.PrintBanks(2); TAFlags flags = 0; rh.AnalyzeEvent(&e, &flags, writer); if (flags & TAFlag_QUIT) { mfe->fShutdownRequested = true; } if (num_analyze > 0) { num_analyze--; if (num_analyze == 0) { mfe->fShutdownRequested = true; } } do_sleep = false; } #ifdef HAVE_THTTP_SERVER if (TARootHelper::fgHttpServer) { int nreq = TARootHelper::fgHttpServer->ProcessRequests(); if (nreq > 0) { do_sleep = false; //printf("ProcessRequests() returned %d\n", nreq); } } #endif #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif //printf("do_sleep %d\n", do_sleep); if (do_sleep) { mfe->Yield(0.010); } else { mfe->Yield(0); } } if (rh.fRunInfo) { TAFlags flags = 0; rh.EndRun(&flags); rh.fRunInfo->fOdb = NULL; rh.DeleteRun(); } for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Finish(); /* close event buffer */ r = b->CloseBuffer(); delete b; b = NULL; if (r.error_flag) { fprintf(stderr,"Cannot close event buffer \"%s\": %s\n", bufname, r.error_message.c_str()); //return -1; } /* disconnect from experiment */ mfe->Disconnect(); return 0; } #else #include "TMidasOnline.h" #ifdef HAVE_ROOT #include "TSystem.h" #endif class OnlineHandler: public TMHandlerInterface { public: RunHandler fRun; int fNumAnalyze = 0; TMWriterInterface* fWriter = NULL; bool fQuit = false; MVOdb* fOdb = NULL; OnlineHandler(int num_analyze, TMWriterInterface* writer, MVOdb* odb, const std::vector& args, bool multithread, bool profiler, int queue_interval_check) // ctor : fRun(args, multithread, profiler, queue_interval_check) { fNumAnalyze = num_analyze; fWriter = writer; fOdb = odb; } ~OnlineHandler() // dtor { fWriter = NULL; fOdb = NULL; } void StartRun(int run_number) { fRun.CreateRun(run_number, NULL); fRun.fRunInfo->fOdb = fOdb; fRun.BeginRun(); } void Transition(int transition, int run_number, int transition_time) { //printf("OnlineHandler::Transtion: transition %d, run %d, time %d\n", transition, run_number, transition_time); if (transition == TR_START) { if (fRun.fRunInfo) { TAFlags flags = 0; fRun.EndRun(&flags); if (flags & TAFlag_QUIT) fQuit = true; fRun.fRunInfo->fOdb = NULL; fRun.DeleteRun(); } assert(fRun.fRunInfo == NULL); StartRun(run_number); printf("Begin run: %d\n", run_number); } else if (transition == TR_STOP) { TAFlags flags = 0; fRun.EndRun(&flags); if (flags & TAFlag_QUIT) fQuit = true; fRun.fRunInfo->fOdb = NULL; fRun.DeleteRun(); printf("End of run %d\n", run_number); } } void Event(const void* data, int data_size) { //printf("OnlineHandler::Event: ptr %p, size %d\n", data, data_size); if (!fRun.fRunInfo) { StartRun(0); // start fake run for events outside of a run } TMEvent* event = new TMEvent(data, data_size); TAFlags flags = 0; fRun.AnalyzeEvent(event, &flags, fWriter); if (flags & TAFlag_QUIT) fQuit = true; if (fNumAnalyze > 0) { fNumAnalyze--; if (fNumAnalyze == 0) fQuit = true; } if (event) { delete event; event = NULL; } } }; static int ProcessMidasOnlineOld(const std::vector& args, const char* hostname, const char* exptname, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check) { TMidasOnline *midas = TMidasOnline::instance(); int err = midas->connect(hostname, exptname, "rootana"); if (err != 0) { fprintf(stderr,"Cannot connect to MIDAS, error %d\n", err); return -1; } MVOdb* odb = MakeMidasOdb(midas->fDB); OnlineHandler* h = new OnlineHandler(num_analyze, writer, odb, args, multithread, profiler, queue_interval_check); midas->RegisterHandler(h); midas->registerTransitions(); /* reqister event requests */ midas->eventRequest("SYSTEM",-1,-1,(1<<1)); int run_number = 0; // midas->odbReadInt("/runinfo/Run number"); int run_state = 0; // midas->odbReadInt("/runinfo/State"); odb->RI("runinfo/run number", &run_number); odb->RI("runinfo/state", &run_state); for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Init(args); if ((run_state == STATE_RUNNING)||(run_state == STATE_PAUSED)) { h->StartRun(run_number); } while (!h->fQuit) { #ifdef HAVE_THTTP_SERVER if (TARootHelper::fgHttpServer) { TARootHelper::fgHttpServer->ProcessRequests(); } #endif #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif if (!TMidasOnline::instance()->poll(10)) break; } if (h->fRun.fRunInfo) { TAFlags flags = 0; h->fRun.EndRun(&flags); h->fRun.fRunInfo->fOdb = NULL; h->fRun.DeleteRun(); } for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Finish(); delete h; h = NULL; delete odb; odb = NULL; /* disconnect from experiment */ midas->disconnect(); return 0; } #endif #endif std::vector TARunInfo::fgFileList; int TARunInfo::fgCurrentFileIndex = 0; static int ProcessMidasFiles(const std::vector& files, const std::vector& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check) { int number_of_missing_files = 0; TARunInfo::fgFileList.clear(); for (unsigned i=0; iInit(args); RunHandler run(args, multithread, profiler, queue_interval_check); bool done = false; for (TARunInfo::fgCurrentFileIndex = 0; TARunInfo::fgCurrentFileIndex < (int)TARunInfo::fgFileList.size(); TARunInfo::fgCurrentFileIndex++) { std::string filename = TARunInfo::fgFileList[TARunInfo::fgCurrentFileIndex]; TMReaderInterface *reader = TMNewReader(filename.c_str()); if (reader->fError) { printf("Could not open \"%s\", error: %s\n", filename.c_str(), reader->fErrorString.c_str()); delete reader; number_of_missing_files++; continue; } while (1) { TMEvent* event = TMReadEvent(reader); if (!event) // EOF break; if (event->error) { delete event; break; } if (event->event_id == 0x8000) // begin of run event { int runno = event->serial_number; if (run.fRunInfo) { if (run.fRunInfo->fRunNo == runno) { // next subrun file, nothing to do run.fRunInfo->fFileName = filename; run.NextSubrun(); } else { // file with a different run number TAFlags flags = 0; run.EndRun(&flags); if (flags & TAFlag_QUIT) { done = true; } run.DeleteRun(); } } if (!run.fRunInfo) { run.CreateRun(runno, filename.c_str()); run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size); run.BeginRun(); } assert(run.fRunInfo); run.AnalyzeSpecialEvent(event); if (writer) TMWriteEvent(writer, event); } else if (event->event_id == 0x8001) // end of run event { //int runno = event->serial_number; run.AnalyzeSpecialEvent(event); if (writer) TMWriteEvent(writer, event); if (run.fRunInfo->fOdb) { delete run.fRunInfo->fOdb; run.fRunInfo->fOdb = NULL; } run.fRunInfo->fOdb = MakeFileDumpOdb(event->GetEventData(), event->data_size); } else if (event->event_id == 0x8002) // message event { run.AnalyzeSpecialEvent(event); if (writer) TMWriteEvent(writer, event); } else { if (!run.fRunInfo) { // create a fake begin of run run.CreateRun(0, filename.c_str()); run.fRunInfo->fOdb = MakeNullOdb(); run.BeginRun(); } if (num_skip > 0) { num_skip--; } else { TAFlags flags = 0; run.AnalyzeEvent(event, &flags, writer); if (flags & TAFlag_QUIT) done = true; if (num_analyze > 0) { num_analyze--; if (num_analyze == 0) done = true; } } } delete event; if (done) break; #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif } reader->Close(); delete reader; if (done) break; } #ifdef HAVE_THTTP_SERVER if (0 && TARootHelper::fgHttpServer) { while (1) { gSystem->DispatchOneEvent(kTRUE); //sleep(1); } } #endif if (run.fRunInfo) { TAFlags flags = 0; run.EndRun(&flags); if (flags & TAFlag_QUIT) done = true; run.DeleteRun(); } for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Finish(); if (number_of_missing_files) { printf("%d midas files were not openable\n",number_of_missing_files); return number_of_missing_files; } return 0; } static int ProcessDemoMode(const std::vector& args, int num_skip, int num_analyze, TMWriterInterface* writer, bool multithread, bool profiler, int queue_interval_check) { for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Init(args); RunHandler run(args, multithread, profiler, queue_interval_check); bool done = false; int runno = 1; for (unsigned i=0; true; i++) { char s[256]; snprintf(s, sizeof(s), "%03d", i); std::string filename = std::string("demo_subrun_") + s; if (!run.fRunInfo) { run.CreateRun(runno, filename.c_str()); run.fRunInfo->fOdb = MakeNullOdb(); run.BeginRun(); } // we do not generate a fake begin of run event... //run.AnalyzeSpecialEvent(event); // only switch subruns after the first subrun file if (i>0) { run.fRunInfo->fFileName = filename; run.NextSubrun(); } TMEvent* event = new TMEvent(); for (unsigned j=0; j<100; j++) { event->Init(0x0001, 0xFFFF, j+1, 0, 0); uint32_t test_data[] = { 0x11112222, 0x33334444, 0x55556666, 0x77778888 }; event->AddBank("TEST", TID_DWORD, (const char*)test_data, sizeof(test_data)); if (num_skip > 0) { num_skip--; } else { TAFlags flags = 0; run.AnalyzeEvent(event, &flags, writer); if (flags & TAFlag_QUIT) done = true; if (num_analyze > 0) { num_analyze--; if (num_analyze == 0) done = true; } } if (done) break; #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif } delete event; // we do not generate a fake end of run event... //run.AnalyzeSpecialEvent(event); if (done) break; } if (run.fRunInfo) { TAFlags flags = 0; run.EndRun(&flags); run.DeleteRun(); } for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Finish(); return 0; } static bool gEnableShowMem = false; #if 0 static int ShowMem(const char* label) { if (!gEnableShowMem) return 0; FILE* fp = fopen("/proc/self/statm","r"); if (!fp) return 0; int mem = 0; fscanf(fp,"%d",&mem); fclose(fp); if (label) printf("memory at %s is %d\n", label, mem); return mem; } #endif class EventDumpModule: public TARunObject { public: EventDumpModule(TARunInfo* runinfo) : TARunObject(runinfo) { if (gTrace) printf("EventDumpModule::ctor, run %d\n", runinfo->fRunNo); } ~EventDumpModule() { if (gTrace) printf("EventDumpModule::dtor!\n"); } void BeginRun(TARunInfo* runinfo) { printf("EventDumpModule::BeginRun, run %d\n", runinfo->fRunNo); } void EndRun(TARunInfo* runinfo) { printf("EventDumpModule::EndRun, run %d\n", runinfo->fRunNo); } void NextSubrun(TARunInfo* runinfo) { printf("EventDumpModule::NextSubrun, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str()); } void PauseRun(TARunInfo* runinfo) { printf("EventDumpModule::PauseRun, run %d\n", runinfo->fRunNo); } void ResumeRun(TARunInfo* runinfo) { printf("EventDumpModule::ResumeRun, run %d\n", runinfo->fRunNo); } TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow) { printf("EventDumpModule::Analyze, run %d, ", runinfo->fRunNo); event->FindAllBanks(); std::string h = event->HeaderToString(); std::string b = event->BankListToString(); printf("%s: %s\n", h.c_str(), b.c_str()); return flow; } void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event) { printf("EventDumpModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size); } }; class EventDumpModuleFactory: public TAFactory { public: void Init(const std::vector &args) { if (gTrace) printf("EventDumpModuleFactory::Init!\n"); } void Finish() { if (gTrace) printf("EventDumpModuleFactory::Finish!\n"); } TARunObject* NewRunObject(TARunInfo* runinfo) { if (gTrace) printf("EventDumpModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str()); return new EventDumpModule(runinfo); } }; #ifdef HAVE_ROOT #include #include #include #define CTRL_QUIT 1 #define CTRL_NEXT 2 #define CTRL_CONTINUE 3 #define CTRL_PAUSE 4 #define CTRL_NEXT_FLOW 5 #define CTRL_TBROWSER 11 class ValueHolder { public: int fValue; ValueHolder() // ctor { fValue = 0; } }; class TextButton: public TGTextButton { public: ValueHolder* fHolder; int fValue; TextButton(TGWindow*p, const char* text, ValueHolder* holder, int value) // ctor : TGTextButton(p, text) { fHolder = holder; fValue = value; } #if 0 void Pressed() { printf("Pressed!\n"); } void Released() { printf("Released!\n"); } #endif void Clicked() { //printf("Clicked button %s, value %d!\n", GetString().Data(), fValue); if (fHolder) fHolder->fValue = fValue; //gSystem->ExitLoop(); } }; class MainWindow: public TGMainFrame { public: TGPopupMenu* fMenu; TGMenuBar* fMenuBar; TGLayoutHints* fMenuBarItemLayout; TGCompositeFrame* fButtonsFrame; ValueHolder* fHolder; TextButton* fNextButton; TextButton* fNextFlowButton; TextButton* fContinueButton; TextButton* fPauseButton; TextButton* fQuitButton; public: MainWindow(const TGWindow*w, int s1, int s2, ValueHolder* holder) // ctor : TGMainFrame(w, s1, s2) { if (gTrace) printf("MainWindow::ctor!\n"); fHolder = holder; //SetCleanup(kDeepCleanup); SetWindowName("ROOT Analyzer Control"); // layout the gui fMenu = new TGPopupMenu(gClient->GetRoot()); fMenu->AddEntry("New TBrowser", CTRL_TBROWSER); fMenu->AddEntry("-", 0); fMenu->AddEntry("Next", CTRL_NEXT); fMenu->AddEntry("NextFlow", CTRL_NEXT_FLOW); fMenu->AddEntry("Continue", CTRL_CONTINUE); fMenu->AddEntry("Pause", CTRL_PAUSE); fMenu->AddEntry("-", 0); fMenu->AddEntry("Quit", CTRL_QUIT); fMenuBarItemLayout = new TGLayoutHints(kLHintsTop|kLHintsLeft, 0, 4, 0, 0); fMenu->Associate(this); fMenuBar = new TGMenuBar(this, 1, 1, kRaisedFrame); fMenuBar->AddPopup("&Rootana", fMenu, fMenuBarItemLayout); fMenuBar->Layout(); AddFrame(fMenuBar, new TGLayoutHints(kLHintsTop|kLHintsLeft|kLHintsExpandX)); fButtonsFrame = new TGVerticalFrame(this); fNextButton = new TextButton(fButtonsFrame, "Next", holder, CTRL_NEXT); fNextFlowButton = new TextButton(fButtonsFrame, "Next Flow Event", holder, CTRL_NEXT_FLOW); fButtonsFrame->AddFrame(fNextButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1)); fButtonsFrame->AddFrame(fNextFlowButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1)); TGHorizontalFrame *hframe = new TGHorizontalFrame(fButtonsFrame); fContinueButton = new TextButton(hframe, " Continue ", holder, CTRL_CONTINUE); fPauseButton = new TextButton(hframe, " Pause ", holder, CTRL_PAUSE); hframe->AddFrame(fContinueButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1)); hframe->AddFrame(fPauseButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1)); fButtonsFrame->AddFrame(hframe, new TGLayoutHints(kLHintsExpandX)); fQuitButton = new TextButton(fButtonsFrame, "Quit ", holder, CTRL_QUIT); fButtonsFrame->AddFrame(fQuitButton, new TGLayoutHints(kLHintsExpandX, 1, 1, 1, 1)); AddFrame(fButtonsFrame, new TGLayoutHints(kLHintsExpandX)); MapSubwindows(); Layout(); Resize(GetDefaultSize()); MapWindow(); } ~MainWindow() // dtor // Closing the control window closes the whole program { if (gTrace) printf("MainWindow::dtor!\n"); delete fMenu; delete fMenuBar; delete fMenuBarItemLayout; } void CloseWindow() { if (gTrace) printf("MainWindow::CloseWindow()\n"); if (fHolder) fHolder->fValue = CTRL_QUIT; //gSystem->ExitLoop(); } Bool_t ProcessMessage(Long_t msg, Long_t parm1, Long_t parm2) { //printf("GUI Message %d %d %d\n",(int)msg,(int)parm1,(int)parm2); switch (GET_MSG(msg)) { default: break; case kC_COMMAND: switch (GET_SUBMSG(msg)) { default: break; case kCM_MENU: //printf("parm1 %d\n", (int)parm1); switch (parm1) { case CTRL_TBROWSER: new TBrowser(); break; default: //printf("Control %d!\n", (int)parm1); if (fHolder) fHolder->fValue = parm1; //gSystem->ExitLoop(); break; } break; } break; } return kTRUE; } }; #endif class InteractiveModule: public TARunObject { public: bool fContinue; bool fNextFlow; int fSkip; #ifdef HAVE_ROOT static ValueHolder* fgHolder; static MainWindow *fgCtrlWindow; #endif InteractiveModule(TARunInfo* runinfo) : TARunObject(runinfo) { if (gTrace) printf("InteractiveModule::ctor, run %d\n", runinfo->fRunNo); fContinue = false; fNextFlow = false; fSkip = 0; #ifdef HAVE_ROOT if (!fgHolder) fgHolder = new ValueHolder; if (!fgCtrlWindow && runinfo->fRoot->fgApp) { fgCtrlWindow = new MainWindow(gClient->GetRoot(), 200, 300, fgHolder); } #endif } ~InteractiveModule() { if (gTrace) printf("InteractiveModule::dtor!\n"); } void BeginRun(TARunInfo* runinfo) { printf("InteractiveModule::BeginRun, run %d\n", runinfo->fRunNo); } void EndRun(TARunInfo* runinfo) { printf("InteractiveModule::EndRun, run %d\n", runinfo->fRunNo); #ifdef HAVE_ROOT if (fgCtrlWindow && runinfo->fRoot->fgApp) { fgCtrlWindow->fNextButton->SetEnabled(false); fgCtrlWindow->fNextFlowButton->SetEnabled(false); fgCtrlWindow->fContinueButton->SetEnabled(false); fgCtrlWindow->fPauseButton->SetEnabled(false); while (1) { #ifdef HAVE_THTTP_SERVER if (TARootHelper::fgHttpServer) { TARootHelper::fgHttpServer->ProcessRequests(); } #endif #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif #ifdef HAVE_MIDAS #ifdef HAVE_TMFE TMFE* mfe = TMFE::Instance(); mfe->Yield(0.010); if (mfe->fShutdownRequested) { return; } #else if (!TMidasOnline::instance()->sleep(10)) { // FIXME: indicate that we should exit the analyzer return; } #endif #else gSystem->Sleep(10); #endif int ctrl = fgHolder->fValue; fgHolder->fValue = 0; switch (ctrl) { case CTRL_QUIT: return; case CTRL_NEXT: return; case CTRL_CONTINUE: return; } } } #endif } void PauseRun(TARunInfo* runinfo) { printf("InteractiveModule::PauseRun, run %d\n", runinfo->fRunNo); } void ResumeRun(TARunInfo* runinfo) { printf("InteractiveModule::ResumeRun, run %d\n", runinfo->fRunNo); } void InteractiveLoop(TARunInfo* runinfo, TAFlags* flags) { #ifdef HAVE_ROOT if (fgCtrlWindow && runinfo->fRoot->fgApp) { while (1) { #ifdef HAVE_THTTP_SERVER if (TARootHelper::fgHttpServer) { TARootHelper::fgHttpServer->ProcessRequests(); } #endif #ifdef HAVE_ROOT if (TARootHelper::fgApp) { gSystem->DispatchOneEvent(kTRUE); } #endif #ifdef HAVE_MIDAS #ifdef HAVE_TMFE TMFE* mfe = TMFE::Instance(); mfe->Yield(0.010); if (mfe->fShutdownRequested) { *flags |= TAFlag_QUIT; return; } #else if (!TMidasOnline::instance()->sleep(10)) { *flags |= TAFlag_QUIT; return; } #endif #else gSystem->Sleep(10); #endif int ctrl = fgHolder->fValue; fgHolder->fValue = 0; switch (ctrl) { case CTRL_QUIT: *flags |= TAFlag_QUIT; return; case CTRL_NEXT: return; case CTRL_NEXT_FLOW: fNextFlow = true; return; case CTRL_CONTINUE: fContinue = true; return; } } } #endif while (1) { char str[256]; fprintf(stdout, "manalyzer> "); fflush(stdout); char* s = fgets(str, sizeof(str)-1, stdin); if (s == NULL) { // EOF *flags |= TAFlag_QUIT; return; } printf("command [%s]\n", str); if (str[0] == 'h') { // "help" printf("Interactive manalyzer commands:\n"); printf(" q - quit\n"); printf(" h - help\n"); printf(" c - continue until next TAFlag_DISPLAY event\n"); printf(" n - next event\n"); printf(" aNNN - analyze N events, i.e. \"a10\"\n"); } else if (str[0] == 'q') { // "quit" *flags |= TAFlag_QUIT; return; } else if (str[0] == 'n') { // "next" return; } else if (str[0] == 'c') { // "continue" fContinue = true; return; } else if (str[0] == 'a') { // "analyze" N events int num = atoi(str+1); printf("Analyzing %d events\n", num); if (num > 0) { fSkip = num-1; } return; } } } TAFlowEvent* Analyze(TARunInfo* runinfo, TMEvent* event, TAFlags* flags, TAFlowEvent* flow) { printf("InteractiveModule::Analyze, run %d, %s\n", runinfo->fRunNo, event->HeaderToString().c_str()); #ifdef HAVE_ROOT if (fgHolder->fValue == CTRL_QUIT) { *flags |= TAFlag_QUIT; return flow; } else if (fgHolder->fValue == CTRL_PAUSE) { fContinue = false; } #endif if ((fContinue||fNextFlow) && !(*flags & TAFlag_DISPLAY)) { return flow; } else { fContinue = false; } if (fSkip > 0) { fSkip--; return flow; } InteractiveLoop(runinfo, flags); return flow; } TAFlowEvent* AnalyzeFlowEvent(TARunInfo* runinfo, TAFlags* flags, TAFlowEvent* flow) { printf("InteractiveModule::AnalyzeFlowEvent, run %d\n", runinfo->fRunNo); #ifdef HAVE_ROOT if (fgHolder->fValue == CTRL_QUIT) { *flags |= TAFlag_QUIT; return flow; } else if (fgHolder->fValue == CTRL_PAUSE) { fContinue = false; } #endif if ((!fNextFlow) && !(*flags & TAFlag_DISPLAY)) { return flow; } fNextFlow = false; InteractiveLoop(runinfo, flags); return flow; } void AnalyzeSpecialEvent(TARunInfo* runinfo, TMEvent* event) { if (gTrace) printf("InteractiveModule::AnalyzeSpecialEvent, run %d, event serno %d, id 0x%04x, data size %d\n", runinfo->fRunNo, event->serial_number, (int)event->event_id, event->data_size); } }; #ifdef HAVE_ROOT MainWindow* InteractiveModule::fgCtrlWindow = NULL; ValueHolder* InteractiveModule::fgHolder = NULL; #endif class InteractiveModuleFactory: public TAFactory { public: void Init(const std::vector &args) { if (gTrace) printf("InteractiveModuleFactory::Init!\n"); } void Finish() { if (gTrace) printf("InteractiveModuleFactory::Finish!\n"); } TARunObject* NewRunObject(TARunInfo* runinfo) { if (gTrace) printf("InteractiveModuleFactory::NewRunObject, run %d, file %s\n", runinfo->fRunNo, runinfo->fFileName.c_str()); return new InteractiveModule(runinfo); } }; ////////////////////////////////////////////////////////// // // main program // ////////////////////////////////////////////////////////// static void help() { printf("\nUsage: ./manalyzer.exe [-h] [-R8081] [-oOutputfile.mid] [file1 file2 ...] [-- arguments passed to modules ...]\n"); printf("\n"); printf("-h: print this help message\n"); printf("--demo: activate the demo mode, online connection or input file not needed, midas events are generated internally, add -e0 or -eNNN to set number of demo events \n"); printf("\n"); printf("-Hhostname: connect to MIDAS experiment on given host\n"); printf("-Eexptname: connect to this MIDAS experiment\n"); printf("--midas-progname SSS -- set analyzer's MIDAS program name, default is \"ana\"\n"); printf("--midas-hostname HOSTNAME[:PORT] -- connect to MIDAS mserver on given host and port\n"); printf("--midas-exptname EXPTNAME -- connect to given experiment\n"); printf("--midas-buffer BUFZZZ -- connect to given MIDAS event buffer\n"); printf("--midas-sampling SSS -- sample events from MIDAS event buffer: GET_ALL=get every event (will block the event writers, GET_NONBLOCKING=get as many as we can process, GET_RECENT=get recent events, see bm_receive_event(). Default is GET_NONBLOCKING\n"); printf("--midas-event-id III -- receive only events with matching event ID\n"); printf("--midas-trigger-mask 0xMASK -- receive only events with matching trigger mask\n"); printf("\n"); printf("-oOutputfile.mid: write selected events into this file\n"); printf("-Rnnnn: Start the ROOT THttpServer HTTP server on specified tcp port, use -R8081, access by firefox http://localhost:8081\n"); printf("-eNNN: Number of events to analyze, 0=unlimited\n"); printf("-sNNN: Number of events to skip before starting analysis\n"); printf("\n"); printf("--dump: activate the event dump module\n"); printf("\n"); printf("-t: Enable tracing of constructors, destructors and function calls\n"); printf("-m: Enable memory leak debugging\n"); printf("-g: Enable graphics display when processing data files\n"); printf("-i: Enable intractive mode\n"); printf("\n"); printf("--mt: Enable multithreaded mode. Extra multithread config settings:\n"); printf("--mtqlNNN: Module thread queue length (buffer). Default: %d\n", gDefaultMultithreadQueueLength); printf("--mtseNNN: Module thread sleep time with empty queue (usec). Default: %d\n", gDefaultMultithreadWaitEmpty); printf("--mtsfNNN: Module thread sleep time when next queue is full (usec). Default: %d\n", gDefaultMultithreadWaitFull); printf("\n"); printf("--no-profiler: Turn off manalyzer module profiler\n"); printf("--pqiNNN: Profile multithread queue lengths every NNN events \n"); #ifdef HAVE_ROOT printf("\n"); printf("-Doutputdirectory: Specify output root file directory\n"); printf("-Ooutputfile.root: Specify output root file filename\n"); #endif printf("\n"); printf("--: All following arguments are passed to the analyzer modules Init() method\n"); printf("\n"); printf("Analyzer modules usage:\n"); if (gModules) for (unsigned i=0; i<(*gModules).size(); i++) (*gModules)[i]->Usage(); printf("\n"); printf("Example1: analyze online data: ./manalyzer.exe -R9091\n"); printf("Example2: analyze existing data: ./manalyzer.exe /data/alpha/current/run00500.mid\n"); exit(1); } // duplicate c++20 std::string s.starts_with() static bool starts_with(const std::string& s, const char* prefix) { return (s.substr(0, strlen(prefix)) == prefix); } // Main function call int manalyzer_main(int argc, char* argv[]) { setbuf(stdout, NULL); setbuf(stderr, NULL); signal(SIGILL, SIG_DFL); signal(SIGBUS, SIG_DFL); signal(SIGSEGV, SIG_DFL); signal(SIGPIPE, SIG_DFL); std::vector args; for (int i=0; i files; std::vector modargs; #ifdef HAVE_MIDAS std::string midas_hostname = ""; std::string midas_exptname = ""; std::string midas_progname = "ana"; std::string midas_buffer = "SYSTEM"; //std::string midas_sampling = "GET_ALL"; std::string midas_sampling = "GET_NONBLOCKING"; int midas_event_id = -1; int midas_trigger_mask = -1; #endif for (unsigned int i=1; i; if ((*gModules).size() == 0) event_dump = true; if (event_dump) (*gModules).push_back(new EventDumpModuleFactory); if (interactive) (*gModules).push_back(new InteractiveModuleFactory); printf("Registered modules: %d\n", (int)(*gModules).size()); #ifdef HAVE_ROOT if (multithread) { // see https://root.cern/manual/multi_threading/ ROOT::EnableImplicitMT(); ROOT::EnableThreadSafety(); } if (root_graphics) { TARootHelper::fgApp = new TApplication("manalyzer", NULL, NULL, 0, 0); } TARootHelper::fgDir = new TDirectory("manalyzer", "location of histograms"); TARootHelper::fgDir->cd(); #endif if (httpPort) { #ifdef HAVE_THTTP_SERVER char str[256]; snprintf(str, sizeof(str), "http:127.0.0.1:%d?cors", httpPort); THttpServer *s = new THttpServer(str); //s->SetTimer(100, kFALSE); TARootHelper::fgHttpServer = s; #else fprintf(stderr,"ERROR: No support for the THttpServer!\n"); #endif } for (unsigned i=0; i 0) { exit_state = ProcessMidasFiles(files, modargs, num_skip, num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length); } else { #ifdef HAVE_MIDAS #ifdef HAVE_TMFE exit_state = ProcessMidasOnlineTmfe(modargs, midas_progname.c_str(), midas_hostname.c_str(), midas_exptname.c_str(), midas_buffer.c_str(), midas_event_id, midas_trigger_mask, midas_sampling.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length); #else exit_state = ProcessMidasOnlineOld(modargs, midas_hostname.c_str(), midas_exptname.c_str(), num_analyze, writer, multithread, performance_profiler, snap_shot_queue_length); #endif #endif } if (writer) { writer->Close(); delete writer; writer = NULL; } return exit_state; } /* emacs * Local Variables: * tab-width: 8 * c-basic-offset: 3 * indent-tabs-mode: nil * End: */