diff --git a/hp-labrea/labrea-idea.py b/hp-labrea/labrea-idea.py
index 7e0904d3494abd90ed8f6a90e3bcb482b3be9782..167c65790f4064ddeb47b51dcd1937314c40f3f0 100755
--- a/hp-labrea/labrea-idea.py
+++ b/hp-labrea/labrea-idea.py
@@ -5,7 +5,6 @@ import os
import sys
import re
import time
-import heapq
import optparse
import itertools
import signal
@@ -14,6 +13,7 @@ import uuid
import json
import os.path as pth
from collections import namedtuple
+from collections import OrderedDict
class FileWatcher(object):
@@ -97,24 +97,23 @@ class WindowContextMgr(object):
self.window = window
self.timeout = timeout
self.ideagen = ideagen
- self.first_update_queue = []
- self.last_update_queue = []
+ self.first_update_queue = OrderedDict()
+ self.last_update_queue = OrderedDict()
- def expire_queue(self, queue, time_key, window):
+ def expire_queue(self, queue, window):
aggr_events = []
- while queue:
- timestamp, ctx = queue[0]
- if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]:
- # event went away while processing another queue or obsolete record so just clean up
- heapq.heappop(queue)
- elif timestamp < self.update_timestamp - window:
- heapq.heappop(queue)
- closed = self.ctx_close(self.contexts[ctx])
- if closed is not None:
- aggr_events.append(closed)
- del self.contexts[ctx]
- else:
+ ctx_to_del = []
+ for ctx, timestamp in queue.iteritems():
+ if timestamp >= self.update_timestamp - window:
break
+ closed = self.ctx_close(self.contexts[ctx])
+ if closed is not None:
+ aggr_events.append(closed)
+ ctx_to_del.append(ctx)
+ for ctx in ctx_to_del:
+ del self.contexts[ctx]
+ del self.first_update_queue[ctx]
+ del self.last_update_queue[ctx]
return aggr_events
def process(self, event=None, timestamp=None):
@@ -122,18 +121,20 @@ class WindowContextMgr(object):
aggr_events = []
- aggr_events.extend(self.expire_queue(self.first_update_queue, "first_update", self.window))
- aggr_events.extend(self.expire_queue(self.last_update_queue, "last_update", self.timeout))
+ aggr_events.extend(self.expire_queue(self.first_update_queue, self.window))
+ aggr_events.extend(self.expire_queue(self.last_update_queue, self.timeout))
if event is not None:
ctx = self.match(event)
if ctx is not None:
if ctx not in self.contexts:
self.contexts[ctx] = self.ctx_create(event)
- heapq.heappush(self.first_update_queue, (self.update_timestamp, ctx))
+ self.first_update_queue[ctx] = self.update_timestamp
+ self.last_update_queue[ctx] = self.update_timestamp
else:
self.ctx_append(self.contexts[ctx], event)
- heapq.heappush(self.last_update_queue, (self.update_timestamp, ctx))
+ del self.last_update_queue[ctx]
+ self.last_update_queue[ctx] = self.update_timestamp
return aggr_events
@@ -145,8 +146,8 @@ class WindowContextMgr(object):
if closed is not None:
aggr_events.append(closed)
self.contexts = {}
- self.first_update_queue = []
- self.last_update_queue = []
+ self.first_update_queue = OrderedDict()
+ self.last_update_queue = OrderedDict()
return aggr_events