Skip to content
Snippets Groups Projects
Commit 8797c181 authored by Jan Mach's avatar Jan Mach
Browse files

Fix: Make sure all possibly uncommitted IDEA messages are committed during daemon shutdown.

Previously it was possible that during the daemon shutdown some IDEA messages could remain uncommitted when running in bulk commit mode. It was necessary to make use of 'stop' event to force commit and switch back to immediate commit mode for the rest of the events that still might be in the event queue (it is not guaranteed the stop event will be last in the queue, some of the previous events might still schedule some other event, which will be handled after the stop event). (Redmine issue: #4572)
parent 2ad7c6b6
No related branches found
No related tags found
No related merge requests found
...@@ -26,7 +26,7 @@ requests==2.22.0 ...@@ -26,7 +26,7 @@ requests==2.22.0
rrdtool==0.1.15 rrdtool==0.1.15
pyyaml==5.1.2 pyyaml==5.1.2
pydgets==0.9 pydgets==0.9
pyzenkit==0.57 pyzenkit==0.58
pynspect==0.17 pynspect==0.17
ipranges==0.1.10 ipranges==0.1.10
typedcols==0.1.13 typedcols==0.1.13
......
...@@ -49,6 +49,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -49,6 +49,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
Daemon component capable of storing IDEA messages into database. Daemon component capable of storing IDEA messages into database.
""" """
EVENT_START = 'start' EVENT_START = 'start'
EVENT_STOP = 'stop'
EVENT_MSG_PROCESS = 'message_process' EVENT_MSG_PROCESS = 'message_process'
EVENT_DBH_COMMIT = 'dbh_commit' EVENT_DBH_COMMIT = 'dbh_commit'
EVENT_LOG_STATISTICS = 'log_statistics' EVENT_LOG_STATISTICS = 'log_statistics'
...@@ -80,13 +82,14 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -80,13 +82,14 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
# Permit changing of default event mapping # Permit changing of default event mapping
self.event_map = kwargs.get('event_map', { self.event_map = kwargs.get('event_map', {
self.EVENT_START: self.EVENT_START, self.EVENT_START: self.EVENT_START,
self.EVENT_STOP: self.EVENT_STOP,
self.EVENT_MSG_PROCESS: self.EVENT_MSG_PROCESS, self.EVENT_MSG_PROCESS: self.EVENT_MSG_PROCESS,
self.EVENT_DBH_COMMIT: self.EVENT_DBH_COMMIT, self.EVENT_DBH_COMMIT: self.EVENT_DBH_COMMIT,
self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS
}) })
def _event_insert_now(self, daemon, args): def _event_insert_now(self, daemon, args):
# Attempt to store IDEA message into database. # Attempt to store IDEA message into database with immediate commit.
self.event_service.insert_event(args['idea']) self.event_service.insert_event(args['idea'])
daemon.logger.info( daemon.logger.info(
"Component '{}': Stored message '{}':'{}' into database.".format( "Component '{}': Stored message '{}':'{}' into database.".format(
...@@ -96,8 +99,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -96,8 +99,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
) )
) )
def _event_insert_later(self, daemon, args): def _event_insert_bulk(self, daemon, args):
# Attempt to store IDEA message into database. # Attempt to store IDEA message into database with delayed commit.
self.event_service.insert_event_bulkci(args['idea']) self.event_service.insert_event_bulkci(args['idea'])
daemon.logger.info( daemon.logger.info(
"Component '{}': Stored message '{}':'{}' into database (bulk mode).".format( "Component '{}': Stored message '{}':'{}' into database (bulk mode).".format(
...@@ -121,6 +124,18 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -121,6 +124,18 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
self.events_uncommitted = 0 self.events_uncommitted = 0
self.last_commit = time.time() self.last_commit = time.time()
def _setup_insert_now(self, daemon):
self.commit_bulk = False
self.event_gateway = self._event_insert_now
def _setup_insert_bulk(self, daemon):
self.commit_bulk = True
self.event_gateway = self._event_insert_bulk
self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV)
self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR)
self.events_uncommitted = 0
self.last_commit = time.time()
def setup(self, daemon): def setup(self, daemon):
""" """
Perform component setup. Perform component setup.
...@@ -134,11 +149,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -134,11 +149,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
) )
) )
if self.commit_bulk: if self.commit_bulk:
self.event_gateway = self._event_insert_later self._setup_insert_bulk(daemon)
self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV)
self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR)
self.events_uncommitted = 0
self.last_commit = time.time()
daemon.logger.info( daemon.logger.info(
"[STATUS] Component '{}': Using bulk commits with '{}' as enforced commit interval".format( "[STATUS] Component '{}': Using bulk commits with '{}' as enforced commit interval".format(
self.cid, self.cid,
...@@ -152,19 +163,23 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -152,19 +163,23 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
) )
) )
else: else:
self.event_gateway = self._event_insert_now self._setup_insert_now(daemon)
def get_events(self): def get_events(self):
""" """
Get the list of event names and their appropriate callback handlers. Get the list of event names and their appropriate callback handlers.
""" """
return [ return [
{ {
'event': self.event_map[self.EVENT_START], 'event': self.event_map[self.EVENT_START],
'callback': self.cbk_event_start, 'callback': self.cbk_event_start,
'prepend': False 'prepend': False
}, },
{
'event': self.event_map[self.EVENT_STOP],
'callback': self.cbk_event_stop,
'prepend': False
},
{ {
'event': self.event_map[self.EVENT_MSG_PROCESS], 'event': self.event_map[self.EVENT_MSG_PROCESS],
'callback': self.cbk_event_message_process, 'callback': self.cbk_event_message_process,
...@@ -203,6 +218,24 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -203,6 +218,24 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
return (daemon.FLAG_CONTINUE, args) return (daemon.FLAG_CONTINUE, args)
def cbk_event_stop(self, daemon, args):
"""
Stop the component.
"""
daemon.logger.debug(
"Component '{}': Stopping the component".format(
self.cid
)
)
# In case we are running in bulk commit mode.
if self.commit_bulk:
# Commit all currently pending IDEA messages.
self._commit_pending()
# Switch to immediate commit mode for the rest of the messages in the queue.
self._setup_insert_now(daemon)
return (daemon.FLAG_CONTINUE, args)
def cbk_event_message_process(self, daemon, args): def cbk_event_message_process(self, daemon, args):
""" """
Store the message into the persistent storage. Store the message into the persistent storage.
...@@ -255,11 +288,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -255,11 +288,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
self.commit_bulkintv self.commit_bulkintv
) )
) )
self.event_service.commit_bulk() self._commit_pending()
self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted)
self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT)
self.events_uncommitted = 0
self.last_commit = time.time()
daemon.queue.schedule_after(self.commit_bulkintv, self.EVENT_DBH_COMMIT) daemon.queue.schedule_after(self.commit_bulkintv, self.EVENT_DBH_COMMIT)
return (daemon.FLAG_CONTINUE, args) return (daemon.FLAG_CONTINUE, args)
...@@ -284,3 +313,10 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent): ...@@ -284,3 +313,10 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
) )
) )
return (daemon.FLAG_CONTINUE, args) return (daemon.FLAG_CONTINUE, args)
def _commit_pending(self):
self.event_service.commit_bulk()
self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted)
self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT)
self.events_uncommitted = 0
self.last_commit = time.time()
Subproject commit bf1e3727bfc74038ff52e4d9e7715f1799a71d6d Subproject commit 0796c110b270be2491a7bfd1b0bb432539dc5bbc
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment