Skip to content
Snippets Groups Projects
Commit 34cb2199 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Added warden_filer daemon for sending and receiving events to/from

plain files in directory
parent d7923419
No related branches found
No related tags found
No related merge requests found
BSD License
Copyright © 2011-2015 Cesnet z.s.p.o
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the Cesnet z.s.p.o nor the names of its
contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE Cesnet z.s.p.o BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+---------------------------------+
| Warden Filer 0.1 for Warden 3.X |
+---------------------------------+
Content
A. Introduction
B. Dependencies
C. Usage
D. Configuration
E. Directories and locking issues
------------------------------------------------------------------------------
A. Introduction
Warden Filer (executable warden_filer.py) is daemon for easy handling of
Idea events transfer between plain local files and Warden server. The tool can
be instructed to run as one of two daemons - reader and sender.
In reader mode, Filer polls Warden server and saves incoming events as
plain files in directory.
In writer mode, Filer polls directory and sends out all new files out to
Warden server.
------------------------------------------------------------------------------
B. Dependencies
1. Platform
Python 2.7+
2. Python packages
python-daemon 1.5+, warden_client 3.0+
------------------------------------------------------------------------------
C. Usage
warden_filer.py [-h] [-c CONFIG] [--oneshot] {sender,receiver}
Save Warden events as files or send files to Warden
positional arguments:
{sender,receiver} choose direction: sender picks up files and submits
them to Warden, receiver pulls events from Warden
and saves them as files
optional arguments:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
configuration file path
--oneshot don't daemonise, run just once
CONFIG denotes path to configuration file, default is warden_filer.cfg in
current directory.
--oneshot prevents daemonizing, Filer just does its work once (fetches
available events or sends event files present in directory), but obeys
all other applicable options from configuration file (concerning logging,
filtering, directories, etc.)
Without --oneshot Filer goes to full unix daemon mode.
------------------------------------------------------------------------------
D. Configuration
Configuration is JSON object in file - however, lines starting with "#"
or "//" are allowed and will be ignored as comments. File must contain valid
JSON object, containing configuration. See also warden_filer.cfg as example.
warden - can contain Warden 3 configuration (see Warden doc), or path
to Warden configuration file
sender - configuration section for sender mode
dir - directory, whose "incoming" subdir will be checked for Idea
events to send out
node - o information about detector to be prepended into event Node
array (see Idea doc)
receiver - configuration section for receiver mode
dir - directory, whose "incoming" subdir will serve as target for events
filter - filter fields for Warden query (see Warden and Idea doc,
possible keys: cat, nocat, group, nogroup, tag, notag)
------------------------------------------------------------------------------
E. Directories and locking issues
Working directories are not just simple paths, but contain structure,
loosely mimicked from Maildir with slightly changed names to avoid first look
confusion. Simple path suffers locking issue: when one process saves file
there, another process has no way to know whether file is already complete
or not, and starting to read prematurely can lead to corrupted data read.
Also, two concurrent processes may decide to work on one file, stomping on
others legs. So, your scripts and tools inserting data or taking data from
working directories must obey simple protocols.
1. Inserting file
* Use "temp" subdirectory to create new file; filename is arbitrary, but
must be unique among all subdirectories.
* When done writing, rename the file into "incoming" subdir. Rename is
atomic operation, so for readers, file will appear either nonexistent
or complete.
For simple usage (bash scripts, etc.), just creating sufficiently random
filename in "temp" and then moving into "incoming" may be enough.
Concatenating $RANDOM couple of times will do. :)
For advanced or potentially concurrent usage inserting enough of unique
information into name is recommended - Filer itself uses hostname, pid,
unixtime, milliseconds, device number and file inode number to avoid
locking issues both on local and network based filesystems and to be
prepared for hight traffic.
2. Picking up file
* Rename the file to work with into "temp" directory.
* Do whatever you want with contents, and when finished, rename file back
into "incoming", or remove, or move somewhere else, or move into "errors"
directory, after all, it's your file.
Note that in concurrent environment file can disappear between directory
enumeration and attempt to rename - then just pick another one (and
repeat), someone was swifter.)
------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o
{
// Warden config can be also referenced as:
// "warden": "/path/to/warden_client.cfg"
"warden": {
"url": "https://example.com/warden3",
"cafile": "tcs-ca-bundle.pem",
"timeout": 10,
"errlog": {"level": "debug"},
"filelog": {"level": "debug"},
"idstore": "myclient.id",
"name": "com.example.warden.test",
"secret": "SeCrEt"
},
"sender": {
// Maildir like directory, whose "incoming" subdir will be checked
// for Idea events to send out
"dir": "warden_sender",
// Optional information about detector to be prepended into Idea Node array
"node": {
"Name": "cz.example.warden.test",
"Type": ["External"],
"SW": ["warden_filer"],
"AggrWin": "00:05:00",
"Note": "Test warden_filer sender"
}
},
"receiver": {
// Maildir like directory, whose "incoming" will serve as target for events
"dir": "warden_receiver",
// Optional filter fields for Warden query
"filter": {
"cat": ["Test", "Recon.Scanning"],
"nocat": null,
"group": ["cz.cesnet"],
"nogroup": null,
"tag": null,
"notag": ["Honeypot"]
}
}
}
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2015 Cesnet z.s.p.o
# Use of this source is governed by a 3-clause BSD-style license, see LICENSE file.
from warden_client import Client, Error, read_cfg
import json
import string
import os
import sys
import errno
import socket
import time
import logging
import signal
import lockfile
import argparse
from os import path, mkdir
from random import choice, randint;
from daemon import DaemonContext
from daemon.pidlockfile import TimeoutPIDLockFile
class NamedFile(object):
""" Wrapper class for file objects, which allows and tracks filename
changes.
"""
def __init__(self, pth, name, fd=None):
self.name = name
self.path = pth
if fd:
self.f = os.fdopen(fd, "w+b")
else:
self.f = None
def __str__(self):
return "%s(%s, %s)" % (type(self).__name__, self.path, self.name)
def get_path(self, basepath=None, name=None):
return path.join(basepath or self.path, name or self.name)
def open(self, mode):
return open(self.get_path(), mode)
def moveto(self, destpath):
os.rename(self.get_path(), self.get_path(basepath=destpath))
self.path = destpath
def rename(self, newname):
os.rename(self.get_path(), self.get_path(name=newname))
self.name = newname
def remove(self):
os.remove(self.get_path())
class SafeDir(object):
""" Maildir like directory for safe file exchange.
- Producers are expected to drop files into "temp" under globally unique
filename and rename it into "incoming" atomically (newfile method)
- Workers pick files in "incoming", rename them into "temp",
do whatever they want, and either discard them or move into
"errors" directory
"""
def __init__(self, p):
self.path = self._ensure_path(p)
self.incoming = self._ensure_path(path.join(self.path, "incoming"))
self.errors = self._ensure_path(path.join(self.path, "errors"))
self.temp = self._ensure_path(path.join(self.path, "temp"))
self.hostname = socket.gethostname()
self.pid = os.getpid()
def __str__(self):
return "%s(%s)" % (type(self).__name__, self.path)
def _ensure_path(self, p):
try:
mkdir(p)
except OSError:
if not path.isdir(p):
raise
return p
def _get_new_name(self, device=0, inode=0):
return "%s.%d.%f.%d.%d" % (
self.hostname, self.pid, time.time(), device, inode)
def newfile(self):
""" Creates file with unique filename within this SafeDir.
- hostname takes care of network filesystems
- pid distinguishes two daemons on one machine
(we are not multithreaded, so this is enough)
- time in best precision supported narrows window within process
- device/inode makes file unique on particular filesystem
In fact, device/inode is itself enough for uniqueness, however
if we mandate wider format, users can use simpler form with
random numbers instead of device/inode, if they choose to,
and it will still ensure reasonable uniqueness.
"""
# Note: this simpler device/inode algorithm replaces original,
# which checked uniqueness among all directories by atomic
# links.
# First find and open name unique within temp
tmpname = None
while not tmpname:
tmpname = self._get_new_name()
try:
fd = os.open(path.join(self.temp, tmpname), os.O_CREAT | os.O_RDWR | os.O_EXCL)
except OSError as e:
if e.errno != errno.EEXIST:
raise # other errors than duplicates should get noticed
tmpname = None
# Now we know device/inode, rename to make unique within system
stat = os.fstat(fd)
newname = self._get_new_name(stat.st_dev, stat.st_ino)
nf = NamedFile(self.temp, tmpname, fd)
nf.rename(newname)
return nf
def get_incoming(self):
return [NamedFile(self.incoming, n) for n in os.listdir(self.incoming)]
def receiver(config, wclient, sdir, oneshot):
poll_time = config.get("poll_time", 5)
conf_filt = config.get("filter", {})
filt = {}
# Extract filter explicitly to be sure we have right param names for getEvents
for s in ("cat", "nocat", "tag", "notag", "group", "nogroup"):
filt[s] = conf_filt.get(s, None)
while running_flag:
events = wclient.getEvents(**filt)
count_ok = count_err = 0
while events:
for event in events:
try:
nf = None
nf = sdir.newfile()
with nf.f as f:
data = json.dumps(event)
f.write(data)
nf.moveto(sdir.incoming)
count_ok += 1
except Exception as e:
Error("Error saving event", wclient.logger, exc=sys.exc_info(),
detail={"file": str(nf), "event_id": event.get("ID"), "sdir": sdir.path})
count_err += 1
wclient.logger.info(
"warden_filer: received %d, errors %d"
% (count_ok, count_err))
if oneshot:
events = None
else:
events = wclient.getEvents(**filt)
if oneshot:
terminate_me(None, None)
else:
time.sleep(poll_time)
def sender(config, wclient, sdir, oneshot):
send_events_limit = config.get("send_events_limit", 500)
poll_time = config.get("poll_time", 5)
node = config.get("node", None)
while running_flag:
nflist = sdir.get_incoming()
if oneshot:
terminate_me(None, None)
while running_flag and not nflist:
time.sleep(poll_time)
nflist = sdir.get_incoming()
# count chunk iterations rounded up
count = len(nflist)
for i in range(0, count, send_events_limit):
# process one at most send_events_limit long chunk
events = []
nf_sent = []
for j in range(i, min(i+send_events_limit, count)):
nf = nflist[j]
# prepare event array from files
try:
nf.moveto(sdir.temp)
except Exception:
pass # Silently go to next filename, somebody else might have interfered
try:
with nf.open("rb") as fd:
data = fd.read()
event = json.loads(data)
if node:
nodelist = event.setdefault("Node", [])
nodelist.insert(0, node)
events.append(event)
nf_sent.append(nf)
except Exception as e:
Error("Error loading event", wclient.logger, exc=sys.exc_info(),
detail={"file": str(nf), "sdir": sdir.path})
nf.moveto(sdir.errors)
res = wclient.sendEvents(events)
count_ok = count_err = count_retry = 0
if isinstance(res, Error):
try:
errs = res.detail["errors"]
except (KeyError, AttributeError, TypeError):
errs = None
if errs:
# Event errors - move bad events into "errors"
for e in errs.iterkeys():
try:
idx = int(e)
except ValueError:
continue
nf_sent[idx].moveto(sdir.errors)
nf_sent[idx] = None
count_err += 1
else:
# Global errors - move all events back to "incoming" for attempt in next round
for idx in range(len(nf_sent)):
nf_sent[idx].moveto(sdir.incoming)
nf_sent[idx] = None
count_retry += 1
# Cleanup rest - succesfully sent events
for name in nf_sent:
if name:
name.remove()
count_ok += 1
wclient.logger.info(
"warden_filer: saved %d, errors %d, retreated %d"
% (count_ok, count_err, count_retry))
def get_logger_files(logger):
""" Return file objects of loggers """
files = []
for handler in logger.handlers:
if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
files.append(handler.stream)
if hasattr(handler, 'socket') and hasattr(handler.socket, 'fileno'):
files.append(handler.socket)
return files
running_flag = True # Daemon cleanly exits when set to False
def terminate_me(signum, frame):
global running_flag
running_flag = False
class DummyContext(object):
""" In one shot mode we use this instead of DaemonContext """
def __enter__(self): pass
def __exit__(self, *exc): pass
def get_args():
argp = argparse.ArgumentParser(
description="Save Warden events as files or send files to Warden")
argp.add_argument("func",
choices=["sender", "receiver"],
action="store",
help="choose direction: sender picks up files and submits them to "
"Warden, receiver pulls events from Warden and saves them as files")
argp.add_argument("-c", "--config",
default=path.splitext(__file__)[0]+".cfg",
dest="config",
help="configuration file path")
argp.add_argument('--oneshot',
default=False,
dest="oneshot",
action="store_true",
help="don't daemonise, run just once")
return argp.parse_args()
def get_configs():
config = read_cfg(args.config)
# Allow inline or external Warden config
wconfig = config.get("warden", "warden_client.cfg")
if isinstance(wconfig, str):
wconfig = read_cfg(wconfig)
fconfig = config.get(args.func, {})
return wconfig, fconfig
if __name__ == "__main__":
args = get_args()
function = sender if args.func=="sender" else receiver
wconfig, fconfig = get_configs()
oneshot = args.oneshot
safe_dir = SafeDir(fconfig.get("dir", args.func))
wclient = Client(**wconfig)
if oneshot:
daemon = DummyContext()
else:
work_dir = fconfig.get("work_dir", ".")
chroot_dir = fconfig.get("chroot_dir")
umask = fconfig.get("umask", 0)
pid_file = fconfig.get("pid_file", "/var/run/warden_filer.pid")
uid = fconfig.get("uid")
gid = fconfig.get("gid")
daemon = DaemonContext(
working_directory = work_dir,
chroot_directory = chroot_dir,
umask = umask,
pidfile = TimeoutPIDLockFile(pid_file, acquire_timeout=0),
uid = uid,
gid = gid,
files_preserve = get_logger_files(wclient.logger),
signal_map = {
signal.SIGTERM: terminate_me,
signal.SIGINT: terminate_me,
signal.SIGHUP: None
}
)
try:
with daemon:
wclient.logger.info("Starting %s" % args.func)
function(fconfig, wclient, safe_dir, oneshot)
wclient.logger.info("Exiting %s" % args.func)
except lockfile.Error as e:
wclient.logger.critical("Error acquiring lockfile %s (%s)"
% (daemon.pidfile.lock_file, type(e).__name__))
except Exception:
wclient.logger.critical("%s daemon error" % args.func, exc_info=sys.exc_info())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment