Newer
Older

Pavel Kácha
committed
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2015 Cesnet z.s.p.o
# Use of this source is governed by a 3-clause BSD-style license, see LICENSE file.
from warden_client import Client, Error, read_cfg
import json
import string
import os
import sys
import errno
import socket
import time
import logging
import signal
import argparse
from os import path, mkdir
from random import choice, randint
# for py2/py3 compatibility
try:
basestring
except NameError:
basestring = str
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class NamedFile(object):
""" Wrapper class for file objects, which allows and tracks filename
changes.
"""
def __init__(self, pth, name, fd=None):
self.name = name
self.path = pth
if fd:
self.f = os.fdopen(fd, "w+b")
else:
self.f = None
def __str__(self):
return "%s(%s, %s)" % (type(self).__name__, self.path, self.name)
def get_path(self, basepath=None, name=None):
return path.join(basepath or self.path, name or self.name)
def open(self, mode):
return open(self.get_path(), mode)
def moveto(self, destpath):
os.rename(self.get_path(), self.get_path(basepath=destpath))
self.path = destpath
def rename(self, newname):
os.rename(self.get_path(), self.get_path(name=newname))
self.name = newname
def remove(self):
os.remove(self.get_path())
class SafeDir(object):
""" Maildir like directory for safe file exchange.

Pavel Kácha
committed
- Producers are expected to drop files into "tmp" under globally unique
filename and rename it into "incoming" atomically (newfile method)

Pavel Kácha
committed
- Workers pick files in "incoming", rename them into "tmp",
do whatever they want, and either discard them or move into
"errors" directory
"""
def __init__(self, p):
self.path = self._ensure_path(p)
self.incoming = self._ensure_path(path.join(self.path, "incoming"))
self.errors = self._ensure_path(path.join(self.path, "errors"))

Pavel Kácha
committed
self.temp = self._ensure_path(path.join(self.path, "tmp"))
self.hostname = socket.gethostname()
self.pid = os.getpid()
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.path)
def _ensure_path(self, p):
try:
mkdir(p)
except OSError:
if not path.isdir(p):
raise
return p
def _get_new_name(self, device=0, inode=0):
return "%s.%d.%f.%d.%d.idea" % (
self.hostname, self.pid, time.time(), device, inode)
def newfile(self):
""" Creates file with unique filename within this SafeDir.
- hostname takes care of network filesystems
- pid distinguishes two daemons on one machine
(we are not multithreaded, so this is enough)
- time in best precision supported narrows window within process
- device/inode makes file unique on particular filesystem
In fact, device/inode is itself enough for uniqueness, however
if we mandate wider format, users can use simpler form with
random numbers instead of device/inode, if they choose to,
and it will still ensure reasonable uniqueness.
"""
# Note: this simpler device/inode algorithm replaces original,
# which checked uniqueness among all directories by atomic
# links.

