diff --git a/warden3/contrib/warden_filer/README b/warden3/contrib/warden_filer/README index 5dc7590271cd7c443ff01bc1bd8424a898ecf911..e1250acbf6cee7086e764a33b15672b4ec661980 100644 --- a/warden3/contrib/warden_filer/README +++ b/warden3/contrib/warden_filer/README @@ -104,6 +104,13 @@ JSON object, containing configuration. See also warden_filer.cfg as example. array (see Idea doc). Be careful here, you may ruin Idea messages by wrongly formatted data and they are not checked here in any way + poll_time - how often to check Warden server for new events (in seconds, + defaults to 5) + file_limit - limit number of files in "incoming" directory. When the limit + is reached, polling is paused for "limit_wait_time" seconds + limit_wait_time - wait this number of seconds if limit on number of files + is reached (defaults to 5) + Both the "sender" and "reciever" sections can also bear daemon configuration. diff --git a/warden3/contrib/warden_filer/warden_filer.cfg.dist b/warden3/contrib/warden_filer/warden_filer.cfg.dist index 0366e5c2b5096dba60d876a51f8e16d75589a99c..ab6b1e1947e0a3b5e642ff0f74dca8cda1285367 100644 --- a/warden3/contrib/warden_filer/warden_filer.cfg.dist +++ b/warden3/contrib/warden_filer/warden_filer.cfg.dist @@ -49,6 +49,8 @@ //"node": { // "Name": "cz.example.warden.test_receiver", // "Type": ["Relay"] - //} + //}, + // Optional limit on number of files in "incoming" directory + //"file_limit": 10000 } } diff --git a/warden3/contrib/warden_filer/warden_filer.py b/warden3/contrib/warden_filer/warden_filer.py index c7ef609726257519911a8cf8b229ed7c18a50001..b9d08cb376dbb3f474c48f79afb123bb643f1b1d 100755 --- a/warden3/contrib/warden_filer/warden_filer.py +++ b/warden3/contrib/warden_filer/warden_filer.py @@ -138,46 +138,73 @@ class SafeDir(object): return [NamedFile(self.incoming, n) for n in os.listdir(self.incoming)] + def get_incoming_cnt(self): + """Get number of files in the incoming directory""" + return len(os.listdir(self.incoming)) + + def receiver(config, wclient, sdir, oneshot): poll_time = config.get("poll_time", 5) node = config.get("node", None) conf_filt = config.get("filter", {}) + file_limit = config.get("file_limit", None) + wait_time = config.get("limit_wait_time", 5) filt = {} # Extract filter explicitly to be sure we have right param names for getEvents for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"): filt[s] = conf_filt.get(s, None) while running_flag: - events = wclient.getEvents(**filt) count_ok = count_err = 0 - while events: - for event in events: - if node: - nodelist = event.setdefault("Node", []) - nodelist.insert(0, node) - try: - nf = None - nf = sdir.newfile() - with nf.f as f: - data = json.dumps(event) - f.write(data) - nf.moveto(sdir.incoming) - count_ok += 1 - except Exception as e: - Error(message="Error saving event", exc=sys.exc_info(), file=str(nf), - event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger) - count_err += 1 + limit_reached = False + if file_limit: + cnt_files = sdir.get_incoming_cnt() # Count files in 'incoming' dir + remain_to_limit = file_limit - cnt_files + # Query server, but not for more events than what can fit into limit + if remain_to_limit > 0: + events = wclient.getEvents(count=remain_to_limit, **filt) + else: + events = [] + # Check whether limit was reached + if len(events) >= remain_to_limit: + limit_reached = True + else: + events = wclient.getEvents(**filt) + + for event in events: + if node: + nodelist = event.setdefault("Node", []) + nodelist.insert(0, node) + try: + nf = None + nf = sdir.newfile() + with nf.f as f: + data = json.dumps(event) + f.write(data) + nf.moveto(sdir.incoming) + count_ok += 1 + except Exception as e: + Error(message="Error saving event", exc=sys.exc_info(), file=str(nf), + event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger) + count_err += 1 + + if events: wclient.logger.info( "warden_filer: received %d, errors %d" % (count_ok, count_err)) - events = wclient.getEvents(**filt) - count_ok = count_err = 0 + + if limit_reached: + wclient.logger.info("Limit on number of files in 'incoming' dir reached.") + if oneshot: - if not events: + if not events or limit_reached: terminate_me(None, None) else: - time.sleep(poll_time) + if limit_reached: + time.sleep(wait_time) + elif not events: + time.sleep(poll_time)