Skip to content
Snippets Groups Projects
Commit 5877c9fb authored by Pavel Kácha's avatar Pavel Kácha
Browse files

* --daemon is now explicit option

* --pid_file is now on command line, instead of in cfg file (better supports startup shell scripts)
* filer now supports done_dir, where processed messages get stored instead of removing them
* sender now can opportunistically wait if the number of event files to process is lower than send_events_limit. Governed by owait_* cfg file options
parent 950b8941
Branches
Tags
No related merge requests found
+---------------------------------+
| Warden Filer 0.1 for Warden 3.X |
+---------------------------------+
+---------------------------------------+
| Warden Filer 3.0-beta1 for Warden 3.X |
+---------------------------------------+
Content
......@@ -49,15 +49,21 @@ C. Usage
-c CONFIG, --config CONFIG
configuration file path
--oneshot don't daemonise, run just once
-d, --daemon daemonize
-p PID_FILE, --pid_file PID_FILE
create PID file with this name
CONFIG denotes path to configuration file, default is warden_filer.cfg in
current directory.
--oneshot prevents daemonizing, Filer just does its work once (fetches
available events or sends event files present in directory), but obeys
all other applicable options from configuration file (concerning logging,
filtering, directories, etc.)
Without --oneshot Filer goes to full unix daemon mode.
--oneshot instructs Filer to just do its work once (fetch available events
or send event files present in directory), but obeys all other applicable
options from configuration file (concerning logging, filtering, directories,
etc.)
--daemon instructs Filer to go to full unix daemon mode. Without it,
Filer just stays on foreground.
--pid_file makes Filer to create the usual PID file. Without it, no PID
file gets created.
------------------------------------------------------------------------------
D. Configuration
......@@ -71,12 +77,25 @@ JSON object, containing configuration. See also warden_filer.cfg as example.
sender - configuration section for sender mode
dir - directory, whose "incoming" subdir will be checked for Idea
events to send out
done_dir - directory, into which the messages will be moved after
successful sending. If not set, processed messages will get
deleted, which is default, and usually what you want. Note that
this is just regular directory, no special locking precautions
and no subdirectories are done here, however if "done_dir" is on
the same filesystem as "dir"
filter - filter fields (same as in Warden query, see Warden and Idea
doc, possible keys: cat, nocat, group, nogroup, tag, notag),
unmatched events get discarded and deleted
doc, possible keys: cat, nocat, group, nogroup, tag, notag),
unmatched events get discarded and deleted
node - o information about detector to be prepended into event Node
array (see Idea doc). Note that Warden server may require it
to correspond with client registration
array (see Idea doc). Note that Warden server may require it to
correspond with client registration
poll_time - how often to check incoming directory (in seconds, defaults
to 5)
owait_timeout - how long to opportunistically wait for possible new
incoming files when number of files to process is less than
send_events_limit (in seconds, defaults to poll_time)
owait_poll_time - how often to check incoming directory during
opportunistic timeout (in seconds, defaults to 1)
receiver - configuration section for receiver mode
dir - directory, whose "incoming" subdir will serve as target for events
filter - filter fields for Warden query (see Warden and Idea doc,
......@@ -86,6 +105,14 @@ JSON object, containing configuration. See also warden_filer.cfg as example.
messages by wrongly formatted data and they are not checked
here in any way
Both the "sender" and "reciever" sections can also bear daemon
configuration.
work_dir - where should daemon chdir
chroot_dir - confine daemon into chroot directory
umask - explicitly set umask for created files
uid, gid - uid/gid, under which daemon will run
------------------------------------------------------------------------------
E. Directories and locking issues
......@@ -95,13 +122,19 @@ confusion. Simple path suffers locking issue: when one process saves file
there, another process has no way to know whether file is already complete
or not, and starting to read prematurely can lead to corrupted data read.
Also, two concurrent processes may decide to work on one file, stomping on
others legs. So, your scripts and tools inserting data or taking data from
working directories must obey simple protocols.
others legs.
So, your scripts and tools inserting data or taking data from working
directories must obey simple protocols, which use atomic "rename" to avoid
locking issues.
Also, your directory (and its structure) _must_ reside on the same
filesystem to keep "rename" atomic. _Never_ try to mount some of the
subdirectories ("temp", "incoming", "errors") from other filesystem.
1. Inserting file
* Use "temp" subdirectory to create new file; filename is arbitrary, but
must be unique among all subdirectories.
* The file you want to create _must_ be created in the "temp" subdirectory
first, _not_ "incoming". Filename is arbitrary, but must be unique among
all subdirectories.
* When done writing, rename the file into "incoming" subdir. Rename is
atomic operation, so for readers, file will appear either nonexistent
......@@ -115,7 +148,7 @@ working directories must obey simple protocols.
information into name is recommended - Filer itself uses hostname, pid,
unixtime, milliseconds, device number and file inode number to avoid
locking issues both on local and network based filesystems and to be
prepared for hight traffic.
prepared for high traffic.
2. Picking up file
......@@ -123,11 +156,11 @@ working directories must obey simple protocols.
* Do whatever you want with contents, and when finished, rename file back
into "incoming", or remove, or move somewhere else, or move into "errors"
directory, after all, it's your file.
directory - what suits your needs, after all, it's your file.
Note that in concurrent environment file can disappear between directory
enumeration and attempt to rename - then just pick another one (and
repeat), someone was swifter.)
possibly repeat), someone was swifter.
------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o
......@@ -213,30 +213,50 @@ def match_event(event, cat=None, nocat=None, tag=None, notag=None, group=None, n
def get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot):
nflist = sdir.get_incoming()
if oneshot and not nflist:
terminate_me(None, None)
timeout = time.time() + owait_timeout
while len(nflist)<nfchunk and time.time()<timeout and running_flag:
time.sleep(owait_poll_time)
nflist = sdir.get_incoming()
return nflist
def sender(config, wclient, sdir, oneshot):
poll_time = config.get("poll_time", 5)
owait_poll_time = config.get("owait_poll_time", 1)
owait_timeout = config.get("owait_timeout", poll_time)
node = config.get("node", None)
done_dir = config.get("done_dir", None)
conf_filt = config.get("filter", {})
filt = {}
# Extract filter explicitly to be sure we have right param names for match_event
for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"):
filt[s] = conf_filt.get(s, None)
nfchunk = wclient.send_events_limit
while running_flag:
nflist = sdir.get_incoming()
if oneshot:
nflist = get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot)
if oneshot and not nflist:
terminate_me(None, None)
while running_flag and not nflist:
# No new files, wait and try again
time.sleep(poll_time)
nflist = sdir.get_incoming()
nflist = get_dir_list(sdir, owait_poll_time, owait_timeout, nfchunk, oneshot)
# Loop over all chunks. However:
# - omit the last loop, if there is less data than the optimal window;
# next get_dir_list will still get it again, possibly together with
# new files, which may have appeared meanwhile
# - unless it's the sole loop (so that at least _something_ gets sent)
nfindex = 0
nfchunk = wclient.send_events_limit
while nfindex<len(nflist):
while nfindex<len(nflist) and ((len(nflist)-nfindex>=nfchunk) or not nfindex):
events = []
nf_sent = []
count_ok = count_err = count_unmatched = 0
count_ok = count_err = count_unmatched = count_local = 0
for nf in nflist[nfindex:nfindex+nfchunk]:
# prepare event array from files
try:
......@@ -261,6 +281,7 @@ def sender(config, wclient, sdir, oneshot):
Error(message="Error loading event", exc=sys.exc_info(), file=str(nf),
sdir=sdir.path).log(wclient.logger)
nf.moveto(sdir.errors)
count_local += 1
res = wclient.sendEvents(events)
......@@ -277,10 +298,13 @@ def sender(config, wclient, sdir, oneshot):
# Cleanup rest - the succesfully sent events
for name in nf_sent:
if name:
name.remove()
if done_dir:
name.moveto(done_dir)
else:
name.remove()
count_ok += 1
wclient.logger.info(
"warden_filer: saved %d, errors %d, unmatched %d" % (count_ok, count_err, count_unmatched))
"warden_filer: saved %d, warden errors %d, local errors %d, unmatched %d" % (count_ok, count_err, count_local, count_unmatched))
nfindex += nfchunk # skip to next chunk of files
nfchunk = wclient.send_events_limit # might get changed by server
......@@ -326,11 +350,21 @@ def get_args():
default=path.splitext(__file__)[0]+".cfg",
dest="config",
help="configuration file path")
argp.add_argument('--oneshot',
argp.add_argument("-o", "--oneshot",
default=False,
dest="oneshot",
action="store_true",
help="don't daemonise, run just once")
argp.add_argument("-d", "--daemon",
default=False,
dest="daemon",
action="store_true",
help="daemonize")
argp.add_argument("-p", "--pid_file",
default=None,
dest="pid_file",
action="store",
help="create PID file with this name")
return argp.parse_args()
......@@ -357,18 +391,17 @@ if __name__ == "__main__":
wconfig, fconfig = get_configs()
oneshot = args.oneshot
safe_dir = SafeDir(fconfig.get("dir", args.func))
wclient = Client(**wconfig)
if oneshot:
if not args.daemon:
daemon = DummyContext()
else:
work_dir = fconfig.get("work_dir", ".")
chroot_dir = fconfig.get("chroot_dir")
umask = fconfig.get("umask", 0)
pid_file = fconfig.get("pid_file", "/var/run/warden_filer.pid")
pid_file = args.pid_file
uid = fconfig.get("uid")
gid = fconfig.get("gid")
......@@ -376,7 +409,7 @@ if __name__ == "__main__":
working_directory = work_dir,
chroot_directory = chroot_dir,
umask = umask,
pidfile = TimeoutPIDLockFile(pid_file, acquire_timeout=0),
pidfile = TimeoutPIDLockFile(pid_file, acquire_timeout=0) if pid_file else None,
uid = uid,
gid = gid,
files_preserve = get_logger_files(wclient.logger),
......@@ -390,7 +423,7 @@ if __name__ == "__main__":
try:
with daemon:
wclient.logger.info("Starting %s" % args.func)
function(fconfig, wclient, safe_dir, oneshot)
function(fconfig, wclient, safe_dir, args.oneshot)
wclient.logger.info("Exiting %s" % args.func)
except lockfile.Error as e:
wclient.logger.critical("Error acquiring lockfile %s (%s)"
......
......@@ -25,8 +25,9 @@ ACTION="$1"
case "$ACTION" in
start)
mkdir -p "${PID%/*}"
log_daemon_msg "Starting $SERVICE_NAME" "$SERVICE_NAME"
start_daemon -p "$PID" "$DAEMON_PATH" -c "$CONFIG" "$FUNC"
start_daemon -p "$PID" "$DAEMON_PATH" -c "$CONFIG" --pid_file "$PID" --daemon "$FUNC"
log_end_msg $?
;;
stop)
......
......@@ -25,8 +25,9 @@ ACTION="$1"
case "$ACTION" in
start)
mkdir -p "${PID%/*}"
log_daemon_msg "Starting $SERVICE_NAME" "$SERVICE_NAME"
start_daemon -p "$PID" "$DAEMON_PATH" -c "$CONFIG" "$FUNC"
start_daemon -p "$PID" "$DAEMON_PATH" -c "$CONFIG" --pid_file "$PID" --daemon "$FUNC"
log_end_msg $?
;;
stop)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment