api/backend/python/xl-client/dial_xl/table.py (241 lines of code) (raw):
from typing import Iterable
from dial_xl.decorator import Decorator
from dial_xl.doc_string import _DocLine, _DocString
from dial_xl.dynamic_field import DynamicField
from dial_xl.events import Event, ObservableObserver, notify_observer
from dial_xl.field import Field
from dial_xl.overrides import Overrides
from dial_xl.reader import _Reader
from dial_xl.utils import _escape_table_name, _unescape_table_name
class Table(ObservableObserver):
__before: str = ""
__doc_string: _DocString
__decorators: list[Decorator]
__prefix: str = "table "
__name: str
__after_name: str = "\n"
__fields: list[Field | str]
__after_fields: str = ""
__overrides: Overrides | None = None
__after: str = ""
__dynamic_fields: dict[str, DynamicField] = []
__field_indices: dict[str, int]
__decorator_indices: dict[str, int]
def __init__(self, name: str):
self.__name = _escape_table_name(name)
self.__doc_string = _DocString([], _DocLine)
self.__decorators = []
self.__fields = []
self.__field_indices = {}
self.__decorator_indices = {}
self.__dynamic_fields = {}
@property
def name(self) -> str:
return _unescape_table_name(self.__name)
@name.setter
@notify_observer
def name(self, value: str):
"""Set the name of the table and invalidates compilation/computation results and sheet parsing errors"""
self.__name = _escape_table_name(value)
@property
def overrides(self) -> Overrides | None:
return self.__overrides
@overrides.setter
@notify_observer
def overrides(self, value: Overrides | None):
"""Set the override of the table and invalidates compilation/computation results and sheet parsing errors"""
if self.__overrides is not None:
self.__overrides._detach()
self.__overrides = value
if self.__overrides is not None:
self.__overrides._attach(self)
def to_dsl(self) -> str:
"""Converts the table to DSL format."""
return (
f"{self.__before}"
f"{self.__doc_string.to_dsl()}"
f"{''.join(decorator.to_dsl() for decorator in self.__decorators)}"
f"{self.__prefix}"
f"{self.__name}"
f"{self.__after_name}"
f"{''.join(f if isinstance(f, str) else f.to_dsl() for f in self.__fields)}"
f"{self.__after_fields}"
f"{self.__overrides.to_dsl() if self.__overrides else ''}"
f"{self.__after}"
)
@property
def doc_string(self) -> str:
return self.__doc_string.text
@doc_string.setter
def doc_string(self, value: str):
self.__doc_string.text = value
def get_field(self, name: str) -> Field:
index = self._find_field(name)
if index == -1:
raise ValueError(f"Field '{name}' not found")
return self.__fields[index]
@notify_observer
def add_field(self, field: Field):
"""Add a field to the table and invalidates compilation/computation results and sheet parsing errors"""
if field.name in self.__field_indices:
raise ValueError(f"Field '{field.name}' already exists")
field._attach(self)
self.__fields.append(field)
self.__field_indices[field.name] = len(self.__fields) - 1
@notify_observer
def remove_field(self, name: str) -> Field:
"""Remove a field from the table and invalidates compilation/computation results and sheet parsing errors"""
index = self._find_field(name)
if index == -1:
raise ValueError(f"Field '{name}' not found")
field = self.__fields.pop(index)
field._detach()
self._update_field_indices()
return field
def _find_field(self, name: str) -> int:
return self.__field_indices.get(name, -1)
@notify_observer
def swap_fields(self, name1: str, name2: str):
index1 = self._find_field(name1)
index2 = self._find_field(name2)
if index1 == -1:
raise ValueError(f"Field '{name1}' not found")
if index2 == -1:
raise ValueError(f"Field '{name2}' not found")
self.__fields[index1], self.__fields[index2] = (
self.__fields[index2],
self.__fields[index1],
)
self.__field_indices[name1], self.__field_indices[name2] = (
self.__field_indices[name2],
self.__field_indices[name1],
)
@property
def field_names(self) -> Iterable[str]:
"""Enumerates field names"""
return (field.name for field in self.fields)
@property
def dynamic_field_names(self) -> Iterable[str]:
"""Enumerates dynamic field names"""
return self.__dynamic_fields.keys()
@property
def fields(self) -> Iterable[Field]:
"""Enumerates fields"""
return (field for field in self.__fields if isinstance(field, Field))
@property
def dynamic_fields(self) -> Iterable[DynamicField]:
"""Enumerates dynamic fields"""
return self.__dynamic_fields.values()
def get_dynamic_field(self, name: str) -> DynamicField:
if name not in self.__dynamic_fields:
raise ValueError(f"Dynamic field '{name}' not found")
return self.__dynamic_fields[name]
def get_decorator(self, name: str) -> Decorator:
index = self._find_decorator(name)
if index == -1:
raise ValueError(f"Decorator '{name}' not found")
return self.__decorators[index]
@notify_observer
def add_decorator(self, decorator: Decorator):
"""Add a decorator to the table and invalidates compilation/computation results and sheet parsing errors"""
if decorator.name in self.__decorator_indices:
raise ValueError(f"Decorator '{decorator.name}' already exists")
decorator._attach(self)
self.__decorators.append(decorator)
self.__decorator_indices[decorator.name] = len(self.__decorators) - 1
@notify_observer
def remove_decorator(self, name: str) -> Decorator:
"""Remove a decorator from the table and invalidates compilation/computation results and sheet parsing errors"""
index = self._find_decorator(name)
if index == -1:
raise ValueError(f"Decorator '{name}' not found")
decorator = self.__decorators.pop(index)
decorator._detach()
self._update_decorator_indices()
return decorator
def _find_decorator(self, name: str) -> int:
return self.__decorator_indices.get(name, -1)
def _notify_before(self, event: Event):
if self._observer:
self._observer._notify_before(event)
sender = event.sender
if (
isinstance(sender, Decorator)
and event.method_name == "name"
and sender._observer == self
):
self._on_decorator_rename(sender.name, event.kwargs["value"])
elif isinstance(sender, Field) and event.method_name == "name":
self._on_field_rename(sender.name, event.kwargs["value"])
def _on_field_rename(self, old_name: str, new_name: str):
index = self._find_field(old_name)
if index == -1:
raise ValueError(f"Field '{old_name}' not found")
if new_name in self.__field_indices:
raise ValueError(f"Field '{new_name}' already exists")
self.__field_indices[new_name] = self.__field_indices.pop(old_name)
def _on_decorator_rename(self, old_name: str, new_name: str):
index = self._find_decorator(old_name)
if index == -1:
raise ValueError(f"Decorator '{old_name}' not found")
if new_name in self.__decorator_indices:
raise ValueError(f"Decorator '{new_name}' already exists")
self.__decorator_indices[new_name] = self.__decorator_indices.pop(
old_name
)
def _update_decorator_indices(self):
self.__decorator_indices = {
decorator.name: index
for index, decorator in enumerate(self.__decorators)
}
def _update_field_indices(self):
self.__field_indices = {
field.name: index
for index, field in enumerate(self.__fields)
if isinstance(field, Field)
}
def _set_dynamic_fields(self, dynamic_fields: list[DynamicField]):
self.__dynamic_fields = {field.name: field for field in dynamic_fields}
@property
def decorator_names(self) -> Iterable[str]:
"""Enumerates decorator names"""
return (decorator.name for decorator in self.__decorators)
@property
def decorators(self) -> Iterable[Decorator]:
"""Enumerates decorators"""
return (decorator for decorator in self.__decorators)
@classmethod
def _deserialize(cls, reader: _Reader) -> "Table":
result = cls("")
result.__before = reader.next(lambda d: d["span"]["from"])
if reader.entity.get("docs"):
docs: list[_DocLine] = []
for index, doc_entity in enumerate(reader.entity.get("docs", [])):
doc_reader = reader.with_entity(doc_entity)
doc = _DocLine._deserialize(doc_reader)
docs.append(doc)
reader.position = doc_reader.position
result.__doc_string = _DocString(docs, _DocLine)
for index, decorator_entity in enumerate(
reader.entity.get("decorators", [])
):
decorator_reader = reader.with_entity(decorator_entity)
decorator = Decorator._deserialize(decorator_reader)
result.__decorators.append(decorator)
decorator._attach(result)
reader.position = decorator_reader.position
result.__prefix = reader.next(lambda d: d["name"]["span"]["from"])
result.__name = reader.next(lambda d: d["name"]["span"]["to"])
result.__after_name = reader.till_linebreak()
for index, field_entity in enumerate(reader.entity.get("fields", [])):
field_reader = reader.with_entity(field_entity)
unparsed = field_reader.next_unparsed(lambda d: d["span"]["from"])
if unparsed:
result.__fields.append(unparsed + field_reader.till_linebreak())
field = Field._deserialize(field_reader)
result.__fields.append(field)
field._attach(result)
reader.position = field_reader.position
if reader.entity.get("overrides"):
overrides_reader = reader.with_entity(reader.entity["overrides"])
unparsed = overrides_reader.next_unparsed(
lambda d: d["span"]["from"]
)
if unparsed:
result.__after_fields = (
unparsed + overrides_reader.till_linebreak()
)
result.__overrides = Overrides._deserialize(overrides_reader)
result.__overrides._attach(result)
reader.position = overrides_reader.position
result.__after = reader.next(lambda d: d["span"]["to"])
result._update_decorator_indices()
result._update_field_indices()
return result