def __find_spot_instance()

in workflows/pipe-common/pipeline/autoscaling/awsprovider.py [0:0]


    def __find_spot_instance(self, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd,
                             kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, is_dedicated):
        utils.pipe_log('Creating spot request')

        utils.pipe_log('- Checking spot prices for current region...')
        spot_prices = self.__get_spot_prices(self.ec2, self.cloud_region, ins_type)

        allowed_networks = utils.get_networks_config(self.cloud_region)
        cheapest_zone = ''
        if len(spot_prices) == 0:
            utils.pipe_log('- Unable to get prices for a spot of type {}, cheapest zone can not be determined'.format(ins_type))
        else:
            cheapest_zone, _ = spot_prices[0]
            utils.pipe_log('- Prices for {} spots:\n'.format(ins_type) +
                           '\n'.join('{0}: {1:.5f}'.format(zone, price) for zone, price in spot_prices) + '\n' +
                           '{} zone will be used'.format(cheapest_zone))

        specifications = {
            'ImageId': ins_img,
            'InstanceType': ins_type,
            'KeyName': ins_key,
            'UserData': base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8'),
            'BlockDeviceMappings': self.__get_block_devices(ins_img, ins_hdd, kms_encyr_key_id, swap_size=swap_size),
        }
        if allowed_networks and cheapest_zone in allowed_networks:
            subnet_id = allowed_networks[cheapest_zone]
            utils.pipe_log('- Networks list found, subnet {} in AZ {} will be used'.format(subnet_id, cheapest_zone))
            specifications['SubnetId'] = subnet_id
            specifications['SecurityGroupIds'] = utils.get_security_groups(self.cloud_region)
            specifications['Placement'] = { 'AvailabilityZone': cheapest_zone }
        else:
            utils.pipe_log('- Networks list NOT found or cheapest zone can not be determined, default subnet in a random AZ will be used')
            specifications['SecurityGroupIds'] = utils.get_security_groups(self.cloud_region)

        if is_dedicated:
            specifications.update({
                "Placement": {
                    'Tenancy': "dedicated"
                }
            })

        current_time = datetime.now(pytz.utc) + timedelta(seconds=10)

        try:
            response = self.ec2.request_spot_instances(
                SpotPrice=str(bid_price),
                InstanceCount=1,
                Type='one-time',
                ValidFrom=current_time,
                ValidUntil=current_time + timedelta(seconds=num_rep * time_rep),
                LaunchSpecification=specifications,
            )
        except ClientError as client_error:
            if 'Max spot instance count exceeded' in client_error.message or \
                    'InstanceLimitExceeded' in client_error.message:
                utils.pipe_log_warn(LIMIT_EXCEEDED_ERROR_MESSAGE)
                sys.exit(LIMIT_EXCEEDED_EXIT_CODE)
            else:
                raise client_error

        rep = 0
        ins_id = ''
        ins_ip = ''

        request_id = response['SpotInstanceRequests'][0]['SpotInstanceRequestId']

        if not request_id:
            raise RuntimeError('Spot instance request did not return a SpotInstanceRequestId')

        utils.pipe_log('- Spot request was sent. SpotInstanceRequestId: {}. Waiting for spot request registration...'.format(request_id))

        # Await for spot request registration (sometimes SpotInstanceRequestId is not returned immediately)
        while rep <= num_rep:
            try:
                requests_list = self.ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])
                if len(requests_list['SpotInstanceRequests']) > 0:
                    break

            except Exception as e:
                if e.response['Error']['Code'] != "InvalidSpotInstanceRequestID.NotFound":
                    raise e

            rep = self.__increment_or_fail(num_rep, rep,
                                           'Exceeded retry count ({}) while waiting for spot request {}'.format(num_rep, request_id),
                                           kill_instance_id_on_fail=ins_id)

            utils.pipe_log('- Spot request {} is not yet available. Still waiting...'.format(request_id))
            sleep(time_rep)
        #

        utils.pipe_log('- Spot request {} is registered'.format(request_id))
        self.ec2.create_tags(
            Resources=[request_id],
            Tags=AWSInstanceProvider.run_id_tag(run_id, pool_id),
        )

        utils.pipe_log('- Spot request {} was tagged with RunID {}. Waiting for request fulfillment...'.format(request_id, run_id))

        last_status = ''
        while rep <= num_rep:
            current_request = self.ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0]
            status = current_request['Status']['Code']
            if status == 'fulfilled':
                ins_id = current_request['InstanceId']
                instance = None
                try:
                    instance = self.ec2.describe_instances(InstanceIds=[ins_id])
                except Exception as describe_ex:
                    if describe_ex.response['Error']['Code'] == "InvalidInstanceID.NotFound":
                        utils.pipe_log('- Spot request {} is already fulfilled but instance id {} can not be found yet. Still waiting...'.format(request_id, ins_id))
                        rep = self.__increment_or_fail(num_rep, rep,
                                                       'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
                                                       kill_instance_id_on_fail=ins_id)
                        sleep(time_rep)
                        continue
                    else:
                        raise describe_ex
                instance_reservation = instance['Reservations'][0]['Instances'][0] if instance else None
                if not instance_reservation or 'PrivateIpAddress' not in instance_reservation or not instance_reservation['PrivateIpAddress']:
                    utils.pipe_log('- Spot request {} is already fulfilled but PrivateIpAddress is not yet assigned. Still waiting...'.format(request_id))
                    rep = self.__increment_or_fail(num_rep, rep,
                                                   'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
                                                   kill_instance_id_on_fail=ins_id)
                    sleep(time_rep)
                    continue
                ins_ip = instance_reservation['PrivateIpAddress']
                self.ec2.create_tags(
                    Resources=[ins_id],
                    Tags=AWSInstanceProvider.get_tags(run_id, pool_id),
                )

                ebs_tags = AWSInstanceProvider.resource_tags()
                if ebs_tags:
                    volumes = instance_reservation['BlockDeviceMappings']
                    for volume in volumes:
                        self.ec2.create_tags(
                            Resources=[volume['Ebs']['VolumeId']],
                            Tags=ebs_tags)

                utils.pipe_log('Instance is successfully created for spot request {}. ID: {}, IP: {}\n-'.format(request_id, ins_id, ins_ip))
                break
            last_status = status
            utils.pipe_log('- Spot request {} is not yet fulfilled. Still waiting...'.format(request_id))
            # TODO: review all this logic, it is difficult to read and maintain
            if rep >= num_rep:
                self.exit_if_spot_unavailable(run_id, last_status)
            rep = self.__increment_or_fail(num_rep, rep,
                                           'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.'.format(num_rep, status),
                                           kill_instance_id_on_fail=ins_id)
            sleep(time_rep)

        self.exit_if_spot_unavailable(run_id, last_status)

        return ins_id, ins_ip