in workflows/pipe-common/pipeline/autoscaling/awsprovider.py [0:0]
def __find_spot_instance(self, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd,
kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, is_dedicated):
utils.pipe_log('Creating spot request')
utils.pipe_log('- Checking spot prices for current region...')
spot_prices = self.__get_spot_prices(self.ec2, self.cloud_region, ins_type)
allowed_networks = utils.get_networks_config(self.cloud_region)
cheapest_zone = ''
if len(spot_prices) == 0:
utils.pipe_log('- Unable to get prices for a spot of type {}, cheapest zone can not be determined'.format(ins_type))
else:
cheapest_zone, _ = spot_prices[0]
utils.pipe_log('- Prices for {} spots:\n'.format(ins_type) +
'\n'.join('{0}: {1:.5f}'.format(zone, price) for zone, price in spot_prices) + '\n' +
'{} zone will be used'.format(cheapest_zone))
specifications = {
'ImageId': ins_img,
'InstanceType': ins_type,
'KeyName': ins_key,
'UserData': base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8'),
'BlockDeviceMappings': self.__get_block_devices(ins_img, ins_hdd, kms_encyr_key_id, swap_size=swap_size),
}
if allowed_networks and cheapest_zone in allowed_networks:
subnet_id = allowed_networks[cheapest_zone]
utils.pipe_log('- Networks list found, subnet {} in AZ {} will be used'.format(subnet_id, cheapest_zone))
specifications['SubnetId'] = subnet_id
specifications['SecurityGroupIds'] = utils.get_security_groups(self.cloud_region)
specifications['Placement'] = { 'AvailabilityZone': cheapest_zone }
else:
utils.pipe_log('- Networks list NOT found or cheapest zone can not be determined, default subnet in a random AZ will be used')
specifications['SecurityGroupIds'] = utils.get_security_groups(self.cloud_region)
if is_dedicated:
specifications.update({
"Placement": {
'Tenancy': "dedicated"
}
})
current_time = datetime.now(pytz.utc) + timedelta(seconds=10)
try:
response = self.ec2.request_spot_instances(
SpotPrice=str(bid_price),
InstanceCount=1,
Type='one-time',
ValidFrom=current_time,
ValidUntil=current_time + timedelta(seconds=num_rep * time_rep),
LaunchSpecification=specifications,
)
except ClientError as client_error:
if 'Max spot instance count exceeded' in client_error.message or \
'InstanceLimitExceeded' in client_error.message:
utils.pipe_log_warn(LIMIT_EXCEEDED_ERROR_MESSAGE)
sys.exit(LIMIT_EXCEEDED_EXIT_CODE)
else:
raise client_error
rep = 0
ins_id = ''
ins_ip = ''
request_id = response['SpotInstanceRequests'][0]['SpotInstanceRequestId']
if not request_id:
raise RuntimeError('Spot instance request did not return a SpotInstanceRequestId')
utils.pipe_log('- Spot request was sent. SpotInstanceRequestId: {}. Waiting for spot request registration...'.format(request_id))
# Await for spot request registration (sometimes SpotInstanceRequestId is not returned immediately)
while rep <= num_rep:
try:
requests_list = self.ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])
if len(requests_list['SpotInstanceRequests']) > 0:
break
except Exception as e:
if e.response['Error']['Code'] != "InvalidSpotInstanceRequestID.NotFound":
raise e
rep = self.__increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) while waiting for spot request {}'.format(num_rep, request_id),
kill_instance_id_on_fail=ins_id)
utils.pipe_log('- Spot request {} is not yet available. Still waiting...'.format(request_id))
sleep(time_rep)
#
utils.pipe_log('- Spot request {} is registered'.format(request_id))
self.ec2.create_tags(
Resources=[request_id],
Tags=AWSInstanceProvider.run_id_tag(run_id, pool_id),
)
utils.pipe_log('- Spot request {} was tagged with RunID {}. Waiting for request fulfillment...'.format(request_id, run_id))
last_status = ''
while rep <= num_rep:
current_request = self.ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0]
status = current_request['Status']['Code']
if status == 'fulfilled':
ins_id = current_request['InstanceId']
instance = None
try:
instance = self.ec2.describe_instances(InstanceIds=[ins_id])
except Exception as describe_ex:
if describe_ex.response['Error']['Code'] == "InvalidInstanceID.NotFound":
utils.pipe_log('- Spot request {} is already fulfilled but instance id {} can not be found yet. Still waiting...'.format(request_id, ins_id))
rep = self.__increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
kill_instance_id_on_fail=ins_id)
sleep(time_rep)
continue
else:
raise describe_ex
instance_reservation = instance['Reservations'][0]['Instances'][0] if instance else None
if not instance_reservation or 'PrivateIpAddress' not in instance_reservation or not instance_reservation['PrivateIpAddress']:
utils.pipe_log('- Spot request {} is already fulfilled but PrivateIpAddress is not yet assigned. Still waiting...'.format(request_id))
rep = self.__increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
kill_instance_id_on_fail=ins_id)
sleep(time_rep)
continue
ins_ip = instance_reservation['PrivateIpAddress']
self.ec2.create_tags(
Resources=[ins_id],
Tags=AWSInstanceProvider.get_tags(run_id, pool_id),
)
ebs_tags = AWSInstanceProvider.resource_tags()
if ebs_tags:
volumes = instance_reservation['BlockDeviceMappings']
for volume in volumes:
self.ec2.create_tags(
Resources=[volume['Ebs']['VolumeId']],
Tags=ebs_tags)
utils.pipe_log('Instance is successfully created for spot request {}. ID: {}, IP: {}\n-'.format(request_id, ins_id, ins_ip))
break
last_status = status
utils.pipe_log('- Spot request {} is not yet fulfilled. Still waiting...'.format(request_id))
# TODO: review all this logic, it is difficult to read and maintain
if rep >= num_rep:
self.exit_if_spot_unavailable(run_id, last_status)
rep = self.__increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
kill_instance_id_on_fail=ins_id)
sleep(time_rep)
self.exit_if_spot_unavailable(run_id, last_status)
return ins_id, ins_ip