in core/src/klio_core/dataflow.py [0:0]
def find_job_by_name(self, job_name, gcp_project, region=None):
"""Search Dataflow for a job given its name and GCP project.
Args:
job_name (str): Name of Dataflow job.
gcp_project (str): GCP project in which to search.
region (str): Region in which to search. Defaults to
searching all regions in
:attr:`klio_core.variables.DATAFLOW_REGIONS`.
Returns:
dict or None: If found, ``dict`` of job summary results. Otherwise,
``None``.
"""
if not region:
regions = variables.DATAFLOW_REGIONS
else:
regions = (region,)
base_request = self.client.projects().locations().jobs()
all_matching_jobs = []
# TODO: no batch requesting from Google's side, but should add
# threading to send multiple requests concurrently. @lynn
for region in regions:
# Note: the parameter `view="JOB_VIEW_ALL"` does not return
# the same information in this `.list()` call as it
# does in the `.get()` call in `get_job_detail` below.
request = base_request.list(
projectId=gcp_project, location=region, filter="ACTIVE"
)
try:
response = request.execute()
# general catch all since the handling would be the same no matter
# of the exception
except Exception as e:
self.logger.warning(
"Error listing active jobs in project '%s' in region '%s':"
" %s" % (gcp_project, region, e)
)
continue
job_results = response.get("jobs", [])
if job_results:
for result in job_results:
if result["name"] == job_name:
all_matching_jobs.append(result)
# Note: job names are unique within regions, but not across
# regions :grimace:
if len(all_matching_jobs) > 1:
self.logger.info(
"More than one parent job found for job name '%s' under "
"project '%s'. Selecting one at random."
)
return random.choice(all_matching_jobs)
if all_matching_jobs:
return all_matching_jobs[0]