in scripts/autoscaling/aws/nodeup.py [0:0]
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--ins_key", type=str, required=True)
parser.add_argument("--run_id", type=str, required=True)
parser.add_argument("--cluster_name", type=str, required=False)
parser.add_argument("--cluster_role", type=str, required=False)
parser.add_argument("--ins_type", type=str, default='m4.large')
parser.add_argument("--ins_hdd", type=int, default=30)
parser.add_argument("--ins_img", type=str, default='ami-f68f3899')
parser.add_argument("--ins_platform", type=str, default='linux')
parser.add_argument("--num_rep", type=int, default=250) # 250 x 3s = 12.5m
parser.add_argument("--time_rep", type=int, default=3)
parser.add_argument("--is_spot", type=bool, default=False)
parser.add_argument("--bid_price", type=float, default=1.0)
parser.add_argument("--kube_ip", type=str, required=True)
parser.add_argument("--kubeadm_token", type=str, required=True)
parser.add_argument("--kubeadm_cert_hash", type=str, required=True)
parser.add_argument("--kube_node_token", type=str, required=True)
parser.add_argument("--kube_cluster_name", type=str, required=False)
parser.add_argument("--kms_encyr_key_id", type=str, required=False)
parser.add_argument("--region_id", type=str, default=None)
parser.add_argument("--availability_zone", type=str, required=False)
parser.add_argument("--network_interface", type=str, required=False)
parser.add_argument("--performance_network", type=bool, required=False)
parser.add_argument("--subnet_id", type=str, required=False)
parser.add_argument("--security_groups", type=str, required=False)
parser.add_argument("--dedicated", type=bool, required=False)
parser.add_argument("--node_ssh_port", type=str, default='')
parser.add_argument("--docker_data_root", type=str, default='/ebs/docker')
parser.add_argument("--docker_storage_driver", type=str, default='')
parser.add_argument("--skip_system_images_load", type=str, default='')
parser.add_argument("--label", type=str, default=[], required=False, action='append')
parser.add_argument("--image", type=str, default=[], required=False, action='append')
parser.add_argument("--tags", type=str, default=[], required=False, action='append')
args, unknown = parser.parse_known_args()
ins_key = args.ins_key
run_id = args.run_id
ins_type = args.ins_type
ins_hdd = args.ins_hdd
ins_img = args.ins_img
ins_platform = args.ins_platform
# Java may pass 'null' (literally) instead of the empty parameter
if ins_platform == 'null':
ins_platform = 'linux'
num_rep = args.num_rep
time_rep = args.time_rep
is_spot = args.is_spot
bid_price = args.bid_price
cluster_name = args.cluster_name
cluster_role = args.cluster_role
kube_ip = args.kube_ip
kubeadm_token = args.kubeadm_token
kubeadm_cert_hash = args.kubeadm_cert_hash
kube_node_token = args.kube_node_token
kube_cluster_name = args.kube_cluster_name
kms_encyr_key_id = args.kms_encyr_key_id
region_id = args.region_id
availability_zone = args.availability_zone
network_interface = args.network_interface
performance_network = args.performance_network
security_groups = args.security_groups
subnet = args.subnet_id
is_dedicated = args.dedicated if args.dedicated else False
node_ssh_port = args.node_ssh_port
docker_data_root = args.docker_data_root
docker_storage_driver = args.docker_storage_driver
skip_system_images_load = args.skip_system_images_load
pre_pull_images = args.image
additional_labels = map_labels_to_dict(args.label)
pool_id = additional_labels.get(POOL_ID_KEY)
input_tags = build_tags_from_input(args.tags)
global_distribution_url = os.getenv('GLOBAL_DISTRIBUTION_URL',
default='https://cloud-pipeline-oss-builds.s3.us-east-1.amazonaws.com/')
if not kube_ip or not kubeadm_token:
raise RuntimeError('Kubernetes configuration is required to create a new node')
pipe_log_init(run_id)
wait_time_sec = get_preference(NODE_WAIT_TIME_SEC)
if wait_time_sec and wait_time_sec.isdigit():
num_rep = int(wait_time_sec) / time_rep
aws_region = get_aws_region(region_id)
boto3.setup_default_session(region_name=aws_region)
pipe_log('Started initialization of new calculation node in AWS region {}:\n'
'- RunID: {}\n'
'- Type: {}\n'
'- Disk: {}\n'
'- Image: {}\n'
'- Platform: {}\n'
'- IsSpot: {}\n'
'- BidPrice: {}\n'
'- Repeat attempts: {}\n'
'- Repeat timeout: {}\n-'
'- Docker data root: {}\n-'
'- Docker storage driver: {}\n-'.format(aws_region,
run_id,
ins_type,
ins_hdd,
ins_img,
ins_platform,
str(is_spot),
str(bid_price),
str(num_rep),
str(time_rep),
docker_data_root,
docker_storage_driver))
try:
# Hacking max max_attempts to get rid of
# "An error occurred (RequestLimitExceeded) when calling the ... operation (reached max retries: 4)"
# Official solution shall be provided with https://github.com/boto/botocore/pull/1260, waiting for release
# This is applied to the old versions of botocore
boto3_version = LooseVersion(boto3.__version__)
boto3_version_retries = LooseVersion("1.7")
pipe_log('Using boto3 version {}'.format(boto3.__version__))
ec2 = None
if boto3_version < boto3_version_retries:
try:
ec2 = boto3.client('ec2')
if hasattr(ec2.meta.events, "_unique_id_handlers"):
ec2.meta.events._unique_id_handlers['retry-config-ec2']['handler']._checker.__dict__['_max_attempts'] = BOTO3_RETRY_COUNT
except Exception as inner_exception:
pipe_log('Unable to modify retry config:\n{}'.format(str(inner_exception)))
else:
ec2 = boto3.client('ec2', config=Config(retries={'max_attempts': BOTO3_RETRY_COUNT}))
# Setup kubernetes client
try:
api = pykube.HTTPClient(pykube.KubeConfig.from_service_account())
except Exception:
api = pykube.HTTPClient(pykube.KubeConfig.from_file(KUBE_CONFIG_PATH))
api.session.verify = False
api_url = os.environ["API"]
api_token = os.environ["API_TOKEN"]
api_user = os.environ["API_USER"]
instance_additional_spec = None
allowed_instance = get_allowed_instance_image(aws_region, ins_type, ins_platform, ins_img, api_token, run_id)
if allowed_instance and allowed_instance["instance_mask"]:
pipe_log('Found matching rule {instance_mask} for requested instance type {instance_type}'.format(instance_mask=allowed_instance["instance_mask"], instance_type=ins_type))
instance_additional_spec = allowed_instance["additional_spec"]
if instance_additional_spec:
pipe_log('Additional custom instance configuration will be added: {}'.format(instance_additional_spec))
if not ins_img or ins_img == 'null':
if allowed_instance and allowed_instance["instance_mask_ami"]:
ins_img = allowed_instance["instance_mask_ami"]
pipe_log('Instance image was not provided explicitly, {instance_image} will be used (retrieved for {instance_mask}/{instance_type} rule)'.format(instance_image=allowed_instance["instance_mask_ami"],
instance_mask=allowed_instance["instance_mask"],
instance_type=ins_type))
else:
pipe_log('Specified in configuration image {ami} will be used'.format(ami=ins_img))
ins_id, ins_ip = verify_run_id(ec2, run_id)
if not ins_id:
ins_id, ins_ip = check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id,
input_tags)
if not ins_id:
ins_id, ins_ip = run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type, is_spot,
num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_cluster_name, api,
global_distribution_url, pre_pull_images, instance_additional_spec,
availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, input_tags,
docker_data_root, docker_storage_driver, skip_system_images_load)
check_instance(ec2, ins_id, run_id, num_rep, time_rep, api)
nodename = verify_regnode(ec2, ins_id, num_rep, time_rep, run_id, api)
label_node(nodename, run_id, api, cluster_name, cluster_role, aws_region, additional_labels)
pipe_log('Node created:\n'
'- {}\n'
'- {}'.format(ins_id, ins_ip))
# External process relies on this output
print(ins_id + "\t" + ins_ip + "\t" + nodename)
pipe_log('{} task finished'.format(NODEUP_TASK), status=TaskStatus.SUCCESS)
except Exception as e:
pipe_log('[ERROR] ' + str(e), status=TaskStatus.FAILURE)
raise e