From 9af24d63e3b6435a98dfbc47a600f3dc092228c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20K=C3=A1cha?= <ph@cesnet.cz> Date: Tue, 2 May 2017 11:56:49 +0000 Subject: [PATCH] Keep queues shorter by expunging obsolete items as we go --- warden3/contrib/connectors/hp-labrea/labrea-idea.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/warden3/contrib/connectors/hp-labrea/labrea-idea.py b/warden3/contrib/connectors/hp-labrea/labrea-idea.py index 1d383c0..3bb9556 100755 --- a/warden3/contrib/connectors/hp-labrea/labrea-idea.py +++ b/warden3/contrib/connectors/hp-labrea/labrea-idea.py @@ -103,16 +103,16 @@ class WindowContextMgr(object): def expire_queue(self, queue, time_key, window): aggr_events = [] while queue: - ctx = queue[0][1] - if ctx not in self.contexts: - # event went away while processing another queue, so just clean up + timestamp, ctx = queue[0] + if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]: + # event went away while processing another queue or obsolete record so just clean up heapq.heappop(queue) - elif self.contexts[ctx][time_key] < self.update_timestamp - window: + elif timestamp < self.update_timestamp - window: heapq.heappop(queue) closed = self.ctx_close(self.contexts[ctx]) if closed is not None: aggr_events.append(closed) - del self.contexts[ctx] + del self.contexts[ctx] else: break return aggr_events -- GitLab