diff --git a/warden3/contrib/connectors/hp-labrea/labrea-idea.py b/warden3/contrib/connectors/hp-labrea/labrea-idea.py index 7e0904d3494abd90ed8f6a90e3bcb482b3be9782..167c65790f4064ddeb47b51dcd1937314c40f3f0 100755 --- a/warden3/contrib/connectors/hp-labrea/labrea-idea.py +++ b/warden3/contrib/connectors/hp-labrea/labrea-idea.py @@ -5,7 +5,6 @@ import os import sys import re import time -import heapq import optparse import itertools import signal @@ -14,6 +13,7 @@ import uuid import json import os.path as pth from collections import namedtuple +from collections import OrderedDict class FileWatcher(object): @@ -97,24 +97,23 @@ class WindowContextMgr(object): self.window = window self.timeout = timeout self.ideagen = ideagen - self.first_update_queue = [] - self.last_update_queue = [] + self.first_update_queue = OrderedDict() + self.last_update_queue = OrderedDict() - def expire_queue(self, queue, time_key, window): + def expire_queue(self, queue, window): aggr_events = [] - while queue: - timestamp, ctx = queue[0] - if ctx not in self.contexts or timestamp < self.contexts[ctx][time_key]: - # event went away while processing another queue or obsolete record so just clean up - heapq.heappop(queue) - elif timestamp < self.update_timestamp - window: - heapq.heappop(queue) - closed = self.ctx_close(self.contexts[ctx]) - if closed is not None: - aggr_events.append(closed) - del self.contexts[ctx] - else: + ctx_to_del = [] + for ctx, timestamp in queue.iteritems(): + if timestamp >= self.update_timestamp - window: break + closed = self.ctx_close(self.contexts[ctx]) + if closed is not None: + aggr_events.append(closed) + ctx_to_del.append(ctx) + for ctx in ctx_to_del: + del self.contexts[ctx] + del self.first_update_queue[ctx] + del self.last_update_queue[ctx] return aggr_events def process(self, event=None, timestamp=None): @@ -122,18 +121,20 @@ class WindowContextMgr(object): aggr_events = [] - aggr_events.extend(self.expire_queue(self.first_update_queue, "first_update", self.window)) - aggr_events.extend(self.expire_queue(self.last_update_queue, "last_update", self.timeout)) + aggr_events.extend(self.expire_queue(self.first_update_queue, self.window)) + aggr_events.extend(self.expire_queue(self.last_update_queue, self.timeout)) if event is not None: ctx = self.match(event) if ctx is not None: if ctx not in self.contexts: self.contexts[ctx] = self.ctx_create(event) - heapq.heappush(self.first_update_queue, (self.update_timestamp, ctx)) + self.first_update_queue[ctx] = self.update_timestamp + self.last_update_queue[ctx] = self.update_timestamp else: self.ctx_append(self.contexts[ctx], event) - heapq.heappush(self.last_update_queue, (self.update_timestamp, ctx)) + del self.last_update_queue[ctx] + self.last_update_queue[ctx] = self.update_timestamp return aggr_events @@ -145,8 +146,8 @@ class WindowContextMgr(object): if closed is not None: aggr_events.append(closed) self.contexts = {} - self.first_update_queue = [] - self.last_update_queue = [] + self.first_update_queue = OrderedDict() + self.last_update_queue = OrderedDict() return aggr_events