scripts/populate_gcp_shapes.py (232 lines of code) (raw):

import argparse import concurrent import concurrent.futures import os import sys from pathlib import Path import requests from mongoengine import DoesNotExist dir_path = Path(os.path.dirname(os.path.realpath(__file__))).parent src_path = os.path.join(dir_path, 'src') sys.path.append(src_path) from commons.log_helper import get_logger _LOG = get_logger('scripts-populate-gcp-shapes') INFRACOST_API_URL = 'https://pricing.api.infracost.io/graphql' ZONES = [ 'us-east1-b', 'us-east1-c', 'us-east1-d', 'us-east4-c', 'us-east4-b', 'us-east4-a', 'us-central1-c', 'us-central1-a', 'us-central1-f', 'us-central1-b', 'us-west1-b', 'us-west1-c', 'us-west1-a', ] QUERY_TEMPLATE = \ """{ products( filter: { vendorName: "gcp", service: "Compute Engine", productFamily: "Compute Instance", region: "%s", attributeFilters: [ ] }, ) { attributes { key, value } prices( filter: { purchaseOption: "on_demand" }, ) { USD } } } """ GCP_REGIONS = ["us-west1", "us-west2", "us-west3", "us-west4", "us-central1", "us-east1", "us-east4", "northamerica-northeast1", "southamerica-east1", "europe-west2", "europe-west1", "europe-west4", "europe-west6", "europe-west3", "europe-north1", "asia-south1", "asia-southeast1", "asia-southeast2", "asia-east2", "asia-east1", "asia-northeast1", "asia-northeast2", "australia-southeast1", "asia-northeast3"] ACTION_PRICE = 'PRICE' ACTION_SHAPE = 'SHAPE' DEFAULT_CONCURRENT_WORKERS = 7 def parse_args(): parser = argparse.ArgumentParser( description='Script for r8s GCP Shape/Shape Price ' 'collections population') parser.add_argument('-uri', '--r8s_mongodb_connection_uri', help='MongoDB Connection string', required=True) parser.add_argument('--action', choices=['SHAPE', 'PRICE'], required=True, action='append', help='Determines whether Shape Specs or ' 'pricing data will be parsed.') parser.add_argument('-gac', '--GOOGLE_APPLICATION_CREDENTIALS', help='Absolute path to Google application credentials ' 'file. Required for SHAPE action', required=False) parser.add_argument('-token', '--infracost_api_key', help='Infracost API token used to query instance ' 'prices. Required for PRICE action', required=False) parser.add_argument('-pr', '--price_region', action='append', required=False, default=GCP_REGIONS, help='List of GCP regions to populate price for. ' 'If not specified, all GCP regions will ' 'be parsed. Required for \'PRICE\' action') parser.add_argument('-cw', '--concurrent_workers', type=int, help='Number of concurrent workers for price parsing', required=False, default=DEFAULT_CONCURRENT_WORKERS) return dict(vars(parser.parse_args())) def export_args(**kwargs): for key, value in kwargs.items(): if isinstance(value, str): os.environ[key] = value def create_shapes_for_zone(zone): import google.auth from googleapiclient import discovery from models.shape import Shape, CloudEnum _LOG.debug(f'Initializing GCP credentials') credentials, project = google.auth.default() service = discovery.build('compute', 'v1', credentials=credentials) _LOG.debug(f'Updating shapes, using zone \'{zone}\'') created_count = 0 request = service.machineTypes().list(project=project, zone=zone) while request is not None: response = request.execute() for index, machine_type in enumerate(response['items']): _LOG.debug(f'Processing {index}/{len(response["items"])}') machine_type_name = machine_type.get('name') machine_type_cpu = machine_type.get('guestCpus') machine_type_ram = machine_type.get('memoryMb') // 1024 try: Shape.objects.get(name=machine_type_name) except DoesNotExist: _LOG.debug( f'Shape \'{machine_type_name}\' does not exist yet, ' f'creating.') shape = Shape( name=machine_type_name, cloud=CloudEnum.CLOUD_GOOGLE, cpu=machine_type_cpu, memory=machine_type_ram, family_type=get_family_type(machine_type_name) ) shape.save() created_count += 1 request = service.machineTypes().list_next(previous_request=request, previous_response=response) _LOG.debug(f'Created shapes for zone \'{zone}\': {created_count}') def get_family_type(machine_type_name): name_type_mapping = { "c2": "Compute-optimized", "c2d": "Compute-optimized", "m1": "Memory-optimized", "m2": "Memory-optimized", "m3": "Memory-optimized", "a2": "Accelerator-optimized", "g2": "Accelerator-optimized" } default = 'General-purpose workloads' for key in name_type_mapping.keys(): if machine_type_name.startswith(key): return name_type_mapping.get(key) return default def create_prices(region, connection_uri, api_key): os.environ['r8s_mongodb_connection_uri'] = connection_uri response = requests.post( url=INFRACOST_API_URL, json={"query": QUERY_TEMPLATE % region}, headers={'x-api-key': api_key} ) if not response.status_code == 200: _LOG.error(f'Unsuccessful response from Infracost obtained. Status: ' f'{response.status_code}. Content: {response.content}') return products = response.json().get('data', {}).get('products', []) processed_count = 0 for product in products: product_name = get_product_name(product_data=product) product_price = get_product_price(product_data=product, product_name=product_name) if product_name and product_price: shape_price = create_shape_price( product_name=product_name, product_price=product_price, region=region ) if shape_price: processed_count += 1 _LOG.debug(f'Processed {processed_count} prices for region {region}') def run_populate_prices(regions: list, workers: int, connection_uri: str, api_key: str): _LOG.debug(f'Populating GCP Prices data') with concurrent.futures.ThreadPoolExecutor(max_workers=workers) \ as executor: futures = [] for region in regions: futures.append( executor.submit(create_prices, region=region, api_key=api_key, connection_uri=connection_uri)) for future in concurrent.futures.as_completed(futures): _LOG.debug(f"Thread finished: {future.result()}") def create_shape_price(product_name, product_price, region): from models.shape_price import ShapePrice from models.shape import CloudEnum from mongoengine import NotUniqueError try: shape_price = ShapePrice( customer="DEFAULT", cloud=CloudEnum.CLOUD_GOOGLE.value, name=product_name, region=region, on_demand=product_price ) shape_price.save() _LOG.debug(f'{product_name}:{region} Saved') return shape_price except NotUniqueError: _LOG.debug(f'Shape price \'{product_name}\' already exist, replacing.') old_shape = ShapePrice.objects.get( name=product_name, customer='DEFAULT', region=region) if old_shape.on_demand != product_price: old_shape.on_demand = product_price old_shape.save() _LOG.debug(f'{product_name}:{region} Updated') return old_shape def get_product_name(product_data): attributes = product_data.get('attributes', []) for attribute in attributes: if attribute.get('key') == 'machineType': return attribute.get('value') def get_product_price(product_data, product_name): prices = product_data.get('prices') if len(prices) > 1: _LOG.warning(f'Several prices obtained for product \'{product_name}\'') for price in prices: return float(price.get('USD')) def update_last_update_date(): from services.setting_service import SettingsService from models.base_model import CloudEnum setting_service = SettingsService() setting = setting_service.update_shape_update_date( cloud=CloudEnum.CLOUD_GOOGLE.value ) print(f"Updated setting: {setting.value}") def main(): _LOG.info("Parsing arguments") parameters = parse_args() _LOG.info('Exporting env variables') export_args(**parameters) allowed_actions = parameters.get('action') if ACTION_SHAPE in allowed_actions: _LOG.info('Populating shapes') for index, zone in enumerate(ZONES): _LOG.info(f'Processing {index+1}/{len(ZONES)} zone: {zone}') create_shapes_for_zone( zone=zone ) if ACTION_PRICE in allowed_actions: _LOG.info('Populating Prices') run_populate_prices( regions=parameters.get('price_region', GCP_REGIONS), workers=parameters.get('concurrent_workers', DEFAULT_CONCURRENT_WORKERS), connection_uri=parameters.get('r8s_mongodb_connection_uri'), api_key=parameters.get('infracost_api_key') ) update_last_update_date() if __name__ == '__main__': main()