quickstarts/whereami/whereami_payload.py (160 lines of code) (raw):

import sys import socket import os from datetime import datetime import emoji import logging from logging.config import dictConfig import requests from requests.adapters import HTTPAdapter import urllib3 from urllib3 import Retry # gRPC stuff import grpc from six import b import whereami_pb2 import whereami_pb2_grpc METADATA_URL = 'http://metadata.google.internal/computeMetadata/v1/' METADATA_HEADERS = {'Metadata-Flavor': 'Google'} GRPC_SECURE_PORTS = ['443', '8443'] # when using gRPC, this list is checked when determining to use a secure or insecure channel # set up logging dictConfig({ 'version': 1, 'formatters': {'default': { 'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s', }}, 'handlers': {'wsgi': { 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout', 'formatter': 'default' }}, 'root': { 'level': 'INFO', 'handlers': ['wsgi'] } }) # set up emoji list emoji_list = list(emoji.EMOJI_DATA.keys()) class WhereamiPayload(object): def __init__(self): self.payload = {} self.gce_metadata = {} # this will cache the results from calling GCE metadata # configure retries for GCE metadata GET # we're doing this because, on GKE, metadata endpoint can take a few seconds to be available # see https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity#limitations # everything else session = requests.Session() adapter = HTTPAdapter(max_retries=Retry(connect=3, read=3, other=3, total=3, backoff_factor=5)) #, status_forcelist=[429, 500, 502, 503, 504])) session.mount("http://", adapter) session.mount("https://", adapter) try: # grab info from GCE metadata r = session.get(METADATA_URL + '?recursive=true', headers=METADATA_HEADERS) if r.ok: logging.info("Successfully accessed GCE metadata endpoint.") self.gce_metadata = r.json() except: logging.warning("Unable to access GCE metadata endpoint.") def build_payload(self, request_headers): # header propagation for HTTP calls to downward services # for Istio / Anthos Service Mesh def getForwardHeaders(request_headers): headers = {} incoming_headers = ['x-request-id', 'x-b3-traceid', 'x-b3-spanid', 'x-b3-parentspanid', 'x-b3-sampled', 'x-b3-flags', 'x-ot-span-context', 'x-cloud-trace-context', 'traceparent', 'grpc-trace-bin' ] for ihdr in incoming_headers: val = request_headers.get(ihdr) if val is not None: headers[ihdr] = val return headers # call HTTP backend (expect JSON reesponse) def call_http_backend(backend_service): try: r = requests.get(backend_service, headers=getForwardHeaders(request_headers)) if r.ok: backend_result = r.json() else: backend_result = None except: logging.warning(sys.exc_info()[0]) backend_result = None return backend_result # call gRPC backend def call_grpc_backend(backend_service): try: # assumes port number is appended to backend_service name if backend_service.split(':')[1] in GRPC_SECURE_PORTS: logging.info("Using gRPC secure channel.") channel = grpc.secure_channel(backend_service, grpc.ssl_channel_credentials()) else: logging.info("Using gRPC insecure channel.") channel = grpc.insecure_channel(backend_service) stub = whereami_pb2_grpc.WhereamiStub(channel) backend_result = stub.GetPayload( whereami_pb2.Empty()) except: backend_result = None logging.warning("Unable to capture backend result.") return backend_result # grab info from cached GCE metadata if len(self.gce_metadata): logging.info("Found cached GCE metadata.") # get project / zone info self.payload['project_id'] = self.gce_metadata['project']['projectId'] self.payload['zone'] = self.gce_metadata['instance']['zone'].split('/')[-1] # if we're running in GKE, we can also get cluster name try: self.payload['cluster_name'] = self.gce_metadata['instance']['attributes']['cluster-name'] except: logging.warning("Unable to capture GKE cluster name.") # if we're running on Google, grab the instance ID and default Google service account try: self.payload['gce_instance_id'] = str(self.gce_metadata['instance']['id']) # casting to str as value can be alphanumeric on Cloud Run except: logging.warning("Unable to capture GCE instance ID.") try: self.payload['gce_service_account'] = self.gce_metadata['instance']['serviceAccounts']['default']['email'] except: logging.warning("Unable to capture GCE service account.") else: logging.warning("GCE metadata unavailable.") # get node name via downward API if os.getenv('NODE_NAME'): self.payload['node_name'] = os.getenv('NODE_NAME') else: logging.warning("Unable to capture node name.") # get host header try: self.payload['host_header'] = request_headers.get('host') except: logging.warning("Unable to capture host header.") # get pod name, emoji & datetime self.payload['pod_name'] = socket.gethostname() self.payload['pod_name_emoji'] = emoji_list[hash( socket.gethostname()) % len(emoji_list)] self.payload['timestamp'] = datetime.now().replace( microsecond=0).isoformat() # get namespace, pod ip, and pod service account via downward API if os.getenv('POD_NAMESPACE'): self.payload['pod_namespace'] = os.getenv('POD_NAMESPACE') else: logging.warning("Unable to capture pod namespace.") if os.getenv('POD_IP'): self.payload['pod_ip'] = os.getenv('POD_IP') else: logging.warning("Unable to capture pod IP address.") if os.getenv('POD_SERVICE_ACCOUNT'): self.payload['pod_service_account'] = os.getenv( 'POD_SERVICE_ACCOUNT') else: logging.warning("Unable to capture pod KSA.") # get the whereami METADATA envvar if os.getenv('METADATA'): self.payload['metadata'] = os.getenv('METADATA') else: logging.warning("Unable to capture metadata environment variable.") # should we call a backend service? if os.getenv('BACKEND_ENABLED') == 'True': backend_service = os.getenv('BACKEND_SERVICE') logging.info("Attempting to call %s", backend_service) if os.getenv('GRPC_ENABLED') == "True": backend_result = call_grpc_backend(backend_service) if backend_result: self.payload['backend_result'] = backend_result else: self.payload['backend_result'] = call_http_backend(backend_service) echo_headers = os.getenv('ECHO_HEADERS') if echo_headers == 'True': try: self.payload['headers'] = {k: v for k, v in request_headers.items()} except: logging.warning("Unable to capture inbound headers.") return self.payload