in luigi/contrib/dataproc.py [0:0]
def run(self):
base_uri = "https://www.googleapis.com/compute/v1/projects/{}".format(self.gcloud_project_id)
software_config = {"imageVersion": self.image_version} if self.image_version else {}
cluster_conf = {
"clusterName": self.dataproc_cluster_name,
"projectId": self.gcloud_project_id,
"config": {
"configBucket": "",
"gceClusterConfig": {
"networkUri": base_uri + "/global/networks/" + self.gcloud_network,
"zoneUri": base_uri + "/zones/" + self.gcloud_zone,
"serviceAccountScopes": [
"https://www.googleapis.com/auth/cloud-platform"
]
},
"masterConfig": {
"numInstances": 1,
"machineTypeUri": base_uri + "/zones/" + self.gcloud_zone + "/machineTypes/" + self.master_node_type,
"diskConfig": {
"bootDiskSizeGb": self.master_disk_size,
"numLocalSsds": 0
}
},
"workerConfig": {
"numInstances": self.worker_normal_count,
"machineTypeUri": base_uri + "/zones/" + self.gcloud_zone + "/machineTypes/" + self.worker_node_type,
"diskConfig": {
"bootDiskSizeGb": self.worker_disk_size,
"numLocalSsds": 0
}
},
"secondaryWorkerConfig": {
"numInstances": self.worker_preemptible_count,
"isPreemptible": True
},
"softwareConfig": software_config
}
}
self.dataproc_client.projects().regions().clusters()\
.create(projectId=self.gcloud_project_id, region=self.dataproc_region, body=cluster_conf).execute()
while True:
time.sleep(10)
cluster_status = self._get_cluster_status()
status = cluster_status['status']['state']
logger.info("Creating new dataproc cluster: {} status: {}".format(self.dataproc_cluster_name, status))
if status == 'RUNNING':
break
if status == 'ERROR':
raise Exception(cluster_status['status']['details'])