def run()

in luigi/contrib/kubernetes.py [0:0]


    def run(self):
        self._init_kubernetes()
        # Render job
        job_json = {
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {
                "name": self.uu_name,
                "labels": {
                    "spawned_by": "luigi",
                    "luigi_task_id": self.job_uuid
                }
            },
            "spec": {
                "backoffLimit": self.backoff_limit,
                "template": {
                    "metadata": {
                        "name": self.uu_name,
                        "labels": {}
                    },
                    "spec": self.spec_schema
                }
            }
        }
        if self.kubernetes_namespace is not None:
            job_json['metadata']['namespace'] = self.kubernetes_namespace
        if self.active_deadline_seconds is not None:
            job_json['spec']['activeDeadlineSeconds'] = \
                self.active_deadline_seconds
        # Update user labels
        job_json['metadata']['labels'].update(self.labels)
        job_json['spec']['template']['metadata']['labels'].update(self.labels)

        # Add default restartPolicy if not specified
        if "restartPolicy" not in self.spec_schema:
            job_json["spec"]["template"]["spec"]["restartPolicy"] = "Never"
        # Submit job
        self.__logger.info("Submitting Kubernetes Job: " + self.uu_name)
        job = Job(self.__kube_api, job_json)
        job.create()
        # Track the Job (wait while active)
        self.__logger.info("Start tracking Kubernetes Job: " + self.uu_name)
        self.__track_job()