in luigi/contrib/kubernetes.py [0:0]
def run(self):
self._init_kubernetes()
# Render job
job_json = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": self.uu_name,
"labels": {
"spawned_by": "luigi",
"luigi_task_id": self.job_uuid
}
},
"spec": {
"backoffLimit": self.backoff_limit,
"template": {
"metadata": {
"name": self.uu_name,
"labels": {}
},
"spec": self.spec_schema
}
}
}
if self.kubernetes_namespace is not None:
job_json['metadata']['namespace'] = self.kubernetes_namespace
if self.active_deadline_seconds is not None:
job_json['spec']['activeDeadlineSeconds'] = \
self.active_deadline_seconds
# Update user labels
job_json['metadata']['labels'].update(self.labels)
job_json['spec']['template']['metadata']['labels'].update(self.labels)
# Add default restartPolicy if not specified
if "restartPolicy" not in self.spec_schema:
job_json["spec"]["template"]["spec"]["restartPolicy"] = "Never"
# Submit job
self.__logger.info("Submitting Kubernetes Job: " + self.uu_name)
job = Job(self.__kube_api, job_json)
job.create()
# Track the Job (wait while active)
self.__logger.info("Start tracking Kubernetes Job: " + self.uu_name)
self.__track_job()