From 8797c18191533481cb98d2f4ce55d98c65303426 Mon Sep 17 00:00:00 2001
From: Jan Mach <jan.mach@cesnet.cz>
Date: Sat, 11 Jan 2020 11:25:01 +0100
Subject: [PATCH] Fix: Make sure all possibly uncommitted IDEA messages are
 committed during daemon shutdown.

Previously it was possible that during the daemon shutdown some IDEA messages could remain uncommitted when running in bulk commit mode. It was necessary to make use of 'stop' event to force commit and switch back to immediate commit mode for the rest of the events that still might be in the event queue (it is not guaranteed the stop event will be last in the queue, some of the previous events might still schedule some other event, which will be handled after the stop event). (Redmine issue: #4572)
---
 conf/requirements.pip                  |  2 +-
 lib/mentat/daemon/component/storage.py | 66 ++++++++++++++++++++------
 submodules/pyzenkit                    |  2 +-
 3 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/conf/requirements.pip b/conf/requirements.pip
index 50b07cd6..ab9c66c1 100644
--- a/conf/requirements.pip
+++ b/conf/requirements.pip
@@ -26,7 +26,7 @@ requests==2.22.0
 rrdtool==0.1.15
 pyyaml==5.1.2
 pydgets==0.9
-pyzenkit==0.57
+pyzenkit==0.58
 pynspect==0.17
 ipranges==0.1.10
 typedcols==0.1.13
diff --git a/lib/mentat/daemon/component/storage.py b/lib/mentat/daemon/component/storage.py
index 7614935c..6e143982 100644
--- a/lib/mentat/daemon/component/storage.py
+++ b/lib/mentat/daemon/component/storage.py
@@ -49,6 +49,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
     Daemon component capable of storing IDEA messages into database.
     """
     EVENT_START          = 'start'
+    EVENT_STOP           = 'stop'
+
     EVENT_MSG_PROCESS    = 'message_process'
     EVENT_DBH_COMMIT     = 'dbh_commit'
     EVENT_LOG_STATISTICS = 'log_statistics'
@@ -80,13 +82,14 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
         # Permit changing of default event mapping
         self.event_map = kwargs.get('event_map', {
             self.EVENT_START:          self.EVENT_START,
+            self.EVENT_STOP:           self.EVENT_STOP,
             self.EVENT_MSG_PROCESS:    self.EVENT_MSG_PROCESS,
             self.EVENT_DBH_COMMIT:     self.EVENT_DBH_COMMIT,
             self.EVENT_LOG_STATISTICS: self.EVENT_LOG_STATISTICS
         })
 
     def _event_insert_now(self, daemon, args):
-        # Attempt to store IDEA message into database.
+        # Attempt to store IDEA message into database with immediate commit.
         self.event_service.insert_event(args['idea'])
         daemon.logger.info(
             "Component '{}': Stored message '{}':'{}' into database.".format(
@@ -96,8 +99,8 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
             )
         )
 
-    def _event_insert_later(self, daemon, args):
-        # Attempt to store IDEA message into database.
+    def _event_insert_bulk(self, daemon, args):
+        # Attempt to store IDEA message into database with delayed commit.
         self.event_service.insert_event_bulkci(args['idea'])
         daemon.logger.info(
             "Component '{}': Stored message '{}':'{}' into database (bulk mode).".format(
@@ -121,6 +124,18 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
             self.events_uncommitted = 0
             self.last_commit = time.time()
 
+    def _setup_insert_now(self, daemon):
+        self.commit_bulk = False
+        self.event_gateway = self._event_insert_now
+
+    def _setup_insert_bulk(self, daemon):
+        self.commit_bulk = True
+        self.event_gateway = self._event_insert_bulk
+        self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV)
+        self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR)
+        self.events_uncommitted = 0
+        self.last_commit = time.time()
+
     def setup(self, daemon):
         """
         Perform component setup.
@@ -134,11 +149,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
             )
         )
         if self.commit_bulk:
-            self.event_gateway = self._event_insert_later
-            self.commit_bulkintv = daemon.c(CONFIG_COMMIT_BULKINTV)
-            self.commit_bulkthr = daemon.c(CONFIG_COMMIT_BULKTHR)
-            self.events_uncommitted = 0
-            self.last_commit = time.time()
+            self._setup_insert_bulk(daemon)
             daemon.logger.info(
                 "[STATUS] Component '{}': Using bulk commits with '{}' as enforced commit interval".format(
                     self.cid,
@@ -152,19 +163,23 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
                 )
             )
         else:
-            self.event_gateway = self._event_insert_now
+            self._setup_insert_now(daemon)
 
     def get_events(self):
         """
         Get the list of event names and their appropriate callback handlers.
         """
         return [
-
             {
                 'event': self.event_map[self.EVENT_START],
                 'callback': self.cbk_event_start,
                 'prepend': False
             },
+            {
+                'event': self.event_map[self.EVENT_STOP],
+                'callback': self.cbk_event_stop,
+                'prepend': False
+            },
             {
                 'event': self.event_map[self.EVENT_MSG_PROCESS],
                 'callback': self.cbk_event_message_process,
@@ -203,6 +218,24 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
 
         return (daemon.FLAG_CONTINUE, args)
 
+    def cbk_event_stop(self, daemon, args):
+        """
+        Stop the component.
+        """
+        daemon.logger.debug(
+            "Component '{}': Stopping the component".format(
+                self.cid
+            )
+        )
+        # In case we are running in bulk commit mode.
+        if self.commit_bulk:
+            # Commit all currently pending IDEA messages.
+            self._commit_pending()
+            # Switch to immediate commit mode for the rest of the messages in the queue.
+            self._setup_insert_now(daemon)
+
+        return (daemon.FLAG_CONTINUE, args)
+
     def cbk_event_message_process(self, daemon, args):
         """
         Store the message into the persistent storage.
@@ -255,11 +288,7 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
                     self.commit_bulkintv
                 )
             )
-            self.event_service.commit_bulk()
-            self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted)
-            self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT)
-            self.events_uncommitted = 0
-            self.last_commit = time.time()
+            self._commit_pending()
 
         daemon.queue.schedule_after(self.commit_bulkintv, self.EVENT_DBH_COMMIT)
         return (daemon.FLAG_CONTINUE, args)
@@ -284,3 +313,10 @@ class StorageDaemonComponent(pyzenkit.zendaemon.ZenDaemonComponent):
             )
         )
         return (daemon.FLAG_CONTINUE, args)
+
+    def _commit_pending(self):
+        self.event_service.commit_bulk()
+        self.inc_statistic(self.STATS_CNT_COMMIT_TIMEOUT, self.events_uncommitted)
+        self.inc_statistic(self.STATS_CNT_COMMITS_TIMEOUT)
+        self.events_uncommitted = 0
+        self.last_commit = time.time()
diff --git a/submodules/pyzenkit b/submodules/pyzenkit
index bf1e3727..0796c110 160000
--- a/submodules/pyzenkit
+++ b/submodules/pyzenkit
@@ -1 +1 @@
-Subproject commit bf1e3727bfc74038ff52e4d9e7715f1799a71d6d
+Subproject commit 0796c110b270be2491a7bfd1b0bb432539dc5bbc
-- 
GitLab