sparkey/__init__.py (385 lines of code) (raw):

#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright 2012-2020 Spotify AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from builtins import object import ctypes import ctypes.util import future libsparkey = ctypes.cdll.LoadLibrary(ctypes.util.find_library("sparkey")) # Some constants class Compression(object): NONE = 0 SNAPPY = 1 class IterState(object): NEW = 0 ACTIVE = 1 CLOSED = 2 INVALID = 3 class IterType(object): PUT = 0 DELETE = 1 class SparkeyException(Exception): pass def _format(func, ret, *args): func.restype = ret func.argtypes = tuple(args) return func def _ctypes_wrapper(func, ret, *args): fn = _format(func, ret, *args) def wrapper(*a, **kw): code = fn(*a, **kw) if code != 0: raise SparkeyException(_errstring(code)) return wrapper _ptr = ctypes.c_void_p _str = ctypes.c_char_p _byref = ctypes.byref _create_string_buffer = ctypes.create_string_buffer _c_ulonglong = ctypes.c_ulonglong _errstring = _format(libsparkey.sparkey_errstring, _str, ctypes.c_int) _logwriter_create = _ctypes_wrapper(libsparkey.sparkey_logwriter_create, ctypes.c_int, _ptr, _str, ctypes.c_int, ctypes.c_int) _logwriter_append = _ctypes_wrapper(libsparkey.sparkey_logwriter_append, ctypes.c_int, _ptr, _str) _logwriter_close = _ctypes_wrapper(libsparkey.sparkey_logwriter_close, ctypes.c_int, _ptr) _logwriter_flush = _ctypes_wrapper(libsparkey.sparkey_logwriter_flush, ctypes.c_int, _ptr) _logwriter_put = _ctypes_wrapper(libsparkey.sparkey_logwriter_put, ctypes.c_int, _ptr, _c_ulonglong, _str, _c_ulonglong, _str) _logwriter_delete = _ctypes_wrapper(libsparkey.sparkey_logwriter_delete, ctypes.c_int, _ptr, _c_ulonglong, _str) _logreader_open = _ctypes_wrapper(libsparkey.sparkey_logreader_open, ctypes.c_int, _ptr, _str) _logreader_close = _format(libsparkey.sparkey_logreader_close, None, _ptr) _logiter_close = _format(libsparkey.sparkey_logiter_close, None, _ptr) _logiter_create = _ctypes_wrapper(libsparkey.sparkey_logiter_create, ctypes.c_int, _ptr, _ptr) _logiter_next = _ctypes_wrapper(libsparkey.sparkey_logiter_next, ctypes.c_int, _ptr, _ptr) _logiter_state = _format(libsparkey.sparkey_logiter_state, ctypes.c_int, _ptr) _logiter_type = _format(libsparkey.sparkey_logiter_type, ctypes.c_int, _ptr) _logiter_keylen = _format(libsparkey.sparkey_logiter_keylen, _c_ulonglong, _ptr) _logiter_valuelen = _format(libsparkey.sparkey_logiter_valuelen, _c_ulonglong, _ptr) _logiter_fill_key = _ctypes_wrapper(libsparkey.sparkey_logiter_fill_key, ctypes.c_int, _ptr, _ptr, _c_ulonglong, _str, ctypes.POINTER(_c_ulonglong)) _logiter_fill_value = _ctypes_wrapper(libsparkey.sparkey_logiter_fill_value, ctypes.c_int, _ptr, _ptr, _c_ulonglong, _str, ctypes.POINTER(_c_ulonglong)) _hash_write = _ctypes_wrapper(libsparkey.sparkey_hash_write, ctypes.c_int, _str, _str, ctypes.c_int) _hash_open = _ctypes_wrapper(libsparkey.sparkey_hash_open, ctypes.c_int, _ptr, _str, _str) _hash_close = _format(libsparkey.sparkey_hash_close, None, _ptr) _hash_getreader = _format(libsparkey.sparkey_hash_getreader, _ptr, _ptr) _logiter_hashnext = _ctypes_wrapper(libsparkey.sparkey_logiter_hashnext, ctypes.c_int, _ptr, _ptr) _hash_get = _ctypes_wrapper(libsparkey.sparkey_hash_get, ctypes.c_int, _ptr, _str, _c_ulonglong, _ptr) _hash_numentries = _format(libsparkey.sparkey_hash_numentries, _c_ulonglong, _ptr) if str == bytes: def _to_bytes(s, name): t = type(s) if t != str and t != future.types.newstr: raise SparkeyException(name + " must be a string") return s def _to_str(b, name): if b is None: return None if type(b) != str: raise SparkeyException(name + " must be a string") return b else: def _to_bytes(s, name): t = type(s) if t == bytes: return s if t != str: raise SparkeyException(name + " must be a string") return s.encode('utf-8') def _to_str(b, name): if b is None: return None t = type(b) if t == str: return b if t != bytes: raise SparkeyException(name + " must be bytes") return b.decode('utf-8') class LogWriter(object): def __init__(self, filename, mode='NEW', compression_type=Compression.NONE, compression_block_size=0): """Creates or appends a log file. Types of keys and values can be strings or bytes. For Python 2, this is the same thing. For Python 3, strings will be encoded as UTF-8 This is not threadsafe, don't write to the same file from multiple threads or processes. @param filename: file to create or append to. @param mode: one of two modes: - NEW: creates the file regardless of whether it already exists or not. - APPEND: appends to the log if it exists, otherwise raises an exception. @param compression_type: one of two types: - NONE: keys and values are written as is, and each key-value pair is considered a block of its own. - SNAPPY: compression is done on a block level of at most compression_block_size uncompressed bytes. Each block may contain multiple key/value pairs and it may split keys or values over block borders. @param compression_block_size: mandatory unless compression is NONE. This indicates how large the maximum block may be. To get good compression and performance, this should be a fairly small multiple of expected key + value size. """ filename = _to_bytes(filename, "filename") log = _ptr() self._log = log if mode == 'NEW': _logwriter_create(_byref(log), filename, compression_type, compression_block_size) elif mode == 'APPEND': _logwriter_append(_byref(log), filename) else: raise SparkeyException("Invalid mode %s, expected 'NEW' or " "'APPEND'" % (mode)) def __del__(self): self.close() def close(self): """Closes the writer (if not already closed). Also flushes all pending changes from memory to file. """ log = self._log if log is not None: self._log = None _logwriter_close(_byref(log)) def _assert_open(self): if self._log is None: raise SparkeyException("Writer is closed") def flush(self): """Flushes all pending changes from memory to file.""" self._assert_open() _logwriter_flush(self._log) def __setitem__(self, key, value): """Equivalent to put(key, value)""" self.put(key, value) def put(self, key, value): """Append the key-value pair to the log. @param key: type must be bytes or string @param value: type must be bytes or string """ self._assert_open() key = _to_bytes(key, "key") value = _to_bytes(value, "value") _logwriter_put(self._log, len(key), key, len(value), value) def __delitem__(self, key): """del writer[key] is equivalent to delete(key) (see L{delete})""" self.delete(key) def delete(self, key): """Appends a delete operation of key to the log. @param key: type must be bytes or string """ self._assert_open() key = _to_bytes(key, "key") _logwriter_delete(self._log, len(key), key) class LogReader(object): def __init__(self, filename): """Opens a file for log iteration. @param filename: file to open. """ filename = _to_bytes(filename, "filename") log = _ptr() self._log = log _logreader_open(_byref(log), filename) def __del__(self): self.close() def close(self): """Safely closes the log reader.""" log = self._log if log is not None: self._log = None _logreader_close(_byref(log)) def __iter__(self): """Creates a new iterator for this log reader. @returntype: L{LogIter} """ return LogIter(self) def _assert_open(self): if self._log is None: raise SparkeyException("Reader is closed") def _iter_res(iterator, log): state = _logiter_state(iterator) if state != IterState.ACTIVE: raise StopIteration() type_ = _logiter_type(iterator) keylen = _logiter_keylen(iterator) string_buffer = _create_string_buffer(keylen) length = _c_ulonglong() _logiter_fill_key(iterator, log, keylen, string_buffer, _byref(length)) if length.value != keylen: raise SparkeyException("Invalid keylen, expected %s but got %s" % (keylen, length.value)) key = string_buffer.raw valuelen = _logiter_valuelen(iterator) string_buffer = _create_string_buffer(valuelen) _logiter_fill_value(iterator, log, valuelen, string_buffer, _byref(length)) if length.value != valuelen: raise SparkeyException("Invalid valuelen, expected %s but got %s" % (valuelen, length.value)) value = string_buffer.raw return key, value, type_ class LogIter(object): def __init__(self, logreader): """Internal function. Use iter(logreader) or just "for key, value, type in logreader:" instead. """ logreader._assert_open() self._iter = _ptr() self._log = logreader _logiter_create(_byref(self._iter), logreader._log) def __del__(self): self.close() def close(self): """Safely closes the iterator.""" if self._iter is not None: _logiter_close(_byref(self._iter)) self._iter = None def __iter__(self): return self def _assert_open(self): if self._iter is None or self._log is None: raise SparkeyException("Iterator is closed") self._log._assert_open() def next(self): """Return next element in the log. @return: (key, value, type) if there are remaining elements. key and value are strings and type is a L{IterType}. @raise StopIteration: if there are no more entries in the log. """ self._assert_open() _logiter_next(self._iter, self._log._log) return _iter_res(self._iter, self._log._log) def __next__(self): return self.next() def writehash(hashfile, logfile, hash_size=0): """Write a hash file based on the contents in the log file. If the log file hasn't been changed since the existing hashfile was created, this is a no-op. @param hashfile: file to create. If it already exists, it will atomically be updated. @param logfile: file to read from. It must exist. @param hash_size: Valid values are 0, 4, 8. 0 means autoselect hash size. 4 is 32 bit hash, 8 is 64 bit hash. """ hashfile = _to_bytes(hashfile, "hashfile") logfile = _to_bytes(logfile, "logfile") _hash_write(hashfile, logfile, hash_size) class HashReader(object): """This is a reader that supports both iteration and random lookups.""" def __init__(self, hashfile, logfile): """Opens a hash file and log file for reading. @param hashfile: Hash file to open, must exist and be associated with the log file. @param logfile: Log file to open, must exist. """ hashfile = _to_bytes(hashfile, "hashfile") logfile = _to_bytes(logfile, "logfile") reader = _ptr() self._reader = reader self._iter = None _hash_open(_byref(reader), hashfile, logfile) self._iter = HashIterator(self) def __del__(self): self.close() def close(self): """Safely close the reader.""" reader = self._reader if reader is not None: _hash_close(_byref(reader)) self._reader = None if self._iter is not None: self._iter.close() self._iter = None def __iter__(self): """Equivalent to L{iteritems}""" return self.iteritems() def iteritems(self): """Iterate through all live entries. @returntype: L{HashIterator} """ return HashIterator(self) def _assert_open(self): if self._reader is None: raise SparkeyException("HashReader is closed") def __getitem__(self, key): """reader[key] throws KeyError exception when key doesn't exist, otherwise is equivalent to reader.get(key) (see L{get}) @param key: for the item **Note** in python 3 this always returns a bytes object, use getAsString(key) to return a String version. """ value = self.get(key) if value is None: raise KeyError(key) return value def __contains__(self, key): self._assert_open() iterator = self._iter._iter _hash_get(self._reader, key, len(key), iterator) res = True state = _logiter_state(iterator) if state != IterState.ACTIVE: res = False return res def has_key(self, key): return self.__contains__(key) def get(self, key): """Retrieve the value associated with the key @param key: type must be bytes or string @returns: bytes representing the value associated with the key, or None if the key does not exist. """ return self._iter.get(key) def getAsString(self, key): """Retrieve the value associated with the key @param key: type must be bytes or string @returns: a string representing the value associated with the key, or None if the key does not exist. """ return _to_str(self.get(key), "value") def __len__(self): return _hash_numentries(self._reader) class HashIterator(object): def __init__(self, hashreader): """Internal function: use iter(hashreader) instead.""" hashreader._assert_open() self._iter = _ptr() self._log = _hash_getreader(hashreader._reader) self._hashreader = hashreader _logiter_create(_byref(self._iter), self._log) def __del__(self): self.close() def close(self): """Safely closes the iterator.""" if self._iter is not None: _logiter_close(_byref(self._iter)) self._hashreader = None self._log = None self._iter = None def __iter__(self): return self def next(self): """Return next live entry in the log. @return: (key, value) if there are remaining elements. key and value are strings. @raise StopIteration: if there are no more live entries in the log. """ self._assert_open() _logiter_hashnext(self._iter, self._hashreader._reader) t = _iter_res(self._iter, self._log) if t: key, value, type = t return key, value def __next__(self): return self.next() def _assert_open(self): if self._hashreader is None: raise SparkeyException("Iterator is closed") self._hashreader._assert_open() def __getitem__(self, key): value = self.get(key) if value is None: raise KeyError return value def get(self, key): """Get the value associated with the key @param key: type must be bytes or string @returns: bytes representing the value associated with the key, or None if the key does not exist. """ key = _to_bytes(key, "key") self._assert_open() iterator = self._iter log = self._log _hash_get(self._hashreader._reader, key, len(key), iterator) state = _logiter_state(iterator) if state != IterState.ACTIVE: return None type_ = _logiter_type(iterator) assert type_ == IterType.PUT valuelen = _logiter_valuelen(iterator) string_buffer = _create_string_buffer(valuelen) clen = _c_ulonglong() _logiter_fill_value(iterator, log, valuelen, string_buffer, _byref(clen)) if clen.value != valuelen: raise SparkeyException("Invalid valuelen, expected %s but got %s" % (valuelen, clen.value)) value = string_buffer.raw return value def getAsString(self, key): """Retrieve the value associated with the key @param key: type must be bytes or string @returns: a string representing the value associated with the key, or None if the key does not exist. """ return _to_str(self.get(key), "value") class HashWriter(object): def __init__(self, hashfile, logfile, mode='NEW', compression_type=Compression.NONE, compression_block_size=0, hash_size=0): """Creates a new writer. Does everything that L{LogWriter} does, but also writes the hash file. @param hashfile: filename of hash file @param logfile: filename of log file @param mode: Same as in L{LogWriter.__init__} @param compression_type: Same as in L{LogWriter.__init__} @param compression_block_size: Same as in L{LogWriter.__init__} @param hash_size: Valid values are 0, 4, 8. 0 means autoselect hash size . 4 is 32 bit hash, 8 is 64 bit hash. """ self._logwriter = LogWriter(logfile, mode, compression_type, compression_block_size) self._hashfile = hashfile self._logfile = logfile self._reader = None self._hash_size = hash_size def _assert_open(self): if self._logwriter is None: raise SparkeyException("Writer is closed") def __setitem__(self, key, value): """Equivalent to writer.put(key, value), see L{put}""" self.put(key, value) def put(self, k, v): """Append the key-value pair to the log. @param key: type must be bytes or string @param value: type must be bytes or string """ self._logwriter.put(k, v) def __delitem__(self, key): """Equivalent to writer.delete(key), see L{delete}""" self.delete(key) def delete(self, k): """Appends a delete operation of key to the log. @param key: type must be bytes or string """ self._assert_open() self._logwriter.delete(k) def flush(self): """Flushes all log writes, and also rebuilds the hash.""" self._assert_open() self._logwriter.flush() writehash(self._hashfile, self._logfile, self._hash_size) def __del__(self): self.destroy() def destroy(self): """Closes the writer, but does not flush anything. All writes before the previous flush will be gone. """ if self._logwriter is not None: self._logwriter.close() self._logwriter = None self._close_reader() self._hashfile = None self._logfile = None def finish(self): """Equivalent to L{close}""" self.close() def close(self): """Flushes pending log writes from memory to disk, rewrites the hash file and closes the writer. """ if self._logwriter is not None: self.flush() self.destroy() # Reader related code def _close_reader(self): if self._reader is not None: self._reader.close() self._reader = None def _init_reader(self): if self._reader is None: self._reader = HashReader(self._hashfile, self._logfile) return self._reader def __iter__(self): """Equivalent to L{iteritems}""" return self.iteritems() def iteritems(self): """Iterate through all entries that have been flushed. @returns: L{HashIterator} """ self._assert_open() return self._init_reader().iteritems() def __getitem__(self, key): """Equivalent to writer.get(key), see L{get}""" self._assert_open() return self._init_reader().get(key) def get(self, key): """Performs a hash lookup of a key. Only finds things that were flushed to the hash. @param key: type must be bytes or string @returns: bytes representing the value associated with the key, or None if the key does not exist in the hash. """ self._assert_open() return self._init_reader().get(key) def getAsString(self, key): """Performs a hash lookup of a key. Only finds things that were flushed to the hash. @param key: type must be bytes or string @returns: a string representing the value associated with the key, or None if the key does not exist in the hash. """ return _to_str(self.get(key), "value")