def run()

in luigi/contrib/dataproc.py [0:0]


    def run(self):
        base_uri = "https://www.googleapis.com/compute/v1/projects/{}".format(self.gcloud_project_id)
        software_config = {"imageVersion": self.image_version} if self.image_version else {}

        cluster_conf = {
            "clusterName": self.dataproc_cluster_name,
            "projectId": self.gcloud_project_id,
            "config": {
                "configBucket": "",
                "gceClusterConfig": {
                    "networkUri": base_uri + "/global/networks/" + self.gcloud_network,
                    "zoneUri": base_uri + "/zones/" + self.gcloud_zone,
                    "serviceAccountScopes": [
                        "https://www.googleapis.com/auth/cloud-platform"
                    ]
                },
                "masterConfig": {
                    "numInstances": 1,
                    "machineTypeUri": base_uri + "/zones/" + self.gcloud_zone + "/machineTypes/" + self.master_node_type,
                    "diskConfig": {
                        "bootDiskSizeGb": self.master_disk_size,
                        "numLocalSsds": 0
                    }
                },
                "workerConfig": {
                    "numInstances": self.worker_normal_count,
                    "machineTypeUri": base_uri + "/zones/" + self.gcloud_zone + "/machineTypes/" + self.worker_node_type,
                    "diskConfig": {
                        "bootDiskSizeGb": self.worker_disk_size,
                        "numLocalSsds": 0
                    }
                },
                "secondaryWorkerConfig": {
                    "numInstances": self.worker_preemptible_count,
                    "isPreemptible": True
                },
                "softwareConfig": software_config
            }
        }

        self.dataproc_client.projects().regions().clusters()\
            .create(projectId=self.gcloud_project_id, region=self.dataproc_region, body=cluster_conf).execute()

        while True:
            time.sleep(10)
            cluster_status = self._get_cluster_status()
            status = cluster_status['status']['state']
            logger.info("Creating new dataproc cluster: {} status: {}".format(self.dataproc_cluster_name, status))
            if status == 'RUNNING':
                break
            if status == 'ERROR':
                raise Exception(cluster_status['status']['details'])