in exec/src/klio_exec/commands/run.py [0:0]
def _set_google_cloud_options(self, options):
gcp_opts = options.view_as(pipeline_options.GoogleCloudOptions)
gcp_opts.job_name = self.job_name
if self.runtime_conf.update is not None:
gcp_opts.update = self.runtime_conf.update
labels = gcp_opts.labels or []
klio_exec_value = KlioPipeline._get_clean_label_value(
klio_exec_version
)
klio_core_value = KlioPipeline._get_clean_label_value(
klio_core_version
)
klio_value = KlioPipeline._get_clean_label_value(klio_lib_version)
klio_labels = [
"klio-exec={}".format(klio_exec_value),
"klio-core={}".format(klio_core_value),
"klio={}".format(klio_value),
]
labels.extend(klio_labels)
# Dataflow may not be able to handle duplicate keys; we should probably
# do that here in some fashion (@lynn)
for label, os_key in DATAFLOW_LABEL_KEY_TO_OS_ENVIRON.items():
os_value = os.environ.get(os_key, "")
os_value = KlioPipeline._get_clean_label_value(os_value)
if os_value:
labels.append("{}={}".format(label, os_value))
deploy_user = os.environ.get("USER")
if os.environ.get("CI", "").lower() == "true":
# TODO: maybe provide way to allow something besides just "CI"
deploy_user = "ci"
if deploy_user:
deploy_label = "deployed_by={}".format(
KlioPipeline._get_clean_label_value(deploy_user)
)
labels.append(deploy_label)
gcp_opts.labels = labels