diff --git a/warden3/contrib/warden_filer/warden_filer.cfg b/warden3/contrib/warden_filer/warden_filer.cfg index bf6e50c757e978bdb0c3f51e3f3aadf57e5c5fba..3ab68c7b1fc4b81c0758916bdce7ebf971d4d922 100644 --- a/warden3/contrib/warden_filer/warden_filer.cfg +++ b/warden3/contrib/warden_filer/warden_filer.cfg @@ -30,7 +30,6 @@ "Type": ["Relay"], "SW": ["warden_filer-sender"], "AggrWin": "00:05:00", - "Note": "Test warden_filer sender" } }, "receiver": { diff --git a/warden3/contrib/warden_filer/warden_filer.py b/warden3/contrib/warden_filer/warden_filer.py index bb5207f85317d64509493ece7527b9382ef34cfa..5c6c41132f6718b9d505bfd1019586bafd9bd963 100644 --- a/warden3/contrib/warden_filer/warden_filer.py +++ b/warden3/contrib/warden_filer/warden_filer.py @@ -21,6 +21,7 @@ from random import choice, randint; from daemon import DaemonContext from daemon.pidlockfile import TimeoutPIDLockFile +VERSION = "3.0-beta1" class NamedFile(object): """ Wrapper class for file objects, which allows and tracks filename @@ -213,7 +214,6 @@ def match_event(event, cat=None, nocat=None, tag=None, notag=None, group=None, n def sender(config, wclient, sdir, oneshot): - send_events_limit = config.get("send_events_limit", 500) poll_time = config.get("poll_time", 5) node = config.get("node", None) conf_filt = config.get("filter", {}) @@ -227,58 +227,63 @@ def sender(config, wclient, sdir, oneshot): if oneshot: terminate_me(None, None) while running_flag and not nflist: + # No new files, wait and try again time.sleep(poll_time) nflist = sdir.get_incoming() - # count chunk iterations rounded up - count = len(nflist) - - events = [] - nf_sent = [] - count_ok = count_err = count_unmatched = 0 - for nf in nflist: - # prepare event array from files - try: - nf.moveto(sdir.temp) - except Exception: - continue # Silently go to next filename, somebody else might have interfered - try: - with nf.open("rb") as fd: - data = fd.read() - event = json.loads(data) - if not match_event(event, **filt): - wclient.logger.debug("Unmatched event: %s" % data) - count_unmatched += 1 - nf.remove() - continue - if node: - nodelist = event.setdefault("Node", []) - nodelist.insert(0, node) - events.append(event) - nf_sent.append(nf) - except Exception as e: - Error(message="Error loading event", exc=sys.exc_info(), file=str(nf), - sdir=sdir.path).log(wclient.logger) - nf.moveto(sdir.errors) - - res = wclient.sendEvents(events) - - if isinstance(res, Error): - for e in res.errors: - errno = e["error"] - evlist = e.get("events", range(len(nf_sent))) # None means all - for i in evlist: - if nf_sent[i]: - nf_sent[i].moveto(sdir.errors) - nf_sent[i] = None - count_err += 1 - - # Cleanup rest - succesfully sent events - for name in nf_sent: - if name: - name.remove() - count_ok += 1 - wclient.logger.info( - "warden_filer: saved %d, errors %d, unmatched %d" % (count_ok, count_err, count_unmatched)) + + nfindex = 0 + nfchunk = wclient.send_events_limit + while nfindex<len(nflist): + events = [] + nf_sent = [] + count_ok = count_err = count_unmatched = 0 + for nf in nflist[nfindex:nfindex+nfchunk]: + # prepare event array from files + try: + nf.moveto(sdir.temp) + except Exception: + continue # Silently go to next filename, somebody else might have interfered + try: + with nf.open("rb") as fd: + data = fd.read() + event = json.loads(data) + if not match_event(event, **filt): + wclient.logger.debug("Unmatched event: %s" % data) + count_unmatched += 1 + nf.remove() + continue + if node: + nodelist = event.setdefault("Node", []) + nodelist.insert(0, node) + events.append(event) + nf_sent.append(nf) + except Exception as e: + Error(message="Error loading event", exc=sys.exc_info(), file=str(nf), + sdir=sdir.path).log(wclient.logger) + nf.moveto(sdir.errors) + + res = wclient.sendEvents(events) + + if isinstance(res, Error): + for e in res.errors: + errno = e["error"] + evlist = e.get("events", range(len(nf_sent))) # None means all + for i in evlist: + if nf_sent[i]: + nf_sent[i].moveto(sdir.errors) + nf_sent[i] = None + count_err += 1 + + # Cleanup rest - the succesfully sent events + for name in nf_sent: + if name: + name.remove() + count_ok += 1 + wclient.logger.info( + "warden_filer: saved %d, errors %d, unmatched %d" % (count_ok, count_err, count_unmatched)) + + nfindex += nfchunk # skip to next chunk of files + nfchunk = wclient.send_events_limit # might get changed by server