in scripts/autoscaling/aws/nodeup.py [0:0]
def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key,
ins_hdd, kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, kube_client,
instance_additional_spec, availability_zone, security_groups, subnet, network_interface,
is_dedicated, performance_network, input_tags):
pipe_log('Creating spot request')
pipe_log('- Checking spot prices for current region...')
spot_prices = get_spot_prices(ec2, aws_region, ins_type)
allowed_networks = get_networks_config(ec2, aws_region, ins_type)
cheapest_zone = ''
if len(spot_prices) == 0:
pipe_log('- Unable to get prices for a spot of type {}, cheapest zone can not be determined'.format(ins_type))
else:
if availability_zone:
pipe_log('- Desired availability zone {} was specified, trying to use it'.format(availability_zone))
for cheapest_zone, lowest_price in spot_prices:
if cheapest_zone == availability_zone:
cheapest_zone = availability_zone
break
if not cheapest_zone:
cheapest_zone, lowest_price = spot_prices[0]
pipe_log('- Prices for {} spots:\n'.format(ins_type) +
'\n'.join('{0}: {1:.5f}'.format(zone, price) for zone, price in spot_prices) + '\n' +
'{} zone will be used'.format(cheapest_zone))
specifications = {
'ImageId': ins_img,
'InstanceType': ins_type,
'KeyName': ins_key,
'UserData': base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8'),
'BlockDeviceMappings': get_block_devices(ec2, ins_img, ins_type, ins_hdd, kms_encyr_key_id, swap_size),
}
subnet_id = None
if subnet:
subnet_id = get_specified_subnet(subnet, availability_zone)
elif allowed_networks and cheapest_zone in allowed_networks:
subnet_id = allowed_networks[cheapest_zone]
pipe_log('- Networks list found, subnet {} in AZ {} will be used'.format(subnet_id, cheapest_zone))
if network_interface:
if subnet_id:
pipe_log('- Network interface specified. Desired subnet id {} will be ignored'.format(subnet_id))
network_interface, subnet_id, az_name = fetch_network_interface_info(ec2, network_interface, availability_zone, allowed_networks)
specifications.update({
"NetworkInterfaces": [
{
"DeviceIndex": 0,
"NetworkInterfaceId": network_interface
}
],
})
elif performance_network:
pipe_log('- Performance network requested.')
if not subnet or not subnet_id:
pipe_log('- Subnet is not specified, trying to get a random one...')
subnet_id = get_random_subnet(ec2)
pipe_log('- Subnet: {} will be used.'.format(subnet_id))
if subnet_id:
specifications.update({
"NetworkInterfaces": [
{
'DeleteOnTermination': True,
'DeviceIndex': 0,
'SubnetId': subnet_id,
'Groups': get_security_groups(aws_region, security_groups),
'InterfaceType': 'efa'
}
]
})
else:
pipe_log('- Cannot define subnet to be launched in, will skip performance network setup and continue with default options...')
pipe_log('- Default subnet in random AZ will be used')
specifications.update({'SecurityGroupIds': get_security_groups(aws_region, security_groups)})
elif subnet_id:
specifications.update({
'SubnetId': subnet_id,
'SecurityGroupIds': get_security_groups(aws_region, security_groups)
})
if cheapest_zone:
specifications['Placement'] = { 'AvailabilityZone': cheapest_zone }
else:
pipe_log('- Networks list NOT found or cheapest zone can not be determined, default subnet in a random AZ will be used')
specifications['SecurityGroupIds'] = get_security_groups(aws_region, security_groups)
if instance_additional_spec:
specifications.update(instance_additional_spec)
if is_dedicated:
specifications.update({
"Placement": {
'Tenancy': "dedicated"
}
})
current_time = datetime.now(pytz.utc) + timedelta(seconds=10)
response = None
try:
response = ec2.request_spot_instances(
SpotPrice=str(bid_price),
InstanceCount=1,
Type='one-time',
ValidFrom=current_time,
ValidUntil=current_time + timedelta(seconds=num_rep * time_rep),
LaunchSpecification=specifications,
)
except ClientError as client_error:
if 'Max spot instance count exceeded' in client_error.message or \
'InstanceLimitExceeded' in client_error.message:
pipe_log_warn(LIMIT_EXCEEDED_ERROR_MASSAGE)
sys.exit(LIMIT_EXCEEDED_EXIT_CODE)
else:
raise client_error
rep = 0
ins_id = ''
ins_ip = ''
request_id = response['SpotInstanceRequests'][0]['SpotInstanceRequestId']
if not request_id:
raise RuntimeError('Spot instance request did not return a SpotInstanceRequestId')
pipe_log('- Spot request was sent. SpotInstanceRequestId: {}. Waiting for spot request registration...'.format(request_id))
# Await for spot request registration (sometimes SpotInstanceRequestId is not returned immediately)
while rep <= num_rep:
try:
requests_list = ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])
if len(requests_list['SpotInstanceRequests']) > 0:
break
except Exception as e:
if e.response['Error']['Code'] != "InvalidSpotInstanceRequestID.NotFound":
raise e
rep = increment_or_fail(num_rep,
rep,
'Exceeded retry count ({}) while waiting for spot request {}'.format(num_rep, request_id))
pipe_log('- Spot request {} is not yet available. Still waiting...'.format(request_id))
sleep(time_rep)
#
pipe_log('- Spot request {} is registered'.format(request_id))
ec2.create_tags(
Resources=[request_id],
Tags=run_id_tag(run_id, pool_id),
)
pipe_log('- Spot request {} was tagged with RunID {}. Waiting for request fulfillment...'.format(request_id, run_id))
last_status = ''
while rep <= num_rep:
current_request = ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0]
status = current_request['Status']['Code']
last_status = status
if status == 'fulfilled':
ins_id = current_request['InstanceId']
instance = None
try:
instance = ec2.describe_instances(InstanceIds=[ins_id])
except Exception as describe_ex:
if describe_ex.response['Error']['Code'] == "InvalidInstanceID.NotFound":
pipe_log('- Spot request {} is already fulfilled but instance id {} can not be found yet. Still waiting...'.format(request_id, ins_id))
rep = increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'
.format(num_rep, status))
sleep(time_rep)
continue
else:
raise describe_ex
instance_reservation = instance['Reservations'][0]['Instances'][0] if instance else None
if not instance_reservation or 'PrivateIpAddress' not in instance_reservation or not instance_reservation['PrivateIpAddress']:
pipe_log('- Spot request {} is already fulfilled but PrivateIpAddress is not yet assigned. Still waiting...'.format(request_id))
rep = increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'
.format(num_rep, status),
ec2_client=ec2,
kill_instance_id_on_fail=ins_id,
kube_client=kube_client)
sleep(time_rep)
continue
ins_ip = instance_reservation['PrivateIpAddress']
ec2.create_tags(
Resources=[ins_id],
Tags=get_tags(run_id, aws_region, pool_id, input_tags),
)
ebs_tags = resource_tags(aws_region)
if input_tags:
ebs_tags.extend(input_tags)
if ebs_tags:
volumes = instance_reservation['BlockDeviceMappings']
for volume in volumes:
ec2.create_tags(
Resources=[volume['Ebs']['VolumeId']],
Tags=ebs_tags)
# FIXME: 'modify_instance_metadata_options' shall be added to the pipe-common/autoscaling/awsprovider.py
try:
pipe_log('- Waiting for instance {} (spot request {}) to become RUNNING before setting IMDSv2'.format(ins_id, request_id))
ins_status = PENDING
ins_status_rep = 0
while ins_status_rep <= num_rep and ins_status != RUNNING:
ins_status = get_current_status(ec2, ins_id)
ins_status_rep += 1
sleep(time_rep)
if ins_status == RUNNING:
pipe_log('- Tying to set IMDSv2 for instance {} (spot request {})'.format(ins_id, request_id))
ec2.modify_instance_metadata_options(
InstanceId=ins_id,
HttpTokens='optional',
HttpPutResponseHopLimit=2,
HttpEndpoint='enabled'
)
else:
raise RuntimeError('Time out error while waiting for the instance transition to RUNNING state')
except Exception as modify_metadata_ex:
pipe_log_warn('- [WARN] Cannot set IMDSv2 for instance {} (spot request {}):\n{}'.format(ins_id, request_id, str(modify_metadata_ex)))
pipe_log('Instance is successfully created for spot request {}. ID: {}, IP: {}\n-'.format(request_id, ins_id, ins_ip))
break
pipe_log('- Spot request {} is not yet fulfilled. Still waiting...'.format(request_id))
# TODO: review all this logic, it is difficult to read and maintain
if rep >= num_rep:
exit_if_spot_unavailable(run_id, last_status)
rep = increment_or_fail(num_rep, rep,
'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'
.format(num_rep, status))
sleep(time_rep)
exit_if_spot_unavailable(run_id, last_status)
return ins_id, ins_ip