cli/src/klio_cli/commands/message/publish.py (102 lines of code) (raw):
# Copyright 2019-2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import functools
import logging
import emoji
from google.api_core import exceptions as gapi_exceptions
from google.cloud import pubsub
from klio_core.proto.v1beta1 import klio_pb2
def _create_publisher(topic):
client = pubsub.PublisherClient()
try:
client.get_topic(request={"topic": topic})
except gapi_exceptions.NotFound:
msg = (
":persevere: Topic '{}' not found. Is there a running job "
"subscribed to this topic? or is there a typo in the configured "
"topic?".format(topic)
)
logging.error(emoji.emojize(msg, language="alias"))
raise SystemExit(1)
except Exception:
raise
return functools.partial(client.publish, topic=topic)
def _get_current_klio_job(config):
klio_job = klio_pb2.KlioJob()
klio_job.job_name = config.job_name
klio_job.gcp_project = config.pipeline_options.project
return klio_job
# [batch dev] TODO: rename entity_id variable in this module
def _create_pubsub_message(entity_id, job, force, ping, top_down, msg_version):
kmsg = klio_pb2.KlioMessage()
kmsg.version = msg_version
if msg_version == 1:
kmsg.data.entity_id = entity_id
if not top_down:
kmsg.metadata.downstream.extend([job])
elif msg_version == 2:
kmsg.data.element = bytes(entity_id, "utf-8")
if not top_down:
kmsg.metadata.intended_recipients.limited.recipients.extend([job])
else:
kmsg.metadata.intended_recipients.anyone.SetInParent()
kmsg.metadata.ping = ping
kmsg.metadata.force = force
return kmsg.SerializeToString()
def _publish_messages(
config, entity_ids, ping, force, top_down, allow_non_klio, msg_version
):
current_job = _get_current_klio_job(config)
publish = _create_publisher(config.job_config.events.inputs[0].topic)
success_ids = []
fail_ids = []
for entity_id in entity_ids:
if not allow_non_klio:
message = _create_pubsub_message(
entity_id, current_job, ping, force, top_down, msg_version
)
else:
# TODO: should rename argument to something more abstract (@lynn)
message = bytes(entity_id.encode("utf-8"))
try:
future = publish(data=message)
# block until message is published or exception is raised
future.result()
success_ids.append(entity_id)
except Exception as e:
msg = "Failed to publish message for entity '%s': %s" % (
entity_id,
e,
)
logging.warning(msg)
fail_ids.append(entity_id)
return success_ids, fail_ids
def publish_messages(
config,
entity_ids,
force=False,
ping=False,
top_down=False,
allow_non_klio=False,
msg_version=None,
):
# [batch dev] TODO: if we use KlioConfig, we don't have to find the
# version
# [batch dev] maintaining backwards compatibility if this code path
# is executed by some other way other than via cli.py::publish.
if msg_version is None:
msg_version = config.version
if not config.job_config.events.inputs:
msg = "No input topics configured for {} :-1:".format(config.job_name)
logging.error(emoji.emojize(msg, language="alias"))
raise SystemExit(1)
logging.info(
"Publishing {} messages to {}'s input topic {}".format(
len(entity_ids),
config.job_name,
# [batch dev] should we support multiple inputs in the future?
config.job_config.events.inputs[0].topic,
)
)
success, fail = _publish_messages(
config, entity_ids, force, ping, top_down, allow_non_klio, msg_version
)
if success:
msg = ":boom: Successfully published {} messages.".format(len(success))
logging.info(emoji.emojize(msg, language="alias"))
if fail:
msg = (
":persevere: Failed to publish the following {} entity "
"IDs: {}".format(len(fail), ", ".join(fail))
)
logging.warning(emoji.emojize(msg, language="alias"))