api/backend/python/xl-client/dial_xl/sheet.py (171 lines of code) (raw):

from typing import Iterable import aiohttp from dial_xl.calculate import FieldData from dial_xl.compile import FieldType, ParsingError from dial_xl.credentials import CredentialProvider from dial_xl.dynamic_field import DynamicField from dial_xl.events import Event, ObservableObserver, notify_observer from dial_xl.reader import _Reader from dial_xl.table import Table from dial_xl.utils import _auth_header class Sheet(ObservableObserver): __name: str __tables: list[Table | str] __after: str = "" __table_indices: dict[str, int] __parsing_errors: list[ParsingError] def __init__(self, name: str, parsing_errors: list[ParsingError]): self.__name = name self.__parsing_errors = parsing_errors self.__tables = [] self.__table_indices = {} @property def name(self) -> str: return self.__name @name.setter @notify_observer def name(self, value: str): """Set the name of the sheet and invalidates compilation/computation results and sheet parsing errors""" self.__name = value def get_table(self, name: str) -> Table: index = self._find_table(name) if index == -1: raise ValueError(f"Table '{name}' not found") return self.__tables[index] @notify_observer def add_table(self, table: Table): """Add a table to the sheet and invalidates compilation/computation results and sheet parsing errors""" table._attach(self) self.__table_indices[table.name] = len(self.__tables) self.__tables.append(table) @notify_observer def remove_table(self, name: str) -> Table: """Remove a table from the sheet and invalidates compilation/computation results and sheet parsing errors""" index = self._find_table(name) if index == -1: raise ValueError(f"Table '{name}' not found") table = self.__tables.pop(index) table._detach() self._update_table_indices() return table def _find_table(self, name: str) -> int: return self.__table_indices.get(name, -1) @property def table_names(self) -> Iterable[str]: """Enumerates table names""" return (table.name for table in self.tables) @property def tables(self) -> Iterable[Table]: """Enumerates tables""" return (table for table in self.__tables if isinstance(table, Table)) def _notify_before(self, event: Event): if self._observer: self._observer._notify_before(event) sender = event.sender if isinstance(sender, Table) and event.method_name == "name": self._on_table_rename(sender.name, event.kwargs["value"]) self._set_parsing_errors([]) def _on_table_rename(self, old_name: str, new_name: str): index = self._find_table(old_name) if index == -1: raise ValueError(f"Table '{old_name}' not found") if new_name in self.__table_indices: raise ValueError(f"Table '{new_name}' already exists") self.__table_indices[new_name] = self.__table_indices.pop(old_name) @property def parsing_errors(self): return self.__parsing_errors def to_dsl(self) -> str: """Converts the sheet to DSL format.""" return ( f"{''.join(t if isinstance(t, str) else t.to_dsl() for t in self.__tables)}" f"{self.__after}" ) @classmethod def _deserialize( cls, reader: _Reader, name: str, parsing_errors: list[ParsingError] ) -> "Sheet": result = cls(name, parsing_errors) for table_entity in reader.entity.get("tables", []): table_reader = reader.with_entity(table_entity) unparsed = table_reader.next_unparsed(lambda d: d["span"]["from"]) if unparsed: result.__tables.append(unparsed + table_reader.till_linebreak()) table = Table._deserialize(table_reader) result.__tables.append(table) table._attach(result) reader.position = table_reader.position result.__after = reader.before_next() result._update_table_indices() return result def _set_parsing_errors(self, errors: list[ParsingError]): self.__parsing_errors = errors def _update_table_indices(self): self.__table_indices = { table.name: index for index, table in enumerate(self.__tables) if isinstance(table, Table) } def _update_field_types(self, field_types: dict[str, dict[str, FieldType]]): for table_name in self.table_names: table = self.get_table(table_name) for field_name in table.field_names: field = table.get_field(field_name) field._set_field_type( field_types.get(table_name, {}).get(field_name) ) def _update_field_data( self, field_data: dict[str, dict[str, FieldData]], field_types: dict[str, dict[str, FieldType]], ): for table_name in self.table_names: table = self.get_table(table_name) table_data = field_data.get(table_name, {}) dynamic_fields: list[DynamicField] = [] for field_name in set(table_data.keys()).union(table.field_names): if table._find_field(field_name) == -1: field = DynamicField( field_name, field_types.get(table_name, {}).get(field_name), field_data.get(table_name, {}).get(field_name), ) dynamic_fields.append(field) else: field = table.get_field(field_name) field._set_field_data( field_data.get(table_name, {}).get(field_name) ) table._set_dynamic_fields(dynamic_fields) def _collect_values( json_data, key: str, collected: set[int] | None = None ) -> set[int]: if collected is None: collected = set() if isinstance(json_data, dict): if key in json_data: collected.add(json_data[key]) for value in json_data.values(): _collect_values(value, key, collected) elif isinstance(json_data, list): for item in json_data: _collect_values(item, key, collected) return collected async def _parse_sheet( rest_base_url: str, sheet_name: str, dsl: str, credentials: CredentialProvider, ) -> Sheet: async with aiohttp.ClientSession() as session: async with session.post( f"{rest_base_url}/v1/parse-sheet", headers=await _auth_header(credentials), data=dsl, ) as response: if response.status != 200: raise ValueError(f"Failed to parse sheet: {response.status}") sheet = await response.json() froms = list(_collect_values(sheet, "from")) froms.append(len(dsl)) froms.sort() reader = _Reader(dsl, froms, sheet, 0) return Sheet._deserialize( reader, sheet_name, [ParsingError._deserialize(error) for error in sheet["errors"]], )