cost-optimization/gke-vpa-recommendations/metrics-exporter/main.py (155 lines of code) (raw):
# Copyright 2023 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import config
import asyncio
import logging
import utils
import warnings
from google.protobuf import descriptor_pb2
import bigquery_schema_pb2
from google.cloud import monitoring_v3
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.api_core.exceptions import GoogleAPICallError
warnings.filterwarnings(
"ignore",
"Your application has authenticated using end user credentials")
async def get_gke_metrics(metric_name, query, namespace, start_time, client):
"""
Retrieves Google Kubernetes Engine (GKE) metrics.
Parameters:
metric_name (str): The name of the metric to retrieve.
query: Query configuration for the metric.
Returns:
list: List of metrics.
"""
interval = utils.get_interval(start_time, query.window)
aggregation = utils.get_aggregation(query)
project_name = utils.get_request_name()
rows = []
try:
results = client.list_time_series(
request={
"name": project_name,
"filter": f'metric.type = "{query.metric}" AND resource.label.namespace_name = {namespace}',
"interval": interval,
"view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
"aggregation": aggregation})
logging.info(f"Building Row of metric results")
for result in results:
row = bigquery_schema_pb2.Record()
label = result.resource.labels
metadata = result.metadata.system_labels.fields
metric_label = result.metric.labels
if "hpa" in metric_name:
controller_name = metric_label['targetref_name']
controller_type = metric_label['targetref_kind']
container_name = metric_label['container_name']
elif "vpa" in metric_name:
controller_name = label['controller_name']
controller_type = label['controller_kind']
container_name = metric_label['container_name']
else:
controller_name = metadata['top_level_controller_name'].string_value
controller_type = metadata['top_level_controller_type'].string_value
container_name = label['container_name']
row.run_date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
row.metric_name = metric_name
row.project_id = label['project_id']
row.location = label['location']
row.cluster_name = label['cluster_name']
row.namespace_name = label['namespace_name']
row.controller_name = controller_name
row.controller_type = controller_type
row.container_name = container_name
points = []
for point in result.points:
new_point = row.points_array.add()
new_point.metric_timestamp = point.interval.start_time.strftime('%Y-%m-%d %H:%M:%S.%f')
new_point.metric_value = point.value.double_value or float(point.value.int64_value)
rows.append(row.SerializeToString())
except GoogleAPICallError as error:
logging.info(f'Google API call error: {error}')
except Exception as error:
logging.info(f'Unexpected error: {error}')
return rows
async def write_to_bigquery(write_client, rows):
parent = write_client.table_path(config.PROJECT_ID, config.BIGQUERY_DATASET, config.BIGQUERY_TABLE)
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The initial request must contain the stream name.
request_template.write_stream = stream_name
# So that BigQuery knows how to parse the serialized_rows, generate a
# protocol buffer representation of your message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
bigquery_schema_pb2.Record.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Create a batch of row data by appending proto2 serialized bytes to the
# serialized_rows repeated field.
proto_rows = types.ProtoRows()
for row in rows:
proto_rows.serialized_rows.append(row)
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
# Shutdown background threads and close the streaming connection.
append_rows_stream.close()
# A PENDING type stream must be "finalized" before being committed. No new
# records can be written to the stream after this method has been called.
write_client.finalize_write_stream(name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
logging.info(f"Writes to stream: '{write_stream.name}' have been committed.")
async def run_pipeline(namespace, client, bqclient, start_time):
for metric, query in config.MQL_QUERY.items():
logging.info(f'Retrieving {metric} for namespace {namespace}...')
rows_to_insert = await get_gke_metrics(metric, query, namespace, start_time, client)
if rows_to_insert:
await write_to_bigquery(bqclient, rows_to_insert)
else:
logging.info(f'{metric} unavailable. Skip')
logging.info("Run Completed")
def get_namespaces(client, start_time):
namespaces = set()
query = config.NS_QUERY
interval = utils.get_interval(start_time, query.window)
aggregation = utils.get_aggregation(query)
project_name = utils.get_request_name()
try:
results = client.list_time_series(
request={
"name": project_name,
"filter": f'metric.type = "{query.metric}" AND {config.namespace_filter}',
"interval": interval,
"view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.HEADERS,
"aggregation": aggregation})
logging.info("Building Row of Namespace results")
for result in results:
label = result.resource.labels
namespaces.add(label['namespace_name'])
return list(namespaces)
except GoogleAPICallError as error:
logging.error(f'Google API call error: {error}')
except Exception as error:
logging.error(f'Unexpected error: {error}')
return list(namespaces)
if __name__ == "__main__":
utils.setup_logging()
start_time = time.time()
try:
client = monitoring_v3.MetricServiceClient()
bqclient = bigquery_storage_v1.BigQueryWriteClient()
except Exception as error:
logging.error(f'Google client connection error: {error}')
monitor_namespaces = get_namespaces(client, start_time)
namespace_count = len(monitor_namespaces)
logging.debug(f"Discovered {namespace_count} namespaces to query")
if (namespace_count > 0):
for namespace in monitor_namespaces:
asyncio.run(
run_pipeline(
namespace,
client=client,
bqclient=bqclient,
start_time=start_time))
else:
logging.error("Monitored Namespace list is zero size, end Job")