in core/src/klio_core/config/core.py [0:0]
def __config_post_init__(self, config_dict):
if self.label:
self.labels.append(self.label)
self.max_num_workers = max(2, self.num_workers)
self.USER_ATTRIBS = []
valid_disk_types = ["local-ssd", "pd-ssd", "pd-standard"]
def format_disk_type(simple_type):
return WORKER_DISK_TYPE_URL.format(
project=self.project,
region=self.region,
disk_type=simple_type,
)
# worker_disk_type may or may not already be formatted as a URL
if self.worker_disk_type is not None:
if self.worker_disk_type in valid_disk_types:
self.worker_disk_type = format_disk_type(self.worker_disk_type)
elif self.worker_disk_type != format_disk_type(
self.worker_disk_type.split("/")[-1]
):
raise ValueError(
"Invalid pipeline_options.worker_disk_type: '{}'".format(
self.worker_disk_type
)
)
declared_config = self._as_dict()
for key, value in config_dict.items():
if key not in declared_config:
# set user attributes to job_config - but only the first
# level, for example, from this
# pipeline_options:
# foo:
# key1: value1
# list1:
# - one
# - two
# gets parsed to:
# pipeline_options.foo ->
# {"key1": "value1", "alist": ["one", "two"]}
setattr(self, key, value)
# key track of user-set attributes so that when as_dict()
# is called, we re-add it to what _as_dict() returns
self.USER_ATTRIBS.append({key: value})