in cli/src/klio_cli/commands/job/verify.py [0:0]
def _verify_inputs(self):
unverified_bucket_count = 0
unverified_topic_count = 0
unverified_sub_count = 0
data_inputs = self.klio_config.job_config.data.inputs
if not data_inputs:
logging.warning("Your job has no data inputs, is that expected?")
logging.info("Verifying your data inputs...")
for _input in data_inputs:
input_gcs = _input.location
if input_gcs is not None:
verified_bucket = self._verify_gcs_bucket(input_gcs)
if not verified_bucket:
unverified_bucket_count += 1
else:
message = "There is no data_location for {}".format(_input)
logging.error(message)
event_inputs = self.klio_config.job_config.events.inputs
if not event_inputs:
logging.warning("Your job has no event inputs, is that expected?")
logging.info("Verifying your event inputs...")
pubsub_event_inputs = [
_i for _i in event_inputs if _i.name == "pubsub"
]
for _input in pubsub_event_inputs:
input_topic = _input.topic
input_subscription = _input.subscription
if input_subscription is not None:
(
unverified_topic,
unverified_sub,
) = self._verify_subscription_and_topic(
input_subscription, input_topic,
)
if not unverified_topic:
unverified_topic_count += 1
if not unverified_sub:
unverified_sub_count += 1
else:
logging.error(
"There is no subscription associated with {}.".format(
_input
)
)
if len(pubsub_event_inputs):
logging.info(
"You have {} unverified input buckets, {} unverified input "
"topics and {} unverified input subscriptions".format(
unverified_bucket_count,
unverified_topic_count,
unverified_sub_count,
)
)
else:
logging.info(
"You have {} unverified input buckets".format(
unverified_bucket_count,
)
)
if (
unverified_bucket_count
+ unverified_topic_count
+ unverified_sub_count
== 0
):
return True
return False