confidence-shared/shared.py (836 lines of code) (raw):

# This file was autogenerated by some hot garbage in the `uniffi` crate. # Trust me, you don't want to mess with it! # Common helper code. # # Ideally this would live in a separate .py file where it can be unittested etc # in isolation, and perhaps even published as a re-useable package. # # However, it's important that the details of how this helper code works (e.g. the # way that different builtin types are passed across the FFI) exactly match what's # expected by the rust code on the other side of the interface. In practice right # now that means coming from the exact some version of `uniffi` that was used to # compile the rust component. The easiest way to ensure this is to bundle the Python # helpers directly inline like we're doing here. from __future__ import annotations import os import sys import ctypes import enum import struct import contextlib import datetime import threading import itertools import traceback import typing import asyncio import platform # Used for default argument values _DEFAULT = object() class _UniffiRustBuffer(ctypes.Structure): _fields_ = [ ("capacity", ctypes.c_uint64), ("len", ctypes.c_uint64), ("data", ctypes.POINTER(ctypes.c_char)), ] @staticmethod def default(): return _UniffiRustBuffer(0, 0, None) @staticmethod def alloc(size): return _rust_call(_UniffiLib.ffi_confidence_shared_rustbuffer_alloc, size) @staticmethod def reserve(rbuf, additional): return _rust_call(_UniffiLib.ffi_confidence_shared_rustbuffer_reserve, rbuf, additional) def free(self): return _rust_call(_UniffiLib.ffi_confidence_shared_rustbuffer_free, self) def __str__(self): return "_UniffiRustBuffer(capacity={}, len={}, data={})".format( self.capacity, self.len, self.data[0:self.len] ) @contextlib.contextmanager def alloc_with_builder(*args): """Context-manger to allocate a buffer using a _UniffiRustBufferBuilder. The allocated buffer will be automatically freed if an error occurs, ensuring that we don't accidentally leak it. """ builder = _UniffiRustBufferBuilder() try: yield builder except: builder.discard() raise @contextlib.contextmanager def consume_with_stream(self): """Context-manager to consume a buffer using a _UniffiRustBufferStream. The _UniffiRustBuffer will be freed once the context-manager exits, ensuring that we don't leak it even if an error occurs. """ try: s = _UniffiRustBufferStream.from_rust_buffer(self) yield s if s.remaining() != 0: raise RuntimeError("junk data left in buffer at end of consume_with_stream") finally: self.free() @contextlib.contextmanager def read_with_stream(self): """Context-manager to read a buffer using a _UniffiRustBufferStream. This is like consume_with_stream, but doesn't free the buffer afterwards. It should only be used with borrowed `_UniffiRustBuffer` data. """ s = _UniffiRustBufferStream.from_rust_buffer(self) yield s if s.remaining() != 0: raise RuntimeError("junk data left in buffer at end of read_with_stream") class _UniffiForeignBytes(ctypes.Structure): _fields_ = [ ("len", ctypes.c_int32), ("data", ctypes.POINTER(ctypes.c_char)), ] def __str__(self): return "_UniffiForeignBytes(len={}, data={})".format(self.len, self.data[0:self.len]) class _UniffiRustBufferStream: """ Helper for structured reading of bytes from a _UniffiRustBuffer """ def __init__(self, data, len): self.data = data self.len = len self.offset = 0 @classmethod def from_rust_buffer(cls, buf): return cls(buf.data, buf.len) def remaining(self): return self.len - self.offset def _unpack_from(self, size, format): if self.offset + size > self.len: raise InternalError("read past end of rust buffer") value = struct.unpack(format, self.data[self.offset:self.offset+size])[0] self.offset += size return value def read(self, size): if self.offset + size > self.len: raise InternalError("read past end of rust buffer") data = self.data[self.offset:self.offset+size] self.offset += size return data def read_i8(self): return self._unpack_from(1, ">b") def read_u8(self): return self._unpack_from(1, ">B") def read_i16(self): return self._unpack_from(2, ">h") def read_u16(self): return self._unpack_from(2, ">H") def read_i32(self): return self._unpack_from(4, ">i") def read_u32(self): return self._unpack_from(4, ">I") def read_i64(self): return self._unpack_from(8, ">q") def read_u64(self): return self._unpack_from(8, ">Q") def read_float(self): v = self._unpack_from(4, ">f") return v def read_double(self): return self._unpack_from(8, ">d") class _UniffiRustBufferBuilder: """ Helper for structured writing of bytes into a _UniffiRustBuffer. """ def __init__(self): self.rbuf = _UniffiRustBuffer.alloc(16) self.rbuf.len = 0 def finalize(self): rbuf = self.rbuf self.rbuf = None return rbuf def discard(self): if self.rbuf is not None: rbuf = self.finalize() rbuf.free() @contextlib.contextmanager def _reserve(self, num_bytes): if self.rbuf.len + num_bytes > self.rbuf.capacity: self.rbuf = _UniffiRustBuffer.reserve(self.rbuf, num_bytes) yield None self.rbuf.len += num_bytes def _pack_into(self, size, format, value): with self._reserve(size): # XXX TODO: I feel like I should be able to use `struct.pack_into` here but can't figure it out. for i, byte in enumerate(struct.pack(format, value)): self.rbuf.data[self.rbuf.len + i] = byte def write(self, value): with self._reserve(len(value)): for i, byte in enumerate(value): self.rbuf.data[self.rbuf.len + i] = byte def write_i8(self, v): self._pack_into(1, ">b", v) def write_u8(self, v): self._pack_into(1, ">B", v) def write_i16(self, v): self._pack_into(2, ">h", v) def write_u16(self, v): self._pack_into(2, ">H", v) def write_i32(self, v): self._pack_into(4, ">i", v) def write_u32(self, v): self._pack_into(4, ">I", v) def write_i64(self, v): self._pack_into(8, ">q", v) def write_u64(self, v): self._pack_into(8, ">Q", v) def write_float(self, v): self._pack_into(4, ">f", v) def write_double(self, v): self._pack_into(8, ">d", v) def write_c_size_t(self, v): self._pack_into(ctypes.sizeof(ctypes.c_size_t) , "@N", v) # A handful of classes and functions to support the generated data structures. # This would be a good candidate for isolating in its own ffi-support lib. class InternalError(Exception): pass class _UniffiRustCallStatus(ctypes.Structure): """ Error runtime. """ _fields_ = [ ("code", ctypes.c_int8), ("error_buf", _UniffiRustBuffer), ] # These match the values from the uniffi::rustcalls module CALL_SUCCESS = 0 CALL_ERROR = 1 CALL_UNEXPECTED_ERROR = 2 @staticmethod def default(): return _UniffiRustCallStatus(code=_UniffiRustCallStatus.CALL_SUCCESS, error_buf=_UniffiRustBuffer.default()) def __str__(self): if self.code == _UniffiRustCallStatus.CALL_SUCCESS: return "_UniffiRustCallStatus(CALL_SUCCESS)" elif self.code == _UniffiRustCallStatus.CALL_ERROR: return "_UniffiRustCallStatus(CALL_ERROR)" elif self.code == _UniffiRustCallStatus.CALL_UNEXPECTED_ERROR: return "_UniffiRustCallStatus(CALL_UNEXPECTED_ERROR)" else: return "_UniffiRustCallStatus(<invalid code>)" def _rust_call(fn, *args): # Call a rust function return _rust_call_with_error(None, fn, *args) def _rust_call_with_error(error_ffi_converter, fn, *args): # Call a rust function and handle any errors # # This function is used for rust calls that return Result<> and therefore can set the CALL_ERROR status code. # error_ffi_converter must be set to the _UniffiConverter for the error class that corresponds to the result. call_status = _UniffiRustCallStatus.default() args_with_error = args + (ctypes.byref(call_status),) result = fn(*args_with_error) _uniffi_check_call_status(error_ffi_converter, call_status) return result def _uniffi_check_call_status(error_ffi_converter, call_status): if call_status.code == _UniffiRustCallStatus.CALL_SUCCESS: pass elif call_status.code == _UniffiRustCallStatus.CALL_ERROR: if error_ffi_converter is None: call_status.error_buf.free() raise InternalError("_rust_call_with_error: CALL_ERROR, but error_ffi_converter is None") else: raise error_ffi_converter.lift(call_status.error_buf) elif call_status.code == _UniffiRustCallStatus.CALL_UNEXPECTED_ERROR: # When the rust code sees a panic, it tries to construct a _UniffiRustBuffer # with the message. But if that code panics, then it just sends back # an empty buffer. if call_status.error_buf.len > 0: msg = _UniffiConverterString.lift(call_status.error_buf) else: msg = "Unknown rust panic" raise InternalError(msg) else: raise InternalError("Invalid _UniffiRustCallStatus code: {}".format( call_status.code)) def _uniffi_trait_interface_call(call_status, make_call, write_return_value): try: return write_return_value(make_call()) except Exception as e: call_status.code = _UniffiRustCallStatus.CALL_UNEXPECTED_ERROR call_status.error_buf = _UniffiConverterString.lower(repr(e)) def _uniffi_trait_interface_call_with_error(call_status, make_call, write_return_value, error_type, lower_error): try: try: return write_return_value(make_call()) except error_type as e: call_status.code = _UniffiRustCallStatus.CALL_ERROR call_status.error_buf = lower_error(e) except Exception as e: call_status.code = _UniffiRustCallStatus.CALL_UNEXPECTED_ERROR call_status.error_buf = _UniffiConverterString.lower(repr(e)) class _UniffiHandleMap: """ A map where inserting, getting and removing data is synchronized with a lock. """ def __init__(self): # type Handle = int self._map = {} # type: Dict[Handle, Any] self._lock = threading.Lock() self._counter = itertools.count() def insert(self, obj): with self._lock: handle = next(self._counter) self._map[handle] = obj return handle def get(self, handle): try: with self._lock: return self._map[handle] except KeyError: raise InternalError("UniffiHandleMap.get: Invalid handle") def remove(self, handle): try: with self._lock: return self._map.pop(handle) except KeyError: raise InternalError("UniffiHandleMap.remove: Invalid handle") def __len__(self): return len(self._map) # Types conforming to `_UniffiConverterPrimitive` pass themselves directly over the FFI. class _UniffiConverterPrimitive: @classmethod def lift(cls, value): return value @classmethod def lower(cls, value): return value class _UniffiConverterPrimitiveInt(_UniffiConverterPrimitive): @classmethod def check_lower(cls, value): try: value = value.__index__() except Exception: raise TypeError("'{}' object cannot be interpreted as an integer".format(type(value).__name__)) if not isinstance(value, int): raise TypeError("__index__ returned non-int (type {})".format(type(value).__name__)) if not cls.VALUE_MIN <= value < cls.VALUE_MAX: raise ValueError("{} requires {} <= value < {}".format(cls.CLASS_NAME, cls.VALUE_MIN, cls.VALUE_MAX)) class _UniffiConverterPrimitiveFloat(_UniffiConverterPrimitive): @classmethod def check_lower(cls, value): try: value = value.__float__() except Exception: raise TypeError("must be real number, not {}".format(type(value).__name__)) if not isinstance(value, float): raise TypeError("__float__ returned non-float (type {})".format(type(value).__name__)) # Helper class for wrapper types that will always go through a _UniffiRustBuffer. # Classes should inherit from this and implement the `read` and `write` static methods. class _UniffiConverterRustBuffer: @classmethod def lift(cls, rbuf): with rbuf.consume_with_stream() as stream: return cls.read(stream) @classmethod def lower(cls, value): with _UniffiRustBuffer.alloc_with_builder() as builder: cls.write(value, builder) return builder.finalize() # Contains loading, initialization code, and the FFI Function declarations. # Define some ctypes FFI types that we use in the library """ Function pointer for a Rust task, which a callback function that takes a opaque pointer """ _UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int8) def _uniffi_future_callback_t(return_type): """ Factory function to create callback function types for async functions """ return ctypes.CFUNCTYPE(None, ctypes.c_uint64, return_type, _UniffiRustCallStatus) def _uniffi_load_indirect(): """ This is how we find and load the dynamic library provided by the component. For now we just look it up by name. """ if sys.platform == "darwin": libname = "lib{}.dylib" elif sys.platform.startswith("win"): # As of python3.8, ctypes does not seem to search $PATH when loading DLLs. # We could use `os.add_dll_directory` to configure the search path, but # it doesn't feel right to mess with application-wide settings. Let's # assume that the `.dll` is next to the `.py` file and load by full path. libname = os.path.join( os.path.dirname(__file__), "{}.dll", ) else: # Anything else must be an ELF platform - Linux, *BSD, Solaris/illumos libname = "lib{}.so" libname = libname.format("confidence_shared") path = os.path.join(os.path.dirname(__file__), libname) lib = ctypes.cdll.LoadLibrary(path) return lib def _uniffi_check_contract_api_version(lib): # Get the bindings contract version from our ComponentInterface bindings_contract_version = 26 # Get the scaffolding contract version by calling the into the dylib scaffolding_contract_version = lib.ffi_confidence_shared_uniffi_contract_version() if bindings_contract_version != scaffolding_contract_version: raise InternalError("UniFFI contract version mismatch: try cleaning and rebuilding your project") def _uniffi_check_api_checksums(lib): if lib.uniffi_confidence_shared_checksum_method_confidence_get_flag_string() != 21008: raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project") if lib.uniffi_confidence_shared_checksum_constructor_confidence_new() != 14595: raise InternalError("UniFFI API checksum mismatch: try cleaning and rebuilding your project") # A ctypes library to expose the extern-C FFI definitions. # This is an implementation detail which will be called internally by the public API. _UniffiLib = _uniffi_load_indirect() UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK = ctypes.CFUNCTYPE(None,ctypes.c_uint64,ctypes.c_int8, ) UNIFFI_FOREIGN_FUTURE_FREE = ctypes.CFUNCTYPE(None,ctypes.c_uint64, ) UNIFFI_CALLBACK_INTERFACE_FREE = ctypes.CFUNCTYPE(None,ctypes.c_uint64, ) class UniffiForeignFuture(ctypes.Structure): _fields_ = [ ("handle", ctypes.c_uint64), ("free", UNIFFI_FOREIGN_FUTURE_FREE), ] class UniffiForeignFutureStructU8(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_uint8), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_U8 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructU8, ) class UniffiForeignFutureStructI8(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_int8), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_I8 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructI8, ) class UniffiForeignFutureStructU16(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_uint16), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_U16 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructU16, ) class UniffiForeignFutureStructI16(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_int16), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_I16 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructI16, ) class UniffiForeignFutureStructU32(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_uint32), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_U32 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructU32, ) class UniffiForeignFutureStructI32(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_int32), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_I32 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructI32, ) class UniffiForeignFutureStructU64(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_uint64), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_U64 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructU64, ) class UniffiForeignFutureStructI64(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_int64), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_I64 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructI64, ) class UniffiForeignFutureStructF32(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_float), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_F32 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructF32, ) class UniffiForeignFutureStructF64(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_double), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_F64 = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructF64, ) class UniffiForeignFutureStructPointer(ctypes.Structure): _fields_ = [ ("return_value", ctypes.c_void_p), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_POINTER = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructPointer, ) class UniffiForeignFutureStructRustBuffer(ctypes.Structure): _fields_ = [ ("return_value", _UniffiRustBuffer), ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_RUST_BUFFER = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructRustBuffer, ) class UniffiForeignFutureStructVoid(ctypes.Structure): _fields_ = [ ("call_status", _UniffiRustCallStatus), ] UNIFFI_FOREIGN_FUTURE_COMPLETE_VOID = ctypes.CFUNCTYPE(None,ctypes.c_uint64,UniffiForeignFutureStructVoid, ) _UniffiLib.uniffi_confidence_shared_fn_clone_confidence.argtypes = ( ctypes.c_void_p, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.uniffi_confidence_shared_fn_clone_confidence.restype = ctypes.c_void_p _UniffiLib.uniffi_confidence_shared_fn_free_confidence.argtypes = ( ctypes.c_void_p, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.uniffi_confidence_shared_fn_free_confidence.restype = None _UniffiLib.uniffi_confidence_shared_fn_constructor_confidence_new.argtypes = ( ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.uniffi_confidence_shared_fn_constructor_confidence_new.restype = ctypes.c_void_p _UniffiLib.uniffi_confidence_shared_fn_method_confidence_get_flag_string.argtypes = ( ctypes.c_void_p, _UniffiRustBuffer, _UniffiRustBuffer, ) _UniffiLib.uniffi_confidence_shared_fn_method_confidence_get_flag_string.restype = ctypes.c_uint64 _UniffiLib.ffi_confidence_shared_rustbuffer_alloc.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rustbuffer_alloc.restype = _UniffiRustBuffer _UniffiLib.ffi_confidence_shared_rustbuffer_from_bytes.argtypes = ( _UniffiForeignBytes, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rustbuffer_from_bytes.restype = _UniffiRustBuffer _UniffiLib.ffi_confidence_shared_rustbuffer_free.argtypes = ( _UniffiRustBuffer, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rustbuffer_free.restype = None _UniffiLib.ffi_confidence_shared_rustbuffer_reserve.argtypes = ( _UniffiRustBuffer, ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rustbuffer_reserve.restype = _UniffiRustBuffer _UniffiLib.ffi_confidence_shared_rust_future_poll_u8.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_u8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_u8.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_u8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_u8.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_u8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_u8.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_u8.restype = ctypes.c_uint8 _UniffiLib.ffi_confidence_shared_rust_future_poll_i8.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_i8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_i8.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_i8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_i8.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_i8.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_i8.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_i8.restype = ctypes.c_int8 _UniffiLib.ffi_confidence_shared_rust_future_poll_u16.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_u16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_u16.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_u16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_u16.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_u16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_u16.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_u16.restype = ctypes.c_uint16 _UniffiLib.ffi_confidence_shared_rust_future_poll_i16.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_i16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_i16.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_i16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_i16.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_i16.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_i16.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_i16.restype = ctypes.c_int16 _UniffiLib.ffi_confidence_shared_rust_future_poll_u32.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_u32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_u32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_u32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_u32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_u32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_u32.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_u32.restype = ctypes.c_uint32 _UniffiLib.ffi_confidence_shared_rust_future_poll_i32.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_i32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_i32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_i32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_i32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_i32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_i32.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_i32.restype = ctypes.c_int32 _UniffiLib.ffi_confidence_shared_rust_future_poll_u64.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_u64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_u64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_u64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_u64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_u64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_u64.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_u64.restype = ctypes.c_uint64 _UniffiLib.ffi_confidence_shared_rust_future_poll_i64.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_i64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_i64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_i64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_i64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_i64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_i64.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_i64.restype = ctypes.c_int64 _UniffiLib.ffi_confidence_shared_rust_future_poll_f32.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_f32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_f32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_f32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_f32.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_f32.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_f32.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_f32.restype = ctypes.c_float _UniffiLib.ffi_confidence_shared_rust_future_poll_f64.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_f64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_f64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_f64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_f64.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_f64.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_f64.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_f64.restype = ctypes.c_double _UniffiLib.ffi_confidence_shared_rust_future_poll_pointer.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_pointer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_pointer.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_pointer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_pointer.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_pointer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_pointer.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_pointer.restype = ctypes.c_void_p _UniffiLib.ffi_confidence_shared_rust_future_poll_rust_buffer.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_rust_buffer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_rust_buffer.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_rust_buffer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_rust_buffer.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_rust_buffer.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_rust_buffer.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_rust_buffer.restype = _UniffiRustBuffer _UniffiLib.ffi_confidence_shared_rust_future_poll_void.argtypes = ( ctypes.c_uint64, UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK, ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_poll_void.restype = None _UniffiLib.ffi_confidence_shared_rust_future_cancel_void.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_cancel_void.restype = None _UniffiLib.ffi_confidence_shared_rust_future_free_void.argtypes = ( ctypes.c_uint64, ) _UniffiLib.ffi_confidence_shared_rust_future_free_void.restype = None _UniffiLib.ffi_confidence_shared_rust_future_complete_void.argtypes = ( ctypes.c_uint64, ctypes.POINTER(_UniffiRustCallStatus), ) _UniffiLib.ffi_confidence_shared_rust_future_complete_void.restype = None _UniffiLib.uniffi_confidence_shared_checksum_method_confidence_get_flag_string.argtypes = ( ) _UniffiLib.uniffi_confidence_shared_checksum_method_confidence_get_flag_string.restype = ctypes.c_uint16 _UniffiLib.uniffi_confidence_shared_checksum_constructor_confidence_new.argtypes = ( ) _UniffiLib.uniffi_confidence_shared_checksum_constructor_confidence_new.restype = ctypes.c_uint16 _UniffiLib.ffi_confidence_shared_uniffi_contract_version.argtypes = ( ) _UniffiLib.ffi_confidence_shared_uniffi_contract_version.restype = ctypes.c_uint32 _uniffi_check_contract_api_version(_UniffiLib) _uniffi_check_api_checksums(_UniffiLib) # Public interface members begin here. class _UniffiConverterString: @staticmethod def check_lower(value): if not isinstance(value, str): raise TypeError("argument must be str, not {}".format(type(value).__name__)) return value @staticmethod def read(buf): size = buf.read_i32() if size < 0: raise InternalError("Unexpected negative string length") utf8_bytes = buf.read(size) return utf8_bytes.decode("utf-8") @staticmethod def write(value, buf): utf8_bytes = value.encode("utf-8") buf.write_i32(len(utf8_bytes)) buf.write(utf8_bytes) @staticmethod def lift(buf): with buf.consume_with_stream() as stream: return stream.read(stream.remaining()).decode("utf-8") @staticmethod def lower(value): with _UniffiRustBuffer.alloc_with_builder() as builder: builder.write(value.encode("utf-8")) return builder.finalize() class ConfidenceProtocol(typing.Protocol): def get_flag_string(self, flag_key: "str",default_value: "str"): raise NotImplementedError class Confidence: _pointer: ctypes.c_void_p def __init__(self, ): self._pointer = _rust_call(_UniffiLib.uniffi_confidence_shared_fn_constructor_confidence_new,) def __del__(self): # In case of partial initialization of instances. pointer = getattr(self, "_pointer", None) if pointer is not None: _rust_call(_UniffiLib.uniffi_confidence_shared_fn_free_confidence, pointer) def _uniffi_clone_pointer(self): return _rust_call(_UniffiLib.uniffi_confidence_shared_fn_clone_confidence, self._pointer) # Used by alternative constructors or any methods which return this type. @classmethod def _make_instance_(cls, pointer): # Lightly yucky way to bypass the usual __init__ logic # and just create a new instance with the required pointer. inst = cls.__new__(cls) inst._pointer = pointer return inst async def get_flag_string(self, flag_key: "str",default_value: "str") -> "str": _UniffiConverterString.check_lower(flag_key) _UniffiConverterString.check_lower(default_value) return await _uniffi_rust_call_async( _UniffiLib.uniffi_confidence_shared_fn_method_confidence_get_flag_string( self._uniffi_clone_pointer(), _UniffiConverterString.lower(flag_key), _UniffiConverterString.lower(default_value) ), _UniffiLib.ffi_confidence_shared_rust_future_poll_rust_buffer, _UniffiLib.ffi_confidence_shared_rust_future_complete_rust_buffer, _UniffiLib.ffi_confidence_shared_rust_future_free_rust_buffer, # lift function _UniffiConverterString.lift, # Error FFI converter None, ) class _UniffiConverterTypeConfidence: @staticmethod def lift(value: int): return Confidence._make_instance_(value) @staticmethod def check_lower(value: Confidence): if not isinstance(value, Confidence): raise TypeError("Expected Confidence instance, {} found".format(type(value).__name__)) @staticmethod def lower(value: ConfidenceProtocol): if not isinstance(value, Confidence): raise TypeError("Expected Confidence instance, {} found".format(type(value).__name__)) return value._uniffi_clone_pointer() @classmethod def read(cls, buf: _UniffiRustBuffer): ptr = buf.read_u64() if ptr == 0: raise InternalError("Raw pointer value was null") return cls.lift(ptr) @classmethod def write(cls, value: ConfidenceProtocol, buf: _UniffiRustBuffer): buf.write_u64(cls.lower(value)) # Async support# RustFuturePoll values _UNIFFI_RUST_FUTURE_POLL_READY = 0 _UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 # Stores futures for _uniffi_continuation_callback _UniffiContinuationHandleMap = _UniffiHandleMap() UNIFFI_GLOBAL_EVENT_LOOP = None """ Set the event loop to use for async functions This is needed if some async functions run outside of the eventloop, for example: - A non-eventloop thread is spawned, maybe from `EventLoop.run_in_executor` or maybe from the Rust code spawning its own thread. - The Rust code calls an async callback method from a sync callback function, using something like `pollster` to block on the async call. In this case, we need an event loop to run the Python async function, but there's no eventloop set for the thread. Use `uniffi_set_event_loop` to force an eventloop to be used in this case. """ def uniffi_set_event_loop(eventloop: asyncio.BaseEventLoop): global UNIFFI_GLOBAL_EVENT_LOOP UNIFFI_GLOBAL_EVENT_LOOP = eventloop def _uniffi_get_event_loop(): if UNIFFI_GLOBAL_EVENT_LOOP is not None: return UNIFFI_GLOBAL_EVENT_LOOP else: return asyncio.get_running_loop() # Continuation callback for async functions # lift the return value or error and resolve the future, causing the async function to resume. @UNIFFI_RUST_FUTURE_CONTINUATION_CALLBACK def _uniffi_continuation_callback(future_ptr, poll_code): (eventloop, future) = _UniffiContinuationHandleMap.remove(future_ptr) eventloop.call_soon_threadsafe(_uniffi_set_future_result, future, poll_code) def _uniffi_set_future_result(future, poll_code): if not future.cancelled(): future.set_result(poll_code) async def _uniffi_rust_call_async(rust_future, ffi_poll, ffi_complete, ffi_free, lift_func, error_ffi_converter): try: eventloop = _uniffi_get_event_loop() # Loop and poll until we see a _UNIFFI_RUST_FUTURE_POLL_READY value while True: future = eventloop.create_future() ffi_poll( rust_future, _uniffi_continuation_callback, _UniffiContinuationHandleMap.insert((eventloop, future)), ) poll_code = await future if poll_code == _UNIFFI_RUST_FUTURE_POLL_READY: break return lift_func( _rust_call_with_error(error_ffi_converter, ffi_complete, rust_future) ) finally: ffi_free(rust_future) __all__ = [ "InternalError", "Confidence", "uniffi_set_event_loop", ]