syndicate/connection/application_autoscaling_connection.py (159 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import boto3
from syndicate.commons.log_helper import get_logger
from syndicate.connection.helper import apply_methods_decorator, retry
_LOG = get_logger(__name__)
@apply_methods_decorator(retry())
class ApplicationAutoscaling(object):
def __init__(self, region=None, aws_access_key_id=None,
aws_secret_access_key=None, aws_session_token=None):
self.region = region
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_session_token = aws_session_token
self.client = boto3.client('application-autoscaling', region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token)
_LOG.debug('Opened new Application autoscaling connection.')
def register_target(self, service_namespace, resource_id,
scalable_dimension, min_capacity=None,
max_capacity=None, role_arn=None):
params = dict(ServiceNamespace=service_namespace,
ResourceId=resource_id,
ScalableDimension=scalable_dimension)
if min_capacity:
params['MinCapacity'] = int(min_capacity)
if max_capacity:
params['MaxCapacity'] = int(max_capacity)
if role_arn:
params['RoleARN'] = role_arn
self.client.register_scalable_target(**params)
def put_step_scaling_policy(self, policy_name, service_namespace,
resource_id,
scalable_dimension,
adjustment_type=None,
metric_interval_lower_bound=None,
metric_interval_upper_bound=None,
scaling_adjustment=None,
min_adjustment_magnitude=None,
cooldown=None,
metric_aggregation_type=None):
params = dict(PolicyName=policy_name,
ServiceNamespace=service_namespace,
ResourceId=resource_id,
ScalableDimension=scalable_dimension,
PolicyType='StepScaling')
step_scaling_config_dict = dict()
if adjustment_type:
step_scaling_config_dict['AdjustmentType'] = adjustment_type
if min_adjustment_magnitude:
step_scaling_config_dict[
'MinAdjustmentMagnitude'] = min_adjustment_magnitude
if cooldown:
step_scaling_config_dict['Cooldown'] = cooldown
if metric_aggregation_type:
step_scaling_config_dict[
'MetricAggregationType'] = metric_aggregation_type
step_adjustment_dict = dict()
if metric_interval_lower_bound:
step_adjustment_dict[
'MetricIntervalLowerBound'] = metric_interval_lower_bound
if metric_interval_upper_bound:
step_adjustment_dict[
'MetricIntervalUpperBound'] = metric_interval_upper_bound
if scaling_adjustment:
step_adjustment_dict['ScalingAdjustment'] = scaling_adjustment
if step_adjustment_dict:
step_scaling_config_dict['StepAdjustments'] = step_adjustment_dict
if step_scaling_config_dict:
params['StepScalingPolicyConfiguration'] = step_scaling_config_dict
return self.client.put_scaling_policy(**params)
def put_target_scaling_policy(self, policy_name, service_namespace,
resource_id, scalable_dimension,
target_value=None,
predefined_metric_type=None,
resource_label=None, metric_name=None,
namespace=None, dimensions=None,
statistic=None, unit=None,
scale_out_cooldown=None,
scale_in_cooldown=None):
params = dict(PolicyName=policy_name,
ServiceNamespace=service_namespace,
ResourceId=resource_id,
ScalableDimension=scalable_dimension,
PolicyType='TargetTrackingScaling')
target_scaling_config_dict = dict()
if target_value:
target_scaling_config_dict['TargetValue'] = target_value
predefined_config_dict = dict()
if predefined_metric_type:
predefined_config_dict[
'PredefinedMetricType'] = predefined_metric_type
if resource_label:
predefined_config_dict['ResourceLabel'] = resource_label
if predefined_config_dict:
target_scaling_config_dict[
'PredefinedMetricSpecification'] = predefined_config_dict
customized_config_dict = dict()
if metric_name:
customized_config_dict['MetricName'] = metric_name
if namespace:
customized_config_dict['Namespace'] = namespace
if dimensions:
customized_config_dict['Dimensions'] = dimensions
if statistic:
customized_config_dict['Statistic'] = statistic
if unit:
customized_config_dict['Unit'] = unit
if customized_config_dict:
params['CustomizedMetricSpecification'] = customized_config_dict
if scale_out_cooldown:
target_scaling_config_dict['ScaleOutCooldown'] = scale_out_cooldown
if scale_in_cooldown:
target_scaling_config_dict['ScaleInCooldown'] = scale_in_cooldown
if target_scaling_config_dict:
params[
'TargetTrackingScalingPolicyConfiguration'] = target_scaling_config_dict
return self.client.put_scaling_policy(**params)
def deregister_scalable_target(self, service_namespace, resource_id,
scalable_dimension):
params = dict(ResourceId=resource_id,
ScalableDimension=scalable_dimension,
ServiceNamespace=service_namespace)
return self.client.deregister_scalable_target(**params)
def describe_scalable_targets(self, service_namespace, resources_ids=None,
scalable_dimension=None):
params = dict(ServiceNamespace=service_namespace)
if resources_ids:
params['ResourceIds'] = resources_ids
if scalable_dimension:
params['ScalableDimension'] = scalable_dimension
targets = []
response = self.client.describe_scalable_targets(**params)
token = response.get('NextToken')
targets.extend(response.get('ScalableTargets'))
while token:
params['NextToken'] = token
response = self.client.describe_scalable_targets(**params)
token = response.get('NextToken')
targets.extend(response.get('ScalableTargets'))
return targets
def describe_scaling_policies(self, service_namespace, policy_names,
resource_id=None, scalable_dimension=None):
params = dict(ServiceNamespace=service_namespace)
if policy_names:
params['PolicyNames'] = policy_names
if scalable_dimension:
params['ScalableDimension'] = scalable_dimension
if resource_id:
params['ResourceId'] = resource_id
targets = []
response = self.client.describe_scaling_policies(**params)
token = response.get('NextToken')
targets.extend(response.get('ScalingPolicies'))
while token:
params['NextToken'] = token
response = self.client.describe_scaling_policies(**params)
token = response.get('NextToken')
targets.extend(response.get('ScalingPolicies'))
return targets