cost-optimization/gke-vpa-recommendations/metrics-exporter/utils.py (72 lines of code) (raw):
import config
import json
import logging
import os
import sys
import urllib.request
from google.cloud import monitoring_v3
from stackdriver_log_formatter import StackdriverLogFormatter
logger = logging.getLogger(__name__)
def get_interval(now, window):
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval(
{
"end_time": {"seconds": seconds, "nanos": nanos},
"start_time": {"seconds": (seconds - window), "nanos": nanos},
}
)
return interval
def get_aggregation(query):
aggregation = monitoring_v3.Aggregation(
{
"alignment_period": {"seconds": query.seconds_between_points},
"per_series_aligner": query.per_series_aligner,
"cross_series_reducer": query.cross_series_reducer,
"group_by_fields": query.columns,
}
)
return aggregation
def get_request_name():
project_name = f"projects/{config.PROJECT_ID}"
return project_name
def get_gcp_project_id():
project_id = os.environ.get("PROJECT_ID", None)
if not project_id:
project_id = get_project_id()
if not project_id: # Running locally
with open(os.environ["GOOGLE_APPLICATION_CREDENTIALS"], "r") as fp:
credentials = json.load(fp)
project_id = credentials["project_id"]
if not project_id:
logger.error(
"Unable to detect GCP project id, please set the 'PROJECT_ID' environment variable.")
raise ValueError("Could not get a value for PROJECT_ID")
return project_id
def get_project_id():
project_id = None
url = "http://metadata.google.internal/computeMetadata/v1/project/project-id"
req = urllib.request.Request(url)
req.add_header("Metadata-Flavor", "Google")
try:
project_id = urllib.request.urlopen(req, timeout=1).read().decode()
except Exception as e:
logger.debug(
f"Metadata service failed, not deployed in GCP. Returning None, {e}")
return project_id
def is_deployed():
id_from_metadata = get_project_id()
if id_from_metadata is not None:
r_val = True
else:
r_val = False
return r_val
def setup_logging(): # pragma: no cover
logging_level = os.environ.get('LOGGING_LEVEL', "info").upper()
handler = logging.StreamHandler(sys.stdout)
if is_deployed():
handler.setFormatter(StackdriverLogFormatter())
logging.basicConfig(
level=logging_level,
format="%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s",
handlers=[handler])