Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

datatest.py

Blame
    • ph's avatar
      b719a75a
      * added crude transaction support · b719a75a
      ph authored
      * added various corner case queries
      * abstract methods now raise correct NotImplementedError
      * added zlib compression of __data__
      * added support fort returning iterators, along with keeping relevant
        cursors around
      * limited sort test (heapsort stop after skip+limit) - python implementation too slow
      * in-memory dict index sort attempt (max 2 seconds difference to lmdb index)
      * pandas.Index sorted set attempt, however necessary set order brings too
        much data into memory sets and defeats sorted set benefit
      * walk-sort - do not use real sort, but walk the _whole_ reverse index for
        sorted column and allow only for result set values to go through
        walking the whole index is much faster than sort for big result sets,
        while negligible in impact to small queries!
      * preread_all possibility to load all result sets to memory for profiling
      * query range attempt with python lambda stop condition, surprisingly fast
      * two pass range - first takewhile over keys counts results, second islice
        takes counted number of values
      b719a75a
      History
      * added crude transaction support
      ph authored
      * added various corner case queries
      * abstract methods now raise correct NotImplementedError
      * added zlib compression of __data__
      * added support fort returning iterators, along with keeping relevant
        cursors around
      * limited sort test (heapsort stop after skip+limit) - python implementation too slow
      * in-memory dict index sort attempt (max 2 seconds difference to lmdb index)
      * pandas.Index sorted set attempt, however necessary set order brings too
        much data into memory sets and defeats sorted set benefit
      * walk-sort - do not use real sort, but walk the _whole_ reverse index for
        sorted column and allow only for result set values to go through
        walking the whole index is much faster than sort for big result sets,
        while negligible in impact to small queries!
      * preread_all possibility to load all result sets to memory for profiling
      * query range attempt with python lambda stop condition, surprisingly fast
      * two pass range - first takewhile over keys counts results, second islice
        takes counted number of values
    lmdb_index.py 5.52 KiB
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-
    
    import lmdb
    import random
    import functools
    import operator
    import itertools
    import struct
    
    from dumb_db import Index
    from keyvalue_db import SubqueryTuple
    
    def preread_func(x):
        res = set(x)
        print "len %i" % len(res)
        return res
    
    preread_all = lambda x: x
    #preread_all = preread_func
    
    class LMDBIndex(Index):
    
    
        def __init__(self, name, env, dup=False):
            Index.__init__(self, name, env)
            self.dup = dup
            self.handle = self.env.open_db(self.name, dupsort=self.dup)
    
    
        @staticmethod
        def int32_to_bin(i):
            return "%08x" % i
    
    
        @staticmethod
        def bin_to_int32(b):
            return int(b, 16)
    
    
        def insert_orig(self, txn, key=None, value=None):
            if key is None:
                key = "%016x" % random.randint(0, 2**64-1)  # may not be safe enough
            txn.put(key, value, db=self.handle, dupdata=self.dup, overwrite=True)
            return key, value
    
    
        def insert(self, txn, key=None, value=None):
            if key is None:
                max_key = self.query_max_key(txn)
                int_key = self.bin_to_int32(max_key) + 1
                key = self.int32_to_bin(int_key)
            # Lmdb allows for only one writable transaction, so there should be no race.
            txn.put(key, value, db=self.handle, dupdata=self.dup, overwrite=self.dup)
            return key, value
    
    
        def query_max_key(self, txn):
            with txn.cursor(db=self.handle) as crs:
                if crs.last():
                    return crs.key()
                else:
                    return self.int32_to_bin(0)
    
    
        # Note that we return iterator in hope that up in the stack the
        # fast C implementation of lmdb cursor will connect directly
        # to the fast C implementation of the Python set(), without any
        # interpreted code intervention.
    
        def query_eq_all(self, txn, key):
            crs = txn.cursor(db=self.handle)
            print "eq_all"
            crs.set_key(key)
            return SubqueryTuple(
                set([crs]),
                preread_all(crs.iternext_dup(keys=False, values=True)),
            )
    
    
        def query_eq(self, txn, key):
            return txn.get(key, db=self.handle)
    
    
        def get_key_func(self, txn):
            return functools.partial(txn.get, db=self.handle)
    
    
        def query_ge(self, txn, key):
            crs = txn.cursor(db=self.handle)
            print "ge"
            crs.set_range(key)
            return SubqueryTuple(
                set([crs]),
                preread_all(crs.iternext(keys=False, values=True)),
            )
    
    
        def query_le(self, txn, key):
            # Reverse reading from underlying media may have limits.
            crs = txn.cursor(db=self.handle)
            print "le"
            crs.set_range(key)
            it = crs.iterprev(keys=False, values=True)
            try:
                next(it)
            except StopIteration:
                it = iter(())
            return SubqueryTuple(
                set([crs]),
                preread_all(it),
            )
    
    
        def query_range_orig(self, txn, key1, key2):
            crs = txn.cursor(db=self.handle)
            print "range"
            # Set lmdb cursor to lower range
            crs.set_range(key1)
            # Get iterator for _both_ keys and values
            # Keys are what is filtered on, values are event id's
            lmdb_iter = crs.iternext(keys=True, values=True)
            # Iterator reader, which stops when upper bound is reached
            # Note that here is Python lambda, which I don't know how
            # to replace with faster implementation. :(
            stop_iter = itertools.takewhile(lambda x: x[0]<=key2, lmdb_iter)
            # Now return only id iterator
            res_iter = itertools.imap(operator.itemgetter(1), stop_iter)
            return SubqueryTuple(
                set([crs]),
                preread_all(res_iter),
            )
    
    
        def query_range(self, txn, key1, key2):
            # Create possibly C wrapped C implementation of comparison
            comparator = functools.partial(operator.ge, key2)
            crs = txn.cursor(db=self.handle)
            print "range"
            # Set lmdb cursor to lower range
            crs.set_range(key1)
            # Get iterator for values
            # Keys are what is filtered on, values are event id's
            lmdb_iter = crs.iternext(keys=True, values=False)
            # Iterate while comparator returns True
            keys = itertools.takewhile(comparator, lmdb_iter)
            # Count length
            length = len(list(keys))
            # Set cursor back to lower range
            crs.set_range(key1)
            # Get iterator for keys
            lmdb_iter = crs.iternext(keys=False, values=True)
            # Just fetch 'length' values
            res_iter = itertools.islice(lmdb_iter, 0, length)
            return SubqueryTuple(
                set([crs]),
                preread_all(res_iter),
            )
    
    
        def itervalues(self, txn):
            crs = txn.cursor(db=self.handle)
            print "all vals"
            lmdb_iter = crs.iternext(keys=False, values=True)
            return SubqueryTuple(
                set([crs]),
                preread_all(lmdb_iter),
            )
    
    
        def iteritems(self, txn):
            crs = txn.cursor(db=self.handle)
            print "all items"
            lmdb_iter = crs.iternext(keys=True, values=True)
            return SubqueryTuple(
                set([crs]),
                preread_all(lmdb_iter),
            )
    
    
        def clear(self, txn):
            txn.drop(db=self.handle, delete=False)
    
    
        def dump(self, txn):
            res = {}
            with txn.cursor(db=self.handle) as crs:
                crs.first()
                for key, value in crs.iternext(keys=True, values=True):
                    if self.dup:
                        res.setdefault(key, []).append(value)
                    else:
                        res[key] = value
            return {self.name: res}