Skip to content
Snippets Groups Projects
Commit 46ca1c21 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Added daemonization and file saving

parent 74544671
No related branches found
No related tags found
No related merge requests found
...@@ -11,6 +11,9 @@ import signal ...@@ -11,6 +11,9 @@ import signal
import contextlib import contextlib
import uuid import uuid
import json import json
import socket
import resource
import atexit
import os.path as pth import os.path as pth
from collections import namedtuple from collections import namedtuple
try: try:
...@@ -68,7 +71,7 @@ class FileWatcher(object): ...@@ -68,7 +71,7 @@ class FileWatcher(object):
def close(self): def close(self):
try: try:
if not self.f: if self.f:
self.f.close() self.f.close()
except IOError: except IOError:
pass pass
...@@ -316,7 +319,50 @@ class IdeaGen(object): ...@@ -316,7 +319,50 @@ class IdeaGen(object):
"Target": itargets, "Target": itargets,
"Node": [inode] "Node": [inode]
} }
return json.dumps(idea) return json.dumps(idea, ensure_ascii=True)
class Filer(object):
def __init__(self, directory):
self.basedir = self._ensure_path(directory)
self.tmp = self._ensure_path(pth.join(self.basedir, "tmp"))
self.incoming = self._ensure_path(pth.join(self.basedir, "incoming"))
self.hostname = socket.gethostname()
self.pid = os.getpid()
def _ensure_path(self, p):
try:
os.mkdir(p)
except OSError:
if not pth.isdir(p):
raise
return p
def _get_new_name(self, fd=None):
(inode, device) = os.fstat(fd)[1:3] if fd else (0, 0)
return "%s.%d.%f.%d.%d" % (
self.hostname, self.pid, time.time(), device, inode)
def create_unique_file(self):
# First find and open name unique within tmp
tmpname = None
while not tmpname:
tmpname = self._get_new_name()
try:
fd = os.open(pth.join(self.tmp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
except OSError as e:
if e.errno != errno.EEXIST:
raise # other errors than duplicates should get noticed
tmpname = None
# Now we know the device/inode, rename to make unique within system
newname = self._get_new_name(fd)
os.rename(pth.join(self.tmp, tmpname), pth.join(self.tmp, newname))
nf = os.fdopen(fd, "w")
return nf, newname
def publish_file(self, short_name):
os.rename(pth.join(self.tmp, short_name), pth.join(self.incoming, short_name))
def daemonize( def daemonize(
...@@ -338,7 +384,10 @@ def daemonize( ...@@ -338,7 +384,10 @@ def daemonize(
# Doublefork, split session # Doublefork, split session
if os.fork()>0: if os.fork()>0:
os._exit(0) os._exit(0)
os.setsid() try:
os.setsid()
except OSError:
pass
if os.fork()>0: if os.fork()>0:
os._exit(0) os._exit(0)
# Setup signal handlers # Setup signal handlers
...@@ -411,12 +460,30 @@ def get_args(): ...@@ -411,12 +460,30 @@ def get_args():
type="int", type="int",
action="store", action="store",
help="log file polling interval") help="log file polling interval")
optp.add_option("-d", "--dir",
default=None,
dest="dir",
type="string",
action="store",
help="Target directory (mandatory)"),
optp.add_option("-p", "--pid", optp.add_option("-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"), default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid", dest="pid",
type="string", type="string",
action="store", action="store",
help="create PID file with this name (default: %default)") help="create PID file with this name (default: %default)")
optp.add_option("-u", "--uid",
default=None,
dest="uid",
type="int",
action="store",
help="user id to run under")
optp.add_option("-g", "--gid",
default=None,
dest="gid",
type="int",
action="store",
help="group id to run under")
optp.add_option("--realtime", optp.add_option("--realtime",
default=True, default=True,
dest="realtime", dest="realtime",
...@@ -454,25 +521,30 @@ def reload_me(signum, frame): ...@@ -454,25 +521,30 @@ def reload_me(signum, frame):
reload_flag = True reload_flag = True
def main(): def main():
# safedir output
# daemon
global reload_flag global reload_flag
optp = get_args() optp = get_args()
opts, args = optp.parse_args() opts, args = optp.parse_args()
if not args or opts.name is None: if not args or opts.name is None or opts.dir is None:
optp.print_help() optp.print_help()
sys.exit() sys.exit()
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
signal.signal(signal.SIGHUP, reload_me)
if opts.oneshot: if opts.oneshot:
files = [open(arg) for arg in args] files = [open(arg) for arg in args]
else: else:
daemonize(
pidfile = opts.pid,
uid = opts.uid,
gid = opts.gid,
signals = {
signal.SIGINT: terminate_me,
signal.SIGTERM: terminate_me,
signal.SIGHUP: reload_me
})
files = [FileWatcher(arg) for arg in args] files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files): with contextlib.nested(*files):
lines = itertools.izip(*files) lines = itertools.izip(*files)
ideagen = IdeaGen(name=opts.name, test=opts.test) ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]] contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
filer = Filer(opts.dir)
for line_set in lines: for line_set in lines:
for line in line_set: for line in line_set:
if line: if line:
...@@ -494,8 +566,11 @@ def main(): ...@@ -494,8 +566,11 @@ def main():
if timestamp is not None: if timestamp is not None:
for context in contexts: for context in contexts:
aggr.extend(context.process(event, timestamp)) aggr.extend(context.process(event, timestamp))
for a in aggr: for event in aggr:
print(a) f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
if not running_flag: if not running_flag:
break break
if reload_flag: if reload_flag:
...@@ -509,7 +584,10 @@ def main(): ...@@ -509,7 +584,10 @@ def main():
aggr = [] aggr = []
for context in contexts: for context in contexts:
aggr.extend(context.close()) aggr.extend(context.close())
for a in aggr: for event in aggr:
print(a) f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
main() main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment