diff --git a/warden3/contrib/connectors/hp-labrea/labrea-idea.py b/warden3/contrib/connectors/hp-labrea/labrea-idea.py index 14d15ffb4670c402bd93ff7944e862c942fde45a..78d3fcf245fbdd78688313ed6ae33f3a7e51bfe4 100755 --- a/warden3/contrib/connectors/hp-labrea/labrea-idea.py +++ b/warden3/contrib/connectors/hp-labrea/labrea-idea.py @@ -11,6 +11,9 @@ import signal import contextlib import uuid import json +import socket +import resource +import atexit import os.path as pth from collections import namedtuple try: @@ -68,7 +71,7 @@ class FileWatcher(object): def close(self): try: - if not self.f: + if self.f: self.f.close() except IOError: pass @@ -316,7 +319,50 @@ class IdeaGen(object): "Target": itargets, "Node": [inode] } - return json.dumps(idea) + return json.dumps(idea, ensure_ascii=True) + + +class Filer(object): + + def __init__(self, directory): + self.basedir = self._ensure_path(directory) + self.tmp = self._ensure_path(pth.join(self.basedir, "tmp")) + self.incoming = self._ensure_path(pth.join(self.basedir, "incoming")) + self.hostname = socket.gethostname() + self.pid = os.getpid() + + def _ensure_path(self, p): + try: + os.mkdir(p) + except OSError: + if not pth.isdir(p): + raise + return p + + def _get_new_name(self, fd=None): + (inode, device) = os.fstat(fd)[1:3] if fd else (0, 0) + return "%s.%d.%f.%d.%d" % ( + self.hostname, self.pid, time.time(), device, inode) + + def create_unique_file(self): + # First find and open name unique within tmp + tmpname = None + while not tmpname: + tmpname = self._get_new_name() + try: + fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL) + except OSError as e: + if e.errno != errno.EEXIST: + raise # other errors than duplicates should get noticed + tmpname = None + # Now we know the device/inode, rename to make unique within system + newname = self._get_new_name(fd) + os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname)) + nf = os.fdopen(fd, "w") + return nf, newname + + def publish_file(self, short_name): + os.rename(pth.join(self.tmp, short_name), pth.join(self.incoming, short_name)) def daemonize( @@ -338,7 +384,10 @@ def daemonize( # Doublefork, split session if os.fork()>0: os._exit(0) - os.setsid() + try: + os.setsid() + except OSError: + pass if os.fork()>0: os._exit(0) # Setup signal handlers @@ -411,12 +460,30 @@ def get_args(): type="int", action="store", help="log file polling interval") + optp.add_option("-d", "--dir", + default=None, + dest="dir", + type="string", + action="store", + help="Target directory (mandatory)"), optp.add_option("-p", "--pid", default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"), dest="pid", type="string", action="store", help="create PID file with this name (default: %default)") + optp.add_option("-u", "--uid", + default=None, + dest="uid", + type="int", + action="store", + help="user id to run under") + optp.add_option("-g", "--gid", + default=None, + dest="gid", + type="int", + action="store", + help="group id to run under") optp.add_option("--realtime", default=True, dest="realtime", @@ -454,25 +521,30 @@ def reload_me(signum, frame): reload_flag = True def main(): - # safedir output - # daemon global reload_flag optp = get_args() opts, args = optp.parse_args() - if not args or opts.name is None: + if not args or opts.name is None or opts.dir is None: optp.print_help() sys.exit() - signal.signal(signal.SIGINT, terminate_me) - signal.signal(signal.SIGTERM, terminate_me) - signal.signal(signal.SIGHUP, reload_me) if opts.oneshot: files = [open(arg) for arg in args] else: + daemonize( + pidfile = opts.pid, + uid = opts.uid, + gid = opts.gid, + signals = { + signal.SIGINT: terminate_me, + signal.SIGTERM: terminate_me, + signal.SIGHUP: reload_me + }) files = [FileWatcher(arg) for arg in args] with contextlib.nested(*files): lines = itertools.izip(*files) ideagen = IdeaGen(name=opts.name, test=opts.test) contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]] + filer = Filer(opts.dir) for line_set in lines: for line in line_set: if line: @@ -494,8 +566,11 @@ def main(): if timestamp is not None: for context in contexts: aggr.extend(context.process(event, timestamp)) - for a in aggr: - print(a) + for event in aggr: + f, name = filer.create_unique_file() + with f: + f.write(event) + filer.publish_file(name) if not running_flag: break if reload_flag: @@ -509,7 +584,10 @@ def main(): aggr = [] for context in contexts: aggr.extend(context.close()) - for a in aggr: - print(a) + for event in aggr: + f, name = filer.create_unique_file() + with f: + f.write(event) + filer.publish_file(name) main()