Skip to content
Snippets Groups Projects
Commit 3842b361 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Reworked heapq queues to OrderedDicts to save memory

parent e0ac8277
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,6 @@ import os
import sys
import re
import time
import heapq
import optparse
import itertools
import signal
......@@ -14,6 +13,7 @@ import uuid
import json
import os.path as pth
from collections import namedtuple
from collections import OrderedDict
class FileWatcher(object):
......@@ -97,24 +97,23 @@ class WindowContextMgr(object):
self.window = window
self.timeout = timeout
self.ideagen = ideagen
self.first_update_queue = []
self.last_update_queue = []
self.first_update_queue = OrderedDict()
self.last_update_queue = OrderedDict()
def expire_queue(self, queue, time_key, window):
def expire_queue(self, queue, window):
aggr_events = []
while queue:
timestamp, ctx = queue[0]
if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]:
# event went away while processing another queue or obsolete record so just clean up
heapq.heappop(queue)
elif timestamp < self.update_timestamp - window:
heapq.heappop(queue)
closed = self.ctx_close(self.contexts[ctx])
if closed is not None:
aggr_events.append(closed)
del self.contexts[ctx]
else:
ctx_to_del = []
for ctx, timestamp in queue.iteritems():
if timestamp >= self.update_timestamp - window:
break
closed = self.ctx_close(self.contexts[ctx])
if closed is not None:
aggr_events.append(closed)
ctx_to_del.append(ctx)
for ctx in ctx_to_del:
del self.contexts[ctx]
del self.first_update_queue[ctx]
del self.last_update_queue[ctx]
return aggr_events
def process(self, event=None, timestamp=None):
......@@ -122,18 +121,20 @@ class WindowContextMgr(object):
aggr_events = []
aggr_events.extend(self.expire_queue(self.first_update_queue, "first_update", self.window))
aggr_events.extend(self.expire_queue(self.last_update_queue, "last_update", self.timeout))
aggr_events.extend(self.expire_queue(self.first_update_queue, self.window))
aggr_events.extend(self.expire_queue(self.last_update_queue, self.timeout))
if event is not None:
ctx = self.match(event)
if ctx is not None:
if ctx not in self.contexts:
self.contexts[ctx] = self.ctx_create(event)
heapq.heappush(self.first_update_queue, (self.update_timestamp, ctx))
self.first_update_queue[ctx] = self.update_timestamp
self.last_update_queue[ctx] = self.update_timestamp
else:
self.ctx_append(self.contexts[ctx], event)
heapq.heappush(self.last_update_queue, (self.update_timestamp, ctx))
del self.last_update_queue[ctx]
self.last_update_queue[ctx] = self.update_timestamp
return aggr_events
......@@ -145,8 +146,8 @@ class WindowContextMgr(object):
if closed is not None:
aggr_events.append(closed)
self.contexts = {}
self.first_update_queue = []
self.last_update_queue = []
self.first_update_queue = OrderedDict()
self.last_update_queue = OrderedDict()
return aggr_events
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment