Skip to content
Snippets Groups Projects
Commit 5529ae73 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Added daemonization and file saving

parent f0b177bb
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,9 @@ import signal
import contextlib
import uuid
import json
import socket
import resource
import atexit
import os.path as pth
from collections import namedtuple
try:
......@@ -68,7 +71,7 @@ class FileWatcher(object):
def close(self):
try:
if not self.f:
if self.f:
self.f.close()
except IOError:
pass
......@@ -316,7 +319,50 @@ class IdeaGen(object):
"Target": itargets,
"Node": [inode]
}
return json.dumps(idea)
return json.dumps(idea, ensure_ascii=True)
class Filer(object):
def __init__(self, directory):
self.basedir = self._ensure_path(directory)
self.tmp = self._ensure_path(pth.join(self.basedir, "tmp"))
self.incoming = self._ensure_path(pth.join(self.basedir, "incoming"))
self.hostname = socket.gethostname()
self.pid = os.getpid()
def _ensure_path(self, p):
try:
os.mkdir(p)
except OSError:
if not pth.isdir(p):
raise
return p
def _get_new_name(self, fd=None):
(inode, device) = os.fstat(fd)[1:3] if fd else (0, 0)
return "%s.%d.%f.%d.%d" % (
self.hostname, self.pid, time.time(), device, inode)
def create_unique_file(self):
# First find and open name unique within tmp
tmpname = None
while not tmpname:
tmpname = self._get_new_name()
try:
fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
except OSError as e:
if e.errno != errno.EEXIST:
raise # other errors than duplicates should get noticed
tmpname = None
# Now we know the device/inode, rename to make unique within system
newname = self._get_new_name(fd)
os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname))
nf = os.fdopen(fd, "w")
return nf, newname
def publish_file(self, short_name):
os.rename(pth.join(self.tmp, short_name), pth.join(self.incoming, short_name))
def daemonize(
......@@ -338,7 +384,10 @@ def daemonize(
# Doublefork, split session
if os.fork()>0:
os._exit(0)
try:
os.setsid()
except OSError:
pass
if os.fork()>0:
os._exit(0)
# Setup signal handlers
......@@ -411,12 +460,30 @@ def get_args():
type="int",
action="store",
help="log file polling interval")
optp.add_option("-d", "--dir",
default=None,
dest="dir",
type="string",
action="store",
help="Target directory (mandatory)"),
optp.add_option("-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid",
type="string",
action="store",
help="create PID file with this name (default: %default)")
optp.add_option("-u", "--uid",
default=None,
dest="uid",
type="int",
action="store",
help="user id to run under")
optp.add_option("-g", "--gid",
default=None,
dest="gid",
type="int",
action="store",
help="group id to run under")
optp.add_option("--realtime",
default=True,
dest="realtime",
......@@ -454,25 +521,30 @@ def reload_me(signum, frame):
reload_flag = True
def main():
# safedir output
# daemon
global reload_flag
optp = get_args()
opts, args = optp.parse_args()
if not args or opts.name is None:
if not args or opts.name is None or opts.dir is None:
optp.print_help()
sys.exit()
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
signal.signal(signal.SIGHUP, reload_me)
if opts.oneshot:
files = [open(arg) for arg in args]
else:
daemonize(
pidfile = opts.pid,
uid = opts.uid,
gid = opts.gid,
signals = {
signal.SIGINT: terminate_me,
signal.SIGTERM: terminate_me,
signal.SIGHUP: reload_me
})
files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files):
lines = itertools.izip(*files)
ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
filer = Filer(opts.dir)
for line_set in lines:
for line in line_set:
if line:
......@@ -494,8 +566,11 @@ def main():
if timestamp is not None:
for context in contexts:
aggr.extend(context.process(event, timestamp))
for a in aggr:
print(a)
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
if not running_flag:
break
if reload_flag:
......@@ -509,7 +584,10 @@ def main():
aggr = []
for context in contexts:
aggr.extend(context.close())
for a in aggr:
print(a)
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment