Skip to content
Snippets Groups Projects
Commit 9af24d63 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Keep queues shorter by expunging obsolete items as we go

parent 3ad7e517
No related branches found
No related tags found
No related merge requests found
...@@ -103,16 +103,16 @@ class WindowContextMgr(object): ...@@ -103,16 +103,16 @@ class WindowContextMgr(object):
def expire_queue(self, queue, time_key, window): def expire_queue(self, queue, time_key, window):
aggr_events = [] aggr_events = []
while queue: while queue:
ctx = queue[0][1] timestamp, ctx = queue[0]
if ctx not in self.contexts: if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]:
# event went away while processing another queue, so just clean up # event went away while processing another queue or obsolete record so just clean up
heapq.heappop(queue) heapq.heappop(queue)
elif self.contexts[ctx][time_key] < self.update_timestamp - window: elif timestamp < self.update_timestamp - window:
heapq.heappop(queue) heapq.heappop(queue)
closed = self.ctx_close(self.contexts[ctx]) closed = self.ctx_close(self.contexts[ctx])
if closed is not None: if closed is not None:
aggr_events.append(closed) aggr_events.append(closed)
del self.contexts[ctx] del self.contexts[ctx]
else: else:
break break
return aggr_events return aggr_events
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment