Select Git revision
datatest.py
-
ph authored
* added various corner case queries * abstract methods now raise correct NotImplementedError * added zlib compression of __data__ * added support fort returning iterators, along with keeping relevant cursors around * limited sort test (heapsort stop after skip+limit) - python implementation too slow * in-memory dict index sort attempt (max 2 seconds difference to lmdb index) * pandas.Index sorted set attempt, however necessary set order brings too much data into memory sets and defeats sorted set benefit * walk-sort - do not use real sort, but walk the _whole_ reverse index for sorted column and allow only for result set values to go through walking the whole index is much faster than sort for big result sets, while negligible in impact to small queries! * preread_all possibility to load all result sets to memory for profiling * query range attempt with python lambda stop condition, surprisingly fast * two pass range - first takewhile over keys counts results, second islice takes counted number of values
ph authored* added various corner case queries * abstract methods now raise correct NotImplementedError * added zlib compression of __data__ * added support fort returning iterators, along with keeping relevant cursors around * limited sort test (heapsort stop after skip+limit) - python implementation too slow * in-memory dict index sort attempt (max 2 seconds difference to lmdb index) * pandas.Index sorted set attempt, however necessary set order brings too much data into memory sets and defeats sorted set benefit * walk-sort - do not use real sort, but walk the _whole_ reverse index for sorted column and allow only for result set values to go through walking the whole index is much faster than sort for big result sets, while negligible in impact to small queries! * preread_all possibility to load all result sets to memory for profiling * query range attempt with python lambda stop condition, surprisingly fast * two pass range - first takewhile over keys counts results, second islice takes counted number of values
lmdb_index.py 5.52 KiB
#!/usr/bin/python
# -*- encoding: utf-8 -*-
import lmdb
import random
import functools
import operator
import itertools
import struct
from dumb_db import Index
from keyvalue_db import SubqueryTuple
def preread_func(x):
res = set(x)
print "len %i" % len(res)
return res
preread_all = lambda x: x
#preread_all = preread_func
class LMDBIndex(Index):
def __init__(self, name, env, dup=False):
Index.__init__(self, name, env)
self.dup = dup
self.handle = self.env.open_db(self.name, dupsort=self.dup)
@staticmethod
def int32_to_bin(i):
return "%08x" % i
@staticmethod
def bin_to_int32(b):
return int(b, 16)
def insert_orig(self, txn, key=None, value=None):
if key is None:
key = "%016x" % random.randint(0, 2**64-1) # may not be safe enough
txn.put(key, value, db=self.handle, dupdata=self.dup, overwrite=True)
return key, value
def insert(self, txn, key=None, value=None):
if key is None:
max_key = self.query_max_key(txn)
int_key = self.bin_to_int32(max_key) + 1
key = self.int32_to_bin(int_key)
# Lmdb allows for only one writable transaction, so there should be no race.
txn.put(key, value, db=self.handle, dupdata=self.dup, overwrite=self.dup)
return key, value
def query_max_key(self, txn):
with txn.cursor(db=self.handle) as crs:
if crs.last():
return crs.key()
else:
return self.int32_to_bin(0)
# Note that we return iterator in hope that up in the stack the
# fast C implementation of lmdb cursor will connect directly
# to the fast C implementation of the Python set(), without any
# interpreted code intervention.
def query_eq_all(self, txn, key):
crs = txn.cursor(db=self.handle)
print "eq_all"
crs.set_key(key)
return SubqueryTuple(
set([crs]),
preread_all(crs.iternext_dup(keys=False, values=True)),
)
def query_eq(self, txn, key):
return txn.get(key, db=self.handle)
def get_key_func(self, txn):
return functools.partial(txn.get, db=self.handle)
def query_ge(self, txn, key):
crs = txn.cursor(db=self.handle)
print "ge"
crs.set_range(key)
return SubqueryTuple(
set([crs]),
preread_all(crs.iternext(keys=False, values=True)),
)
def query_le(self, txn, key):
# Reverse reading from underlying media may have limits.
crs = txn.cursor(db=self.handle)
print "le"
crs.set_range(key)
it = crs.iterprev(keys=False, values=True)
try:
next(it)
except StopIteration:
it = iter(())
return SubqueryTuple(
set([crs]),
preread_all(it),
)
def query_range_orig(self, txn, key1, key2):
crs = txn.cursor(db=self.handle)
print "range"
# Set lmdb cursor to lower range
crs.set_range(key1)
# Get iterator for _both_ keys and values
# Keys are what is filtered on, values are event id's
lmdb_iter = crs.iternext(keys=True, values=True)
# Iterator reader, which stops when upper bound is reached
# Note that here is Python lambda, which I don't know how
# to replace with faster implementation. :(
stop_iter = itertools.takewhile(lambda x: x[0]<=key2, lmdb_iter)
# Now return only id iterator
res_iter = itertools.imap(operator.itemgetter(1), stop_iter)
return SubqueryTuple(
set([crs]),
preread_all(res_iter),
)
def query_range(self, txn, key1, key2):
# Create possibly C wrapped C implementation of comparison
comparator = functools.partial(operator.ge, key2)
crs = txn.cursor(db=self.handle)
print "range"
# Set lmdb cursor to lower range
crs.set_range(key1)
# Get iterator for values
# Keys are what is filtered on, values are event id's
lmdb_iter = crs.iternext(keys=True, values=False)
# Iterate while comparator returns True
keys = itertools.takewhile(comparator, lmdb_iter)
# Count length
length = len(list(keys))
# Set cursor back to lower range
crs.set_range(key1)
# Get iterator for keys
lmdb_iter = crs.iternext(keys=False, values=True)
# Just fetch 'length' values
res_iter = itertools.islice(lmdb_iter, 0, length)
return SubqueryTuple(
set([crs]),
preread_all(res_iter),
)
def itervalues(self, txn):
crs = txn.cursor(db=self.handle)
print "all vals"
lmdb_iter = crs.iternext(keys=False, values=True)
return SubqueryTuple(
set([crs]),
preread_all(lmdb_iter),
)
def iteritems(self, txn):
crs = txn.cursor(db=self.handle)
print "all items"
lmdb_iter = crs.iternext(keys=True, values=True)
return SubqueryTuple(
set([crs]),
preread_all(lmdb_iter),
)
def clear(self, txn):
txn.drop(db=self.handle, delete=False)
def dump(self, txn):
res = {}
with txn.cursor(db=self.handle) as crs:
crs.first()
for key, value in crs.iternext(keys=True, values=True):
if self.dup:
res.setdefault(key, []).append(value)
else:
res[key] = value
return {self.name: res}