syndicate/core/resources/helper.py (120 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
from syndicate.commons.log_helper import get_logger
from syndicate.core.conf.processor import GLOBAL_AWS_SERVICES
from typing import TypeVar, Optional
from datetime import datetime
T = TypeVar('T')
_LOG = get_logger(__name__)
def validate_params(name, meta, required_params):
existing_parameters = list(meta.keys())
parameters_string = ', '.join(required_params)
existing_parameters_string = ', '.join(existing_parameters)
for each in required_params:
if each not in existing_parameters:
raise AssertionError(
'All required parameters must be specified! Resource: {0}'
' Required parameters: {1}. Given parameters: {2}'.format(
name, parameters_string, existing_parameters_string))
def validate_date(name, date_str):
"""
Checks if the provided date in `date_str` is in ISO 8601 or Unix timestamp format
"""
try:
datetime.fromisoformat(date_str)
unix_timestamp = int(date_str)
datetime.utcfromtimestamp(unix_timestamp)
return date_str
except Exception as e:
raise AssertionError('Invalid date format: {0}. Resource: {1}'
' Error message: {2}'.format(
date_str, name, e))
def check_region_available(region_name, available_regions, res_meta=None):
if region_name in available_regions:
return True
if res_meta:
res_type = res_meta['resource_type']
raise AssertionError(
"Region {0} isn't available for resource {1}.".format(region_name,
res_type))
else:
raise AssertionError("Region {0} isn't available.".format(region_name))
def create_args_for_multi_region(args, available_regions):
from syndicate.core import CONFIG
new_region_args = []
for arg_set in args:
name = arg_set['name']
meta = arg_set['meta']
region = meta.get('region')
if region is None:
item = arg_set.copy()
item['region'] = CONFIG.region
new_region_args.append(item)
elif isinstance(region, str):
if region == 'all':
for each in available_regions:
item = arg_set.copy()
item['region'] = each
new_region_args.append(item)
else:
if check_region_available(region, available_regions, meta):
item = arg_set.copy()
item['region'] = region
new_region_args.append(item)
elif isinstance(region, list):
for each in region:
if check_region_available(each, available_regions, meta):
item = arg_set.copy()
item['region'] = each
new_region_args.append(item)
else:
raise AssertionError(
'Invalid value region: {0}. Resource: {1}.'.format(region,
name))
return new_region_args
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
def resolve_dynamic_identifier(to_replace, resource_meta):
"""
Replaces keys from 'to_replace' with values from it inside json built
from 'resource_meta'
:type to_replace: dict
:type resource_meta: dict
"""
raw_json = json.dumps(resource_meta)
for name, value in to_replace.items():
raw_json = raw_json.replace(name, value)
return json.loads(raw_json)
def build_description_obj(response, name, meta):
resource_type = meta['resource_type']
obj = {
'resource_name': name,
'resource_meta': meta,
'description': response
}
if resource_type not in GLOBAL_AWS_SERVICES:
from syndicate.core import CONFIG
obj['resource_meta']['region'] = meta.get('region', CONFIG.region)
return obj
def assert_required_params(required_params_names, all_params):
"""
Raises error if there is at least one missing parameter.
:param required_params_names:
:param all_params:
:return:
"""
missing = [param for param in required_params_names if
param not in all_params.keys()]
if missing:
raise AssertionError(f'Missing required parameters: {missing}')
def assert_possible_values(iterable: list, possible: list):
if not set(iterable).issubset(set(possible)):
message = f'Incorrect values in given iteramble: {iterable}. ' \
f'Must be a subset of these: {possible}'
_LOG.error(message)
raise AssertionError(message)
def filter_dict_by_shape(d, shape):
new_d = {}
for attribute, value in shape.items():
if value is None:
new_d[attribute] = d.get(attribute)
if isinstance(value, list):
new_d[attribute] = filter_list_by_shape(d.get(attribute), value)
if isinstance(value, dict):
new_d[attribute] = filter_dict_by_shape(d.get(attribute), value)
return new_d
def filter_list_by_shape(lst, shape):
if not shape[0] or not lst:
return lst
new_lst = []
if isinstance(shape[0], dict):
for d in lst:
new_lst.append(filter_dict_by_shape(d, shape[0]))
return new_lst
def if_updated(new: T, old: T) -> Optional[T]:
"""
If `new` differs from `old` it `new` will be returned.
If it does not, None is returned. They must be comparable
"""
return new if new != old else None