Skip to content
Snippets Groups Projects
Commit 778f2383 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Keep queues shorter by expunging obsolete items as we go

parent 683169fd
No related branches found
No related tags found
No related merge requests found
......@@ -103,11 +103,11 @@ class WindowContextMgr(object):
def expire_queue(self, queue, time_key, window):
aggr_events = []
while queue:
ctx = queue[0][1]
if ctx not in self.contexts:
# event went away while processing another queue, so just clean up
timestamp, ctx = queue[0]
if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]:
# event went away while processing another queue or obsolete record so just clean up
heapq.heappop(queue)
elif self.contexts[ctx][time_key] < self.update_timestamp - window:
elif timestamp < self.update_timestamp - window:
heapq.heappop(queue)
closed = self.ctx_close(self.contexts[ctx])
if closed is not None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment