diff --git a/hp-labrea/labrea-idea.py b/hp-labrea/labrea-idea.py
index 14d15ffb4670c402bd93ff7944e862c942fde45a..78d3fcf245fbdd78688313ed6ae33f3a7e51bfe4 100755
--- a/hp-labrea/labrea-idea.py
+++ b/hp-labrea/labrea-idea.py
@@ -11,6 +11,9 @@ import signal
import contextlib
import uuid
import json
+import socket
+import resource
+import atexit
import os.path as pth
from collections import namedtuple
try:
@@ -68,7 +71,7 @@ class FileWatcher(object):
def close(self):
try:
- if not self.f:
+ if self.f:
self.f.close()
except IOError:
pass
@@ -316,7 +319,50 @@ class IdeaGen(object):
"Target": itargets,
"Node": [inode]
}
- return json.dumps(idea)
+ return json.dumps(idea, ensure_ascii=True)
+
+
+class Filer(object):
+
+ def __init__(self, directory):
+ self.basedir = self._ensure_path(directory)
+ self.tmp = self._ensure_path(pth.join(self.basedir, "tmp"))
+ self.incoming = self._ensure_path(pth.join(self.basedir, "incoming"))
+ self.hostname = socket.gethostname()
+ self.pid = os.getpid()
+
+ def _ensure_path(self, p):
+ try:
+ os.mkdir(p)
+ except OSError:
+ if not pth.isdir(p):
+ raise
+ return p
+
+ def _get_new_name(self, fd=None):
+ (inode, device) = os.fstat(fd)[1:3] if fd else (0, 0)
+ return "%s.%d.%f.%d.%d" % (
+ self.hostname, self.pid, time.time(), device, inode)
+
+ def create_unique_file(self):
+ # First find and open name unique within tmp
+ tmpname = None
+ while not tmpname:
+ tmpname = self._get_new_name()
+ try:
+ fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise # other errors than duplicates should get noticed
+ tmpname = None
+ # Now we know the device/inode, rename to make unique within system
+ newname = self._get_new_name(fd)
+ os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname))
+ nf = os.fdopen(fd, "w")
+ return nf, newname
+
+ def publish_file(self, short_name):
+ os.rename(pth.join(self.tmp, short_name), pth.join(self.incoming, short_name))
def daemonize(
@@ -338,7 +384,10 @@ def daemonize(
# Doublefork, split session
if os.fork()>0:
os._exit(0)
- os.setsid()
+ try:
+ os.setsid()
+ except OSError:
+ pass
if os.fork()>0:
os._exit(0)
# Setup signal handlers
@@ -411,12 +460,30 @@ def get_args():
type="int",
action="store",
help="log file polling interval")
+ optp.add_option("-d", "--dir",
+ default=None,
+ dest="dir",
+ type="string",
+ action="store",
+ help="Target directory (mandatory)"),
optp.add_option("-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid",
type="string",
action="store",
help="create PID file with this name (default: %default)")
+ optp.add_option("-u", "--uid",
+ default=None,
+ dest="uid",
+ type="int",
+ action="store",
+ help="user id to run under")
+ optp.add_option("-g", "--gid",
+ default=None,
+ dest="gid",
+ type="int",
+ action="store",
+ help="group id to run under")
optp.add_option("--realtime",
default=True,
dest="realtime",
@@ -454,25 +521,30 @@ def reload_me(signum, frame):
reload_flag = True
def main():
- # safedir output
- # daemon
global reload_flag
optp = get_args()
opts, args = optp.parse_args()
- if not args or opts.name is None:
+ if not args or opts.name is None or opts.dir is None:
optp.print_help()
sys.exit()
- signal.signal(signal.SIGINT, terminate_me)
- signal.signal(signal.SIGTERM, terminate_me)
- signal.signal(signal.SIGHUP, reload_me)
if opts.oneshot:
files = [open(arg) for arg in args]
else:
+ daemonize(
+ pidfile = opts.pid,
+ uid = opts.uid,
+ gid = opts.gid,
+ signals = {
+ signal.SIGINT: terminate_me,
+ signal.SIGTERM: terminate_me,
+ signal.SIGHUP: reload_me
+ })
files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files):
lines = itertools.izip(*files)
ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
+ filer = Filer(opts.dir)
for line_set in lines:
for line in line_set:
if line:
@@ -494,8 +566,11 @@ def main():
if timestamp is not None:
for context in contexts:
aggr.extend(context.process(event, timestamp))
- for a in aggr:
- print(a)
+ for event in aggr:
+ f, name = filer.create_unique_file()
+ with f:
+ f.write(event)
+ filer.publish_file(name)
if not running_flag:
break
if reload_flag:
@@ -509,7 +584,10 @@ def main():
aggr = []
for context in contexts:
aggr.extend(context.close())
- for a in aggr:
- print(a)
+ for event in aggr:
+ f, name = filer.create_unique_file()
+ with f:
+ f.write(event)
+ filer.publish_file(name)
main()