Skip to content
Snippets Groups Projects
Commit 502b32b9 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Merge branch 'warden-3' of homeproj.cesnet.cz:warden into warden-3

parents 9520b11b a2078a6f
No related branches found
No related tags found
No related merge requests found
...@@ -104,6 +104,13 @@ JSON object, containing configuration. See also warden_filer.cfg as example. ...@@ -104,6 +104,13 @@ JSON object, containing configuration. See also warden_filer.cfg as example.
array (see Idea doc). Be careful here, you may ruin Idea array (see Idea doc). Be careful here, you may ruin Idea
messages by wrongly formatted data and they are not checked messages by wrongly formatted data and they are not checked
here in any way here in any way
poll_time - how often to check Warden server for new events (in seconds,
defaults to 5)
file_limit - limit number of files in "incoming" directory. When the limit
is reached, polling is paused for "limit_wait_time" seconds
limit_wait_time - wait this number of seconds if limit on number of files
is reached (defaults to 5)
Both the "sender" and "reciever" sections can also bear daemon Both the "sender" and "reciever" sections can also bear daemon
configuration. configuration.
......
...@@ -49,6 +49,8 @@ ...@@ -49,6 +49,8 @@
//"node": { //"node": {
// "Name": "cz.example.warden.test_receiver", // "Name": "cz.example.warden.test_receiver",
// "Type": ["Relay"] // "Type": ["Relay"]
//} //},
// Optional limit on number of files in "incoming" directory
//"file_limit": 10000
} }
} }
...@@ -138,20 +138,40 @@ class SafeDir(object): ...@@ -138,20 +138,40 @@ class SafeDir(object):
return [NamedFile(self.incoming, n) for n in os.listdir(self.incoming)] return [NamedFile(self.incoming, n) for n in os.listdir(self.incoming)]
def get_incoming_cnt(self):
"""Get number of files in the incoming directory"""
return len(os.listdir(self.incoming))
def receiver(config, wclient, sdir, oneshot): def receiver(config, wclient, sdir, oneshot):
poll_time = config.get("poll_time", 5) poll_time = config.get("poll_time", 5)
node = config.get("node", None) node = config.get("node", None)
conf_filt = config.get("filter", {}) conf_filt = config.get("filter", {})
file_limit = config.get("file_limit", None)
wait_time = config.get("limit_wait_time", 5)
filt = {} filt = {}
# Extract filter explicitly to be sure we have right param names for getEvents # Extract filter explicitly to be sure we have right param names for getEvents
for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"): for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"):
filt[s] = conf_filt.get(s, None) filt[s] = conf_filt.get(s, None)
while running_flag: while running_flag:
events = wclient.getEvents(**filt)
count_ok = count_err = 0 count_ok = count_err = 0
while events: limit_reached = False
if file_limit:
cnt_files = sdir.get_incoming_cnt() # Count files in 'incoming' dir
remain_to_limit = file_limit - cnt_files
# Query server, but not for more events than what can fit into limit
if remain_to_limit > 0:
events = wclient.getEvents(count=remain_to_limit, **filt)
else:
events = []
# Check whether limit was reached
if len(events) >= remain_to_limit:
limit_reached = True
else:
events = wclient.getEvents(**filt)
for event in events: for event in events:
if node: if node:
nodelist = event.setdefault("Node", []) nodelist = event.setdefault("Node", [])
...@@ -168,15 +188,22 @@ def receiver(config, wclient, sdir, oneshot): ...@@ -168,15 +188,22 @@ def receiver(config, wclient, sdir, oneshot):
Error(message="Error saving event", exc=sys.exc_info(), file=str(nf), Error(message="Error saving event", exc=sys.exc_info(), file=str(nf),
event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger) event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger)
count_err += 1 count_err += 1
if events:
wclient.logger.info( wclient.logger.info(
"warden_filer: received %d, errors %d" "warden_filer: received %d, errors %d"
% (count_ok, count_err)) % (count_ok, count_err))
events = wclient.getEvents(**filt)
count_ok = count_err = 0 if limit_reached:
wclient.logger.info("Limit on number of files in 'incoming' dir reached.")
if oneshot: if oneshot:
if not events: if not events or limit_reached:
terminate_me(None, None) terminate_me(None, None)
else: else:
if limit_reached:
time.sleep(wait_time)
elif not events:
time.sleep(poll_time) time.sleep(poll_time)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment