Skip to content
Snippets Groups Projects
Commit 27051e87 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

* removed old hairy testing code

* remove random Idea generation
* added Mongo Idea import
* added simplified Mongo Idea to collated strings converter
* ensure binary keys/values
* change pickle to json (along with json_default support)
* simplified set operation
* fixed LMDBIndex reverse iterator
parent 9468e8f1
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import datetime
import random
import pprint
import cProfile
import sys
import lmdb
import MySQLdb
......@@ -15,118 +16,19 @@ from lmdb_index import LMDBIndex
from sql_db import SQLDB, SQLIndex
from sql_db import DummyEnv
# Notes:
# * gen_random_idea is not complete, only single ip addresses
# * gen_random_idea generates lexically comparable ip adresses (zero padded)
# it's simpler for testing purposes
# (in reality the proper conversion machinery would be necessary)
import mongo_idea_import
def __test__():
lmdb_env = lmdb.Environment(
def __main__():
# lmdb
env = lmdb.Environment(
"lmdb",
map_size=1024*1024*1024*1024, # 1TiB
max_dbs=128,
sync=False,
writemap=False)
db = KeyValueDB(
("name", "surname", "group.work", "group.home"),
idxfactory=LMDBIndex, env=lmdb_env)
db.clear()
db.insert(dict(name="Beda", surname="Travnicek", group=dict(work="ceo", home="slave")))
db.insert(dict(name=["Llamar", "Puno"], surname="Popovic", group=dict(work="hr", home="free")))
db.insert(dict(name="Zuno", surname="Popovic", group=[dict(work="hr", home="complicated"), dict(work="pr")]))
db.insert(dict(name="Zuno", surname="Cimbal", group=dict(work="nobody", home="complicated")))
pprint.pprint(db.dump())
pprint.pprint(db.query(
db.and_(
db.eq("group.work", "hr"),
db.eq("group.home", "complicated")
),
"surname"
))
pprint.pprint(db.query(
db.or_(
db.eq("group.work", "hr"),
db.eq("group.home", "complicated")
),
"surname"
))
pprint.pprint(db.query(
db.range("surname", "O", "S"),
"name"
))
def insert_idea(db, num=10):
for i in range(num):
idea = gen_random_idea()
db.insert(idea)
def ip4tolex(ipstr):
return ".".join(("%03i" % int(i) for i in ipstr.split(".")))
def ip6tolex(ipstr):
return ":".join(("%04x" % int(i, 16) for i in ipstr.split(":")))
def gen_random_idea(client_name="cz.example.warden.test"):
def format_timestamp():
return datetime.datetime.now().isoformat() + "+02:00"
def rand4ip():
return "192.000.002.%03i" % random.randint(1, 254)
def rand6ip():
return "2001:0db8:%s" % ":".join("%04x" % random.randint(0, 65535) for i in range(6))
event = {
"Format": "IDEA0",
"ID": str(uuid.uuid4()),
"CreateTime": format_timestamp(),
"DetectTime": format_timestamp(),
"Category": [random.choice(["Abusive.Spam","Abusive.Harassment","Malware","Fraud.Copyright","Test","Fraud.Phishing","Fraud.Scam"]) for dummy in range(random.randint(1, 3))],
"Note": "Random event",
"ConnCount": random.randint(0, 65535),
"Source": [
{
"Type": ["Phishing"],
"IP4": [rand4ip() for i in range(random.randrange(1, 5))],
"IP6": [rand6ip() for i in range(random.randrange(1, 5))],
"Hostname": ["example.com"],
"Port": [random.randint(1, 65535) for i in range(random.randrange(1, 3))],
}
],
"Target": [
{
"IP4": [rand4ip() for i in range(random.randrange(1, 5))],
"IP6": [rand6ip() for i in range(random.randrange(1, 5))],
"Proto": ["tcp", "http"],
}
],
"Node": [
{
"Name": client_name,
"Type": [random.choice(["Data", "Protocol", "Honeypot", "Heuristic", "Log"]) for dummy in range(random.randint(1, 3))],
"SW": ["Kippo"],
}
]
}
return event
def __main__():
# lmdb
#~ env = lmdb.Environment(
#~ "lmdb",
#~ map_size=1024*1024*1024*1024, # 1TiB
#~ max_dbs=128,
#~ sync=False,
#~ writemap=False)
#~ idxfactory = LMDBIndex
#~ dbfactory = KeyValueDB
idxfactory = LMDBIndex
dbfactory = KeyValueDB
#~ # dummy sql
#~ env = DummyEnv()
......@@ -134,45 +36,71 @@ def __main__():
#~ dbfactory = SQLDB
# sql
env = MySQLdb.connect(
host='localhost',
user='root',
passwd='l3nivec',
db='dumbdb')
idxfactory = SQLIndex
dbfactory = SQLDB
#~ env = MySQLdb.connect(
#~ host='localhost',
#~ user='root',
#~ passwd='l3nivec',
#~ db='dumbdb')
#~ idxfactory = SQLIndex
#~ dbfactory = SQLDB
# Random idea
#~ db = dbfactory(
#~ ("ID", "DetectTime", "Category", "Node.Name",
#~ "Source.IP4", "Source.IP6", "Target.IP4", "Target.IP6"),
#~ idxfactory=idxfactory, env=env)
# Idea from Mentat mongoexport
db = dbfactory(
("ID", "DetectTime", "Category", "Node.Name",
"Source.IP4", "Source.IP6", "Target.IP4", "Target.IP6"),
("ID", "DetectTime", "Category", "Node.Name", "Node.Type",
"Source.IP4.min", "Source.IP4.max", "Source.IP4.ip",
"Source.IP6.min", "Source.IP6.max", "Source.IP6.ip",
"Target.IP4.min", "Target.IP4.max", "Target.IP4.ip",
"Target.IP6.min", "Target.IP6.max", "Target.IP6.ip",
"Source.Type", "Target.Type",
"Source.Port", "Target.Port"),
idxfactory=idxfactory, env=env)
#~ db.clear()
#~ return
#~ insert_idea(db, 80000)
#~ env.commit()
#~ pprint.pprint(db.dump())
#~ return
#~ res = db.query(
#~ db.and_(
#~ db.range("Source.IP4", '192.000.002.000', '192.000.002.255'),
#~ db.range("Target.IP4", "192.000.002.000", "192.000.002.255"),
#~ db.eq("Node.Name", "cz.example.warden.test")
#~ ),
#~ "DetectTime"
#~ )
# Import
for i, l in enumerate(mongo_idea_import.get_events(sys.stdin)):
if not i%1000:
print i
sys.stdout.flush()
db.insert(l, mongo_idea_import.json_default)
res = db.query(
db.and_(
db.range("Source.IP4", '192.000.002.128', '192.000.002.255'),
db.range("Target.IP4", "192.000.002.120", "192.000.002.255"),
db.eq("Node.Name", "cz.example.warden.test")
db.range("Target.IP4.ip", '195.113.000.000', '195.113.255.255'),
db.range("Source.IP4.ip", "071.006.165.000", "071.006.165.255"),
db.eq("Node.Name", "cz.cesnet.mentat.warden_filer"),
db.eq("Node.Type", "Relay"),
db.eq("Target.Port", " 22")
),
order="DetectTime",
skip=0,
limit=30
#~ order=None
)
print len(res)
#~ res = db.query(
#~ db.eq("Source.IP4.ip", "071.006.165.200"),
#~ order="DetectTime",
#~ skip=0,
#~ limit=None
#~ )
#~ res = db.query(
#~ db.range("Source.IP4.ip", "000.000.000.000", "255.255.255.255"),
#~ order="DetectTime",
#~ skip=0,
#~ limit=None
#~ )
#~ pprint.pprint(res)
print len(res)
#~ cProfile.run("__main__()", sort="cumulative")
__main__()
......@@ -2,7 +2,7 @@
# -*- encoding: utf-8 -*-
import collections
import cPickle
import json
class Index(object):
......@@ -49,6 +49,15 @@ class DB(object):
rev = True
fwd = True
@staticmethod
def binarize_str(s):
if isinstance(s, unicode):
return s.encode("utf-8")
else:
return s
def __init__(self, indices, idxfactory, rev=True, fwd=True, *args, **kwargs):
self.indices = indices
self.env = kwargs.get("env")
......@@ -64,15 +73,16 @@ class DB(object):
self.data = idxfactory("__data__", dup=False, *args, **kwargs)
def insert(self, data):
uniq = self.data.insert(None, cPickle.dumps(data))[0]
def insert(self, data, json_default=None):
uniq = self.data.insert(None, json.dumps(data, ensure_ascii = True, default = json_default))[0]
for key in self.indices:
values = self.get_value(data, key.split("."))
for value in values:
bin_value = self.binarize_str(value)
if self.rev:
self.revkeys[key].insert(value, uniq)
self.revkeys[key].insert(bin_value, uniq)
if self.fwd:
self.fwdkeys[key].insert(uniq, value)
self.fwdkeys[key].insert(uniq, bin_value)
def and_(self, *q):
......
#!/usr/bin/python
# -*- encoding: utf-8 -*-
import cPickle
import json
from dumb_db import DB
class KeyValueDB(DB):
# This is very naïve implementation. Users would benefit
# from exposing of underlying transaction api in some form.
rev = True
fwd = True
def and_(self, *q): # FIXME - intersection knows more
res = q[0]
for q in q[1:]:
res &= q
return res
def and_(self, *q):
return set.intersection(*q)
def or_(self, *q):
res = q[0]
for q in q[1:]:
res |= q
return res
return set.union(*q)
def query(self, q, order=None, reverse=False, skip=0, limit=1):
# There is a bottleneck in sorting = query_eq is a python method,
# not C optimization. Maybe we could somehow draw out txn.get, maybe
# through functools.partial (implemented in C)
if order is not None:
res = sorted(q, key=self.fwdkeys[order].query_eq, reverse=reverse)
else:
......@@ -38,5 +38,9 @@ class KeyValueDB(DB):
# Tough call, but let's assume big skip and small limit
# is more common.
res = res[skip:skip+limit]
return [cPickle.loads(self.data.query_eq(v)) for v in res]
# Here some form of cursor api would be appropriate - cursor would
# contain list of resulting IDs for free skipping and limiting
# while fetching only actualy read data.
return [json.loads(self.data.query_eq(v)) for v in res]
#return [self.data.query_eq(v) for v in res]
......@@ -8,6 +8,7 @@ from dumb_db import Index
class LMDBIndex(Index):
def __init__(self, name, env, dup=False):
Index.__init__(self, name, env)
self.dup = dup
......@@ -47,16 +48,34 @@ class LMDBIndex(Index):
def query_le(self, key):
# Reverse reading from underlaying media may have limits
# Another implementation could be to start from the very first
# item and iterate until key is reached. However for problems
# with this approach see comments in query_range.
with self.env.begin(buffers=False) as txn:
with txn.cursor(db=self.handle) as crs:
crs.set_range(key)
it = crs.iterprev(keys=False, values=True)
it.next()
try:
next(it)
except StopIteration:
return set()
return set(it)
def query_range(self, key1, key2):
# Not quite correct
# Not quite correct, may return events which contain
# one IP address greater than both keys and second IP
# address lower than both keys
# Possible correct implementations:
# * fetch and intersect keys, not values, then get ids for resulting keys
# * get query_ge iterator for key1, then fetch keys until key2 is
# reached.
# Problem is how to implement fast comparison with key2 without
# sacrificing iter->set C speed.
# Maybe operator.lt/gt (C) and itertools.takewhile or
# itertools.ifilter (C)?
return self.query_ge(key1) & self.query_le(key2)
......
#!/usr/bin/python
# -*- encoding: utf-8 -*-
import sys
import os
import base64
# Ok, not cool, but fine for testing
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "idea", "lib_python", "lib"))
import idea
from idea import base
from idea.base import unicode
import typedcol
import uuid
import re
import time
import struct
import datetime
import iprange
import json
from idea.lite import Version, MediaType, Charset, Encoding, Handle, ID
NTP_TO_EPOCH_DELTA = 2208988800
Version = MediaType = Charset = Encoding = Handle = ID = Duration = URI = \
NSID = MAC = Port = Netname = Hash = EventTag = ProtocolName = \
ConfidenceFloat = SourceTargetTag = lambda x: x
def from_binary(b):
return base64.b64decode(b["$binary"])
def Timestamp(t):
sbin = from_binary(t)
high, low = struct.unpack(">II", sbin)
stamp = high - NTP_TO_EPOCH_DELTA + ((float)(low&0xffffffff))/pow(2,32)
return datetime.datetime.utcfromtimestamp(stamp).isoformat() + "Z"
def ip4_mongo_to_collate_str(ip):
sbin = from_binary(ip)
ints = struct.unpack(">4B", sbin)
return ".".join(("%03i" % i for i in ints))
def ip6_mongo_to_collate_str(ip):
sbin = from_binary(ip)
ints = struct.unpack(">8H", sbin)
return ":".join(("%04x" % i for i in ints))
def Net4(ip):
res = {}
try:
res["min"] = ip4_mongo_to_collate_str(ip["min"])
except KeyError:
pass
try:
res["max"] = ip4_mongo_to_collate_str(ip["max"])
except KeyError:
pass
try:
res["ip"] = ip4_mongo_to_collate_str(ip["ip"])
except KeyError:
pass
if "ip" in res:
res["max"] = res["min"] = res["ip"]
elif "min" in res and not "max" in res:
res["ip"] = res["max"] = res["min"]
elif "max" in res and not "min" in res:
res["ip"] = res["min"] = res["max"]
else:
raise ValueError("Totally wrong IP4 address")
return res
def Net6(ip):
res = {}
try:
res["min"] = ip6_mongo_to_collate_str(ip["min"])
except Exception:
pass
try:
res["max"] = ip6_mongo_to_collate_str(ip["max"])
except Exception:
pass
try:
res["ip"] = ip6_mongo_to_collate_str(ip["ip"])
except Exception:
pass
if "ip" in res:
res["max"] = res["min"] = res["ip"]
elif "min" in res and not "max" in res:
res["ip"] = res["max"] = res["min"]
elif "max" in res and not "min" in res:
res["ip"] = res["min"] = res["max"]
else:
raise ValueError("Totally wrong IP6 address")
return res
def Port(s):
return "%5i" % int(s)
def SourceTargetTag(s):
if base.tag_re.match(s) is None:
raise TypeError("Wrong Type")
return s
NodeTag = SourceTargetTag
AttachmentTag = SourceTargetTag
idea_types = {
"Boolean": bool,
"Integer": int,
"String": unicode,
"Binary": str,
"ConfidenceFloat": float,
"Version": Version,
"MediaType": MediaType,
"Charset": Charset,
"Encoding": Encoding,
"Handle": Handle,
"ID": ID,
"Timestamp": Timestamp,
"Duration": Duration,
"URI": URI,
"Net4": Net4,
"Net6": Net6,
"Port": Port,
"NSID": NSID,
"MAC": MAC,
"Netname": Netname,
"Hash": Hash,
"EventTag": EventTag,
"ProtocolName": ProtocolName,
"SourceTargetTag": SourceTargetTag,
"NodeTag": NodeTag,
"AttachmentTag": AttachmentTag
}
idea_defaults = {
"Format": "IDEA0",
"ID": lambda: uuid.uuid4()
}
idea_lists = base.list_types(idea_types)
class SourceTargetDict(typedcol.TypedDict):
allow_unknown = True
typedef = base.source_target_dict_typedef(idea_types, idea_lists)
class AttachDict(typedcol.TypedDict):
allow_unknown = True
typedef = base.attach_dict_typedef(idea_types, idea_lists)
class NodeDict(typedcol.TypedDict):
allow_unknown = True
typedef = base.node_dict_typedef(idea_types, idea_lists)
class Idea(base.IdeaBase):
typedef = base.idea_typedef(
idea_types,
idea_lists,
idea_defaults,
SourceTargetDict,
AttachDict,
NodeDict)
def get_events(f):
for l in f:
data = json.loads(l)
try:
idea = Idea(data)
except Exception as exc:
print "Wrong Idea"
print l
print exc
continue
try:
del idea["msg_raw2"]
idea["ts"] = Timestamp(idea["ts"])
except Exception:
pass
#~ print idea
yield idea
json_default = idea.lite.json_default
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment