in pipe-cli/src/utilities/pipeline_run_operations.py [0:0]
def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, instance_type, docker_image,
cmd_template, timeout, quiet, instance_count, cores, sync, price_type=None, region_id=None,
parent_node=None, non_pause=None, friendly_url=None,
status_notifications=False,
status_notifications_status=None, status_notifications_recipient=None,
status_notifications_subject=None, status_notifications_body=None,
run_as_user=None):
all_user_roles = UserOperationsManager().get_all_user_roles()
# Preserving old style impersonation for admin users. Specified user token is generated and used
# for impersonation rather than run as capability which is used for non-admin users.
if run_as_user and ROLE_ADMIN in all_user_roles:
UserTokenOperations().set_user_token(run_as_user)
run_as_user = None
# All pipeline run parameters can be specified as options, e.g. --read1 /path/to/reads.fastq
# In this case - runs_params_dict will contain keys-values for each option, e.g. {'--read1': '/path/to/reads.fastq'}
# So they can be addressed with run_params_dict['--read1']
# This approach is used because we do not know parameters list beforehand
run_params_dict = dict([(k.strip('-'), v) for k, v in zip(run_params[::2], run_params[1::2])])
cls._validate_run_params(run_params_dict, all_user_roles)
if instance_count == 0:
instance_count = None
if non_pause and not price_type == PriceType.ON_DEMAND:
click.echo("--non-pause option supported for on-demand runs only and will be ignored")
non_pause = None
if price_type == PriceType.ON_DEMAND and non_pause is None:
non_pause = False
# Calculate instance_type and instance_count if only cores specified
if not instance_count and cores:
nodes_spec = ClusterManager.calculate_cluster_from_cores(cores, core_type=instance_type)
instance_count = nodes_spec["count"]
if instance_count > 1:
instance_count -= 1
else:
instance_count = None
instance_type = nodes_spec["name"]
if friendly_url:
friendly_url = cls._build_pretty_url(friendly_url)
try:
if not pipeline and docker_image and cls.required_args_missing(parent_node, instance_type, instance_disk,
cmd_template):
instance_disk, instance_type, cmd_template = cls.load_missing_args(docker_image, instance_disk,
instance_type, cmd_template)
if pipeline:
parts = pipeline.split('@')
pipeline_name = parts[0]
if not quiet:
click.echo('Fetching pipeline info...', nl=False)
pipeline_model = Pipeline.get(
pipeline_name,
load_versions=False,
load_storage_rules=False,
load_run_parameters=len(parts) == 1,
config_name=config)
if not quiet:
click.echo('done.', nl=True)
pipeline_run_parameters = pipeline_model.current_version.run_parameters
if len(parts) > 1:
if not quiet:
click.echo('Fetching parameters...', nl=False)
pipeline_run_parameters = Pipeline.load_run_parameters(pipeline_model.identifier, parts[1],
config_name=config)
if not quiet:
click.echo('done.', nl=True)
if parameters:
cls.print_pipeline_parameters_info(pipeline_model, pipeline_run_parameters)
else:
if not quiet:
click.echo('Evaluating estimated price...', nl=False)
run_price = Pipeline.get_estimated_price(pipeline_model.identifier,
pipeline_run_parameters.version,
instance_type,
instance_disk,
config_name=config,
price_type=price_type,
region_id=region_id)
click.echo('done.', nl=True)
price_table = prettytable.PrettyTable()
price_table.field_names = ["key", "value"]
price_table.align = "l"
price_table.set_style(12)
price_table.header = False
instance_type = instance_type or run_price.instance_type
price_table.add_row(['Price per hour ({}, hdd {})'.format(run_price.instance_type,
run_price.instance_disk),
'{} $'.format(round(run_price.price_per_hour, 2))])
if run_price.minimum_time_price is not None and run_price.minimum_time_price > 0:
price_table.add_row(['Minimum price',
'{} $'.format(round(run_price.minimum_time_price, 2))])
if run_price.average_time_price is not None and run_price.average_time_price > 0:
price_table.add_row(['Average price',
'{} $'.format(round(run_price.average_time_price, 2))])
if run_price.maximum_time_price is not None and run_price.maximum_time_price > 0:
price_table.add_row(['Maximum price',
'{} $'.format(round(run_price.maximum_time_price, 2))])
click.echo()
click.echo(price_table)
click.echo()
if not quiet:
cls._check_gpu_and_cuda_compatibility(instance_type,
pipeline_run_parameters=pipeline_run_parameters)
# Checking if user provided required parameters:
wrong_parameters = False
for parameter in pipeline_run_parameters.parameters:
if parameter.required and not run_params_dict.get(parameter.name) and parameter.value is None:
if not quiet:
click.echo('"{}" parameter is required'.format(parameter.name), err=True)
else:
click.echo(parameter.name)
sys.exit(1)
wrong_parameters = True
elif run_params_dict.get(parameter.name) is not None:
parameter.value = run_params_dict.get(parameter.name)
for user_parameter in run_params_dict.keys():
custom_parameter = True
for parameter in pipeline_run_parameters.parameters:
if parameter.name.lower() == user_parameter.lower():
custom_parameter = False
break
if custom_parameter:
pipeline_run_parameters.parameters.append(PipelineRunParameterModel(user_parameter,
run_params_dict.get(
user_parameter),
None,
False))
if not wrong_parameters:
if not yes:
click.confirm('Are you sure you want to schedule a run of {}?'.format(pipeline), abort=True)
pipeline_run_model = Pipeline.launch_pipeline(pipeline_model.identifier,
pipeline_run_parameters.version,
pipeline_run_parameters.parameters,
instance_disk,
instance_type,
docker_image,
cmd_template,
timeout,
config_name=config,
instance_count=instance_count,
price_type=price_type,
region_id=region_id,
parent_node=parent_node,
non_pause=non_pause,
friendly_url=friendly_url,
status_notifications=status_notifications,
status_notifications_status=status_notifications_status,
status_notifications_recipient=status_notifications_recipient,
status_notifications_subject=status_notifications_subject,
status_notifications_body=status_notifications_body,
run_as_user=run_as_user)
pipeline_run_id = pipeline_run_model.identifier
if not quiet:
click.echo('"{}" pipeline run scheduled with RunId: {}'.format(
cls.build_image_name(pipeline_model.name, pipeline_run_parameters.version),
pipeline_run_id))
if sync:
pipeline_processed_status = cls.get_pipeline_processed_status(pipeline_run_id)
click.echo('Pipeline run {} completed with status {}'
.format(pipeline_run_id, pipeline_processed_status))
if pipeline_processed_status != 'SUCCESS':
sys.exit(1)
else:
click.echo(pipeline_run_id)
if sync:
pipeline_processed_status = cls.get_pipeline_processed_status(pipeline_run_id)
click.echo(pipeline_processed_status)
if pipeline_processed_status != 'SUCCESS':
sys.exit(1)
elif parameters:
if not quiet:
click.echo('You must specify pipeline for listing parameters', err=True)
elif docker_image is None or cls.required_args_missing(parent_node, instance_type, instance_disk,
cmd_template):
if not quiet:
click.echo('Docker image, instance type, instance disk and cmd template '
'are required parameters if pipeline was not provided.')
else:
required_parameters = []
if docker_image is None:
required_parameters.append('docker_image')
if instance_type is None:
required_parameters.append('instance_type')
if instance_disk is None:
required_parameters.append('instance_disk')
if cmd_template is None:
required_parameters.append('cmd_template')
click.echo(', '.join(required_parameters))
sys.exit(1)
else:
if not quiet:
cls._check_gpu_and_cuda_compatibility(instance_type, docker_image=docker_image)
if not yes:
click.confirm('Are you sure you want to schedule a run?', abort=True)
pipeline_run_model = Pipeline.launch_command(instance_disk,
instance_type,
docker_image,
cmd_template,
run_params_dict,
timeout,
instance_count=instance_count,
price_type=price_type,
region_id=region_id,
parent_node=parent_node,
non_pause=non_pause,
friendly_url=friendly_url,
status_notifications=status_notifications,
status_notifications_status=status_notifications_status,
status_notifications_recipient=status_notifications_recipient,
status_notifications_subject=status_notifications_subject,
status_notifications_body=status_notifications_body,
run_as_user=run_as_user)
pipeline_run_id = pipeline_run_model.identifier
if not quiet:
click.echo('Pipeline run scheduled with RunId: {}'.format(pipeline_run_id))
if sync:
pipeline_processed_status = cls.get_pipeline_processed_status(pipeline_run_id)
click.echo('Pipeline run {} completed with status {}'
.format(pipeline_run_id, pipeline_processed_status))
if pipeline_processed_status != 'SUCCESS':
sys.exit(1)
else:
click.echo(pipeline_run_id)
if sync:
pipeline_processed_status = cls.get_pipeline_processed_status(pipeline_run_id)
click.echo(pipeline_processed_status)
if pipeline_processed_status != 'SUCCESS':
sys.exit(1)
except click.exceptions.Abort:
sys.exit(0)