/********************************************************************\ Name: MIDAS.C Created by: Stefan Ritt Contents: MIDAS main library funcitons $Id$ \********************************************************************/ #undef NDEBUG // midas required assert() to be always enabled #include "midas.h" #include "msystem.h" #include "git-revision.h" #include "mstrlcpy.h" #include #include #include #include #include #include #include #include /**dox***************************************************************/ /** @file midas.c The main core C-code for Midas. */ /** \mainpage MIDAS code documentation Welcome to the doxygen-generated documentation for the MIDAS source code. This documentation is intended to be used as reference by MIDAS developers and advanced users. Documentation for new users, general information on MIDAS, examples, user discussion, mailing lists and forums, can be found through the MIDAS Wiki at http://midas.triumf.ca */ /** @defgroup cmfunctionc Common Functions (cm_xxx) */ /** @defgroup bmfunctionc Event Buffer Functions (bm_xxx) */ /** @defgroup msgfunctionc Message Functions (msg_xxx) */ /** @defgroup bkfunctionc Data Bank Functions (bk_xxx) */ /** @defgroup rpc_xxx RPC Functions (rpc_xxx) */ /** @defgroup rbfunctionc Ring Buffer Functions (rb_xxx) */ /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ /* data type sizes */ static const int tid_size[] = { 0, /* tid == 0 not defined */ 1, /* TID_UINT8 unsigned byte 0 255 */ 1, /* TID_INT8 signed byte -128 127 */ 1, /* TID_CHAR single character 0 255 */ 2, /* TID_UINT16 two bytes 0 65535 */ 2, /* TID_INT16 signed word -32768 32767 */ 4, /* TID_UINT32 four bytes 0 2^32-1 */ 4, /* TID_INT32 signed dword -2^31 2^31-1 */ 4, /* TID_BOOL four bytes bool 0 1 */ 4, /* TID_FLOAT 4 Byte float format */ 8, /* TID_DOUBLE 8 Byte float format */ 4, /* TID_BITFIELD 32 Bits Bitfield 0000... 11111... */ 0, /* TID_STRING zero terminated string */ 0, /* TID_ARRAY variable length array of unkown type */ 0, /* TID_STRUCT C structure */ 0, /* TID_KEY key in online database */ 0, /* TID_LINK link in online database */ 8, /* TID_INT64 8 bytes int -2^63 2^63-1 */ 8 /* TID_UINT64 8 bytes unsigned int 0 2^64-1 */ }; /* data type names */ static const char *tid_name_old[] = { "NULL", "BYTE", "SBYTE", "CHAR", "WORD", "SHORT", "DWORD", "INT", "BOOL", "FLOAT", "DOUBLE", "BITFIELD", "STRING", "ARRAY", "STRUCT", "KEY", "LINK", "INT64", "UINT64" }; static const char *tid_name[] = { "NULL", "UINT8", "INT8", "CHAR", "UINT16", "INT16", "UINT32", "INT32", "BOOL", "FLOAT", "DOUBLE", "BITFIELD", "STRING", "ARRAY", "STRUCT", "KEY", "LINK", "INT64", "UINT64" }; std::string cm_transition_name(int transition) { if (transition == TR_START) return "START"; if (transition == TR_STOP) return "STOP"; if (transition == TR_PAUSE) return "PAUSE"; if (transition == TR_RESUME) return "RESUME"; if (transition == TR_STARTABORT) return "STARTABORT"; if (transition == TR_DEFERRED) return "DEFERRED"; return msprintf("UNKNOWN TRANSITION %d", transition); } const char *mname[] = { "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December" }; /* Globals */ #ifdef OS_MSDOS extern unsigned _stklen = 60000U; #endif extern DATABASE *_database; extern INT _database_entries; // // locking rules for gBuffers and gBuffersMutex: // // - all access to gBuffers must be done while holding gBufferMutex // - while holding gBufferMutex: // - taking additional locks not permitted (no calling odb, no locking event buffers, etc) // - calling functions that can take additional locks not permitted (no calling db_xxx(), bm_xxx(), etc) // - calling functions that can come back recursively not permitted // // after obtaining a BUFFER*pbuf pointer from gBuffers: // // - holding gBuffersMutex is not required // - to access pbuf data, must hold buffer_mutex or call bm_lock_buffer() // - except for: // pbuf->attached - no need to hold a lock (std::atomic) // pbuf->buffer_name - no need to hold a lock (constant data, only changed by bm_open_buffer()) // // object life time: // // - gBuffers never shrinks // - new BUFFER objects are created by bm_open_buffer(), added to gBuffers when ready for use, pbuf->attached set to true // - bm_close_buffer() sets pbuf->attached to false // - BUFFER objects are never deleted to avoid race between delete and bm_send_event() & co // - BUFFER objects are never reused, bm_open_buffer() always creates a new object // - gBuffers[i] set to NULL are empty slots available for reuse // - closed buffers have corresponding gBuffers[i]->attached set to false // static std::mutex gBuffersMutex; // protects gBuffers vector itself, but not it's contents! static std::vector gBuffers; static INT _msg_buffer = 0; static EVENT_HANDLER *_msg_dispatch = NULL; /* Event request descriptor */ struct EventRequest { INT buffer_handle = 0; /* Buffer handle */ short int event_id = 0; /* same as in EVENT_HEADER */ short int trigger_mask = 0; /* same as in EVENT_HEADER */ EVENT_HANDLER* dispatcher = NULL; /* Dispatcher func. */ void clear() { buffer_handle = 0; event_id = 0; trigger_mask = 0; dispatcher = NULL; } }; static std::mutex _request_list_mutex; static std::vector _request_list; //static char *_tcp_buffer = NULL; //static INT _tcp_wp = 0; //static INT _tcp_rp = 0; //static INT _tcp_sock = 0; static MUTEX_T *_mutex_rpc = NULL; // mutex to protect RPC calls static void (*_debug_print)(const char *) = NULL; static INT _debug_mode = 0; static int _rpc_connect_timeout = 10000; // for use on a single machine it is best to restrict RPC access to localhost // by binding the RPC listener socket to the localhost IP address. static int disable_bind_rpc_to_localhost = 0; /* table for transition functions */ struct TRANS_TABLE { INT transition; INT sequence_number; INT (*func)(INT, char *); }; static std::mutex _trans_table_mutex; static std::vector _trans_table; static TRANS_TABLE _deferred_trans_table[] = { {TR_START, 0, NULL}, {TR_STOP, 0, NULL}, {TR_PAUSE, 0, NULL}, {TR_RESUME, 0, NULL}, {0, 0, NULL} }; static BOOL _rpc_registered = FALSE; static int _rpc_listen_socket = 0; static INT rpc_transition_dispatch(INT idx, void *prpc_param[]); void cm_ctrlc_handler(int sig); typedef struct { INT code; const char *string; } ERROR_TABLE; static const ERROR_TABLE _error_table[] = { {CM_WRONG_PASSWORD, "Wrong password"}, {CM_UNDEF_EXP, "Experiment not defined"}, {CM_UNDEF_ENVIRON, "\"exptab\" file not found and MIDAS_DIR or MIDAS_EXPTAB environment variable is not defined"}, {RPC_NET_ERROR, "Cannot connect to remote host"}, {0, NULL} }; typedef struct { void *adr; int size; char file[80]; int line; } DBG_MEM_LOC; static DBG_MEM_LOC *_mem_loc = NULL; static INT _n_mem = 0; struct TR_PARAM { INT transition; INT run_number; char *errstr; INT errstr_size; INT async_flag; INT debug_flag; std::atomic_int status{0}; std::atomic_bool finished{false}; std::atomic thread{NULL}; }; static TR_PARAM _trp; /*------------------------------------------------------------------*/ void *dbg_malloc(unsigned int size, char *file, int line) { FILE *f; void *adr; int i; adr = malloc(size); /* search for deleted entry */ for (i = 0; i < _n_mem; i++) if (_mem_loc[i].adr == NULL) break; if (i == _n_mem) { _n_mem++; if (!_mem_loc) _mem_loc = (DBG_MEM_LOC *) malloc(sizeof(DBG_MEM_LOC)); else _mem_loc = (DBG_MEM_LOC *) realloc(_mem_loc, sizeof(DBG_MEM_LOC) * _n_mem); } _mem_loc[i].adr = adr; _mem_loc[i].size = size; strcpy(_mem_loc[i].file, file); _mem_loc[i].line = line; f = fopen("mem.txt", "w"); for (i = 0; i < _n_mem; i++) if (_mem_loc[i].adr) fprintf(f, "%s:%d size=%d adr=%p\n", _mem_loc[i].file, _mem_loc[i].line, _mem_loc[i].size, _mem_loc[i].adr); fclose(f); return adr; } void *dbg_calloc(unsigned int size, unsigned int count, char *file, int line) { void *adr; adr = dbg_malloc(size * count, file, line); if (adr) memset(adr, 0, size * count); return adr; } void dbg_free(void *adr, char *file, int line) { FILE *f; int i; free(adr); for (i = 0; i < _n_mem; i++) if (_mem_loc[i].adr == adr) break; if (i < _n_mem) _mem_loc[i].adr = NULL; f = fopen("mem.txt", "w"); for (i = 0; i < _n_mem; i++) if (_mem_loc[i].adr) fprintf(f, "%s:%d %s:%d size=%d adr=%p\n", _mem_loc[i].file, _mem_loc[i].line, file, line, _mem_loc[i].size, _mem_loc[i].adr); fclose(f); } static std::vector split(const char* sep, const std::string& s) { unsigned sep_len = strlen(sep); std::vector v; std::string::size_type pos = 0; while (1) { std::string::size_type next = s.find(sep, pos); if (next == std::string::npos) { v.push_back(s.substr(pos)); break; } v.push_back(s.substr(pos, next-pos)); pos = next+sep_len; } return v; } static std::string join(const char* sep, const std::vector& v) { std::string s; for (unsigned i=0; i0) { s += sep; } s += v[i]; } return s; } bool ends_with_char(const std::string& s, char c) { if (s.length() < 1) return false; return s[s.length()-1] == c; } std::string msprintf(const char *format, ...) { va_list ap, ap1; va_start(ap, format); va_copy(ap1, ap); size_t size = vsnprintf(nullptr, 0, format, ap1) + 1; char *buffer = (char *)malloc(size); if (!buffer) return ""; vsnprintf(buffer, size, format, ap); va_end(ap); std::string s(buffer); free(buffer); return s; } /********************************************************************\ * * * Common message functions * * * \********************************************************************/ typedef int (*MessagePrintCallback)(const char *); static std::atomic _message_print{puts}; static std::atomic_int _message_mask_system{MT_ALL}; static std::atomic_int _message_mask_user{MT_ALL}; /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ /**dox***************************************************************/ /** @addtogroup msgfunctionc * * @{ */ /********************************************************************/ /** Convert error code to string. Used after cm_connect_experiment to print error string in command line programs or windows programs. @param code Error code as defined in midas.h @param string Error string @return CM_SUCCESS */ std::string cm_get_error(INT code) { for (int i = 0; _error_table[i].code; i++) { if (_error_table[i].code == code) { return _error_table[i].string; } } return msprintf("unlisted status code %d", code); } /********************************************************************/ int cm_msg_early_init(void) { return CM_SUCCESS; } /********************************************************************/ int cm_msg_open_buffer(void) { //printf("cm_msg_open_buffer!\n"); if (_msg_buffer == 0) { int status = bm_open_buffer(MESSAGE_BUFFER_NAME, MESSAGE_BUFFER_SIZE, &_msg_buffer); if (status != BM_SUCCESS && status != BM_CREATED) { return status; } } return CM_SUCCESS; } /********************************************************************/ int cm_msg_close_buffer(void) { //printf("cm_msg_close_buffer!\n"); if (_msg_buffer) { bm_close_buffer(_msg_buffer); _msg_buffer = 0; } return CM_SUCCESS; } /********************************************************************/ /** Retrieve list of message facilities by searching logfiles on disk @param list List of facilities @return status SUCCESS */ INT EXPRT cm_msg_facilities(STRING_LIST *list) { std::string path; cm_msg_get_logfile("midas", 0, &path, NULL, NULL); /* extract directory name from full path name of midas.log */ size_t pos = path.rfind(DIR_SEPARATOR); if (pos != std::string::npos) { path.resize(pos); } else { path = ""; } //printf("cm_msg_facilities: path [%s]\n", path.c_str()); STRING_LIST flist; ss_file_find(path.c_str(), "*.log", &flist); for (size_t i = 0; i < flist.size(); i++) { const char *p = flist[i].c_str(); if (strchr(p, '_') == NULL && !(p[0] >= '0' && p[0] <= '9')) { size_t pos = flist[i].rfind('.'); if (pos != std::string::npos) { flist[i].resize(pos); } list->push_back(flist[i]); } } return SUCCESS; } /********************************************************************/ void cm_msg_get_logfile(const char *fac, time_t t, std::string* filename, std::string* linkname, std::string* linktarget) { HNDLE hDB; int status; status = cm_get_experiment_database(&hDB, NULL); // check for call to cm_msg() before MIDAS is fully initialized // or after MIDAS is partially shutdown. if (status != CM_SUCCESS) { if (filename) *filename = std::string(fac) + ".log"; if (linkname) *linkname = ""; if (linktarget) *linktarget = ""; return; } if (filename) *filename = ""; if (linkname) *linkname = ""; if (linktarget) *linktarget = ""; std::string facility; if (fac && fac[0]) facility = fac; else facility = "midas"; std::string message_format; db_get_value_string(hDB, 0, "/Logger/Message file date format", 0, &message_format, TRUE); if (message_format.find('%') != std::string::npos) { /* replace stings such as %y%m%d with current date */ struct tm tms; ss_tzset(); if (t == 0) time(&t); localtime_r(&t, &tms); char de[256]; de[0] = '_'; strftime(de + 1, sizeof(de)-1, strchr(message_format.c_str(), '%'), &tms); message_format = de; } std::string message_dir; db_get_value_string(hDB, 0, "/Logger/Message dir", 0, &message_dir, TRUE); if (message_dir.empty()) { db_get_value_string(hDB, 0, "/Logger/Data dir", 0, &message_dir, FALSE); if (message_dir.empty()) { message_dir = cm_get_path(); if (message_dir.empty()) { message_dir = ss_getcwd(); } } } // prepend experiment directory if (message_dir[0] != DIR_SEPARATOR) message_dir = cm_get_path() + message_dir; if (message_dir.back() != DIR_SEPARATOR) message_dir.push_back(DIR_SEPARATOR); if (filename) *filename = message_dir + facility + message_format + ".log"; if (!message_format.empty()) { if (linkname) *linkname = message_dir + facility + ".log"; if (linktarget) *linktarget = facility + message_format + ".log"; } } /********************************************************************/ /** Set message masks. When a message is generated by calling cm_msg(), it can got to two destinatinons. First a user defined callback routine and second to the "SYSMSG" buffer. A user defined callback receives all messages which satisfy the user_mask. \code int message_print(const char *msg) { char str[160]; memset(str, ' ', 159); str[159] = 0; if (msg[0] == '[') msg = strchr(msg, ']')+2; memcpy(str, msg, strlen(msg)); ss_printf(0, 20, str); return 0; } ... cm_set_msg_print(MT_ALL, MT_ALL, message_print); ... \endcode @param system_mask Bit masks for MERROR, MINFO etc. to send system messages. @param user_mask Bit masks for MERROR, MINFO etc. to send messages to the user callback. @param func Function which receives all printout. By setting "puts", messages are just printed to the screen. @return CM_SUCCESS */ INT cm_set_msg_print(INT system_mask, INT user_mask, int (*func)(const char *)) { _message_mask_system = system_mask; _message_mask_user = user_mask; _message_print = func; return BM_SUCCESS; } /********************************************************************/ /** Write message to logging file. Called by cm_msg. @attention May burn your fingers @param message_type Message type @param message Message string @param facility Message facility, filename in which messages will be written @return CM_SUCCESS */ INT cm_msg_log(INT message_type, const char *facility, const char *message) { INT status; if (rpc_is_remote()) { if (rpc_is_connected()) { status = rpc_call(RPC_CM_MSG_LOG, message_type, facility, message); if (status != RPC_SUCCESS) { fprintf(stderr, "cm_msg_log: Message \"%s\" not written to midas.log because rpc_call(RPC_CM_MSG_LOG) failed with status %d\n", message, status); } return status; } else { fprintf(stderr, "cm_msg_log: Message \"%s\" not written to midas.log, no connection to mserver\n", message); return RPC_NET_ERROR; } } if (message_type != MT_DEBUG) { std::string filename, linkname, linktarget; cm_msg_get_logfile(facility, 0, &filename, &linkname, &linktarget); #ifdef OS_LINUX if (!linkname.empty()) { //printf("cm_msg_log: filename [%s] linkname [%s] linktarget [%s]\n", filename.c_str(), linkname.c_str(), linktarget.c_str()); // If filename does not exist, user just switched from non-date format to date format. // In that case we must copy linkname to filename, otherwise messages might get lost. if (ss_file_exist(linkname.c_str()) && !ss_file_link_exist(linkname.c_str())) { ss_file_copy(linkname.c_str(), filename.c_str(), true); } unlink(linkname.c_str()); status = symlink(linktarget.c_str(), linkname.c_str()); if (status != 0) { fprintf(stderr, "cm_msg_log: Error: Cannot symlink message log file \'%s' to \'%s\', symlink() errno: %d (%s)\n", linktarget.c_str(), linkname.c_str(), errno, strerror(errno)); } } #endif int fh = open(filename.c_str(), O_WRONLY | O_CREAT | O_APPEND | O_LARGEFILE, 0644); if (fh < 0) { fprintf(stderr, "cm_msg_log: Message \"%s\" not written to midas.log because open(%s) failed with errno %d (%s)\n", message, filename.c_str(), errno, strerror(errno)); } else { struct timeval tv; struct tm tms; ss_tzset(); gettimeofday(&tv, NULL); localtime_r(&tv.tv_sec, &tms); char str[256]; strftime(str, sizeof(str), "%H:%M:%S", &tms); sprintf(str + strlen(str), ".%03d ", (int) (tv.tv_usec / 1000)); strftime(str + strlen(str), sizeof(str), "%G/%m/%d", &tms); std::string msg; msg += str; msg += " "; msg += message; msg += "\n"; /* avoid c++ complaint about comparison between unsigned size_t returned by msg.length() and signed ssize_t returned by write() */ ssize_t len = msg.length(); /* atomic write, no need to take a semaphore */ ssize_t wr = write(fh, msg.c_str(), len); if (wr < 0) { fprintf(stderr, "cm_msg_log: Message \"%s\" not written to \"%s\", write() error, errno %d (%s)\n", message, filename.c_str(), errno, strerror(errno)); } else if (wr != len) { fprintf(stderr, "cm_msg_log: Message \"%s\" not written to \"%s\", short write() wrote %d instead of %d bytes\n", message, filename.c_str(), (int)wr, (int)len); } close(fh); } } return CM_SUCCESS; } static std::string cm_msg_format(INT message_type, const char *filename, INT line, const char *routine, const char *format, va_list *argptr) { /* strip path */ const char* pc = filename + strlen(filename); while (*pc != '\\' && *pc != '/' && pc != filename) pc--; if (pc != filename) pc++; /* convert type to string */ std::string type_str; if (message_type & MT_ERROR) type_str += MT_ERROR_STR; if (message_type & MT_INFO) type_str += MT_INFO_STR; if (message_type & MT_DEBUG) type_str += MT_DEBUG_STR; if (message_type & MT_USER) type_str += MT_USER_STR; if (message_type & MT_LOG) type_str += MT_LOG_STR; if (message_type & MT_TALK) type_str += MT_TALK_STR; std::string message; /* print client name into string */ if (message_type == MT_USER) message = msprintf("[%s] ", routine); else { std::string name = rpc_get_name(); if (name.length() > 0) message = msprintf("[%s,%s] ", name.c_str(), type_str.c_str()); else message = ""; } /* preceed error messages with file and line info */ if (message_type == MT_ERROR) { message += msprintf("[%s:%d:%s,%s] ", pc, line, routine, type_str.c_str()); } else if (message_type == MT_USER) { message = msprintf("[%s,%s] ", routine, type_str.c_str()); } int bufsize = 1024; char* buf = (char*)malloc(bufsize); assert(buf); for (int i=0; i<10; i++) { va_list ap; va_copy(ap, *argptr); /* print argument list into message */ int n = vsnprintf(buf, bufsize-1, format, ap); //printf("vsnprintf [%s] %d %d\n", format, bufsize, n); if (n < bufsize) { break; } bufsize += 100; bufsize *= 2; buf = (char*)realloc(buf, bufsize); assert(buf); } message += buf; free(buf); return message; } static INT cm_msg_send_event(DWORD ts, INT message_type, const char *send_message) { //printf("cm_msg_send: ts %d, type %d, message [%s]\n", ts, message_type, send_message); /* send event if not of type MLOG */ if (message_type != MT_LOG) { if (_msg_buffer) { /* copy message to event */ size_t len = strlen(send_message); int event_length = sizeof(EVENT_HEADER) + len + 1; char event[event_length]; EVENT_HEADER *pevent = (EVENT_HEADER *) event; memcpy(event + sizeof(EVENT_HEADER), send_message, len + 1); /* setup the event header and send the message */ bm_compose_event(pevent, EVENTID_MESSAGE, (WORD) message_type, len + 1, 0); if (ts) pevent->time_stamp = ts; //printf("cm_msg_send_event: len %d, header %d, allocated %d, data_size %d, bm_send_event %p+%d\n", (int)len, (int)sizeof(EVENT_HEADER), event_length, pevent->data_size, pevent, (int)(pevent->data_size + sizeof(EVENT_HEADER))); bm_send_event(_msg_buffer, pevent, 0, BM_WAIT); } } return CM_SUCCESS; } struct msg_buffer_entry { DWORD ts; int message_type; std::string message; }; static std::deque gMsgBuf; static std::mutex gMsgBufMutex; /********************************************************************/ /** This routine can be called to process messages buffered by cm_msg(). Normally it is called from cm_yield() and cm_disconnect_experiment() to make sure all accumulated messages are processed. */ INT cm_msg_flush_buffer() { int i; //printf("cm_msg_flush_buffer!\n"); for (i = 0; i < 100; i++) { msg_buffer_entry e; { std::lock_guard lock(gMsgBufMutex); if (gMsgBuf.empty()) break; e = gMsgBuf.front(); gMsgBuf.pop_front(); // implicit unlock } /* log message */ cm_msg_log(e.message_type, "midas", e.message.c_str()); /* send message to SYSMSG */ int status = cm_msg_send_event(e.ts, e.message_type, e.message.c_str()); if (status != CM_SUCCESS) return status; } return CM_SUCCESS; } /********************************************************************/ /** This routine can be called whenever an internal error occurs or an informative message is produced. Different message types can be enabled or disabled by setting the type bits via cm_set_msg_print(). @attention Do not add the "\n" escape carriage control at the end of the formated line as it is already added by the client on the receiving side. \code ... cm_msg(MINFO, "my program", "This is a information message only); cm_msg(MERROR, "my program", "This is an error message with status:%d", my_status); cm_msg(MTALK, "my_program", My program is Done!"); ... \endcode @param message_type (See @ref midas_macro). @param filename Name of source file where error occured @param line Line number where error occured @param routine Routine name. @param format message to printout, ... Parameters like for printf() @return CM_SUCCESS */ INT cm_msg(INT message_type, const char *filename, INT line, const char *routine, const char *format, ...) { DWORD ts = ss_time(); /* print argument list into message */ std::string message; va_list argptr; va_start(argptr, format); message = cm_msg_format(message_type, filename, line, routine, format, &argptr); va_end(argptr); //printf("message [%s]\n", message); /* call user function if set via cm_set_msg_print */ MessagePrintCallback f = _message_print.load(); if (f != NULL && (message_type & _message_mask_user) != 0) { if (message_type != MT_LOG) { // do not print MLOG messages (*f)(message.c_str()); } } /* return if system mask is not set */ if ((message_type & _message_mask_system) == 0) { return CM_SUCCESS; } gMsgBufMutex.lock(); gMsgBuf.push_back(msg_buffer_entry{ts, message_type, message}); gMsgBufMutex.unlock(); return CM_SUCCESS; } /********************************************************************/ /** This routine is similar to @ref cm_msg(). It differs from cm_msg() only by the logging destination being a file given through the argument list i.e:\b facility @internal @attention Do not add the "\n" escape carriage control at the end of the formated line as it is already added by the client on the receiving side. The first arg in the following example uses the predefined macro MINFO which handles automatically the first 3 arguments of the function (see @ref midas_macro). \code ... cm_msg1(MINFO, "my_log_file", "my_program"," My message status:%d", status); ... //----- File my_log_file.log Thu Nov 8 17:59:28 2001 [my_program] My message status:1 \endcode @param message_type See @ref midas_macro. @param filename Name of source file where error occured @param line Line number where error occured @param facility Logging file name @param routine Routine name @param format message to printout, ... Parameters like for printf() @return CM_SUCCESS */ INT cm_msg1(INT message_type, const char *filename, INT line, const char *facility, const char *routine, const char *format, ...) { va_list argptr; std::string message; static BOOL in_routine = FALSE; /* avoid recursive calles */ if (in_routine) return 0; in_routine = TRUE; /* print argument list into message */ va_start(argptr, format); message = cm_msg_format(message_type, filename, line, routine, format, &argptr); va_end(argptr); /* call user function if set via cm_set_msg_print */ MessagePrintCallback f = _message_print.load(); if (f != NULL && (message_type & _message_mask_user) != 0) (*f)(message.c_str()); /* return if system mask is not set */ if ((message_type & _message_mask_system) == 0) { in_routine = FALSE; return CM_SUCCESS; } /* send message to SYSMSG */ cm_msg_send_event(0, message_type, message.c_str()); /* log message */ cm_msg_log(message_type, facility, message.c_str()); in_routine = FALSE; return CM_SUCCESS; } /********************************************************************/ /** Register a dispatch function for receiving system messages. - example code from mlxspeaker.c \code void receive_message(HNDLE hBuf, HNDLE id, EVENT_HEADER *header, void *message) { char str[256], *pc, *sp; // print message printf("%s\n", (char *)(message)); printf("evID:%x Mask:%x Serial:%i Size:%d\n" ,header->event_id ,header->trigger_mask ,header->serial_number ,header->data_size); pc = strchr((char *)(message),']')+2; ... // skip none talking message if (header->trigger_mask == MT_TALK || header->trigger_mask == MT_USER) ... } int main(int argc, char *argv[]) { ... // now connect to server status = cm_connect_experiment(host_name, exp_name, "Speaker", NULL); if (status != CM_SUCCESS) return 1; // Register callback for messages cm_msg_register(receive_message); ... } \endcode @param func Dispatch function. @return CM_SUCCESS or bm_open_buffer and bm_request_event return status */ INT cm_msg_register(EVENT_HANDLER *func) { INT status, id; // we should only come here after the message buffer // was opened by cm_connect_experiment() assert(_msg_buffer); _msg_dispatch = func; status = bm_request_event(_msg_buffer, EVENTID_ALL, TRIGGER_ALL, GET_NONBLOCKING, &id, func); return status; } static void add_message(char **messages, int *length, int *allocated, time_t tstamp, const char *new_message) { int new_message_length = strlen(new_message); int new_allocated = 1024 + 2 * ((*allocated) + new_message_length); char buf[100]; int buf_length; //printf("add_message: new message %d, length %d, new end: %d, allocated: %d, maybe reallocate size %d\n", new_message_length, *length, *length + new_message_length, *allocated, new_allocated); if (*length + new_message_length + 100 > *allocated) { *messages = (char *) realloc(*messages, new_allocated); assert(*messages != NULL); *allocated = new_allocated; } if (*length > 0) if ((*messages)[(*length) - 1] != '\n') { (*messages)[*length] = '\n'; // separator between messages (*length) += 1; } sprintf(buf, "%ld ", tstamp); buf_length = strlen(buf); memcpy(&((*messages)[*length]), buf, buf_length); (*length) += buf_length; memcpy(&((*messages)[*length]), new_message, new_message_length); (*length) += new_message_length; (*messages)[*length] = 0; // make sure string is NUL terminated } /* Retrieve message from an individual file. Internal use only */ static int cm_msg_retrieve1(const char *filename, time_t t, INT n_messages, char **messages, int *length, int *allocated, int *num_messages) { BOOL stop; int fh; char *p, str[1000]; struct stat stat_buf; time_t tstamp, tstamp_valid, tstamp_last; ss_tzset(); // required by localtime_r() *num_messages = 0; fh = open(filename, O_RDONLY | O_TEXT, 0644); if (fh < 0) { cm_msg(MERROR, "cm_msg_retrieve1", "Cannot open log file \"%s\", errno %d (%s)", filename, errno, strerror(errno)); return SS_FILE_ERROR; } /* read whole file into memory */ fstat(fh, &stat_buf); ssize_t size = stat_buf.st_size; /* if file is too big, only read tail of file */ ssize_t maxsize = 10 * 1024 * 1024; if (size > maxsize) { lseek(fh, -maxsize, SEEK_END); //printf("lseek status %d, errno %d (%s)\n", status, errno, strerror(errno)); size = maxsize; } char *buffer = (char *) malloc(size + 1); if (buffer == NULL) { cm_msg(MERROR, "cm_msg_retrieve1", "Cannot malloc %d bytes to read log file \"%s\", errno %d (%s)", (int) size, filename, errno, strerror(errno)); close(fh); return SS_FILE_ERROR; } ssize_t rd = read(fh, buffer, size); if (rd != size) { cm_msg(MERROR, "cm_msg_retrieve1", "Cannot read %d bytes from log file \"%s\", read() returned %d, errno %d (%s)", (int) size, filename, (int) rd, errno, strerror(errno)); close(fh); return SS_FILE_ERROR; } buffer[size] = 0; close(fh); p = buffer + size - 1; tstamp_last = tstamp_valid = 0; stop = FALSE; while (*p == '\n' || *p == '\r') p--; int n; for (n = 0; !stop && p > buffer;) { /* go to beginning of line */ int i; for (i = 0; p != buffer && (*p != '\n' && *p != '\r'); i++) p--; /* limit line length to sizeof(str) */ if (i >= (int) sizeof(str)) i = sizeof(str) - 1; if (p == buffer) { i++; memcpy(str, p, i); } else memcpy(str, p + 1, i); str[i] = 0; if (strchr(str, '\n')) *strchr(str, '\n') = 0; if (strchr(str, '\r')) *strchr(str, '\r') = 0; mstrlcat(str, "\n", sizeof(str)); // extract time tag time_t now; time(&now); struct tm tms; localtime_r(&now, &tms); // must call tzset() beforehand! if (str[0] >= '0' && str[0] <= '9') { // new format tms.tm_hour = atoi(str); tms.tm_min = atoi(str + 3); tms.tm_sec = atoi(str + 6); tms.tm_year = atoi(str + 13) - 1900; tms.tm_mon = atoi(str + 18) - 1; tms.tm_mday = atoi(str + 21); } else { // old format tms.tm_hour = atoi(str + 11); tms.tm_min = atoi(str + 14); tms.tm_sec = atoi(str + 17); tms.tm_year = atoi(str + 20) - 1900; for (i = 0; i < 12; i++) if (strncmp(str + 4, mname[i], 3) == 0) break; tms.tm_mon = i; tms.tm_mday = atoi(str + 8); } tstamp = ss_mktime(&tms); if (tstamp != -1) tstamp_valid = tstamp; // for new messages (n=0!), stop when t reached if (n_messages == 0) { if (tstamp_valid < t) break; } // for old messages, stop when all messages belonging to tstamp_last are sent if (n_messages != 0) { if (tstamp_last > 0 && tstamp_valid < tstamp_last) break; } if (t == 0 || tstamp == -1 || (n_messages > 0 && tstamp <= t) || (n_messages == 0 && tstamp >= t)) { n++; add_message(messages, length, allocated, tstamp, str); } while (*p == '\n' || *p == '\r') p--; if (n_messages == 1) stop = TRUE; else if (n_messages > 1) { // continue collecting messages until time stamp differs from current one if (n == n_messages) tstamp_last = tstamp_valid; // if all messages without time tags, just return after n if (n == n_messages && tstamp_valid == 0) break; } } free(buffer); *num_messages = n; return CM_SUCCESS; } /********************************************************************/ /** Retrieve old messages from log file @param facility Logging facility ("midas", "chat", "lazy", ...) @param t Return messages logged before and including time t, value 0 means start with newest messages @param min_messages Minimum number of messages to return @param messages messages, newest first, separated by \n characters. caller should free() this buffer at the end. @param num_messages Number of messages returned @return CM_SUCCESS */ INT cm_msg_retrieve2(const char *facility, time_t t, INT n_message, char **messages, int *num_messages) { std::string filename, linkname; INT n, i; time_t filedate; int length = 0; int allocated = 0; time(&filedate); cm_msg_get_logfile(facility, filedate, &filename, &linkname, NULL); //printf("facility %s, filename \"%s\" \"%s\"\n", facility, filename, linkname); // see if file exists, use linkname if not if (!linkname.empty()) { if (!ss_file_exist(filename.c_str())) filename = linkname; } if (ss_file_exist(filename.c_str())) { cm_msg_retrieve1(filename.c_str(), t, n_message, messages, &length, &allocated, &n); } else { n = 0; } /* if there is no symlink, then there is no additional log files to read */ if (linkname.empty()) { *num_messages = n; return CM_SUCCESS; } //printf("read more messages %d %d!\n", n, n_message); int missing = 0; while (n < n_message) { filedate -= 3600 * 24; // go one day back cm_msg_get_logfile(facility, filedate, &filename, NULL, NULL); //printf("read [%s] for time %d!\n", filename.c_str(), filedate); if (ss_file_exist(filename.c_str())) { cm_msg_retrieve1(filename.c_str(), t, n_message - n, messages, &length, &allocated, &i); n += i; missing = 0; } else { missing++; } // stop if ten consecutive files are not found if (missing > 10) break; } *num_messages = n; return CM_SUCCESS; } /********************************************************************/ /** Retrieve newest messages from "midas" facility log file @param n_message Number of messages to retrieve @param message buf_size bytes of messages, separated by \n characters. The returned number of bytes is normally smaller than the initial buf_size, since only full lines are returned. @param *buf_size Size of message buffer to fill @return CM_SUCCESS, CM_TRUNCATED */ INT cm_msg_retrieve(INT n_message, char *message, INT buf_size) { int status; char *messages = NULL; int num_messages = 0; if (rpc_is_remote()) return rpc_call(RPC_CM_MSG_RETRIEVE, n_message, message, buf_size); status = cm_msg_retrieve2("midas", 0, n_message, &messages, &num_messages); if (messages) { mstrlcpy(message, messages, buf_size); int len = strlen(messages); if (len > buf_size) status = CM_TRUNCATED; free(messages); } return status; } /**dox***************************************************************/ /** @} *//* end of msgfunctionc */ /**dox***************************************************************/ /** @addtogroup cmfunctionc * * @{ */ /********************************************************************/ /** Get time from MIDAS server and set local time. @param seconds Time in seconds @return CM_SUCCESS */ INT cm_synchronize(DWORD *seconds) { INT sec, status; /* if connected to server, get time from there */ if (rpc_is_remote()) { status = rpc_call(RPC_CM_SYNCHRONIZE, &sec); /* set local time */ if (status == CM_SUCCESS) ss_settime(sec); } /* return time to caller */ if (seconds != NULL) { *seconds = ss_time(); } return CM_SUCCESS; } /********************************************************************/ /** Get time from MIDAS server and set local time. @param str return time string @param buf_size Maximum size of str @return CM_SUCCESS */ INT cm_asctime(char *str, INT buf_size) { /* if connected to server, get time from there */ if (rpc_is_remote()) return rpc_call(RPC_CM_ASCTIME, str, buf_size); /* return local time */ mstrlcpy(str, ss_asctime().c_str(), buf_size); return CM_SUCCESS; } /********************************************************************/ /** Get time from MIDAS server and set local time. @return return time string */ std::string cm_asctime() { /* if connected to server, get time from there */ if (rpc_is_remote()) { char buf[256]; int status = rpc_call(RPC_CM_ASCTIME, buf, sizeof(buf)); if (status == CM_SUCCESS) { return buf; } else { return ""; } } /* return local time */ return ss_asctime(); } /********************************************************************/ /** Get time from ss_time on server. @param t string @return CM_SUCCESS */ INT cm_time(DWORD *t) { /* if connected to server, get time from there */ if (rpc_is_remote()) return rpc_call(RPC_CM_TIME, t); /* return local time */ *t = ss_time(); return CM_SUCCESS; } /**dox***************************************************************/ /** @} *//* end of cmfunctionc */ /********************************************************************\ * * * cm_xxx - Common Functions to buffer & database * * * \********************************************************************/ /* Globals */ static HNDLE _hKeyClient = 0; /* key handle for client in ODB */ static HNDLE _hDB = 0; /* Database handle */ static std::string _experiment_name; static std::string _client_name; static std::string _path_name; static INT _watchdog_timeout = DEFAULT_WATCHDOG_TIMEOUT; INT _semaphore_alarm = -1; INT _semaphore_elog = -1; INT _semaphore_history = -1; //INT _semaphore_msg = -1; /**dox***************************************************************/ /** @addtogroup cmfunctionc * * @{ */ /** Return version number of current MIDAS library as a string @return version number */ const char *cm_get_version() { return MIDAS_VERSION; } /** Return git revision number of current MIDAS library as a string @return revision number */ const char *cm_get_revision() { return GIT_REVISION; } /********************************************************************/ /** Set path to actual experiment. This function gets called by cm_connect_experiment if the connection is established to a local experiment (not through the TCP/IP server). The path is then used for all shared memory routines. @param path Pathname @return CM_SUCCESS */ INT cm_set_path(const char *path) { assert(path); assert(path[0] != 0); _path_name = path; if (_path_name.back() != DIR_SEPARATOR) { _path_name += DIR_SEPARATOR_STR; } //printf("cm_set_path [%s]\n", _path_name.c_str()); return CM_SUCCESS; } /********************************************************************/ /** Return the path name previously set with cm_set_path. @param path Pathname @return CM_SUCCESS */ INT cm_get_path(char *path, int path_size) { // check that we were not accidentally called // with the size of the pointer to a string // instead of the size of the string buffer assert(path_size != sizeof(char *)); assert(path); assert(_path_name.length() > 0); mstrlcpy(path, _path_name.c_str(), path_size); return CM_SUCCESS; } /********************************************************************/ /** Return the path name previously set with cm_set_path. @param path Pathname @return CM_SUCCESS */ std::string cm_get_path() { assert(_path_name.length() > 0); return _path_name; } /********************************************************************/ /* C++ wrapper for cm_get_path */ INT EXPRT cm_get_path_string(std::string *path) { assert(path != NULL); assert(_path_name.length() > 0); *path = _path_name; return CM_SUCCESS; } /********************************************************************/ /** Set name of the experiment @param name Experiment name @return CM_SUCCESS */ INT cm_set_experiment_name(const char *name) { _experiment_name = name; return CM_SUCCESS; } /********************************************************************/ /** Return the experiment name @param name Pointer to user string, size should be at least NAME_LENGTH @param name_size Size of user string @return CM_SUCCESS */ INT cm_get_experiment_name(char *name, int name_length) { mstrlcpy(name, _experiment_name.c_str(), name_length); return CM_SUCCESS; } /********************************************************************/ /** Return the experiment name @return experiment name */ std::string cm_get_experiment_name() { return _experiment_name; } /**dox***************************************************************/ /** @} *//* end of cmfunctionc */ /**dox***************************************************************/ /** @addtogroup cmfunctionc * * @{ */ #ifdef LOCAL_ROUTINES struct exptab_entry { std::string name; std::string directory; std::string user; }; struct exptab_struct { std::string filename; std::vector exptab; }; static exptab_struct _exptab; // contents of exptab file /** Scan the "exptab" file for MIDAS experiment names and save them for later use by rpc_server_accept(). The file is first searched under $MIDAS/exptab if present, then the directory from argv[0] is probed. @return CM_SUCCESS
CM_UNDEF_EXP exptab not found and MIDAS_DIR not set */ INT cm_read_exptab(exptab_struct *exptab) { exptab->exptab.clear(); /* MIDAS_DIR overrides exptab */ if (getenv("MIDAS_DIR")) { exptab->filename = "MIDAS_DIR"; exptab_entry e; if (getenv("MIDAS_EXPT_NAME")) { e.name = getenv("MIDAS_EXPT_NAME"); } else { e.name = "Default"; cm_msg(MERROR, "cm_read_exptab", "Experiments that use MIDAS_DIR must also set MIDAS_EXPT_NAME to the name of the experiment! Using experiment name \"%s\"", e.name.c_str()); } e.directory = getenv("MIDAS_DIR"); e.user = ""; exptab->exptab.push_back(e); return CM_SUCCESS; } /* default directory for different OSes */ #if defined (OS_WINNT) std::string str; if (getenv("SystemRoot")) str = getenv("SystemRoot"); else if (getenv("windir")) str = getenv("windir"); else str = ""; std::string alt_str = str; str += "\\system32\\exptab"; alt_str += "\\system\\exptab"; #elif defined (OS_UNIX) std::string str = "/etc/exptab"; std::string alt_str = "/exptab"; #else std::strint str = "exptab"; std::string alt_str = "exptab"; #endif /* MIDAS_EXPTAB overrides default directory */ if (getenv("MIDAS_EXPTAB")) { str = getenv("MIDAS_EXPTAB"); alt_str = getenv("MIDAS_EXPTAB"); } exptab->filename = str; /* read list of available experiments */ FILE* f = fopen(str.c_str(), "r"); if (f == NULL) { f = fopen(alt_str.c_str(), "r"); if (f == NULL) return CM_UNDEF_ENVIRON; exptab->filename = alt_str; } if (f != NULL) { do { char buf[256]; memset(buf, 0, sizeof(buf)); char* str = fgets(buf, sizeof(buf)-1, f); if (str == NULL) break; if (str[0] == 0) continue; // empty line if (str[0] == '#') continue; // comment line exptab_entry e; // following code emulates the function of this sprintf(): //sscanf(str, "%s %s %s", exptab[i].name, exptab[i].directory, exptab[i].user); // skip leading spaces while (*str && isspace(*str)) str++; char* p1 = str; char* p2 = str; while (*p2 && !isspace(*p2)) p2++; ssize_t len = p2-p1; if (len<1) continue; //printf("str %d [%s] p1 [%s] p2 %d [%s] len %d\n", *str, str, p1, *p2, p2, (int)len); e.name = std::string(p1, len); if (*p2 == 0) continue; str = p2; // skip leading spaces while (*str && isspace(*str)) str++; p1 = str; p2 = str; while (*p2 && !isspace(*p2)) p2++; len = p2-p1; if (len<1) continue; //printf("str %d [%s] p1 [%s] p2 %d [%s] len %d\n", *str, str, p1, *p2, p2, (int)len); e.directory = std::string(p1, len); if (*p2 == 0) continue; str = p2; // skip leading spaces while (*str && isspace(*str)) str++; p1 = str; p2 = str; while (*p2 && !isspace(*p2)) p2++; len = p2-p1; //printf("str %d [%s] p1 [%s] p2 %d [%s] len %d\n", *str, str, p1, *p2, p2, (int)len); e.user = std::string(p1, len); /* check for trailing directory separator */ if (!ends_with_char(e.directory, DIR_SEPARATOR)) { e.directory += DIR_SEPARATOR_STR; } exptab->exptab.push_back(e); } while (!feof(f)); fclose(f); } #if 0 cm_msg(MINFO, "cm_read_exptab", "Read exptab \"%s\":", exptab->filename.c_str()); for (unsigned j=0; jexptab.size(); j++) { cm_msg(MINFO, "cm_read_exptab", "entry %d, experiment \"%s\", directory \"%s\", user \"%s\"", j, exptab->exptab[j].name.c_str(), exptab->exptab[j].directory.c_str(), exptab->exptab[j].user.c_str()); } #endif return CM_SUCCESS; } /********************************************************************/ /** Return location of exptab file @param s Pointer to string buffer @param size Size of string buffer @return CM_SUCCESS */ int cm_get_exptab_filename(char *s, int size) { mstrlcpy(s, _exptab.filename.c_str(), size); return CM_SUCCESS; } std::string cm_get_exptab_filename() { return _exptab.filename; } /********************************************************************/ /** Return exptab information for given experiment @param s Pointer to string buffer @param size Size of string buffer @return CM_SUCCESS */ int cm_get_exptab(const char *expname, std::string* dir, std::string* user) { if (_exptab.exptab.size() == 0) { int status = cm_read_exptab(&_exptab); if (status != CM_SUCCESS) return status; } for (unsigned i = 0; i < _exptab.exptab.size(); i++) { if (_exptab.exptab[i].name == expname) { if (dir) *dir = _exptab.exptab[i].directory; if (user) *user = _exptab.exptab[i].user; return CM_SUCCESS; } } if (dir) *dir = ""; if (user) *user = ""; return CM_UNDEF_EXP; } /********************************************************************/ /** Return exptab information for given experiment @param s Pointer to string buffer @param size Size of string buffer @return CM_SUCCESS */ int cm_get_exptab(const char *expname, char *dir, int dir_size, char *user, int user_size) { std::string sdir, suser; int status = cm_get_exptab(expname, &sdir, &suser); if (status == CM_SUCCESS) { if (dir) mstrlcpy(dir, sdir.c_str(), dir_size); if (user) mstrlcpy(user, suser.c_str(), user_size); return CM_SUCCESS; } return CM_UNDEF_EXP; } #endif // LOCAL_ROUTINES /********************************************************************/ /** Delete client info from database @param hDB Database handle @param pid PID of entry to delete, zero for this process. @return CM_SUCCESS */ INT cm_delete_client_info(HNDLE hDB, INT pid) { /* only do it if local */ if (!rpc_is_remote()) { db_delete_client_info(hDB, pid); } return CM_SUCCESS; } /********************************************************************/ /** Check if a client with a /system/client/xxx entry has a valid entry in the ODB client table. If not, remove that client from the /system/client tree. @param hDB Handle to online database @param hKeyClient Handle to client key @return CM_SUCCESS, CM_NO_CLIENT */ INT cm_check_client(HNDLE hDB, HNDLE hKeyClient) { if (rpc_is_remote()) return rpc_call(RPC_CM_CHECK_CLIENT, hDB, hKeyClient); #ifdef LOCAL_ROUTINES return db_check_client(hDB, hKeyClient); #endif /*LOCAL_ROUTINES */ return CM_SUCCESS; } /********************************************************************/ /** Set client information in online database and return handle @param hDB Handle to online database @param hKeyClient returned key @param host_name server name @param client_name Name of this program as it will be seen by other clients. @param hw_type Type of byte order @param password MIDAS password @param watchdog_timeout Default watchdog timeout, can be overwritten by ODB setting /programs/\/Watchdog timeout @return CM_SUCCESS */ INT cm_set_client_info(HNDLE hDB, HNDLE *hKeyClient, const char *host_name, char *client_name, INT hw_type, const char *password, DWORD watchdog_timeout) { if (rpc_is_remote()) return rpc_call(RPC_CM_SET_CLIENT_INFO, hDB, hKeyClient, host_name, client_name, hw_type, password, watchdog_timeout); #ifdef LOCAL_ROUTINES { INT status, pid, data, i, idx, size; HNDLE hKey, hSubkey; char str[256], name[NAME_LENGTH], orig_name[NAME_LENGTH], pwd[NAME_LENGTH]; BOOL call_watchdog, allow; PROGRAM_INFO_STR(program_info_str); /* check security if password is present */ status = db_find_key(hDB, 0, "/Experiment/Security/Password", &hKey); if (hKey) { /* get password */ size = sizeof(pwd); db_get_data(hDB, hKey, pwd, &size, TID_STRING); /* first check allowed hosts list */ allow = FALSE; db_find_key(hDB, 0, "/Experiment/Security/Allowed hosts", &hKey); if (hKey && db_find_key(hDB, hKey, host_name, &hKey) == DB_SUCCESS) allow = TRUE; /* check allowed programs list */ db_find_key(hDB, 0, "/Experiment/Security/Allowed programs", &hKey); if (hKey && db_find_key(hDB, hKey, client_name, &hKey) == DB_SUCCESS) allow = TRUE; /* now check password */ if (!allow && strcmp(password, pwd) != 0) { if (password[0]) cm_msg(MINFO, "cm_set_client_info", "Wrong password for host %s", host_name); return CM_WRONG_PASSWORD; } } /* make following operation atomic by locking database */ db_lock_database(hDB); /* check if entry with this pid exists already */ pid = ss_getpid(); sprintf(str, "System/Clients/%0d", pid); status = db_find_key(hDB, 0, str, &hKey); if (status == DB_SUCCESS) { db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE); db_delete_key(hDB, hKey, TRUE); } if (strlen(client_name) >= NAME_LENGTH) client_name[NAME_LENGTH] = 0; strcpy(name, client_name); strcpy(orig_name, client_name); /* check if client name already exists */ status = db_find_key(hDB, 0, "System/Clients", &hKey); for (idx = 1; status != DB_NO_MORE_SUBKEYS; idx++) { for (i = 0;; i++) { status = db_enum_key(hDB, hKey, i, &hSubkey); if (status == DB_NO_MORE_SUBKEYS) break; if (status == DB_SUCCESS) { size = sizeof(str); status = db_get_value(hDB, hSubkey, "Name", str, &size, TID_STRING, FALSE); if (status != DB_SUCCESS) continue; } /* check if client is living */ if (cm_check_client(hDB, hSubkey) == CM_NO_CLIENT) continue; if (equal_ustring(str, name)) { sprintf(name, "%s%d", client_name, idx); break; } } } /* set name */ sprintf(str, "System/Clients/%0d/Name", pid); status = db_set_value(hDB, 0, str, name, NAME_LENGTH, 1, TID_STRING); if (status != DB_SUCCESS) { db_unlock_database(hDB); cm_msg(MERROR, "cm_set_client_info", "cannot set client name, db_set_value(%s) status %d", str, status); return status; } /* copy new client name */ strcpy(client_name, name); db_set_client_name(hDB, client_name); /* set also as rpc name */ rpc_set_name(client_name); /* use /system/clients/PID as root */ sprintf(str, "System/Clients/%0d", pid); db_find_key(hDB, 0, str, &hKey); /* set host name */ status = db_set_value(hDB, hKey, "Host", host_name, HOST_NAME_LENGTH, 1, TID_STRING); if (status != DB_SUCCESS) { db_unlock_database(hDB); return status; } /* set computer id */ status = db_set_value(hDB, hKey, "Hardware type", &hw_type, sizeof(hw_type), 1, TID_INT32); if (status != DB_SUCCESS) { db_unlock_database(hDB); return status; } /* set server port */ data = 0; status = db_set_value(hDB, hKey, "Server Port", &data, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) { db_unlock_database(hDB); return status; } /* lock client entry */ db_set_mode(hDB, hKey, MODE_READ, TRUE); /* get (set) default watchdog timeout */ size = sizeof(watchdog_timeout); sprintf(str, "/Programs/%s/Watchdog Timeout", orig_name); db_get_value(hDB, 0, str, &watchdog_timeout, &size, TID_INT32, TRUE); /* define /programs entry */ sprintf(str, "/Programs/%s", orig_name); db_create_record(hDB, 0, str, strcomb1(program_info_str).c_str()); /* save handle for ODB and client */ cm_set_experiment_database(hDB, hKey); /* save watchdog timeout */ cm_get_watchdog_params(&call_watchdog, NULL); cm_set_watchdog_params(call_watchdog, watchdog_timeout); /* end of atomic operations */ db_unlock_database(hDB); /* touch notify key to inform others */ data = 0; db_set_value(hDB, 0, "/System/Client Notify", &data, sizeof(data), 1, TID_INT32); *hKeyClient = hKey; } #endif /* LOCAL_ROUTINES */ return CM_SUCCESS; } /********************************************************************/ /** Get current client name @return current client name */ std::string cm_get_client_name() { INT status; HNDLE hDB, hKey; /* get root key of client */ cm_get_experiment_database(&hDB, &hKey); if (!hDB) { return "unknown"; } std::string name; status = db_get_value_string(hDB, hKey, "Name", 0, &name); if (status != DB_SUCCESS) { return "unknown"; } //printf("get client name: [%s]\n", name.c_str()); return name; } /********************************************************************/ /** Returns MIDAS environment variables. @attention This function can be used to evaluate the standard MIDAS environment variables before connecting to an experiment (see @ref Environment_variables). The usual way is that the host name and experiment name are first derived from the environment variables MIDAS_SERVER_HOST and MIDAS_EXPT_NAME. They can then be superseded by command line parameters with -h and -e flags. \code #include #include main(int argc, char *argv[]) { INT status, i; char host_name[256],exp_name[32]; // get default values from environment cm_get_environment(host_name, exp_name); // parse command line parameters for (i=1 ; i= argc || argv[i+1][0] == '-') goto usage; if (argv[i][1] == 'e') strcpy(exp_name, argv[++i]); else if (argv[i][1] == 'h') strcpy(host_name, argv[++i]); else { usage: printf("usage: test [-h Hostname] [-e Experiment]\n\n"); return 1; } } } status = cm_connect_experiment(host_name, exp_name, "Test", NULL); if (status != CM_SUCCESS) return 1; ...do anyting... cm_disconnect_experiment(); } \endcode @param host_name Contents of MIDAS_SERVER_HOST environment variable. @param host_name_size string length @param exp_name Contents of MIDAS_EXPT_NAME environment variable. @param exp_name_size string length @return CM_SUCCESS */ INT cm_get_environment(char *host_name, int host_name_size, char *exp_name, int exp_name_size) { if (host_name) host_name[0] = 0; if (exp_name) exp_name[0] = 0; if (host_name && getenv("MIDAS_SERVER_HOST")) mstrlcpy(host_name, getenv("MIDAS_SERVER_HOST"), host_name_size); if (exp_name && getenv("MIDAS_EXPT_NAME")) mstrlcpy(exp_name, getenv("MIDAS_EXPT_NAME"), exp_name_size); return CM_SUCCESS; } INT cm_get_environment(std::string *host_name, std::string *exp_name) { if (host_name) *host_name = ""; if (exp_name) *exp_name = ""; if (host_name && getenv("MIDAS_SERVER_HOST")) *host_name = getenv("MIDAS_SERVER_HOST"); if (exp_name && getenv("MIDAS_EXPT_NAME")) *exp_name = getenv("MIDAS_EXPT_NAME"); return CM_SUCCESS; } #ifdef LOCAL_ROUTINES int cm_set_experiment_local(const char* exp_name) { std::string exp_name1; if ((exp_name != NULL) && (strlen(exp_name) > 0)) { exp_name1 = exp_name; } else { int status = cm_select_experiment_local(&exp_name1); if (status != CM_SUCCESS) return status; } std::string expdir, expuser; int status = cm_get_exptab(exp_name1.c_str(), &expdir, &expuser); if (status != CM_SUCCESS) { cm_msg(MERROR, "cm_set_experiment_local", "Experiment \"%s\" not found in exptab file \"%s\"", exp_name1.c_str(), cm_get_exptab_filename().c_str()); return CM_UNDEF_EXP; } if (!ss_dir_exist(expdir.c_str())) { cm_msg(MERROR, "cm_set_experiment_local", "Experiment \"%s\" directory \"%s\" does not exist", exp_name1.c_str(), expdir.c_str()); return CM_UNDEF_EXP; } cm_set_experiment_name(exp_name1.c_str()); cm_set_path(expdir.c_str()); return CM_SUCCESS; } #endif // LOCAL_ROUTINES /********************************************************************/ void cm_check_connect(void) { if (_hKeyClient) { cm_msg(MERROR, "cm_check_connect", "cm_disconnect_experiment not called at end of program"); cm_msg_flush_buffer(); } } /********************************************************************/ /** This function connects to an existing MIDAS experiment. This must be the first call in a MIDAS application. It opens three TCP connection to the remote host (one for RPC calls, one to send events and one for hot-link notifications from the remote host) and writes client information into the ODB under /System/Clients. @attention All MIDAS applications should evaluate the MIDAS_SERVER_HOST and MIDAS_EXPT_NAME environment variables as defaults to the host name and experiment name (see @ref Environment_variables). For that purpose, the function cm_get_environment() should be called prior to cm_connect_experiment(). If command line parameters -h and -e are used, the evaluation should be done between cm_get_environment() and cm_connect_experiment(). The function cm_disconnect_experiment() must be called before a MIDAS application exits. \code #include #include main(int argc, char *argv[]) { INT status, i; char host_name[256],exp_name[32]; // get default values from environment cm_get_environment(host_name, exp_name); // parse command line parameters for (i=1 ; i= argc || argv[i+1][0] == '-') goto usage; if (argv[i][1] == 'e') strcpy(exp_name, argv[++i]); else if (argv[i][1] == 'h') strcpy(host_name, argv[++i]); else { usage: printf("usage: test [-h Hostname] [-e Experiment]\n\n"); return 1; } } } status = cm_connect_experiment(host_name, exp_name, "Test", NULL); if (status != CM_SUCCESS) return 1; ...do operations... cm_disconnect_experiment(); } \endcode @param host_name Specifies host to connect to. Must be a valid IP host name. The string can be empty ("") if to connect to the local computer. @param exp_name Specifies the experiment to connect to. If this string is empty, the number of defined experiments in exptab is checked. If only one experiment is defined, the function automatically connects to this one. If more than one experiment is defined, a list is presented and the user can interactively select one experiment. @param client_name Client name of the calling program as it can be seen by others (like the scl command in ODBEdit). @param func Callback function to read in a password if security has been enabled. In all command line applications this function is NULL which invokes an internal ss_gets() function to read in a password. In windows environments (MS Windows, X Windows) a function can be supplied to open a dialog box and read in the password. The argument of this function must be the returned password. @return CM_SUCCESS, CM_UNDEF_EXP, CM_SET_ERROR, RPC_NET_ERROR
CM_VERSION_MISMATCH MIDAS library version different on local and remote computer */ INT cm_connect_experiment(const char *host_name, const char *exp_name, const char *client_name, void (*func)(char *)) { INT status; status = cm_connect_experiment1(host_name, exp_name, client_name, func, DEFAULT_ODB_SIZE, DEFAULT_WATCHDOG_TIMEOUT); cm_msg_flush_buffer(); if (status != CM_SUCCESS) { std::string s = cm_get_error(status); puts(s.c_str()); } return status; } /********************************************************************/ /** Connect to a MIDAS experiment (to the online database) on a specific host. @internal */ INT cm_connect_experiment1(const char *host_name, const char *default_exp_name, const char *client_name, void (*func)(char *), INT odb_size, DWORD watchdog_timeout) { INT status, size; char client_name1[NAME_LENGTH]; char password[NAME_LENGTH], str[256]; HNDLE hDB = 0, hKeyClient = 0; BOOL call_watchdog; ss_tzset(); // required for localtime_r() if (_hKeyClient) cm_disconnect_experiment(); cm_msg_early_init(); //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg before connecting to experiment"); //cm_msg_flush_buffer(); rpc_set_name(client_name); /* check for local host */ if (equal_ustring(host_name, "local")) host_name = NULL; #ifdef OS_WINNT { WSADATA WSAData; /* Start windows sockets */ if (WSAStartup(MAKEWORD(1, 1), &WSAData) != 0) return RPC_NET_ERROR; } #endif std::string default_exp_name1; if (default_exp_name) default_exp_name1 = default_exp_name; /* connect to MIDAS server */ if (host_name && host_name[0]) { if (default_exp_name1.length() == 0) { status = cm_select_experiment_remote(host_name, &default_exp_name1); if (status != CM_SUCCESS) return status; } cm_set_experiment_name(default_exp_name1.c_str()); status = rpc_server_connect(host_name, default_exp_name1.c_str()); if (status != RPC_SUCCESS) return status; /* register MIDAS library functions */ status = rpc_register_functions(rpc_get_internal_list(1), NULL); if (status != RPC_SUCCESS) return status; } else { /* lookup path for *SHM files and save it */ #ifdef LOCAL_ROUTINES status = cm_set_experiment_local(default_exp_name1.c_str()); if (status != CM_SUCCESS) return status; default_exp_name1 = cm_get_experiment_name(); ss_suspend_init_odb_port(); INT semaphore_elog, semaphore_alarm, semaphore_history, semaphore_msg; /* create alarm and elog semaphores */ status = ss_semaphore_create("ALARM", &semaphore_alarm); if (status != SS_CREATED && status != SS_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment", "Cannot create alarm semaphore"); return status; } status = ss_semaphore_create("ELOG", &semaphore_elog); if (status != SS_CREATED && status != SS_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment", "Cannot create elog semaphore"); return status; } status = ss_semaphore_create("HISTORY", &semaphore_history); if (status != SS_CREATED && status != SS_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment", "Cannot create history semaphore"); return status; } status = ss_semaphore_create("MSG", &semaphore_msg); if (status != SS_CREATED && status != SS_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment", "Cannot create message semaphore"); return status; } cm_set_experiment_semaphore(semaphore_alarm, semaphore_elog, semaphore_history, semaphore_msg); #else return CM_UNDEF_EXP; #endif } //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg before open ODB"); //cm_msg_flush_buffer(); /* open ODB */ if (odb_size == 0) odb_size = DEFAULT_ODB_SIZE; status = db_open_database("ODB", odb_size, &hDB, client_name); if (status != DB_SUCCESS && status != DB_CREATED) { cm_msg(MERROR, "cm_connect_experiment1", "cannot open database, db_open_database() status %d", status); return status; } //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg after open ODB"); //cm_msg_flush_buffer(); int odb_timeout = db_set_lock_timeout(hDB, 0); size = sizeof(odb_timeout); status = db_get_value(hDB, 0, "/Experiment/ODB timeout", &odb_timeout, &size, TID_INT32, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment1", "cannot get ODB /Experiment/ODB timeout, status %d", status); } if (odb_timeout > 0) { db_set_lock_timeout(hDB, odb_timeout); } BOOL protect_odb = FALSE; size = sizeof(protect_odb); status = db_get_value(hDB, 0, "/Experiment/Protect ODB", &protect_odb, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment1", "cannot get ODB /Experiment/Protect ODB, status %d", status); } if (protect_odb) { db_protect_database(hDB); } BOOL enable_core_dumps = FALSE; size = sizeof(enable_core_dumps); status = db_get_value(hDB, 0, "/Experiment/Enable core dumps", &enable_core_dumps, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment1", "cannot get ODB /Experiment/Enable core dumps, status %d", status); } if (enable_core_dumps) { #ifdef RLIMIT_CORE struct rlimit limit; limit.rlim_cur = RLIM_INFINITY; limit.rlim_max = RLIM_INFINITY; status = setrlimit(RLIMIT_CORE, &limit); if (status != 0) { cm_msg(MERROR, "cm_connect_experiment", "Cannot setrlimit(RLIMIT_CORE, RLIM_INFINITY), errno %d (%s)", errno, strerror(errno)); } #else #warning setrlimit(RLIMIT_CORE) is not available #endif } size = sizeof(disable_bind_rpc_to_localhost); status = db_get_value(hDB, 0, "/Experiment/Security/Enable non-localhost RPC", &disable_bind_rpc_to_localhost, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment1", "cannot get ODB /Experiment/Security/Enable non-localhost RPC, status %d", status); } std::string local_host_name; /* now setup client info */ if (!disable_bind_rpc_to_localhost) local_host_name = "localhost"; else local_host_name = ss_gethostname(); /* check watchdog timeout */ if (watchdog_timeout == 0) watchdog_timeout = DEFAULT_WATCHDOG_TIMEOUT; strcpy(client_name1, client_name); password[0] = 0; status = cm_set_client_info(hDB, &hKeyClient, local_host_name.c_str(), client_name1, rpc_get_hw_type(), password, watchdog_timeout); if (status == CM_WRONG_PASSWORD) { if (func == NULL) strcpy(str, ss_getpass("Password: ")); else func(str); strcpy(password, ss_crypt(str, "mi")); status = cm_set_client_info(hDB, &hKeyClient, local_host_name.c_str(), client_name1, rpc_get_hw_type(), password, watchdog_timeout); if (status != CM_SUCCESS) { /* disconnect */ if (rpc_is_remote()) rpc_server_disconnect(); cm_disconnect_experiment(); return status; } } //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg after set client info"); //cm_msg_flush_buffer(); /* tell the rest of MIDAS that ODB is open for business */ cm_set_experiment_database(hDB, hKeyClient); //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg after set experiment database"); //cm_msg_flush_buffer(); /* cm_msg_open_buffer() calls bm_open_buffer() calls ODB function * to get event buffer size, etc */ status = cm_msg_open_buffer(); if (status != CM_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment1", "cannot open message buffer, cm_msg_open_buffer() status %d", status); return status; } //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg after message system is ready"); //cm_msg_flush_buffer(); /* set experiment name in ODB if not present */ std::string current_name; db_get_value_string(hDB, 0, "/Experiment/Name", 0, ¤t_name, TRUE); if (current_name.length() == 0 || current_name == "Default") { db_set_value_string(hDB, 0, "/Experiment/Name", &default_exp_name1); } if (!rpc_is_remote()) { /* experiment path is only set for local connections */ /* set data dir in ODB */ std::string path = cm_get_path(); db_get_value_string(hDB, 0, "/Logger/Data dir", 0, &path, TRUE); } /* register server to be able to be called by other clients */ status = cm_register_server(); if (status != CM_SUCCESS) { cm_msg(MERROR, "cm_connect_experiment", "Cannot register RPC server, cm_register_server() status %d", status); if (!equal_ustring(client_name, "odbedit")) { return status; } } /* set watchdog timeout */ cm_get_watchdog_params(&call_watchdog, &watchdog_timeout); size = sizeof(watchdog_timeout); sprintf(str, "/Programs/%s/Watchdog Timeout", client_name); db_get_value(hDB, 0, str, &watchdog_timeout, &size, TID_INT32, TRUE); cm_set_watchdog_params(call_watchdog, watchdog_timeout); /* get final client name */ std::string xclient_name = rpc_get_name(); /* startup message is not displayed */ cm_msg(MLOG, "cm_connect_experiment", "Program %s on host %s started", xclient_name.c_str(), local_host_name.c_str()); /* enable system and user messages to stdout as default */ cm_set_msg_print(MT_ALL, MT_ALL, puts); /* call cm_check_connect when exiting */ atexit((void (*)(void)) cm_check_connect); /* register ctrl-c handler */ ss_ctrlc_handler(cm_ctrlc_handler); //cm_msg(MERROR, "cm_connect_experiment", "test cm_msg after connect to experiment is complete"); //cm_msg_flush_buffer(); return CM_SUCCESS; } #ifdef LOCAL_ROUTINES /********************************************************************/ /** Read exptab and return all defined experiments in *exp_name[MAX_EXPERIMENTS] @param host_name Internet host name. @param exp_name list of experiment names @return CM_SUCCESS, RPC_NET_ERROR */ INT cm_list_experiments_local(STRING_LIST *exp_names) { assert(exp_names != NULL); exp_names->clear(); if (_exptab.exptab.size() == 0) { int status = cm_read_exptab(&_exptab); if (status != CM_SUCCESS) return status; } for (unsigned i=0; i<_exptab.exptab.size(); i++) { exp_names->push_back(_exptab.exptab[i].name); } return CM_SUCCESS; } #endif // LOCAL_ROUTINES /********************************************************************/ /** Connect to a MIDAS server and return all defined experiments in *exp_name[MAX_EXPERIMENTS] @param host_name Internet host name. @param exp_name list of experiment names @return CM_SUCCESS, RPC_NET_ERROR */ INT cm_list_experiments_remote(const char *host_name, STRING_LIST *exp_names) { INT status; INT sock; int port = MIDAS_TCP_PORT; char hname[256]; char *s; assert(exp_names != NULL); exp_names->clear(); /* extract port number from host_name */ mstrlcpy(hname, host_name, sizeof(hname)); s = strchr(hname, ':'); if (s) { *s = 0; port = strtoul(s + 1, NULL, 0); } std::string errmsg; status = ss_socket_connect_tcp(hname, port, &sock, &errmsg); if (status != SS_SUCCESS) { cm_msg(MERROR, "cm_list_experiments_remote", "Cannot connect to \"%s\" port %d: %s", hname, port, errmsg.c_str()); return RPC_NET_ERROR; } /* request experiment list */ send(sock, "I", 2, 0); while (1) { char str[256]; status = recv_string(sock, str, sizeof(str), _rpc_connect_timeout); if (status < 0) return RPC_NET_ERROR; if (status == 0) break; exp_names->push_back(str); } ss_socket_close(&sock); return CM_SUCCESS; } #ifdef LOCAL_ROUTINES /********************************************************************/ /** Read exptab and select an experiment from the experiments available on this server @internal @param exp_name selected experiment name @return CM_SUCCESS */ INT cm_select_experiment_local(std::string *exp_name) { INT status; STRING_LIST expts; assert(exp_name != NULL); /* retrieve list of experiments and make selection */ status = cm_list_experiments_local(&expts); if (status != CM_SUCCESS) return status; if (expts.size() == 1) { *exp_name = expts[0]; } else if (expts.size() > 1) { printf("Available experiments on local computer:\n"); for (unsigned i = 0; i < expts.size(); i++) { printf("%d : %s\n", i, expts[i].c_str()); } while (1) { printf("Select number from 0 to %d: ", ((int)expts.size())-1); char str[32]; ss_gets(str, 32); int isel = atoi(str); if (isel < 0) continue; if (isel >= (int)expts.size()) continue; *exp_name = expts[isel]; break; } } else { return CM_UNDEF_EXP; } return CM_SUCCESS; } #endif // LOCAL_ROUTINES /********************************************************************/ /** Connect to a MIDAS server and select an experiment from the experiments available on this server @internal @param host_name Internet host name. @param exp_name selected experiment name @return CM_SUCCESS, RPC_NET_ERROR */ INT cm_select_experiment_remote(const char *host_name, std::string *exp_name) { INT status; STRING_LIST expts; assert(exp_name != NULL); /* retrieve list of experiments and make selection */ status = cm_list_experiments_remote(host_name, &expts); if (status != CM_SUCCESS) return status; if (expts.size() > 1) { printf("Available experiments on server %s:\n", host_name); for (unsigned i = 0; i < expts.size(); i++) { printf("%d : %s\n", i, expts[i].c_str()); } while (1) { printf("Select number from 0 to %d: ", ((int)expts.size())-1); char str[32]; ss_gets(str, 32); int isel = atoi(str); if (isel < 0) continue; if (isel >= (int)expts.size()) continue; *exp_name = expts[isel]; break; } } else { *exp_name = expts[0]; } return CM_SUCCESS; } /********************************************************************/ /** Connect to a MIDAS client of the current experiment @internal @param client_name Name of client to connect to. This name is set by the other client via the cm_connect_experiment call. @param hConn Connection handle @return CM_SUCCESS, CM_NO_CLIENT */ INT cm_connect_client(const char *client_name, HNDLE *hConn) { HNDLE hDB, hKeyRoot, hSubkey, hKey; INT status, i, length, port; char name[NAME_LENGTH], host_name[HOST_NAME_LENGTH]; /* find client entry in ODB */ cm_get_experiment_database(&hDB, &hKey); status = db_find_key(hDB, 0, "System/Clients", &hKeyRoot); if (status != DB_SUCCESS) return status; i = 0; do { /* search for client with specific name */ status = db_enum_key(hDB, hKeyRoot, i++, &hSubkey); if (status == DB_NO_MORE_SUBKEYS) return CM_NO_CLIENT; status = db_find_key(hDB, hSubkey, "Name", &hKey); if (status != DB_SUCCESS) return status; length = NAME_LENGTH; status = db_get_data(hDB, hKey, name, &length, TID_STRING); if (status != DB_SUCCESS) return status; if (equal_ustring(name, client_name)) { status = db_find_key(hDB, hSubkey, "Server Port", &hKey); if (status != DB_SUCCESS) return status; length = sizeof(INT); status = db_get_data(hDB, hKey, &port, &length, TID_INT32); if (status != DB_SUCCESS) return status; status = db_find_key(hDB, hSubkey, "Host", &hKey); if (status != DB_SUCCESS) return status; length = sizeof(host_name); status = db_get_data(hDB, hKey, host_name, &length, TID_STRING); if (status != DB_SUCCESS) return status; /* client found -> connect to its server port */ return rpc_client_connect(host_name, port, client_name, hConn); } } while (TRUE); } static void rpc_client_shutdown(); /********************************************************************/ /** Disconnect from a MIDAS client @param hConn Connection handle obtained via cm_connect_client() @param bShutdown If TRUE, disconnect from client and shut it down (exit the client program) by sending a RPC_SHUTDOWN message @return see rpc_client_disconnect() */ INT cm_disconnect_client(HNDLE hConn, BOOL bShutdown) { return rpc_client_disconnect(hConn, bShutdown); } /********************************************************************/ /** Disconnect from a MIDAS experiment. @attention Should be the last call to a MIDAS library function in an application before it exits. This function removes the client information from the ODB, disconnects all TCP connections and frees all internal allocated memory. See cm_connect_experiment() for example. @return CM_SUCCESS */ INT cm_disconnect_experiment(void) { HNDLE hDB, hKey; //cm_msg(MERROR, "cm_disconnect_experiment", "test cm_msg before disconnect from experiment"); //cm_msg_flush_buffer(); /* wait on any transition thread */ if (_trp.transition && !_trp.finished) { printf("Waiting for transition to finish...\n"); do { ss_sleep(10); } while (!_trp.finished); } /* stop the watchdog thread */ cm_stop_watchdog_thread(); /* send shutdown notification */ std::string client_name = rpc_get_name(); std::string local_host_name; if (!disable_bind_rpc_to_localhost) local_host_name = "localhost"; else { local_host_name = ss_gethostname(); //if (strchr(local_host_name, '.')) // *strchr(local_host_name, '.') = 0; } /* disconnect message not displayed */ cm_msg(MLOG, "cm_disconnect_experiment", "Program %s on host %s stopped", client_name.c_str(), local_host_name.c_str()); cm_msg_flush_buffer(); if (rpc_is_remote()) { if (rpc_is_connected()) { /* close open records */ db_close_all_records(); cm_msg_close_buffer(); } rpc_client_shutdown(); rpc_server_disconnect(); cm_set_experiment_database(0, 0); } else { rpc_client_shutdown(); /* delete client info */ cm_get_experiment_database(&hDB, &hKey); if (hDB) cm_delete_client_info(hDB, 0); //cm_msg(MERROR, "cm_disconnect_experiment", "test cm_msg before close all buffers, close all databases"); //cm_msg_flush_buffer(); cm_msg_close_buffer(); bm_close_all_buffers(); db_close_all_databases(); cm_set_experiment_database(0, 0); //cm_msg(MERROR, "cm_disconnect_experiment", "test cm_msg after close all buffers, close all databases"); //cm_msg_flush_buffer(); } if (!rpc_is_mserver()) rpc_server_shutdown(); /* free RPC list */ rpc_deregister_functions(); //cm_msg(MERROR, "cm_disconnect_experiment", "test cm_msg before deleting the message ring buffer"); //cm_msg_flush_buffer(); /* last flush before we delete the message ring buffer */ cm_msg_flush_buffer(); //cm_msg(MERROR, "cm_disconnect_experiment", "test cm_msg after disconnect is completed"); //cm_msg_flush_buffer(); return CM_SUCCESS; } /********************************************************************/ /** Set the handle to the ODB for the currently connected experiment @param hDB Database handle @param hKeyClient Key handle of client structure @return CM_SUCCESS */ INT cm_set_experiment_database(HNDLE hDB, HNDLE hKeyClient) { //printf("cm_set_experiment_database: hDB %d, hKeyClient %d\n", hDB, hKeyClient); _hDB = hDB; _hKeyClient = hKeyClient; //if (hDB == 0) { // rpc_set_server_option(RPC_ODB_HANDLE, 0); //} return CM_SUCCESS; } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ INT cm_set_experiment_semaphore(INT semaphore_alarm, INT semaphore_elog, INT semaphore_history, INT semaphore_msg) /********************************************************************\ Routine: cm_set_experiment_semaphore Purpose: Set the handle to the experiment wide semaphorees Input: INT semaphore_alarm Alarm semaphore INT semaphore_elog Elog semaphore INT semaphore_history History semaphore INT semaphore_msg Message semaphore Output: none Function value: CM_SUCCESS Successful completion \********************************************************************/ { _semaphore_alarm = semaphore_alarm; _semaphore_elog = semaphore_elog; _semaphore_history = semaphore_history; //_semaphore_msg = semaphore_msg; return CM_SUCCESS; } /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ /********************************************************************/ /** Get the handle to the ODB from the currently connected experiment. @attention This function returns the handle of the online database (ODB) which can be used in future db_xxx() calls. The hkeyclient key handle can be used to access the client information in the ODB. If the client key handle is not needed, the parameter can be NULL. \code HNDLE hDB, hkeyclient; char name[32]; int size; db_get_experiment_database(&hdb, &hkeyclient); size = sizeof(name); db_get_value(hdb, hkeyclient, "Name", name, &size, TID_STRING, TRUE); printf("My name is %s\n", name); \endcode @param hDB Database handle. @param hKeyClient Handle for key where search starts, zero for root. @return CM_SUCCESS */ INT cm_get_experiment_database(HNDLE *hDB, HNDLE *hKeyClient) { if (_hDB) { //printf("cm_get_experiment_database %d %d\n", _hDB, _hKeyClient); if (hDB != NULL) *hDB = _hDB; if (hKeyClient != NULL) *hKeyClient = _hKeyClient; return CM_SUCCESS; } else { //printf("cm_get_experiment_database no init\n"); if (hDB != NULL) *hDB = 0; if (hKeyClient != NULL) *hKeyClient = 0; return CM_DB_ERROR; } } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ INT cm_get_experiment_semaphore(INT *semaphore_alarm, INT *semaphore_elog, INT *semaphore_history, INT *semaphore_msg) /********************************************************************\ Routine: cm_get_experiment_semaphore Purpose: Get the handle to the experiment wide semaphores Input: none Output: INT semaphore_alarm Alarm semaphore INT semaphore_elog Elog semaphore INT semaphore_history History semaphore INT semaphore_msg Message semaphore Function value: CM_SUCCESS Successful completion \********************************************************************/ { if (semaphore_alarm) *semaphore_alarm = _semaphore_alarm; if (semaphore_elog) *semaphore_elog = _semaphore_elog; if (semaphore_history) *semaphore_history = _semaphore_history; //if (semaphore_msg) // *semaphore_msg = _semaphore_msg; if (semaphore_msg) *semaphore_msg = -1; return CM_SUCCESS; } /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ static int bm_validate_client_index(const BUFFER *buf, BOOL abort_if_invalid); static BUFFER_CLIENT *bm_get_my_client(BUFFER *pbuf, BUFFER_HEADER *pheader); #ifdef LOCAL_ROUTINES static BUFFER* bm_get_buffer(const char *who, INT buffer_handle, int *pstatus); static int bm_lock_buffer_read_cache(BUFFER *pbuf); static int bm_lock_buffer_write_cache(BUFFER *pbuf); static int bm_lock_buffer_mutex(BUFFER *pbuf); static int xbm_lock_buffer(BUFFER *pbuf); static void xbm_unlock_buffer(BUFFER *pbuf); class bm_lock_buffer_guard { public: bool fDebug = false; public: bm_lock_buffer_guard(BUFFER* pbuf, bool do_not_lock=false) // ctor { assert(pbuf != NULL); fBuf = pbuf; if (do_not_lock) { if (fDebug) printf("lock_buffer_guard(%s) ctor without lock\n", fBuf->buffer_name); return; } if (fDebug) printf("lock_buffer_guard(%s) ctor\n", fBuf->buffer_name); int status = xbm_lock_buffer(fBuf); if (status != BM_SUCCESS) { fLocked = false; fError = true; fStatus = status; } else { fLocked = true; } } ~bm_lock_buffer_guard() // dtor { if (fInvalid) { if (fDebug) printf("lock_buffer_guard(invalid) dtor\n"); } else { assert(fBuf != NULL); if (fDebug) printf("lock_buffer_guard(%s) dtor, locked %d, error %d\n", fBuf->buffer_name, fLocked, fError); if (fLocked) { xbm_unlock_buffer(fBuf); fLocked = false; fError = false; } fBuf = NULL; } } // make object uncopyable bm_lock_buffer_guard(const bm_lock_buffer_guard&) = delete; bm_lock_buffer_guard& operator=(const bm_lock_buffer_guard&) = delete; void unlock() { assert(fBuf != NULL); if (fDebug) printf("lock_buffer_guard(%s) unlock, locked %d, error %d\n", fBuf->buffer_name, fLocked, fError); assert(fLocked); xbm_unlock_buffer(fBuf); fLocked = false; fError = false; } bool relock() { assert(fBuf != NULL); if (fDebug) printf("lock_buffer_guard(%s) relock, locked %d, error %d\n", fBuf->buffer_name, fLocked, fError); assert(!fLocked); int status = xbm_lock_buffer(fBuf); if (status != BM_SUCCESS) { fLocked = false; fError = true; fStatus = status; } else { fLocked = true; } return fLocked; } void invalidate() { assert(fBuf != NULL); if (fDebug) printf("lock_buffer_guard(%s) invalidate, locked %d, error %d\n", fBuf->buffer_name, fLocked, fError); assert(!fLocked); fInvalid = true; fBuf = NULL; } bool is_locked() const { return fLocked; } bool is_error() const { return fError; } int get_status() const { return fStatus; } BUFFER* get_pbuf() const { assert(!fInvalid); // pbuf was deleted assert(fBuf); // we do not return NULL return fBuf; } private: BUFFER* fBuf = NULL; bool fLocked = false; bool fError = false; bool fInvalid = false; int fStatus = 0; }; #endif static INT bm_notify_client(const char *buffer_name, int s); static INT bm_push_event(const char *buffer_name); static void bm_defragment_event(HNDLE buffer_handle, HNDLE request_id, EVENT_HEADER *pevent, void *pdata, EVENT_HANDLER *dispatcher); /********************************************************************/ /** Sets the internal watchdog flags and the own timeout. If call_watchdog is TRUE, the cm_watchdog routine is called periodically from the system to show other clients that this application is "alive". On UNIX systems, the alarm() timer is used which is then not available for user purposes. The timeout specifies the time, after which the calling application should be considered "dead" by other clients. Normally, the cm_watchdog() routines is called periodically. If a client crashes, this does not occur any more. Then other clients can detect this and clear all buffer and database entries of this application so they are not blocked any more. If this application should not checked by others, the timeout can be specified as zero. It might be useful for debugging purposes to do so, because if a debugger comes to a breakpoint and stops the application, the periodic call of cm_watchdog is disabled and the client looks like dead. If the timeout is not zero, but the watchdog is not called (call_watchdog == FALSE), the user must ensure to call cm_watchdog periodically with a period of WATCHDOG_INTERVAL milliseconds or less. An application which calles system routines which block the alarm signal for some time, might increase the timeout to the maximum expected blocking time before issuing the calls. One example is the logger doing Exabyte tape IO, which can take up to one minute. @param call_watchdog Call the cm_watchdog routine periodically @param timeout Timeout for this application in ms @return CM_SUCCESS */ INT cm_set_watchdog_params_local(BOOL call_watchdog, DWORD timeout) { #ifdef LOCAL_ROUTINES _watchdog_timeout = timeout; std::vector mybuffers; gBuffersMutex.lock(); mybuffers = gBuffers; gBuffersMutex.unlock(); /* set watchdog timeout of all open buffers */ for (BUFFER* pbuf : mybuffers) { if (!pbuf || !pbuf->attached) continue; bm_lock_buffer_guard pbuf_guard(pbuf); if (!pbuf_guard.is_locked()) continue; BUFFER_HEADER *pheader = pbuf->buffer_header; BUFFER_CLIENT *pclient = bm_get_my_client(pbuf, pheader); /* clear entry from client structure in buffer header */ pclient->watchdog_timeout = timeout; /* show activity */ pclient->last_activity = ss_millitime(); } /* set watchdog timeout for ODB */ db_set_watchdog_params(timeout); #endif /* LOCAL_ROUTINES */ return CM_SUCCESS; } INT cm_set_watchdog_params(BOOL call_watchdog, DWORD timeout) { /* set also local timeout to requested value (needed by cm_enable_watchdog()) */ _watchdog_timeout = timeout; if (rpc_is_remote()) { // we are connected remotely return rpc_call(RPC_CM_SET_WATCHDOG_PARAMS, call_watchdog, timeout); } else if (rpc_is_mserver()) { // we are the mserver RPC_SERVER_ACCEPTION* sa = rpc_get_mserver_acception(); if (sa) sa->watchdog_timeout = timeout; /* write timeout value to client enty in ODB */ HNDLE hDB, hKey; cm_get_experiment_database(&hDB, &hKey); if (hDB) { db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE, TRUE); db_set_value(hDB, hKey, "Link timeout", &timeout, sizeof(timeout), 1, TID_INT32); db_set_mode(hDB, hKey, MODE_READ, TRUE); } return DB_SUCCESS; } else { return cm_set_watchdog_params_local(call_watchdog, timeout); } } /********************************************************************/ /** Return the current watchdog parameters @param call_watchdog Call the cm_watchdog routine periodically @param timeout Timeout for this application in seconds @return CM_SUCCESS */ INT cm_get_watchdog_params(BOOL *call_watchdog, DWORD *timeout) { if (call_watchdog) *call_watchdog = FALSE; if (timeout) *timeout = _watchdog_timeout; return CM_SUCCESS; } /********************************************************************/ /** Return watchdog information about specific client @param hDB ODB handle @param client_name ODB client name @param timeout Timeout for this application in seconds @param last Last time watchdog was called in msec @return CM_SUCCESS, CM_NO_CLIENT, DB_INVALID_HANDLE */ INT cm_get_watchdog_info(HNDLE hDB, const char *client_name, DWORD *timeout, DWORD *last) { if (rpc_is_remote()) return rpc_call(RPC_CM_GET_WATCHDOG_INFO, hDB, client_name, timeout, last); #ifdef LOCAL_ROUTINES return db_get_watchdog_info(hDB, client_name, timeout, last); #else /* LOCAL_ROUTINES */ return CM_SUCCESS; #endif /* LOCAL_ROUTINES */ } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ static void load_rpc_hosts(HNDLE hDB, HNDLE hKey, int index, void *info) { int status; int i, last; KEY key; int max_size; char *str; // if (index != -99) // cm_msg(MINFO, "load_rpc_hosts", "Reloading RPC hosts access control list via hotlink callback"); status = db_get_key(hDB, hKey, &key); if (status != DB_SUCCESS) return; //printf("clear rpc hosts!\n"); rpc_clear_allowed_hosts(); max_size = key.item_size; str = (char *) malloc(max_size); last = 0; for (i = 0; i < key.num_values; i++) { int size = max_size; status = db_get_data_index(hDB, hKey, str, &size, i, TID_STRING); if (status != DB_SUCCESS) break; if (strlen(str) < 1) // skip emties continue; if (str[0] == '#') // skip commented-out entries continue; //printf("add rpc hosts %d [%s]\n", i, str); rpc_add_allowed_host(str); last = i; } if (key.num_values - last < 10) { int new_size = last + 10; status = db_set_num_values(hDB, hKey, new_size); if (status != DB_SUCCESS) { cm_msg(MERROR, "load_rpc_hosts", "Cannot resize the RPC hosts access control list, db_set_num_values(%d) status %d", new_size, status); } } free(str); } static void init_rpc_hosts(HNDLE hDB) { int status; char buf[256]; int size, i; HNDLE hKey; strcpy(buf, "localhost"); size = sizeof(buf); status = db_get_value(hDB, 0, "/Experiment/Security/RPC hosts/Allowed hosts[0]", buf, &size, TID_STRING, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "init_rpc_hosts", "Cannot create the RPC hosts access control list, db_get_value() status %d", status); return; } size = sizeof(i); i = 0; status = db_get_value(hDB, 0, "/Experiment/Security/Disable RPC hosts check", &i, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "init_rpc_hosts", "Cannot create \"Disable RPC hosts check\", db_get_value() status %d", status); return; } if (i != 0) // RPC hosts check is disabled return; status = db_find_key(hDB, 0, "/Experiment/Security/RPC hosts/Allowed hosts", &hKey); if (status != DB_SUCCESS || hKey == 0) { cm_msg(MERROR, "init_rpc_hosts", "Cannot find the RPC hosts access control list, db_find_key() status %d", status); return; } load_rpc_hosts(hDB, hKey, -99, NULL); status = db_watch(hDB, hKey, load_rpc_hosts, NULL); if (status != DB_SUCCESS) { cm_msg(MERROR, "init_rpc_hosts", "Cannot watch the RPC hosts access control list, db_watch() status %d", status); return; } } /********************************************************************/ INT cm_register_server(void) /********************************************************************\ Routine: cm_register_server Purpose: Register a server which can be called from other clients of a specific experiment. Input: none Output: none Function value: CM_SUCCESS Successful completion \********************************************************************/ { if (!_rpc_registered) { INT status; int size; HNDLE hDB, hKey; char name[NAME_LENGTH]; char str[256]; int port = 0; cm_get_experiment_database(&hDB, &hKey); size = sizeof(name); status = db_get_value(hDB, hKey, "Name", &name, &size, TID_STRING, FALSE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_register_server", "cannot get client name, db_get_value() status %d", status); return status; } mstrlcpy(str, "/Experiment/Security/RPC ports/", sizeof(str)); mstrlcat(str, name, sizeof(str)); size = sizeof(port); status = db_get_value(hDB, 0, str, &port, &size, TID_UINT32, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_register_server", "cannot get RPC port number, db_get_value(%s) status %d", str, status); return status; } int lport = 0; // actual port number assigned to us by the OS status = rpc_register_server(port, &_rpc_listen_socket, &lport); if (status != RPC_SUCCESS) { cm_msg(MERROR, "cm_register_server", "error, rpc_register_server(port=%d) status %d", port, status); return status; } _rpc_registered = TRUE; /* register MIDAS library functions */ rpc_register_functions(rpc_get_internal_list(1), NULL); /* store port number in ODB */ status = db_find_key(hDB, hKey, "Server Port", &hKey); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_register_server", "error, db_find_key(\"Server Port\") status %d", status); return status; } /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE, TRUE); /* set value */ status = db_set_data(hDB, hKey, &lport, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_register_server", "error, db_set_data(\"Server Port\"=%d) status %d", port, status); return status; } /* lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); init_rpc_hosts(hDB); } return CM_SUCCESS; } /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ /********************************************************************/ /** Registers a callback function for run transitions. This function internally registers the transition callback function and publishes its request for transition notification by writing a transition request to /System/Clients/\/Transition XXX. Other clients making a transition scan the transition requests of all clients and call their transition callbacks via RPC. Clients can register for transitions (Start/Stop/Pause/Resume) in a given sequence. All sequence numbers given in the registration are sorted on a transition and the clients are contacted in ascending order. By default, all programs register with a sequence number of 500. The logger however uses 200 for start, so that it can open files before the other clients are contacted, and 800 for stop, so that the files get closed when all other clients have gone already through the stop trantition. The callback function returns CM_SUCCESS if it can perform the transition or a value larger than one in case of error. An error string can be copied into the error variable. @attention The callback function will be called on transitions from inside the cm_yield() function which therefore must be contained in the main program loop. \code INT start(INT run_number, char *error) { if () { strcpy(error, "Cannot start because ..."); return 2; } printf("Starting run %d\n", run_number); return CM_SUCCESS; } main() { ... cm_register_transition(TR_START, start, 500); do { status = cm_yield(1000); } while (status != RPC_SHUTDOWN && status != SS_ABORT); ... } \endcode @param transition Transition to register for (see @ref state_transition) @param func Callback function. @param sequence_number Sequence number for that transition (1..1000) @return CM_SUCCESS */ INT cm_register_transition(INT transition, INT(*func)(INT, char *), INT sequence_number) { INT status; HNDLE hDB, hKey, hKeyTrans; KEY key; char str[256]; /* check for valid transition */ if (transition != TR_START && transition != TR_STOP && transition != TR_PAUSE && transition != TR_RESUME && transition != TR_STARTABORT) { cm_msg(MERROR, "cm_register_transition", "Invalid transition request \"%d\"", transition); return CM_INVALID_TRANSITION; } cm_get_experiment_database(&hDB, &hKey); rpc_register_function(RPC_RC_TRANSITION, rpc_transition_dispatch); /* register new transition request */ { std::lock_guard guard(_trans_table_mutex); for (size_t i = 0; i < _trans_table.size(); i++) { if (_trans_table[i].transition == transition && _trans_table[i].sequence_number == sequence_number) { cm_msg(MERROR, "cm_register_transition", "transition %s with sequence number %d is already registered", cm_transition_name(transition).c_str(), sequence_number); return CM_INVALID_TRANSITION; } } bool found = false; for (size_t i = 0; i < _trans_table.size(); i++) { if (!_trans_table[i].transition) { _trans_table[i].transition = transition; _trans_table[i].sequence_number = sequence_number; _trans_table[i].func = func; found = true; break; } } if (!found) { TRANS_TABLE tt; tt.transition = transition; tt.sequence_number = sequence_number; tt.func = func; _trans_table.push_back(tt); } // implicit unlock } sprintf(str, "Transition %s", cm_transition_name(transition).c_str()); /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE); /* set value */ status = db_find_key(hDB, hKey, str, &hKeyTrans); if (!hKeyTrans) { status = db_set_value(hDB, hKey, str, &sequence_number, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) return status; } else { status = db_get_key(hDB, hKeyTrans, &key); if (status != DB_SUCCESS) return status; status = db_set_data_index(hDB, hKeyTrans, &sequence_number, sizeof(INT), key.num_values, TID_INT32); if (status != DB_SUCCESS) return status; } /* re-lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); return CM_SUCCESS; } INT cm_deregister_transition(INT transition) { INT status; HNDLE hDB, hKey, hKeyTrans; char str[256]; /* check for valid transition */ if (transition != TR_START && transition != TR_STOP && transition != TR_PAUSE && transition != TR_RESUME && transition != TR_STARTABORT) { cm_msg(MERROR, "cm_deregister_transition", "Invalid transition request \"%d\"", transition); return CM_INVALID_TRANSITION; } cm_get_experiment_database(&hDB, &hKey); { std::lock_guard guard(_trans_table_mutex); /* remove existing transition request */ for (size_t i = 0; i < _trans_table.size(); i++) { if (_trans_table[i].transition == transition) { _trans_table[i].transition = 0; _trans_table[i].sequence_number = 0; _trans_table[i].func = NULL; } } // implicit unlock } sprintf(str, "Transition %s", cm_transition_name(transition).c_str()); /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE); /* set value */ status = db_find_key(hDB, hKey, str, &hKeyTrans); if (hKeyTrans) { status = db_delete_key(hDB, hKeyTrans, FALSE); if (status != DB_SUCCESS) return status; } /* re-lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); return CM_SUCCESS; } /********************************************************************/ /** Change the transition sequence for the calling program. @param transition TR_START, TR_PAUSE, TR_RESUME or TR_STOP. @param sequence_number New sequence number, should be between 1 and 1000 @return CM_SUCCESS */ INT cm_set_transition_sequence(INT transition, INT sequence_number) { INT status; HNDLE hDB, hKey; char str[256]; /* check for valid transition */ if (transition != TR_START && transition != TR_STOP && transition != TR_PAUSE && transition != TR_RESUME) { cm_msg(MERROR, "cm_set_transition_sequence", "Invalid transition request \"%d\"", transition); return CM_INVALID_TRANSITION; } { std::lock_guard guard(_trans_table_mutex); int count = 0; for (size_t i = 0; i < _trans_table.size(); i++) { if (_trans_table[i].transition == transition) { _trans_table[i].sequence_number = sequence_number; count++; } } if (count == 0) { cm_msg(MERROR, "cm_set_transition_sequence", "transition %s is not registered", cm_transition_name(transition).c_str()); return CM_INVALID_TRANSITION; } else if (count > 1) { cm_msg(MERROR, "cm_set_transition_sequence", "cannot change sequence number, transition %s is registered %d times", cm_transition_name(transition).c_str(), count); return CM_INVALID_TRANSITION; } /* Change local sequence number for this transition type */ for (size_t i = 0; i < _trans_table.size(); i++) { if (_trans_table[i].transition == transition) { _trans_table[i].sequence_number = sequence_number; } } // implicit unlock } cm_get_experiment_database(&hDB, &hKey); /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE, TRUE); sprintf(str, "Transition %s", cm_transition_name(transition).c_str()); /* set value */ status = db_set_value(hDB, hKey, str, &sequence_number, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) return status; /* re-lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); return CM_SUCCESS; } INT cm_set_client_run_state(INT state) { INT status; HNDLE hDB, hKey; KEY key; cm_get_experiment_database(&hDB, &hKey); /* check that hKey is still valid */ status = db_get_key(hDB, hKey, &key); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_set_client_run_state", "Cannot set client run state, client hKey %d into /System/Clients is not valid, maybe this client was removed by a watchdog timeout", hKey); return status; } /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE, TRUE); /* set value */ status = db_set_value(hDB, hKey, "Run state", &state, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) return status; /* re-lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); return CM_SUCCESS; } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS static INT _requested_transition; static DWORD _deferred_transition_mask; /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ /********************************************************************/ /** Register a deferred transition handler. If a client is registered as a deferred transition handler, it may defer a requested transition by returning FALSE until a certain condition (like a motor reaches its end position) is reached. @param transition One of TR_xxx @param (*func) Function which gets called whenever a transition is requested. If it returns FALSE, the transition is not performed. @return CM_SUCCESS, \ Error from ODB access */ INT cm_register_deferred_transition(INT transition, BOOL(*func)(INT, BOOL)) { INT status, size; char tr_key_name[256]; HNDLE hDB, hKey; cm_get_experiment_database(&hDB, &hKey); for (int i = 0; _deferred_trans_table[i].transition; i++) if (_deferred_trans_table[i].transition == transition) _deferred_trans_table[i].func = (int (*)(int, char *)) func; /* set new transition mask */ _deferred_transition_mask |= transition; sprintf(tr_key_name, "Transition %s DEFERRED", cm_transition_name(transition).c_str()); /* unlock database */ db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE, TRUE); /* set value */ int i = 0; status = db_set_value(hDB, hKey, tr_key_name, &i, sizeof(INT), 1, TID_INT32); if (status != DB_SUCCESS) return status; /* re-lock database */ db_set_mode(hDB, hKey, MODE_READ, TRUE); /* hot link requested transition */ size = sizeof(_requested_transition); db_get_value(hDB, 0, "/Runinfo/Requested Transition", &_requested_transition, &size, TID_INT32, TRUE); db_find_key(hDB, 0, "/Runinfo/Requested Transition", &hKey); status = db_open_record(hDB, hKey, &_requested_transition, sizeof(INT), MODE_READ, NULL, NULL); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_register_deferred_transition", "Cannot hotlink /Runinfo/Requested Transition"); return status; } return CM_SUCCESS; } /********************************************************************/ /** Check for any deferred transition. If a deferred transition handler has been registered via the cm_register_deferred_transition function, this routine should be called regularly. It checks if a transition request is pending. If so, it calld the registered handler if the transition should be done and then actually does the transition. @return CM_SUCCESS, \ Error from cm_transition() */ INT cm_check_deferred_transition() { INT i, status; char str[256]; static BOOL first; if (_requested_transition == 0) first = TRUE; if (_requested_transition & _deferred_transition_mask) { for (i = 0; _deferred_trans_table[i].transition; i++) if (_deferred_trans_table[i].transition == _requested_transition) break; if (_deferred_trans_table[i].transition == _requested_transition) { if (((BOOL(*)(INT, BOOL)) _deferred_trans_table[i].func)(_requested_transition, first)) { status = cm_transition(_requested_transition | TR_DEFERRED, 0, str, sizeof(str), TR_SYNC, FALSE); if (status != CM_SUCCESS) cm_msg(MERROR, "cm_check_deferred_transition", "Cannot perform deferred transition: %s", str); /* bypass hotlink and set _requested_transition directly to zero */ _requested_transition = 0; return status; } first = FALSE; } } return SUCCESS; } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ /**dox***************************************************************/ #endif /* DOXYGEN_SHOULD_SKIP_THIS */ struct TrClient { int transition = 0; int run_number = 0; int async_flag = 0; int debug_flag = 0; int sequence_number = 0; std::vector wait_for_index; std::string host_name; std::string client_name; int port = 0; std::string key_name; /* this client key name in /System/Clients */ std::atomic_int status{0}; std::thread* thread = NULL; std::string errorstr; DWORD init_time = 0; // time when tr_client created std::string waiting_for_client; // name of client we are waiting for DWORD connect_timeout = 0; DWORD connect_start_time = 0; // time when client rpc connection is started DWORD connect_end_time = 0; // time when client rpc connection is finished DWORD rpc_timeout = 0; DWORD rpc_start_time = 0; // time client rpc call is started DWORD rpc_end_time = 0; // time client rpc call is finished DWORD end_time = 0; // time client thread is finished TrClient() // ctor { // empty } ~TrClient() // dtor { //printf("TrClient::dtor: client \"%s\"\n", client_name); assert(thread == NULL); } void Print() const { printf("client \"%s\", transition %d, seqno %d, status %d", client_name.c_str(), transition, sequence_number, int(status)); if (wait_for_index.size() > 0) { printf(", wait for:"); for (size_t i=0; i& arg1, const std::unique_ptr& arg2) { return arg1->sequence_number < arg2->sequence_number; } /*------------------------------------------------------------------*/ struct TrState { int transition = 0; int run_number = 0; int async_flag = 0; int debug_flag = 0; int status = 0; std::string errorstr; DWORD start_time = 0; DWORD end_time = 0; std::vector> clients; }; /*------------------------------------------------------------------*/ static int tr_finish(HNDLE hDB, TrState* tr, int transition, int status, const char *errorstr) { DWORD end_time = ss_millitime(); if (transition != TR_STARTABORT) { db_set_value(hDB, 0, "/System/Transition/end_time", &end_time, sizeof(DWORD), 1, TID_UINT32); db_set_value(hDB, 0, "/System/Transition/status", &status, sizeof(INT), 1, TID_INT32); if (errorstr) { db_set_value(hDB, 0, "/System/Transition/error", errorstr, strlen(errorstr) + 1, 1, TID_STRING); } else if (status == CM_SUCCESS) { const char *buf = "Success"; db_set_value(hDB, 0, "/System/Transition/error", buf, strlen(buf) + 1, 1, TID_STRING); } else { char buf[256]; sprintf(buf, "status %d", status); db_set_value(hDB, 0, "/System/Transition/error", buf, strlen(buf) + 1, 1, TID_STRING); } } tr->status = status; tr->end_time = end_time; if (errorstr) { tr->errorstr = errorstr; } else { tr->errorstr = "(null)"; } return status; } /*------------------------------------------------------------------*/ static void write_tr_client_to_odb(HNDLE hDB, const TrClient *tr_client) { //printf("Writing client [%s] to ODB\n", tr_client->client_name.c_str()); int status; HNDLE hKey; if (tr_client->transition == TR_STARTABORT) { status = db_create_key(hDB, 0, "/System/Transition/TR_STARTABORT", TID_KEY); status = db_find_key(hDB, 0, "/System/Transition/TR_STARTABORT", &hKey); if (status != DB_SUCCESS) return; } else { status = db_create_key(hDB, 0, "/System/Transition/Clients", TID_KEY); status = db_find_key(hDB, 0, "/System/Transition/Clients", &hKey); if (status != DB_SUCCESS) return; } // same client_name can exist with different sequence numbers! std::string keyname = msprintf("%s_%d", tr_client->client_name.c_str(), tr_client->sequence_number); status = db_create_key(hDB, hKey, keyname.c_str(), TID_KEY); status = db_find_key(hDB, hKey, keyname.c_str(), &hKey); if (status != DB_SUCCESS) return; DWORD now = ss_millitime(); //int transition; //int run_number; //int async_flag; //int debug_flag; status = db_set_value(hDB, hKey, "sequence_number", &tr_client->sequence_number, sizeof(INT), 1, TID_INT32); status = db_set_value(hDB, hKey, "client_name", tr_client->client_name.c_str(), tr_client->client_name.length() + 1, 1, TID_STRING); status = db_set_value(hDB, hKey, "host_name", tr_client->host_name.c_str(), tr_client->host_name.length() + 1, 1, TID_STRING); status = db_set_value(hDB, hKey, "port", &tr_client->port, sizeof(INT), 1, TID_INT32); status = db_set_value(hDB, hKey, "init_time", &tr_client->init_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "waiting_for_client", tr_client->waiting_for_client.c_str(), tr_client->waiting_for_client.length() + 1, 1, TID_STRING); status = db_set_value(hDB, hKey, "connect_timeout", &tr_client->connect_timeout, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "connect_start_time", &tr_client->connect_start_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "connect_end_time", &tr_client->connect_end_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "rpc_timeout", &tr_client->rpc_timeout, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "rpc_start_time", &tr_client->rpc_start_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "rpc_end_time", &tr_client->rpc_end_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "end_time", &tr_client->end_time, sizeof(DWORD), 1, TID_UINT32); status = db_set_value(hDB, hKey, "status", &tr_client->status, sizeof(INT), 1, TID_INT32); status = db_set_value(hDB, hKey, "error", tr_client->errorstr.c_str(), tr_client->errorstr.length() + 1, 1, TID_STRING); status = db_set_value(hDB, hKey, "last_updated", &now, sizeof(DWORD), 1, TID_UINT32); } /*------------------------------------------------------------------*/ /* Perform a detached transition through the external "mtransition" program */ static int cm_transition_detach(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag) { HNDLE hDB; int status; const char *args[100]; std::string path; char debug_arg[256]; char start_arg[256]; std::string expt_name; std::string mserver_hostname; int iarg = 0; cm_get_experiment_database(&hDB, NULL); const char *midassys = getenv("MIDASSYS"); if (midassys) { path += midassys; path += DIR_SEPARATOR_STR; path += "bin"; path += DIR_SEPARATOR_STR; } path += "mtransition"; args[iarg++] = path.c_str(); if (rpc_is_remote()) { /* if connected to mserver, pass connection info to mtransition */ mserver_hostname = rpc_get_mserver_hostname(); args[iarg++] = "-h"; args[iarg++] = mserver_hostname.c_str(); } /* get experiment name from ODB */ db_get_value_string(hDB, 0, "/Experiment/Name", 0, &expt_name, FALSE); if (expt_name.length() > 0) { args[iarg++] = "-e"; args[iarg++] = expt_name.c_str(); } if (debug_flag) { args[iarg++] = "-d"; sprintf(debug_arg, "%d", debug_flag); args[iarg++] = debug_arg; } if (transition == TR_STOP) args[iarg++] = "STOP"; else if (transition == TR_PAUSE) args[iarg++] = "PAUSE"; else if (transition == TR_RESUME) args[iarg++] = "RESUME"; else if (transition == TR_START) { args[iarg++] = "START"; sprintf(start_arg, "%d", run_number); args[iarg++] = start_arg; } args[iarg++] = NULL; #if 0 for (iarg = 0; args[iarg] != NULL; iarg++) { printf("arg[%d] [%s]\n", iarg, args[iarg]); } #endif status = ss_spawnv(P_DETACH, args[0], args); if (status != SS_SUCCESS) { if (errstr != NULL) { sprintf(errstr, "Cannot execute mtransition, ss_spawnv() returned %d", status); } return CM_SET_ERROR; } return CM_SUCCESS; } /*------------------------------------------------------------------*/ /* contact a client via RPC and execute the remote transition */ static int cm_transition_call(TrState* s, int idx) { INT old_timeout, status, i, t1, t0, size; HNDLE hDB; HNDLE hConn = -1; int connect_timeout = 10000; int timeout = 120000; cm_get_experiment_database(&hDB, NULL); assert(hDB); TrClient *tr_client = s->clients[idx].get(); tr_client->errorstr = ""; //tr_client->init_time = ss_millitime(); tr_client->waiting_for_client = ""; tr_client->connect_timeout = 0; tr_client->connect_start_time = 0; tr_client->connect_end_time = 0; tr_client->rpc_timeout = 0; tr_client->rpc_start_time = 0; tr_client->rpc_end_time = 0; tr_client->end_time = 0; write_tr_client_to_odb(hDB, tr_client); /* wait for predecessor if set */ if (tr_client->async_flag & TR_MTHREAD && !tr_client->wait_for_index.empty()) { while (1) { TrClient* wait_for = NULL; for (size_t i = 0; i < tr_client->wait_for_index.size(); i++) { int wait_for_index = tr_client->wait_for_index[i]; assert(wait_for_index >= 0); assert(wait_for_index < (int)s->clients.size()); TrClient *t = s->clients[wait_for_index].get(); if (!t) continue; if (t->status == 0) { wait_for = t; break; } if (t->status != SUCCESS && tr_client->transition != TR_STOP) { cm_msg(MERROR, "cm_transition_call", "Transition %d aborted: client \"%s\" returned status %d", tr_client->transition, t->client_name.c_str(), int(t->status)); tr_client->status = -1; tr_client->errorstr = msprintf("Aborted by failure of client \"%s\"", t->client_name.c_str()); tr_client->end_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); return CM_SUCCESS; } } if (wait_for == NULL) break; tr_client->waiting_for_client = wait_for->client_name; write_tr_client_to_odb(hDB, tr_client); if (tr_client->debug_flag == 1) printf("Client \"%s\" waits for client \"%s\"\n", tr_client->client_name.c_str(), wait_for->client_name.c_str()); i = 0; size = sizeof(i); status = db_get_value(hDB, 0, "/Runinfo/Transition in progress", &i, &size, TID_INT32, FALSE); if (status == DB_SUCCESS && i == 0) { cm_msg(MERROR, "cm_transition_call", "Client \"%s\" transition %d aborted while waiting for client \"%s\": \"/Runinfo/Transition in progress\" was cleared", tr_client->client_name.c_str(), tr_client->transition, wait_for->client_name.c_str()); tr_client->status = -1; tr_client->errorstr = "Canceled"; tr_client->end_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); return CM_SUCCESS; } ss_sleep(100); }; } tr_client->waiting_for_client[0] = 0; /* contact client if transition mask set */ if (tr_client->debug_flag == 1) printf("Connecting to client \"%s\" on host %s...\n", tr_client->client_name.c_str(), tr_client->host_name.c_str()); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call", "cm_transition_call: Connecting to client \"%s\" on host %s...", tr_client->client_name.c_str(), tr_client->host_name.c_str()); /* get transition timeout for rpc connect */ size = sizeof(timeout); db_get_value(hDB, 0, "/Experiment/Transition connect timeout", &connect_timeout, &size, TID_INT32, TRUE); if (connect_timeout < 1000) connect_timeout = 1000; /* get transition timeout */ size = sizeof(timeout); db_get_value(hDB, 0, "/Experiment/Transition timeout", &timeout, &size, TID_INT32, TRUE); if (timeout < 1000) timeout = 1000; /* set our timeout for rpc_client_connect() */ //old_timeout = rpc_get_timeout(RPC_HNDLE_CONNECT); rpc_set_timeout(RPC_HNDLE_CONNECT, connect_timeout, &old_timeout); tr_client->connect_timeout = connect_timeout; tr_client->connect_start_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); /* client found -> connect to its server port */ status = rpc_client_connect(tr_client->host_name.c_str(), tr_client->port, tr_client->client_name.c_str(), &hConn); rpc_set_timeout(RPC_HNDLE_CONNECT, old_timeout); tr_client->connect_end_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); if (status != RPC_SUCCESS) { cm_msg(MERROR, "cm_transition_call", "cannot connect to client \"%s\" on host %s, port %d, status %d", tr_client->client_name.c_str(), tr_client->host_name.c_str(), tr_client->port, status); tr_client->errorstr = msprintf("Cannot connect to client \"%s\"", tr_client->client_name.c_str()); /* clients that do not respond to transitions are dead or defective, get rid of them. K.O. */ cm_shutdown(tr_client->client_name.c_str(), TRUE); cm_cleanup(tr_client->client_name.c_str(), TRUE); if (tr_client->transition != TR_STOP) { /* indicate abort */ i = 1; db_set_value(hDB, 0, "/Runinfo/Start abort", &i, sizeof(INT), 1, TID_INT32); i = 0; db_set_value(hDB, 0, "/Runinfo/Transition in progress", &i, sizeof(INT), 1, TID_INT32); } tr_client->status = status; tr_client->end_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); return status; } if (tr_client->debug_flag == 1) printf("Connection established to client \"%s\" on host %s\n", tr_client->client_name.c_str(), tr_client->host_name.c_str()); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call", "cm_transition: Connection established to client \"%s\" on host %s", tr_client->client_name.c_str(), tr_client->host_name.c_str()); /* call RC_TRANSITION on remote client with increased timeout */ //old_timeout = rpc_get_timeout(hConn); rpc_set_timeout(hConn, timeout, &old_timeout); tr_client->rpc_timeout = timeout; tr_client->rpc_start_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); if (tr_client->debug_flag == 1) printf("Executing RPC transition client \"%s\" on host %s...\n", tr_client->client_name.c_str(), tr_client->host_name.c_str()); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call", "cm_transition: Executing RPC transition client \"%s\" on host %s...", tr_client->client_name.c_str(), tr_client->host_name.c_str()); t0 = ss_millitime(); char errorstr[TRANSITION_ERROR_STRING_LENGTH]; errorstr[0] = 0; status = rpc_client_call(hConn, RPC_RC_TRANSITION, tr_client->transition, tr_client->run_number, errorstr, sizeof(errorstr), tr_client->sequence_number); tr_client->errorstr = errorstr; t1 = ss_millitime(); tr_client->rpc_end_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); /* fix for clients returning 0 as error code */ if (status == 0) status = FE_ERR_HW; /* reset timeout */ rpc_set_timeout(hConn, old_timeout); //DWORD t2 = ss_millitime(); if (tr_client->debug_flag == 1) printf("RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d\n", tr_client->client_name.c_str(), tr_client->host_name.c_str(), t1 - t0, status); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call", "cm_transition: RPC transition finished client \"%s\" on host \"%s\" in %d ms with status %d", tr_client->client_name.c_str(), tr_client->host_name.c_str(), t1 - t0, status); if (status == RPC_NET_ERROR || status == RPC_TIMEOUT) { tr_client->errorstr = msprintf("RPC network error or timeout from client \'%s\' on host \"%s\"", tr_client->client_name.c_str(), tr_client->host_name.c_str()); /* clients that do not respond to transitions are dead or defective, get rid of them. K.O. */ cm_shutdown(tr_client->client_name.c_str(), TRUE); cm_cleanup(tr_client->client_name.c_str(), TRUE); } else if (status != CM_SUCCESS && tr_client->errorstr.empty()) { tr_client->errorstr = msprintf("Unknown error %d from client \'%s\' on host \"%s\"", status, tr_client->client_name.c_str(), tr_client->host_name.c_str()); } tr_client->status = status; tr_client->end_time = ss_millitime(); // write updated status and end_time to ODB write_tr_client_to_odb(hDB, tr_client); #if 0 printf("hconn %d cm_transition_call(%s) finished init %d connect %d end %d rpc %d end %d xxx %d end %d\n", hConn, tr_client->client_name.c_str(), tr_client->init_time - tr_client->init_time, tr_client->connect_start_time - tr_client->init_time, tr_client->connect_end_time - tr_client->init_time, tr_client->rpc_start_time - tr_client->init_time, tr_client->rpc_end_time - tr_client->init_time, t2 - tr_client->init_time, tr_client->end_time - tr_client->init_time); #endif return CM_SUCCESS; } /*------------------------------------------------------------------*/ static int cm_transition_call_direct(TrClient *tr_client) { HNDLE hDB; cm_get_experiment_database(&hDB, NULL); DWORD now = ss_millitime(); tr_client->errorstr = ""; //tr_client->init_time = now; tr_client->waiting_for_client = ""; tr_client->connect_timeout = 0; tr_client->connect_start_time = now; tr_client->connect_end_time = now; tr_client->rpc_timeout = 0; tr_client->rpc_start_time = 0; tr_client->rpc_end_time = 0; tr_client->end_time = 0; write_tr_client_to_odb(hDB, tr_client); // find registered handler // NB: this code should match same code in rpc_transition_dispatch() // NB: only use the first handler, this is how MIDAS always worked // NB: we could run all handlers, but we can return the status and error string of only one of them. _trans_table_mutex.lock(); size_t n = _trans_table.size(); _trans_table_mutex.unlock(); for (size_t i = 0; i < n; i++) { _trans_table_mutex.lock(); TRANS_TABLE tt = _trans_table[i]; _trans_table_mutex.unlock(); if (tt.transition == tr_client->transition && tt.sequence_number == tr_client->sequence_number) { /* call registered function */ if (tt.func) { if (tr_client->debug_flag == 1) printf("Calling local transition callback\n"); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call_direct", "cm_transition: Calling local transition callback"); tr_client->rpc_start_time = ss_millitime(); write_tr_client_to_odb(hDB, tr_client); char errorstr[TRANSITION_ERROR_STRING_LENGTH]; errorstr[0] = 0; tr_client->status = tt.func(tr_client->run_number, errorstr); tr_client->errorstr = errorstr; tr_client->rpc_end_time = ss_millitime(); if (tr_client->debug_flag == 1) printf("Local transition callback finished, status %d\n", int(tr_client->status)); if (tr_client->debug_flag == 2) cm_msg(MINFO, "cm_transition_call_direct", "cm_transition: Local transition callback finished, status %d", int(tr_client->status)); tr_client->end_time = ss_millitime(); // write status and end_time to ODB write_tr_client_to_odb(hDB, tr_client); return tr_client->status; } } } cm_msg(MERROR, "cm_transition_call_direct", "no handler for transition %d with sequence number %d", tr_client->transition, tr_client->sequence_number); tr_client->status = CM_SUCCESS; tr_client->end_time = ss_millitime(); // write status and end_time to ODB write_tr_client_to_odb(hDB, tr_client); return CM_SUCCESS; } /********************************************************************/ /** Performs a run transition (Start/Stop/Pause/Resume). Synchronous/Asynchronous flag. If set to TR_ASYNC, the transition is done asynchronously, meaning that clients are connected and told to execute their callback routine, but no result is awaited. The return value is specified by the transition callback function on the remote clients. If all callbacks can perform the transition, CM_SUCCESS is returned. If one callback cannot perform the transition, the return value of this callback is returned from cm_transition(). The async_flag is usually FALSE so that transition callbacks can block a run transition in case of problems and return an error string. The only exception are situations where a run transition is performed automatically by a program which cannot block in a transition. For example the logger can cause a run stop when a disk is nearly full but it cannot block in the cm_transition() function since it has its own run stop callback which must flush buffers and close disk files and tapes. \code ... i = 1; db_set_value(hDB, 0, "/Runinfo/Transition in progress", &i, sizeof(INT), 1, TID_INT32); status = cm_transition(TR_START, new_run_number, str, sizeof(str), SYNC, debug_flag); if (status != CM_SUCCESS) { // in case of error printf("Error: %s\n", str); } ... \endcode @param transition TR_START, TR_PAUSE, TR_RESUME or TR_STOP. @param run_number New run number. If zero, use current run number plus one. @param errstr returned error string. @param errstr_size Size of error string. @param async_flag TR_SYNC: synchronization flag (TR_SYNC:wait completion, TR_ASYNC: retun immediately) @param debug_flag If 1 output debugging information, if 2 output via cm_msg(). @return CM_SUCCESS, \ error code from remote client */ static INT cm_transition2(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag) { INT i, status, size, sequence_number, port, state; HNDLE hDB, hRootKey, hSubkey, hKey, hKeylocal, hKeyTrans; DWORD seconds; char tr_key_name[256]; KEY key; BOOL deferred; char xerrstr[TRANSITION_ERROR_STRING_LENGTH]; //printf("cm_transition2: transition %d, run_number %d, errstr %p, errstr_size %d, async_flag %d, debug_flag %d\n", transition, run_number, errstr, errstr_size, async_flag, debug_flag); /* if needed, use internal error string */ if (!errstr) { errstr = xerrstr; errstr_size = sizeof(xerrstr); } /* erase error string */ errstr[0] = 0; /* get key of local client */ cm_get_experiment_database(&hDB, &hKeylocal); deferred = (transition & TR_DEFERRED) > 0; transition &= ~TR_DEFERRED; /* check for valid transition */ if (transition != TR_START && transition != TR_STOP && transition != TR_PAUSE && transition != TR_RESUME && transition != TR_STARTABORT) { cm_msg(MERROR, "cm_transition", "Invalid transition request \"%d\"", transition); mstrlcpy(errstr, "Invalid transition request", errstr_size); return CM_INVALID_TRANSITION; } /* check if transition in progress */ if (!deferred) { i = 0; size = sizeof(i); db_get_value(hDB, 0, "/Runinfo/Transition in progress", &i, &size, TID_INT32, TRUE); if (i == 1) { if (errstr) { sprintf(errstr, "Start/Stop transition %d already in progress, please try again later\n", i); mstrlcat(errstr, "or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size); } cm_msg(MERROR, "cm_transition", "another transition is already in progress"); return CM_TRANSITION_IN_PROGRESS; } } /* indicate transition in progress */ i = transition; db_set_value(hDB, 0, "/Runinfo/Transition in progress", &i, sizeof(INT), 1, TID_INT32); /* clear run abort flag */ i = 0; db_set_value(hDB, 0, "/Runinfo/Start abort", &i, sizeof(INT), 1, TID_INT32); /* construct new transition state */ TrState s; s.transition = transition; s.run_number = run_number; s.async_flag = async_flag; s.debug_flag = debug_flag; s.status = 0; s.errorstr[0] = 0; s.start_time = ss_millitime(); s.end_time = 0; /* construct the ODB tree /System/Transition */ status = db_find_key(hDB, 0, "/System/Transition/TR_STARTABORT", &hKey); if (status == DB_SUCCESS) { db_delete_key(hDB, hKey, FALSE); } if (transition != TR_STARTABORT) { status = db_find_key(hDB, 0, "/System/Transition/Clients", &hKey); if (status == DB_SUCCESS) { db_delete_key(hDB, hKey, FALSE); } } if (transition != TR_STARTABORT) { db_set_value(hDB, 0, "/System/Transition/transition", &transition, sizeof(INT), 1, TID_INT32); db_set_value(hDB, 0, "/System/Transition/run_number", &run_number, sizeof(INT), 1, TID_INT32); db_set_value(hDB, 0, "/System/Transition/start_time", &s.start_time, sizeof(DWORD), 1, TID_UINT32); db_set_value(hDB, 0, "/System/Transition/end_time", &s.end_time, sizeof(DWORD), 1, TID_UINT32); status = 0; db_set_value(hDB, 0, "/System/Transition/status", &status, sizeof(INT), 1, TID_INT32); db_set_value(hDB, 0, "/System/Transition/error", "", 1, 1, TID_STRING); db_set_value(hDB, 0, "/System/Transition/deferred", "", 1, 1, TID_STRING); } /* check for alarms */ i = 0; size = sizeof(i); db_get_value(hDB, 0, "/Experiment/Prevent start on alarms", &i, &size, TID_BOOL, TRUE); if (i == TRUE && transition == TR_START) { al_check(); std::string alarms; if (al_get_alarms(&alarms) > 0) { cm_msg(MERROR, "cm_transition", "Run start abort due to alarms: %s", alarms.c_str()); mstrlcpy(errstr, "Cannot start run due to alarms: ", errstr_size); mstrlcat(errstr, alarms.c_str(), errstr_size); return tr_finish(hDB, &s, transition, AL_TRIGGERED, errstr); } } /* check for required programs */ i = 0; size = sizeof(i); db_get_value(hDB, 0, "/Experiment/Prevent start on required progs", &i, &size, TID_BOOL, TRUE); if (i == TRUE && transition == TR_START) { HNDLE hkeyroot, hkey; /* check /programs alarms */ db_find_key(hDB, 0, "/Programs", &hkeyroot); if (hkeyroot) { for (i = 0;; i++) { BOOL program_info_required = FALSE; status = db_enum_key(hDB, hkeyroot, i, &hkey); if (status == DB_NO_MORE_SUBKEYS) break; db_get_key(hDB, hkey, &key); /* don't check "execute on xxx" */ if (key.type != TID_KEY) continue; size = sizeof(program_info_required); status = db_get_value(hDB, hkey, "Required", &program_info_required, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "Cannot get program info required, status %d", status); continue; } if (program_info_required) { std::string name = rpc_get_name(); std::string str = name; str.resize(strlen(key.name)); if (!equal_ustring(str.c_str(), key.name) && cm_exist(key.name, FALSE) == CM_NO_CLIENT) { cm_msg(MERROR, "cm_transition", "Run start abort due to program \"%s\" not running", key.name); std::string serrstr = msprintf("Run start abort due to program \"%s\" not running", key.name); mstrlcpy(errstr, serrstr.c_str(), errstr_size); return tr_finish(hDB, &s, transition, AL_TRIGGERED, errstr); } } } } } /* do detached transition via mtransition tool */ if (async_flag & TR_DETACH) { status = cm_transition_detach(transition, run_number, errstr, errstr_size, async_flag, debug_flag); return tr_finish(hDB, &s, transition, status, errstr); } mstrlcpy(errstr, "Unknown error", errstr_size); if (debug_flag == 0) { size = sizeof(i); db_get_value(hDB, 0, "/Experiment/Transition debug flag", &debug_flag, &size, TID_INT32, TRUE); } /* if no run number is given, get it from ODB and increment it */ if (run_number == 0) { size = sizeof(run_number); status = db_get_value(hDB, 0, "Runinfo/Run number", &run_number, &size, TID_INT32, TRUE); assert(status == SUCCESS); if (transition == TR_START) { run_number++; } s.run_number = run_number; if (transition != TR_STARTABORT) { db_set_value(hDB, 0, "/System/Transition/run_number", &run_number, sizeof(INT), 1, TID_INT32); } } if (run_number <= 0) { cm_msg(MERROR, "cm_transition", "aborting on attempt to use invalid run number %d", run_number); abort(); } /* Set new run number in ODB */ if (transition == TR_START) { if (debug_flag == 1) printf("Setting run number %d in ODB\n", run_number); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: Setting run number %d in ODB", run_number); status = db_set_value(hDB, 0, "Runinfo/Run number", &run_number, sizeof(run_number), 1, TID_INT32); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "cannot set Runinfo/Run number in database, status %d", status); abort(); } } if (deferred) { if (debug_flag == 1) printf("Clearing /Runinfo/Requested transition\n"); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: Clearing /Runinfo/Requested transition"); /* remove transition request */ i = 0; db_set_value(hDB, 0, "/Runinfo/Requested transition", &i, sizeof(int), 1, TID_INT32); } else { status = db_find_key(hDB, 0, "System/Clients", &hRootKey); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "cannot find System/Clients entry in database"); if (errstr) mstrlcpy(errstr, "Cannot find /System/Clients in ODB", errstr_size); return tr_finish(hDB, &s, transition, status, errstr); } /* check if deferred transition already in progress */ size = sizeof(i); db_get_value(hDB, 0, "/Runinfo/Requested transition", &i, &size, TID_INT32, TRUE); if (i) { if (errstr) { mstrlcpy(errstr, "Deferred transition already in progress", errstr_size); mstrlcat(errstr, ", to cancel, set \"/Runinfo/Requested transition\" to zero", errstr_size); } return tr_finish(hDB, &s, transition, CM_TRANSITION_IN_PROGRESS, errstr); } std::string trname = cm_transition_name(transition); sprintf(tr_key_name, "Transition %s DEFERRED", trname.c_str()); /* search database for clients with deferred transition request */ for (i = 0, status = 0;; i++) { status = db_enum_key(hDB, hRootKey, i, &hSubkey); if (status == DB_NO_MORE_SUBKEYS) break; if (status == DB_SUCCESS) { size = sizeof(sequence_number); status = db_get_value(hDB, hSubkey, tr_key_name, &sequence_number, &size, TID_INT32, FALSE); /* if registered for deferred transition, set flag in ODB and return */ if (status == DB_SUCCESS) { char str[256]; size = NAME_LENGTH; db_get_value(hDB, hSubkey, "Name", str, &size, TID_STRING, TRUE); if (debug_flag == 1) printf("---- Transition %s deferred by client \"%s\" ----\n", trname.c_str(), str); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: ---- Transition %s deferred by client \"%s\" ----", trname.c_str(), str); if (debug_flag == 1) printf("Setting /Runinfo/Requested transition\n"); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: Setting /Runinfo/Requested transition"); /* /Runinfo/Requested transition is hot-linked by mfe.c and writing to it * will activate the deferred transition code in the frontend. * the transition itself will be run from the frontend via cm_transition(TR_DEFERRED) */ db_set_value(hDB, 0, "/Runinfo/Requested transition", &transition, sizeof(int), 1, TID_INT32); db_set_value(hDB, 0, "/System/Transition/deferred", str, strlen(str) + 1, 1, TID_STRING); if (errstr) sprintf(errstr, "Transition %s deferred by client \"%s\"", trname.c_str(), str); return tr_finish(hDB, &s, transition, CM_DEFERRED_TRANSITION, errstr); } } } } /* execute programs on start */ if (transition == TR_START) { char str[256]; str[0] = 0; size = sizeof(str); db_get_value(hDB, 0, "/Programs/Execute on start run", str, &size, TID_STRING, TRUE); if (str[0]) ss_system(str); db_find_key(hDB, 0, "/Programs", &hRootKey); if (hRootKey) { for (i = 0;; i++) { BOOL program_info_auto_start = FALSE; status = db_enum_key(hDB, hRootKey, i, &hKey); if (status == DB_NO_MORE_SUBKEYS) break; db_get_key(hDB, hKey, &key); /* don't check "execute on xxx" */ if (key.type != TID_KEY) continue; size = sizeof(program_info_auto_start); status = db_get_value(hDB, hKey, "Auto start", &program_info_auto_start, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "Cannot get program info auto start, status %d", status); continue; } if (program_info_auto_start) { char start_command[MAX_STRING_LENGTH]; start_command[0] = 0; size = sizeof(start_command); status = db_get_value(hDB, hKey, "Start command", &start_command, &size, TID_STRING, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "Cannot get program info start command, status %d", status); continue; } if (start_command[0]) { cm_msg(MINFO, "cm_transition", "Auto Starting program \"%s\", command \"%s\"", key.name, start_command); ss_system(start_command); } } } } } /* execute programs on startabort */ if (transition == TR_STARTABORT) { /* make sure odb entry is always created, otherwise we only see it after the first aborted run start, maybe never */ std::string cmd; db_get_value_string(hDB, 0, "/Programs/Execute on start abort", 0, &cmd, TRUE, 256); if (!cmd.empty()) ss_system(cmd.c_str()); } /* set new start time in database */ if (transition == TR_START) { /* ASCII format */ std::string now = cm_asctime(); now.reserve(32); db_set_value(hDB, 0, "Runinfo/Start Time", now.c_str(), 32, 1, TID_STRING); /* reset stop time */ seconds = 0; db_set_value(hDB, 0, "Runinfo/Stop Time binary", &seconds, sizeof(seconds), 1, TID_UINT32); /* Seconds since 1.1.1970 */ cm_time(&seconds); db_set_value(hDB, 0, "Runinfo/Start Time binary", &seconds, sizeof(seconds), 1, TID_UINT32); } size = sizeof(state); status = db_get_value(hDB, 0, "Runinfo/State", &state, &size, TID_INT32, TRUE); /* set stop time in database */ if (transition == TR_STOP) { if (status != DB_SUCCESS) cm_msg(MERROR, "cm_transition", "cannot get Runinfo/State in database"); if (state != STATE_STOPPED) { /* stop time binary */ cm_time(&seconds); status = db_set_value(hDB, 0, "Runinfo/Stop Time binary", &seconds, sizeof(seconds), 1, TID_UINT32); if (status != DB_SUCCESS) cm_msg(MERROR, "cm_transition", "cannot set \"Runinfo/Stop Time binary\" in database"); /* stop time ascii */ std::string now = cm_asctime(); now.reserve(32); status = db_set_value(hDB, 0, "Runinfo/Stop Time", now.c_str(), 32, 1, TID_STRING); if (status != DB_SUCCESS) cm_msg(MERROR, "cm_transition", "cannot set \"Runinfo/Stop Time\" in database"); } } status = db_find_key(hDB, 0, "System/Clients", &hRootKey); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "cannot find System/Clients entry in database"); if (errstr) mstrlcpy(errstr, "Cannot find /System/Clients in ODB", errstr_size); return tr_finish(hDB, &s, transition, status, errstr); } std::string trname = cm_transition_name(transition); /* check that all transition clients are alive */ for (int i = 0;;) { status = db_enum_key(hDB, hRootKey, i, &hSubkey); if (status != DB_SUCCESS) break; status = cm_check_client(hDB, hSubkey); if (status == DB_SUCCESS) { /* this client is alive. Check next one! */ i++; continue; } assert(status == CM_NO_CLIENT); /* start from scratch: removing odb entries as we iterate over them * does strange things to db_enum_key() */ i = 0; } /* check for broken RPC connections */ rpc_client_check(); if (debug_flag == 1) printf("---- Transition %s started ----\n", trname.c_str()); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: ---- Transition %s started ----", trname.c_str()); sprintf(tr_key_name, "Transition %s", trname.c_str()); /* search database for clients which registered for transition */ for (int i = 0, status = 0;; i++) { KEY subkey; status = db_enum_key(hDB, hRootKey, i, &hSubkey); if (status == DB_NO_MORE_SUBKEYS) break; status = db_get_key(hDB, hSubkey, &subkey); assert(status == DB_SUCCESS); if (status == DB_SUCCESS) { status = db_find_key(hDB, hSubkey, tr_key_name, &hKeyTrans); if (status == DB_SUCCESS) { db_get_key(hDB, hKeyTrans, &key); for (int j = 0; j < key.num_values; j++) { size = sizeof(sequence_number); status = db_get_data_index(hDB, hKeyTrans, &sequence_number, &size, j, TID_INT32); assert(status == DB_SUCCESS); TrClient *c = new TrClient; c->init_time = ss_millitime(); c->transition = transition; c->run_number = run_number; c->async_flag = async_flag; c->debug_flag = debug_flag; c->sequence_number = sequence_number; c->status = 0; c->key_name = subkey.name; /* get client info */ char client_name[NAME_LENGTH]; size = sizeof(client_name); db_get_value(hDB, hSubkey, "Name", client_name, &size, TID_STRING, TRUE); c->client_name = client_name; char host_name[HOST_NAME_LENGTH]; size = sizeof(host_name); db_get_value(hDB, hSubkey, "Host", host_name, &size, TID_STRING, TRUE); c->host_name = host_name; //printf("Found client [%s] name [%s] transition [%s], i=%d, j=%d\n", subkey.name, client_name, tr_key_name, i, j); if (hSubkey == hKeylocal && ((async_flag & TR_MTHREAD) == 0)) { /* remember own client */ c->port = 0; } else { size = sizeof(port); db_get_value(hDB, hSubkey, "Server Port", &port, &size, TID_INT32, TRUE); c->port = port; } /* check for duplicates */ bool found = false; for (size_t k=0; kclient_name == c->client_name) if (cc->host_name == c->host_name) if (cc->port == c->port) if (cc->sequence_number == c->sequence_number) found = true; } if (!found) { s.clients.push_back(std::unique_ptr(c)); c = NULL; } else { cm_msg(MERROR, "cm_transition", "transition %s: client \"%s\" is registered with sequence number %d more than once", trname.c_str(), c->client_name.c_str(), c->sequence_number); delete c; c = NULL; } } } } } std::sort(s.clients.begin(), s.clients.end(), tr_compare); /* set predecessor for multi-threaded transitions */ for (size_t idx = 0; idx < s.clients.size(); idx++) { if (s.clients[idx]->sequence_number == 0) { // sequence number 0 means "don't care" } else { /* find clients with smaller sequence number */ if (idx > 0) { for (size_t i = idx - 1; ; i--) { if (s.clients[i]->sequence_number < s.clients[idx]->sequence_number) { if (s.clients[i]->sequence_number > 0) { s.clients[idx]->wait_for_index.push_back(i); } } if (i==0) break; } } } } for (size_t idx = 0; idx < s.clients.size(); idx++) { write_tr_client_to_odb(hDB, s.clients[idx].get()); } #if 0 for (size_t idx = 0; idx < s.clients.size(); idx++) { printf("TrClient[%d]: ", int(idx)); s.clients[idx]->Print(); printf("\n"); } #endif /* contact ordered clients for transition -----------------------*/ status = CM_SUCCESS; for (size_t idx = 0; idx < s.clients.size(); idx++) { if (debug_flag == 1) printf("\n==== Found client \"%s\" with sequence number %d\n", s.clients[idx]->client_name.c_str(), s.clients[idx]->sequence_number); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: ==== Found client \"%s\" with sequence number %d", s.clients[idx]->client_name.c_str(), s.clients[idx]->sequence_number); if (async_flag & TR_MTHREAD) { status = CM_SUCCESS; assert(s.clients[idx]->thread == NULL); s.clients[idx]->thread = new std::thread(cm_transition_call, &s, idx); } else { if (s.clients[idx]->port == 0) { /* if own client call transition callback directly */ status = cm_transition_call_direct(s.clients[idx].get()); } else { /* if other client call transition via RPC layer */ status = cm_transition_call(&s, idx); } if (status == CM_SUCCESS && transition != TR_STOP) if (s.clients[idx]->status != SUCCESS) { cm_msg(MERROR, "cm_transition", "transition %s aborted: client \"%s\" returned status %d", trname.c_str(), s.clients[idx]->client_name.c_str(), int(s.clients[idx]->status)); break; } } if (status != CM_SUCCESS) break; } /* wait until all threads have finished */ for (size_t idx = 0; idx < s.clients.size(); idx++) { if (s.clients[idx]->thread) { // join() will wait forever until thread finishes s.clients[idx]->thread->join(); delete s.clients[idx]->thread; s.clients[idx]->thread = NULL; } } /* at this point, all per-client threads have stopped and it is safe to delete TrState and return */ i = 0; size = sizeof(i); status = db_get_value(hDB, 0, "/Runinfo/Transition in progress", &i, &size, TID_INT32, FALSE); if (status == DB_SUCCESS && i == 0) { cm_msg(MERROR, "cm_transition", "transition %s aborted: \"/Runinfo/Transition in progress\" was cleared", trname.c_str()); if (errstr != NULL) mstrlcpy(errstr, "Canceled", errstr_size); return tr_finish(hDB, &s, transition, CM_TRANSITION_CANCELED, "Canceled"); } /* search for any error */ for (size_t idx = 0; idx < s.clients.size(); idx++) if (s.clients[idx]->status != CM_SUCCESS) { status = s.clients[idx]->status; if (errstr) mstrlcpy(errstr, s.clients[idx]->errorstr.c_str(), errstr_size); s.errorstr = msprintf("Aborted by client \"%s\"", s.clients[idx]->client_name.c_str()); break; } if (transition != TR_STOP && status != CM_SUCCESS) { /* indicate abort */ i = 1; db_set_value(hDB, 0, "/Runinfo/Start abort", &i, sizeof(INT), 1, TID_INT32); i = 0; db_set_value(hDB, 0, "/Runinfo/Transition in progress", &i, sizeof(INT), 1, TID_INT32); return tr_finish(hDB, &s, transition, status, errstr); } if (debug_flag == 1) printf("\n---- Transition %s finished ----\n", trname.c_str()); if (debug_flag == 2) cm_msg(MINFO, "cm_transition", "cm_transition: ---- Transition %s finished ----", trname.c_str()); /* set new run state in database */ if (transition == TR_START || transition == TR_RESUME) state = STATE_RUNNING; if (transition == TR_PAUSE) state = STATE_PAUSED; if (transition == TR_STOP) state = STATE_STOPPED; if (transition == TR_STARTABORT) state = STATE_STOPPED; size = sizeof(state); status = db_set_value(hDB, 0, "Runinfo/State", &state, size, 1, TID_INT32); if (status != DB_SUCCESS) cm_msg(MERROR, "cm_transition", "cannot set Runinfo/State in database, db_set_value() status %d", status); /* send notification message */ if (transition == TR_START) cm_msg(MINFO, "cm_transition", "Run #%d started", run_number); if (transition == TR_STOP) cm_msg(MINFO, "cm_transition", "Run #%d stopped", run_number); if (transition == TR_PAUSE) cm_msg(MINFO, "cm_transition", "Run #%d paused", run_number); if (transition == TR_RESUME) cm_msg(MINFO, "cm_transition", "Run #%d resumed", run_number); if (transition == TR_STARTABORT) cm_msg(MINFO, "cm_transition", "Run #%d start aborted", run_number); /* lock/unlock ODB values if present */ db_find_key(hDB, 0, "/Experiment/Lock when running", &hKey); if (hKey) { if (state == STATE_STOPPED) db_set_mode(hDB, hKey, MODE_READ | MODE_WRITE | MODE_DELETE, TRUE); else db_set_mode(hDB, hKey, MODE_READ, TRUE); } /* flush online database */ if (transition == TR_STOP) db_flush_database(hDB); /* execute/stop programs on stop */ if (transition == TR_STOP) { std::string cmd; db_get_value_string(hDB, 0, "/Programs/Execute on stop run", 0, &cmd, TRUE, 256); if (!cmd.empty()) ss_system(cmd.c_str()); db_find_key(hDB, 0, "/Programs", &hRootKey); if (hRootKey) { for (i = 0;; i++) { BOOL program_info_auto_stop = FALSE; status = db_enum_key(hDB, hRootKey, i, &hKey); if (status == DB_NO_MORE_SUBKEYS) break; db_get_key(hDB, hKey, &key); /* don't check "execute on xxx" */ if (key.type != TID_KEY) continue; size = sizeof(program_info_auto_stop); status = db_get_value(hDB, hKey, "Auto stop", &program_info_auto_stop, &size, TID_BOOL, TRUE); if (status != DB_SUCCESS) { cm_msg(MERROR, "cm_transition", "Cannot get program info auto stop, status %d", status); continue; } if (program_info_auto_stop) { cm_msg(MINFO, "cm_transition", "Auto Stopping program \"%s\"", key.name); cm_shutdown(key.name, FALSE); } } } } /* indicate success */ i = 0; db_set_value(hDB, 0, "/Runinfo/Transition in progress", &i, sizeof(INT), 1, TID_INT32); if (errstr != NULL) mstrlcpy(errstr, "Success", errstr_size); return tr_finish(hDB, &s, transition, CM_SUCCESS, "Success"); } /*------------------------------------------------------------------*/ /* wrapper around cm_transition2() to send a TR_STARTABORT in case of failure */ static INT cm_transition1(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag) { int status; status = cm_transition2(transition, run_number, errstr, errstr_size, async_flag, debug_flag); if (transition == TR_START && status != CM_SUCCESS) { cm_msg(MERROR, "cm_transition", "Could not start a run: cm_transition() status %d, message \'%s\'", status, errstr); cm_transition2(TR_STARTABORT, run_number, NULL, 0, async_flag, debug_flag); } return status; } /*------------------------------------------------------------------*/ static INT tr_main_thread(void *param) { INT status; TR_PARAM *trp; trp = (TR_PARAM *) param; status = cm_transition1(trp->transition, trp->run_number, trp->errstr, trp->errstr_size, trp->async_flag, trp->debug_flag); trp->status = status; trp->finished = TRUE; return 0; } INT cm_transition_cleanup() { if (_trp.thread && !_trp.finished) { //printf("main transition thread did not finish yet!\n"); return CM_TRANSITION_IN_PROGRESS; } std::thread* t = _trp.thread.exchange(NULL); if (t) { t->join(); delete t; t = NULL; } return CM_SUCCESS; } /* wrapper around cm_transition1() for detached multi-threaded transitions */ INT cm_transition(INT transition, INT run_number, char *errstr, INT errstr_size, INT async_flag, INT debug_flag) { int mflag = async_flag & TR_MTHREAD; int sflag = async_flag & TR_SYNC; int status = cm_transition_cleanup(); if (status != CM_SUCCESS) { cm_msg(MERROR, "cm_transition", "previous transition did not finish yet"); return CM_TRANSITION_IN_PROGRESS; } /* get key of local client */ HNDLE hDB; cm_get_experiment_database(&hDB, NULL); bool deferred = (transition & TR_DEFERRED) > 0; INT trans_raw = (transition & ~TR_DEFERRED); /* check for valid transition */ if (trans_raw != TR_START && trans_raw != TR_STOP && trans_raw != TR_PAUSE && trans_raw != TR_RESUME && trans_raw != TR_STARTABORT) { cm_msg(MERROR, "cm_transition", "Invalid transition request \"%d\"", transition); if (errstr) { mstrlcpy(errstr, "Invalid transition request", errstr_size); } return CM_INVALID_TRANSITION; } /* check if transition in progress */ if (!deferred) { int i = 0; int size = sizeof(i); db_get_value(hDB, 0, "/Runinfo/Transition in progress", &i, &size, TID_INT32, TRUE); if (i == 1) { if (errstr) { sprintf(errstr, "Start/Stop transition %d already in progress, please try again later\n", i); mstrlcat(errstr, "or set \"/Runinfo/Transition in progress\" manually to zero.\n", errstr_size); } cm_msg(MERROR, "cm_transition", "another transition is already in progress"); return CM_TRANSITION_IN_PROGRESS; } } if (mflag) { _trp.transition = transition; _trp.run_number = run_number; if (sflag) { /* in MTHREAD|SYNC mode, we wait until the main thread finishes and it is safe for it to write into errstr */ _trp.errstr = errstr; _trp.errstr_size = errstr_size; } else { /* in normal MTHREAD mode, we return right away and * if errstr is a local variable in the caller and they return too, * errstr becomes a stale reference and writing into it will corrupt the stack * in the mlogger, errstr is a local variable in "start_the_run", "stop_the_run" * and we definitely corrupt mlogger memory with out this: */ _trp.errstr = NULL; _trp.errstr_size = 0; } _trp.async_flag = async_flag; _trp.debug_flag = debug_flag; _trp.status = 0; _trp.finished = FALSE; if (errstr) *errstr = 0; // null error string //ss_thread_create(tr_main_thread, &_trp); std::thread* t = _trp.thread.exchange(new std::thread(tr_main_thread, &_trp)); assert(t==NULL); // previous thread should have been reaped by cm_transition_cleanup() if (sflag) { /* wait until main thread has finished */ do { ss_sleep(10); } while (!_trp.finished); std::thread* t = _trp.thread.exchange(NULL); if (t) { t->join(); delete t; t = NULL; } return _trp.status; } } else return cm_transition1(transition, run_number, errstr, errstr_size, async_flag, debug_flag); return CM_SUCCESS; } /**dox***************************************************************/ #ifndef DOXYGEN_SHOULD_SKIP_THIS /********************************************************************/ INT cm_dispatch_ipc(const char *message, int message_size, int client_socket) /********************************************************************\ Routine: cm_dispatch_ipc Purpose: Called from ss_suspend if an IPC message arrives Input: INT msg IPC message we got, MSG_ODB/MSG_BM INT p1, p2 Optional parameters int s Optional server socket Output: none Function value: CM_SUCCESS Successful completion \********************************************************************/ { if (message[0] == 'O') { HNDLE hDB, hKey, hKeyRoot; INT index; index = 0; sscanf(message + 2, "%d %d %d %d", &hDB, &hKeyRoot, &hKey, &index); if (client_socket) { return db_update_record_mserver(hDB, hKeyRoot, hKey, index, client_socket); } else { return db_update_record_local(hDB, hKeyRoot, hKey, index); } } /* message == "B" means "resume event sender" */ if (message[0] == 'B' && message[2] != ' ') { char str[NAME_LENGTH]; //printf("cm_dispatch_ipc: message [%s], s=%d\n", message, s); mstrlcpy(str, message + 2, sizeof(str)); if (strchr(str, ' ')) *strchr(str, ' ') = 0; if (client_socket) return bm_notify_client(str, client_socket); else return bm_push_event(str); } //printf("cm_dispatch_ipc: message [%s] ignored\n", message); return CM_SUCCESS; } /********************************************************************/ static BOOL _ctrlc_pressed = FALSE; void cm_ctrlc_handler(int sig) { if (_ctrlc_pressed) { printf("Received 2nd Ctrl-C, hard abort\n"); exit(0); } printf("Received Ctrl-C, aborting...\n"); _ctrlc_pressed = TRUE; ss_ctrlc_handler(cm_ctrlc_handler); } BOOL cm_is_ctrlc_pressed() { return _ctrlc_pressed; } void cm_ack_ctrlc_pressed() { _ctrlc_pressed = FALSE; } /********************************************************************/ int cm_exec_script(const char *odb_path_to_script) /********************************************************************\ Routine: cm_exec_script Purpose: Execute script from /Script tree exec_script is enabled by the tree /Script The /Script struct is composed of list of keys from which the name of the key is the button name and the sub-structure is a record as follow: /Script/ =