in cli/src/klio_cli/commands/job/profile.py [0:0]
def _get_gcs_blobs(self):
location = self.gcs_location.split("gs://")[1:][0]
bucket_name, *folders = location.split("/")
prefix = "/".join(folders)
client = storage.Client()
try:
bucket = client.get_bucket(bucket_name)
except Exception:
logging.error(
"Error getting bucket for profiling data", exc_info=True
)
raise SystemExit(1)
try:
# wrap all in try since `list_blobs` is a generator
blobs = bucket.list_blobs(prefix=prefix)
for blob in blobs:
if blob.time_created < self.since:
continue
if blob.time_created > self.until:
continue
yield blob
except Exception:
logging.error(
"Error getting profiling data from {}".format(bucket_name),
exc_info=True,
)
raise SystemExit(1)