Skip to content
Snippets Groups Projects
warden_client_test.py 4.92 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2013 Cesnet z.s.p.o
# Use of this source is governed by a 3-clause BSD-style license, see LICENSE file.

from warden_client import Client, Error, read_cfg
import json
import string
from time import time, gmtime
from math import trunc
from uuid import uuid4
from pprint import pprint
from os import path
from random import randint, randrange, choice, random;
from base64 import b64encode;
def get_precise_timestamp():
    t = time()
    us = trunc((t-trunc(t))*1000000)
    g = gmtime(t)
    iso = '%04d-%02d-%02dT%02d:%02d:%02d.%0dZ' % (g[0:6]+(us,))
    return iso
def gen_min_idea():

    return {
       "Format": "IDEA0",
       "ID": str(uuid4()),
       "DetectTime": get_precise_timestamp(),
       "Category": ["Test"],
    }

def gen_random_idea():

    def geniprange(gen):

        def iprange():
            u = v = 0
            while u==v:
                u, v = gen(), gen()
            u, v = min(u, v), max(u, v)
            return "%s-%s" % (u, v)

        return iprange

    def rand4ip():
        return "%s%d" % ('192.0.2.', randint(1, 254))

    def rand4cidr():
        return "%s%d/%d" % ('192.0.2.', randint(1, 254), randint(24, 31))

    def randip4():
        return [rand4ip, geniprange(rand4ip), rand4cidr][randint(0, 2)]()

    def rand6ip():
        return "2001:DB8:%s" % ":".join("%x" % randint(0, 65535) for i in range(6))

    def rand6cidr():
        m = randint(0, 5)
        return "2001:DB8%s%s::/%d" % (":" if m else "", ":".join("%x" % randint(0, 65535) for i in range(m)), (m+2)*16)

    def randip6():
        return [rand6ip, geniprange(rand6ip), rand6cidr][randint(0, 2)]()

    def randstr(charlist=string.letters, maxlen=32, minlen=1):
        return ''.join(choice(charlist) for i in range(randint(minlen, maxlen)))

    event = {
       "Format": "IDEA0",
       "ID": str(uuid4()),
       "CreateTime": get_precise_timestamp(),
       "DetectTime": get_precise_timestamp(),
       "WinStartTime": get_precise_timestamp(),
       "WinEndTime": get_precise_timestamp(),
       "EventTime": get_precise_timestamp(),
       "CeaseTime": get_precise_timestamp(),
       "Category": ["Test"],
       "Ref": ["cve:CVE-%s-%s" % (randstr(string.digits, 4), randstr()), "http://www.example.com/%s" % randstr()],
       "Confidence": random(),
       "Note": "Random event",
       "ConnCount": randint(0, 65535),
       "Source": [
          {
             "Type": ["Phishing"],
             "IP4": [randip4() for i in range(randrange(1, 5))],
             "IP6": [randip6() for i in range(randrange(1, 5))],
             "Hostname": ["example.com"],
             "Port": [randint(1, 65535) for i in range(randrange(1, 3))],
             "AttachHand": ["att1"],
             "Netname": ["arin:TEST-NET-1"]
          }
       ],
       "Target": [
          {
             "IP4": [randip4() for i in range(randrange(1, 5))],
             "IP6": [randip6() for i in range(randrange(1, 5))],
             "URL": ["http://example.com/%s" % randstr()],
             "Proto": ["tcp", "http"],
             "Netname": ["arin:TEST-NET-1"]
          }
       ],
       "Attach": [
          {
             "Handle": "att1",
             "FileName": [randstr()],
             "Type": ["Malware"],
             "ContentType": "application/octet-stream",
             "Hash": ["sha1:%s" % randstr(string.hexdigits, 24)],
             "Size": 46,
             "Ref": ["cve:CVE-%s-%s" % (randstr(string.digits, 4), randstr())],
             "ContentEncoding": "base64",
             "Content": b64encode(randstr())
          }
       ],
       "Node": [
          {
             "Name": "com.example.test-node",
             "Tags": ["Protocol", "Honeypot"],
             "SW": ["Kippo"],
             "AggrWin": "00:05:00"
          }
       ]
    }

    return event


def main():
    wclient = Client(**read_cfg("warden_client.cfg"))
    # Also inline arguments are possible:
    # wclient = Client(
    #     url  = 'https://warden.example.com/warden3',
    #     keyfile  = '/opt/warden3/etc/key.pem',
    #     certfile = '/opt/warden3/etc/cert.pem',
    #     cafile = '/opt/warden3/etc/tcs-ca-bundle.pem',
    #     timeout=10,
    #     errlog={"level": "debug"},
    #     filelog={"level": "debug"},
    #     idstore="MyClient.id",
    #     name="MyClient")

    print "=== Getting 10 events ==="
    start = time()
    ret = wclient.getEvents(count=10)
    print "Time: %f" % (time()-start)
    for e in ret:
        print e
    if ret:
        print len(ret)

    print "=== Sending 500 events ==="
    start = time()
    ret = wclient.sendEvents([gen_random_idea() for i in range(500)])
    if ret:
        print ret
    print "Time: %f" % (time()-start)

    print "=== Server info ==="
    info = wclient.getInfo()
    if not isinstance(info, Error):
        pprint(info)

Pavel Kácha's avatar
Pavel Kácha committed
    print "=== Debug ==="
    info = wclient.getDebug()
    if not isinstance(info, Error):
        pprint(info)


if __name__ == "__main__":
    main()