Pavel Kácha
committed
# First find and open name unique within tmp
tmpname = None
while not tmpname:
tmpname = self._get_new_name()
try:
fd = os.open(path.join(self.temp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
except OSError as e:
if e.errno != errno.EEXIST:
raise # other errors than duplicates should get noticed
tmpname = None
# Now we know device/inode, rename to make unique within system
stat = os.fstat(fd)
newname = self._get_new_name(stat.st_dev, stat.st_ino)
nf = NamedFile(self.temp, tmpname, fd)
nf.rename(newname)
return nf
def get_incoming(self):
return [NamedFile(self.incoming, n) for n in os.listdir(self.incoming)]
def get_incoming_cnt(self):
"""Get number of files in the incoming directory"""
return len(os.listdir(self.incoming))
def receiver(config, wclient, sdir, oneshot):
poll_time = config.get("poll_time", 5)
node = config.get("node", None)
conf_filt = config.get("filter", {})
file_limit = config.get("file_limit", None)
wait_time = config.get("limit_wait_time", 5)
filt = {}
# Extract filter explicitly to be sure we have right param names for getEvents
for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"):
filt[s] = conf_filt.get(s, None)
while running_flag:
count_ok = count_err = 0
limit_reached = False
if file_limit:
cnt_files = sdir.get_incoming_cnt() # Count files in 'incoming' dir
remain_to_limit = file_limit - cnt_files
# Query server, but not for more events than what can fit into limit
if remain_to_limit > 0:
events = wclient.getEvents(count=remain_to_limit, **filt)
else:
events = []
# Check whether limit was reached
if len(events) >= remain_to_limit:
limit_reached = True
else:
events = wclient.getEvents(**filt)
for event in events:
if node:
nodelist = event.setdefault("Node", [])
nodelist.insert(0, node)
try:
nf = None
nf = sdir.newfile()
with nf.f as f:
data = json.dumps(event)
nf.moveto(sdir.incoming)
count_ok += 1
except Exception as e:
Error(message="Error saving event", exc=sys.exc_info(), file=str(nf),
event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger)
count_err += 1
if events:
wclient.logger.info(
"warden_filer: received %d, errors %d"
% (count_ok, count_err))
if limit_reached:
wclient.logger.info("Limit on number of files in 'incoming' dir reached.")
if oneshot:
if not events or limit_reached:

Pavel Kácha
committed
terminate_me(None, None)
if limit_reached:
time.sleep(wait_time)
elif not events:
time.sleep(poll_time)
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def match_event(event, cat=None, nocat=None, tag=None, notag=None, group=None, nogroup=None):
cat_match = tag_match = group_match = True
if cat or nocat:
event_cats = event.get("Category")
event_full_cats = set(event_cats) | set(cat.split(".", 1)[0] for cat in event_cats)
cat_match = set(cat or nocat) & event_full_cats
cat_match = not cat_match if nocat else cat_match
try:
event_node = event.get("Node", [])[0]
except IndexError:
event_node = {}
if tag or notag:
event_tags = set(event_node.get("Type", []))
tag_match = set(tag or notag) & event_tags
tag_match = not tag_match if notag else tag_match
if group or nogroup:
event_name = event_node.get("Name")
namesplit = event_name.split(".")
allnames = set([".".join(namesplit[0:l]) for l in range(1, len(namesplit)+1)])
group_match = set(group or nogroup) & allnames
group_match = not group_match if nogroup else group_match
return cat_match and tag_match and group_match
def get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot):
nflist = sdir.get_incoming()
if oneshot and not nflist:
terminate_me(None, None)
timeout = time.time() + owait_timeout
while len(nflist)<nfchunk and time.time()<timeout and running_flag:
time.sleep(owait_poll_time)
nflist = sdir.get_incoming()
return nflist
def sender(config, wclient, sdir, oneshot):
poll_time = config.get("poll_time", 5)
owait_poll_time = config.get("owait_poll_time", 1)
owait_timeout = config.get("owait_timeout", poll_time)
node = config.get("node", None)
conf_filt = config.get("filter", {})
filt = {}
# Extract filter explicitly to be sure we have right param names for match_event
for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"):
filt[s] = conf_filt.get(s, None)
while running_flag:
nflist = get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot)
if oneshot and not nflist:
terminate_me(None, None)
while running_flag and not nflist:

Pavel Kácha
committed
# No new files, wait and try again
time.sleep(poll_time)
nflist = get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot)

Pavel Kácha
committed
# Loop over all chunks. However:
# - omit the last loop, if there is less data than the optimal window;
# next get_dir_list will still get it again, possibly together with
# new files, which may have appeared meanwhile
# - unless it's the sole loop (so that at least _something_ gets sent)

Pavel Kácha
committed
nfindex = 0
while nfindex<len(nflist) and ((len(nflist)-nfindex>=nfchunk) or not nfindex):

Pavel Kácha
committed
events = []
nf_sent = []
count_ok = count_err = count_unmatched = count_local = 0

Pavel Kácha
committed
for nf in nflist[nfindex:nfindex+nfchunk]:
# prepare event array from files
try:
nf.moveto(sdir.temp)
except Exception:
continue # Silently go to next filename, somebody else might have interfered
try:
with nf.open("rb") as fd:
data = fd.read().decode('utf-8')

Pavel Kácha
committed
event = json.loads(data)
if not match_event(event, **filt):
wclient.logger.debug("Unmatched event: %s" % data)
count_unmatched += 1
nf.remove()
continue
if node:
nodelist = event.setdefault("Node", [])
nodelist.insert(0, node)
events.append(event)
nf_sent.append(nf)
except Exception as e:
Error(message="Error loading event", exc=sys.exc_info(), file=str(nf),
sdir=sdir.path).log(wclient.logger)
nf.moveto(sdir.errors)

Pavel Kácha
committed
res = wclient.sendEvents(events)
if isinstance(res, Error):
for e in res.errors:
errno = e["error"]
evlist = e.get("events", range(len(nf_sent))) # None means all
for i in evlist:
if nf_sent[i]:
nf_sent[i].moveto(sdir.errors)
nf_sent[i] = None
count_err += 1
# Cleanup rest - the succesfully sent events
for name in nf_sent:
if name:
if done_dir:
name.moveto(done_dir)
else:
name.remove()

Pavel Kácha
committed
count_ok += 1
wclient.logger.info(
"warden_filer: saved %d, warden errors %d, local errors %d, unmatched %d" % (count_ok, count_err, count_local, count_unmatched))

Pavel Kácha
committed
nfindex += nfchunk # skip to next chunk of files
nfchunk = wclient.send_events_limit # might get changed by server
def get_logger_files(logger):
""" Return file objects of loggers """
files = []
for handler in logger.handlers:
if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
files.append(handler.stream)
if hasattr(handler, 'socket') and hasattr(handler.socket, 'fileno'):
files.append(handler.socket)
return files
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def daemonize(
work_dir = None, chroot_dir = None,
umask = None, uid = None, gid = None,
pidfile = None, files_preserve = [], signals = {}):
# Dirs, limits, users
if chroot_dir is not None:
os.chdir(chroot_dir)
os.chroot(chroot_dir)
if umask is not None:
os.umask(umask)
if work_dir is not None:
os.chdir(work_dir)
if gid is not None:
os.setgid(gid)
if uid is not None:
os.setuid(uid)
# Doublefork, split session
if os.fork()>0:
os._exit(0)
os.setsid()
if os.fork()>0:
os._exit(0)
# Setup signal handlers
for (signum, handler) in signals.items():
signal.signal(signum, handler)
# Close descriptors
descr_preserve = set(f.fileno() for f in files_preserve)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd==resource.RLIM_INFINITY:
maxfd = 65535
for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr
if fd not in descr_preserve:
try:
os.close(fd)
except Exception:
pass
# Redirect stdin, stdout, stderr to /dev/null
devnull = os.open(os.devnull, os.O_RDWR)
for fd in range(3):
os.dup2(devnull, fd)
# PID file
if pidfile is not None:
pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC)
os.write(pidd, (str(os.getpid())+"\n").encode())
os.close(pidd)
# Define and setup atexit closure
@atexit.register
def unlink_pid():
try:
os.unlink(pidfile)
except Exception:
pass
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
running_flag = True # Daemon cleanly exits when set to False
def terminate_me(signum, frame):
global running_flag
running_flag = False
class DummyContext(object):
""" In one shot mode we use this instead of DaemonContext """
def __enter__(self): pass
def __exit__(self, *exc): pass
def get_args():
argp = argparse.ArgumentParser(
description="Save Warden events as files or send files to Warden")
argp.add_argument("func",
choices=["sender", "receiver"],
action="store",
help="choose direction: sender picks up files and submits them to "
"Warden, receiver pulls events from Warden and saves them as files")
argp.add_argument("-c", "--config",
default=path.splitext(__file__)[0]+".cfg",
dest="config",
help="configuration file path")
default=False,
dest="oneshot",
action="store_true",
help="don't daemonise, run just once")
argp.add_argument("-d", "--daemon",
default=False,
dest="daemon",
action="store_true",
help="daemonize")
argp.add_argument("-p", "--pid_file",
default=None,
dest="pid_file",
action="store",
help="create PID file with this name")
return argp.parse_args()
def get_configs():
config = read_cfg(args.config)
# Allow inline or external Warden config
wconfig = config.get("warden", "warden_client.cfg")
wconfig = read_cfg(wconfig)
fconfig = config.get(args.func, {})
return wconfig, fconfig
if __name__ == "__main__":
args = get_args()
function = sender if args.func=="sender" else receiver
wconfig, fconfig = get_configs()
wclient = Client(**wconfig)
try:
if args.daemon:
daemonize(
work_dir = fconfig.get("work_dir", "."),
chroot_dir = fconfig.get("chroot_dir"),
umask = fconfig.get("umask"),
uid = fconfig.get("uid"),
gid = fconfig.get("gid"),
pidfile = args.pid_file,
files_preserve = get_logger_files(wclient.logger),
signals = {
signal.SIGTERM: terminate_me,
signal.SIGINT: terminate_me,
signal.SIGHUP: signal.SIG_IGN,
signal.SIGTTIN: signal.SIG_IGN,
signal.SIGTTOU: signal.SIG_IGN})
safe_dir = SafeDir(fconfig.get("dir", args.func))
wclient.logger.info("Starting %s" % args.func)
function(fconfig, wclient, safe_dir, args.oneshot)
wclient.logger.info("Exiting %s" % args.func)
except Exception as e:
Error(message="%s daemon error" % args.func, exc=sys.exc_info()).log(wclient.logger)