Skip to content
Snippets Groups Projects
Commit 950b8941 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

* Filer now works on send_event_limit sized chunks to avoid running off memory of timing out

* Shorter example config
parent 1743762d
No related branches found
No related tags found
No related merge requests found
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
"Type": ["Relay"], "Type": ["Relay"],
"SW": ["warden_filer-sender"], "SW": ["warden_filer-sender"],
"AggrWin": "00:05:00", "AggrWin": "00:05:00",
"Note": "Test warden_filer sender"
} }
}, },
"receiver": { "receiver": {
......
...@@ -21,6 +21,7 @@ from random import choice, randint; ...@@ -21,6 +21,7 @@ from random import choice, randint;
from daemon import DaemonContext from daemon import DaemonContext
from daemon.pidlockfile import TimeoutPIDLockFile from daemon.pidlockfile import TimeoutPIDLockFile
VERSION = "3.0-beta1"
class NamedFile(object): class NamedFile(object):
""" Wrapper class for file objects, which allows and tracks filename """ Wrapper class for file objects, which allows and tracks filename
...@@ -213,7 +214,6 @@ def match_event(event, cat=None, nocat=None, tag=None, notag=None, group=None, n ...@@ -213,7 +214,6 @@ def match_event(event, cat=None, nocat=None, tag=None, notag=None, group=None, n
def sender(config, wclient, sdir, oneshot): def sender(config, wclient, sdir, oneshot):
send_events_limit = config.get("send_events_limit", 500)
poll_time = config.get("poll_time", 5) poll_time = config.get("poll_time", 5)
node = config.get("node", None) node = config.get("node", None)
conf_filt = config.get("filter", {}) conf_filt = config.get("filter", {})
...@@ -227,58 +227,63 @@ def sender(config, wclient, sdir, oneshot): ...@@ -227,58 +227,63 @@ def sender(config, wclient, sdir, oneshot):
if oneshot: if oneshot:
terminate_me(None, None) terminate_me(None, None)
while running_flag and not nflist: while running_flag and not nflist:
# No new files, wait and try again
time.sleep(poll_time) time.sleep(poll_time)
nflist = sdir.get_incoming() nflist = sdir.get_incoming()
# count chunk iterations rounded up
count = len(nflist) nfindex = 0
nfchunk = wclient.send_events_limit
events = [] while nfindex<len(nflist):
nf_sent = [] events = []
count_ok = count_err = count_unmatched = 0 nf_sent = []
for nf in nflist: count_ok = count_err = count_unmatched = 0
# prepare event array from files for nf in nflist[nfindex:nfindex+nfchunk]:
try: # prepare event array from files
nf.moveto(sdir.temp) try:
except Exception: nf.moveto(sdir.temp)
continue # Silently go to next filename, somebody else might have interfered except Exception:
try: continue # Silently go to next filename, somebody else might have interfered
with nf.open("rb") as fd: try:
data = fd.read() with nf.open("rb") as fd:
event = json.loads(data) data = fd.read()
if not match_event(event, **filt): event = json.loads(data)
wclient.logger.debug("Unmatched event: %s" % data) if not match_event(event, **filt):
count_unmatched += 1 wclient.logger.debug("Unmatched event: %s" % data)
nf.remove() count_unmatched += 1
continue nf.remove()
if node: continue
nodelist = event.setdefault("Node", []) if node:
nodelist.insert(0, node) nodelist = event.setdefault("Node", [])
events.append(event) nodelist.insert(0, node)
nf_sent.append(nf) events.append(event)
except Exception as e: nf_sent.append(nf)
Error(message="Error loading event", exc=sys.exc_info(), file=str(nf), except Exception as e:
sdir=sdir.path).log(wclient.logger) Error(message="Error loading event", exc=sys.exc_info(), file=str(nf),
nf.moveto(sdir.errors) sdir=sdir.path).log(wclient.logger)
nf.moveto(sdir.errors)
res = wclient.sendEvents(events)
res = wclient.sendEvents(events)
if isinstance(res, Error):
for e in res.errors: if isinstance(res, Error):
errno = e["error"] for e in res.errors:
evlist = e.get("events", range(len(nf_sent))) # None means all errno = e["error"]
for i in evlist: evlist = e.get("events", range(len(nf_sent))) # None means all
if nf_sent[i]: for i in evlist:
nf_sent[i].moveto(sdir.errors) if nf_sent[i]:
nf_sent[i] = None nf_sent[i].moveto(sdir.errors)
count_err += 1 nf_sent[i] = None
count_err += 1
# Cleanup rest - succesfully sent events
for name in nf_sent: # Cleanup rest - the succesfully sent events
if name: for name in nf_sent:
name.remove() if name:
count_ok += 1 name.remove()
wclient.logger.info( count_ok += 1
"warden_filer: saved %d, errors %d, unmatched %d" % (count_ok, count_err, count_unmatched)) wclient.logger.info(
"warden_filer: saved %d, errors %d, unmatched %d" % (count_ok, count_err, count_unmatched))
nfindex += nfchunk # skip to next chunk of files
nfchunk = wclient.send_events_limit # might get changed by server
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment