diff --git a/conf/requirements.pip b/conf/requirements.pip index 50b07cd6b6deb267a9260d416333a5135fafd27d..ab9c66c1026eae7b682f67b9d23fec8a1ffb9d6a 100644 --- a/conf/requirements.pip +++ b/conf/requirements.pip @@ -26,7 +26,7 @@ requests==2.22.0 rrdtool==0.1.15 pyyaml==5.1.2 pydgets==0.9 -pyzenkit==0.57 +pyzenkit==0.58 pynspect==0.17 ipranges==0.1.10 typedcols==0.1.13 diff --git a/lib/mentat/daemon/component/storage.py b/lib/mentat/daemon/component/storage.py index 7614935cf350a6a4d525b6eb50eeb8ced99a1dda..6e1439828fff3c88a72f04398178d91a8b55fd3f 100644 --- a/lib/mentat/daemon/component/storage.py +++ b/lib/mentat/daemon/component/storage.py @@ -49,6 +49,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): Daemon component capable of storing IDEA messages into database. """ EVENT_START = 'start' + EVENT_STOP = 'stop' + EVENT_MSG_PROCESS = 'message_process' EVENT_DBH_COMMIT = 'dbh_commit' EVENT_LOG_STATISTICS = 'log_statistics' @@ -80,13 +82,14 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): # Permit changing of default event mapping self.event_map = kwargs.get('event_map', { self.EVENT_START: self.EVENT_START, + self.EVENT_STOP: self.EVENT_STOP, self.EVENT_MSG_PROCESS: self.EVENT_MSG_PROCESS, self.EVENT_DBH_COMMIT: self.EVENT_DBH_COMMIT, self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS }) def _event_insert_now(self, daemon, args): - # Attempt to store IDEA message into database. + # Attempt to store IDEA message into database with immediate commit. self.event_service.insert_event(args['idea']) daemon.logger.info( "Component '{}': Stored message '{}':'{}' into database.".format( @@ -96,8 +99,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ) ) - def _event_insert_later(self, daemon, args): - # Attempt to store IDEA message into database. + def _event_insert_bulk(self, daemon, args): + # Attempt to store IDEA message into database with delayed commit. self.event_service.insert_event_bulkci(args['idea']) daemon.logger.info( "Component '{}': Stored message '{}':'{}' into database (bulk mode).".format( @@ -121,6 +124,18 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): self.events_uncommitted = 0 self.last_commit = time.time() + def _setup_insert_now(self, daemon): + self.commit_bulk = False + self.event_gateway = self._event_insert_now + + def _setup_insert_bulk(self, daemon): + self.commit_bulk = True + self.event_gateway = self._event_insert_bulk + self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV) + self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR) + self.events_uncommitted = 0 + self.last_commit = time.time() + def setup(self, daemon): """ Perform component setup. @@ -134,11 +149,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ) ) if self.commit_bulk: - self.event_gateway = self._event_insert_later - self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV) - self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR) - self.events_uncommitted = 0 - self.last_commit = time.time() + self._setup_insert_bulk(daemon) daemon.logger.info( "[STATUS] Component '{}': Using bulk commits with '{}' as enforced commit interval".format( self.cid, @@ -152,19 +163,23 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ) ) else: - self.event_gateway = self._event_insert_now + self._setup_insert_now(daemon) def get_events(self): """ Get the list of event names and their appropriate callback handlers. """ return [ - { 'event': self.event_map[self.EVENT_START], 'callback': self.cbk_event_start, 'prepend': False }, + { + 'event': self.event_map[self.EVENT_STOP], + 'callback': self.cbk_event_stop, + 'prepend': False + }, { 'event': self.event_map[self.EVENT_MSG_PROCESS], 'callback': self.cbk_event_message_process, @@ -203,6 +218,24 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): return (daemon.FLAG_CONTINUE, args) + def cbk_event_stop(self, daemon, args): + """ + Stop the component. + """ + daemon.logger.debug( + "Component '{}': Stopping the component".format( + self.cid + ) + ) + # In case we are running in bulk commit mode. + if self.commit_bulk: + # Commit all currently pending IDEA messages. + self._commit_pending() + # Switch to immediate commit mode for the rest of the messages in the queue. + self._setup_insert_now(daemon) + + return (daemon.FLAG_CONTINUE, args) + def cbk_event_message_process(self, daemon, args): """ Store the message into the persistent storage. @@ -255,11 +288,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): self.commit_bulkintv ) ) - self.event_service.commit_bulk() - self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted) - self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT) - self.events_uncommitted = 0 - self.last_commit = time.time() + self._commit_pending() daemon.queue.schedule_after(self.commit_bulkintv, self.EVENT_DBH_COMMIT) return (daemon.FLAG_CONTINUE, args) @@ -284,3 +313,10 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ) ) return (daemon.FLAG_CONTINUE, args) + + def _commit_pending(self): + self.event_service.commit_bulk() + self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted) + self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT) + self.events_uncommitted = 0 + self.last_commit = time.time() diff --git a/submodules/pyzenkit b/submodules/pyzenkit index bf1e3727bfc74038ff52e4d9e7715f1799a71d6d..0796c110b270be2491a7bfd1b0bb432539dc5bbc 160000 --- a/submodules/pyzenkit +++ b/submodules/pyzenkit @@ -1 +1 @@ -Subproject commit bf1e3727bfc74038ff52e4d9e7715f1799a71d6d +Subproject commit 0796c110b270be2491a7bfd1b0bb432539dc5bbc