def get_daemon()

in workflows/pipe-common/scripts/autoscale_grid_engine.py [0:0]


def get_daemon():
    params = GridEngineParameters()

    grid_engine_type = params.autoscaling_advanced.grid_engine.get() \
        or (GridEngineType.KUBE if params.queue.kube_grid_engine.get()
            else GridEngineType.SLURM if params.queue.slurm_grid_engine.get()
            else GridEngineType.SGE)

    api_url = os.environ['API']

    cluster_owner = os.getenv('OWNER', 'root')
    cluster_hostfile = os.environ['DEFAULT_HOSTFILE']
    cluster_master_run_id = os.environ['RUN_ID']
    cluster_master_name = os.getenv('HOSTNAME', 'pipeline-' + str(cluster_master_run_id))
    cluster_work_dir = params.autoscaling_advanced.work_dir.get()

    queue_name = params.queue.queue_name.get()
    queue_name_short = (queue_name if not queue_name.endswith('.q') else queue_name[:-2])

    logging_dir = params.autoscaling.log_dir.get()
    logging_verbose = params.autoscaling.log_verbose.get()
    logging_level_run = params.autoscaling_advanced.logging_level_run.get()
    logging_level_file = params.autoscaling_advanced.logging_level_file.get()
    logging_level_console = params.autoscaling_advanced.logging_level_console.get()
    logging_format = params.autoscaling_advanced.logging_format.get()
    logging_task = (params.autoscaling_advanced.log_task.get()
                    or '{type}Autoscaling-{queue}'
                       .format(type=grid_engine_type, queue=queue_name_short))
    logging_file = os.path.join(logging_dir,
                                'autoscaler.{type}.{queue}.log'
                                .format(type=grid_engine_type.lower(), queue=queue_name))

    if logging_verbose:
        logging_level_run = 'DEBUG'

    # TODO: Git rid of CloudPipelineAPI usage in favor of PipelineAPI
    api = PipelineAPI(api_url=api_url, log_dir=logging_dir, token=RefreshingToken())

    mkdir(os.path.dirname(logging_file))

    logging_formatter = logging.Formatter(logging_format)

    logging_logger_root = logging.getLogger()
    logging_logger_root.setLevel(logging.WARNING)

    logging_logger = logging.getLogger(name=logging_task)
    logging_logger.setLevel(logging.DEBUG)

    if not logging_logger.handlers:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setLevel(logging_level_console)
        console_handler.setFormatter(logging_formatter)
        logging_logger.addHandler(console_handler)

        file_handler = logging.FileHandler(logging_file)
        file_handler.setLevel(logging_level_file)
        file_handler.setFormatter(logging_formatter)
        logging_logger.addHandler(file_handler)

    logger = RunLogger(api=api, run_id=cluster_master_run_id)
    logger = TaskLogger(task=logging_task, inner=logger)
    logger = LevelLogger(level=logging_level_run, inner=logger)
    logger = LocalLogger(logger=logging_logger, inner=logger)
    logger = ResilientLogger(inner=logger, fallback=LocalLogger(logger=logging_logger))

    # todo: Get rid of Logger usage in favor of logger
    Logger.inner = logger

    common_utils = ScaleCommonUtils()

    static_instance_cpus = int(os.getenv('CLOUD_PIPELINE_NODE_CORES', multiprocessing.cpu_count()))
    static_instance_number = int(os.getenv('node_count', 0))
    static_instance_type = params.autoscaling_advanced.static_instance_type.get()

    instance_cloud_provider = CloudProvider(params.autoscaling_advanced.instance_cloud_provider.get())
    instance_region_id = params.autoscaling_advanced.instance_region_id.get()
    instance_type = params.autoscaling.instance_type.get()
    instance_disk = params.autoscaling.instance_disk.get()
    instance_image = params.autoscaling.instance_image.get()
    instance_price_type = params.autoscaling.price_type.get()
    instance_cmd_template = params.autoscaling.cmd_template.get()
    instance_owner_param = params.autoscaling_advanced.instance_owner_param.get()
    instance_inheritable_params = (params.autoscaling_advanced.instance_inheritable_params.get() or '').split(',')
    instance_inheritable_param_prefixes = \
        (params.autoscaling_advanced.instance_inheritable_param_prefixes.get() or '').split(',')

    autoscale = params.autoscaling.autoscale.get()
    autoscale_instance_number = params.autoscaling.autoscaling_hosts_number.get()

    descending_autoscale = params.autoscaling.descending_autoscale.get()

    hybrid_autoscale = params.autoscaling.hybrid_autoscale.get()
    hybrid_instance_cores = params.autoscaling.hybrid_instance_cores.get()
    hybrid_instance_family = params.autoscaling.hybrid_instance_family.get() \
                             or common_utils.extract_family_from_instance_type(instance_cloud_provider, instance_type)

    polling_delay = params.autoscaling_advanced.polling_delay.get()

    scale_up_strategy = params.autoscaling.scale_up_strategy.get()
    scale_up_batch_size = params.autoscaling.scale_up_batch_size.get()
    scale_up_polling_delay = params.autoscaling.scale_up_polling_delay.get()
    scale_up_unavail_delay = params.autoscaling.scale_up_unavail_delay.get()
    scale_up_unavail_count_insufficient = params.autoscaling.scale_up_unavail_count_insufficient.get()
    scale_up_unavail_count_failure = params.autoscaling.scale_up_unavail_count_failure.get()

    scale_up_timeout = int(resolve_preference(api, 'ge.autoscaling.scale.up.timeout', default=30))
    scale_up_polling_timeout = int(resolve_preference(api, 'ge.autoscaling.scale.up.polling.timeout', default=900))

    scale_down_batch_size = params.autoscaling.scale_down_batch_size.get()
    scale_down_timeout = int(resolve_preference(api, 'ge.autoscaling.scale.down.timeout', default=30))
    scale_down_idle_timeout = params.autoscaling.scale_down_idle_timeout.get()
    scale_down_invalid_timeout = params.autoscaling.scale_down_invalid_timeout.get()

    active_timeout = params.autoscaling_advanced.active_timeout.get()

    dry_init = params.autoscaling_advanced.dry_init.get()
    dry_run = params.autoscaling_advanced.dry_run.get()

    event_ttl = params.autoscaling_advanced.event_ttl.get()

    custom_requirements = params.autoscaling_advanced.custom_requirements.get()
    custom_requirements_purge = params.autoscaling_advanced.custom_requirements_purge.get()

    node_mem_reservations = params.autoscaling_advanced.node_mem_reservations.get()

    queue_static = params.queue.queue_static.get()
    queue_default = params.queue.queue_default.get()
    queue_hostlist_name = params.queue.hostlist_name.get()
    queue_reserved_cpu = params.queue.hosts_free_cores.get()
    queue_master_cpu = params.queue.master_cores.get() or static_instance_cpus
    queue_gpu_resource_name = params.queue.gpu_resource_name.get()
    queue_mem_resource_name = params.queue.mem_resource_name.get()
    queue_exc_resource_name = params.queue.exc_resource_name.get()

    host_storage_file = os.path.join(cluster_work_dir, '.autoscaler.%s.storage' % queue_name)
    host_storage_static_file = os.path.join(cluster_work_dir, '.autoscaler.%s.static.storage' % queue_name)

    Logger.info('Initiating grid engine autoscaling...')

    if not autoscale:
        Logger.info('Using non autoscaling mode...')
        autoscale_instance_number = 0

    if dry_init:
        Logger.info('Using dry init mode...')

    if dry_run:
        Logger.info('Using dry run mode...')

    clock = Clock()
    # TODO: Git rid of CmdExecutor usage in favor of CloudPipelineExecutor implementation
    cmd_executor = CmdExecutor()

    reserved_supply = ResourceSupply(cpu=queue_reserved_cpu)

    event_manager = GridEngineEventManager(ttl=event_ttl, clock=clock)
    worker_recorder = CloudPipelineWorkerRecorder(api=api, event_manager=event_manager, clock=clock)
    availability_manager = InstanceAvailabilityManager(event_manager=event_manager, clock=clock,
                                                       unavail_delay=scale_up_unavail_delay,
                                                       unavail_count_insufficient=scale_up_unavail_count_insufficient,
                                                       unavail_count_failure=scale_up_unavail_count_failure)
    cloud_instance_provider = CloudPipelineInstanceProvider(api=api, region_id=instance_region_id,
                                                            price_type=instance_price_type)
    if node_mem_reservations:
        cloud_instance_provider = CloudPipelineReservationInstanceProvider(
            inner=cloud_instance_provider,
            kube_mem_ratio=float(resolve_preference(api, 'cluster.node.kube.mem.ratio', default=0.025)),
            kube_mem_min_mib=int(resolve_preference(api, 'cluster.node.kube.mem.min.mib', default=256)),
            kube_mem_max_mib=int(resolve_preference(api, 'cluster.node.kube.mem.max.mib', default=1024)),
            system_mem_ratio=float(resolve_preference(api, 'cluster.node.system.mem.ratio', default=0.025)),
            system_mem_min_mib=int(resolve_preference(api, 'cluster.node.system.mem.min.mib', default=256)),
            system_mem_max_mib=int(resolve_preference(api, 'cluster.node.system.mem.max.mib', default=1024)),
            extra_mem_ratio=float(resolve_preference(api, 'cluster.node.extra.mem.ratio', default=0.05)),
            extra_mem_min_mib=int(resolve_preference(api, 'cluster.node.extra.mem.min.mib', default=512)),
            extra_mem_max_mib=int(resolve_preference(api, 'cluster.node.extra.mem.max.mib', default=sys.maxsize)))
    default_instance_provider = DefaultInstanceProvider(inner=cloud_instance_provider,
                                                        instance_type=instance_type)
    static_instance_provider = DefaultInstanceProvider(inner=cloud_instance_provider,
                                                       instance_type=static_instance_type)

    descending_instances = default_instance_provider.provide()
    if not descending_instances:
        raise ValidationError('Parameter {name} has invalid value {value}. '
                              'Such instance type is not available. '
                              'Please specify available instance type. \n\n'
                              '{name}\n{help}'
                              .format(name=params.autoscaling.instance_type.name, value=instance_type,
                                      help=params.autoscaling.instance_type.help))
    descending_instance = descending_instances.pop()
    descending_instance_cores = descending_instance.cpu if descending_instance else 0
    descending_instance_type = descending_instance.name if descending_instance else ''
    descending_instance_family = common_utils.extract_family_from_instance_type(instance_cloud_provider,
                                                                                descending_instance_type)

    if hybrid_autoscale and hybrid_instance_family:
        Logger.info('Using hybrid autoscaling of {} instances...'.format(hybrid_instance_family))
        instance_provider = FamilyInstanceProvider(inner=cloud_instance_provider,
                                                   instance_cloud_provider=instance_cloud_provider,
                                                   instance_family=hybrid_instance_family,
                                                   common_utils=common_utils)
        if not instance_provider.provide():
            raise ValidationError('Parameter {name} has invalid value {value}. '
                                  'Such instance type family is not available. '
                                  'Please specify available instance type family. \n\n'
                                  '{name}\n{help}'
                                  .format(name=params.autoscaling.hybrid_instance_family.name,
                                          value=hybrid_instance_family,
                                          help=params.autoscaling.hybrid_instance_family.help))
        if hybrid_instance_cores:
            Logger.info('Using instances with no more than {} cpus...'.format(hybrid_instance_cores))
            instance_provider = SizeLimitingInstanceProvider(inner=instance_provider,
                                                             max_instance_cores=hybrid_instance_cores)
        if not instance_provider.provide():
            raise ValidationError('Parameter {name} has invalid value {value}. '
                                  'There are no such instance types available. '
                                  'Please specify a different value. \n\n'
                                  '{name}\n'
                                  '{help}'
                                  .format(name=params.autoscaling.hybrid_instance_cores.name,
                                          value=hybrid_instance_cores,
                                          help=params.autoscaling.hybrid_instance_cores.help))
    elif descending_autoscale and descending_instance_family and descending_instance_cores:
        Logger.info('Using descending autoscaling of {} instances...'.format(descending_instance_type))
        instance_provider = FamilyInstanceProvider(inner=cloud_instance_provider,
                                                   instance_cloud_provider=instance_cloud_provider,
                                                   instance_family=descending_instance_family,
                                                   common_utils=common_utils)
        if not instance_provider.provide():
            raise ValidationError('Parameter {name} has invalid value {value}. '
                                  'Such instance type\'s family is not available. '
                                  'Please specify different instance type. \n\n'
                                  '{name}\n{help}'
                                  .format(name=params.autoscaling.instance_type.name,
                                          value=instance_type,
                                          help=params.autoscaling.instance_type.help))
        if descending_instance_cores:
            Logger.info('Using instances with no more than {} cpus...'.format(descending_instance_cores))
            instance_provider = SizeLimitingInstanceProvider(inner=instance_provider,
                                                             max_instance_cores=descending_instance_cores)
        if not instance_provider.provide():
            raise ValidationError('Parameter {name} has invalid value {value}. '
                                  'There are no such instance types available. '
                                  'Please specify different instance type. \n\n'
                                  '{name}\n{help}'
                                  .format(name=params.autoscaling.instance_type.name,
                                          value=instance_type,
                                          help=params.autoscaling.instance_type.help))
        instance_provider = DescendingInstanceProvider(inner=instance_provider)
    else:
        Logger.info('Using default autoscaling of {} instances...'.format(instance_type))
        instance_provider = default_instance_provider

    if scale_up_unavail_delay:
        Logger.info('Using only available instances...')
        instance_provider = AvailableInstanceProvider(inner=instance_provider,
                                                      availability_manager=availability_manager)

    if scale_up_strategy == 'cpu-capacity':
        Logger.info('Selecting instances using cpu capacity strategy...')
        instance_selector = CpuCapacityInstanceSelector(instance_provider=instance_provider,
                                                        reserved_supply=reserved_supply)
    elif scale_up_strategy == 'naive-cpu-capacity':
        Logger.info('Selecting instances using fractional cpu capacity strategy...')
        instance_selector = NaiveCpuCapacityInstanceSelector(instance_provider=instance_provider,
                                                             reserved_supply=reserved_supply)
    else:
        Logger.info('Selecting instances using default strategy...')
        instance_selector = BackwardCompatibleInstanceSelector(instance_provider=instance_provider,
                                                               reserved_supply=reserved_supply,
                                                               batch_size=scale_up_batch_size)

    instances = instance_provider.provide()
    if not instances:
        raise ValidationError('Grid engine autoscaler configuration is invalid. '
                              'There are no required instance types available. '
                              'Please use different configuration parameters.')

    static_instances = static_instance_provider.provide()
    if not static_instances:
        raise ValidationError('Parameter {name} has invalid value {value}. '
                              'Such instance type is not available. '
                              'Please specify available instance type. \n\n'
                              '{name}\n{help}'
                              .format(name=params.autoscaling_advanced.static_instance_type.name,
                                      value=static_instance_type,
                                      help=params.autoscaling_advanced.static_instance_type.help))
    static_instance = static_instances.pop()

    instance_supplies = [ResourceSupply.of(available_instance) - reserved_supply for available_instance in instances]
    biggest_instance_supply = sorted(instance_supplies, key=lambda supply: supply.cpu).pop()
    static_instance_supply = ResourceSupply.of(static_instance) - reserved_supply
    master_instance_supply = copy.deepcopy(static_instance_supply)
    master_instance_supply.cpu = queue_master_cpu - queue_reserved_cpu \
        if queue_master_cpu - queue_reserved_cpu > 0 \
        else queue_master_cpu
    cluster_supply = biggest_instance_supply * autoscale_instance_number
    if queue_static:
        cluster_supply += master_instance_supply + static_instance_supply * static_instance_number

    if grid_engine_type == GridEngineType.SLURM:
        grid_engine = SlurmGridEngine(cmd_executor=cmd_executor)
        job_preprocessor = DoNothingGridEngineJobProcessor()
        job_validator = SlurmJobValidator(grid_engine=grid_engine, instance_max_supply=biggest_instance_supply,
                                          cluster_max_supply=cluster_supply)
        demand_selector = SlurmDemandSelector(grid_engine=grid_engine)
        launch_adapter = SlurmLaunchAdapter()
    elif grid_engine_type == GridEngineType.KUBE:
        kube_client = get_kube_client()
        resource_parser = KubeResourceParser()
        grid_engine = KubeGridEngine(kube=kube_client, resource_parser=resource_parser, owner=cluster_owner)
        job_preprocessor = DoNothingGridEngineJobProcessor()
        job_validator = KubeJobValidator(grid_engine=grid_engine, instance_max_supply=biggest_instance_supply,
                                         cluster_max_supply=cluster_supply)
        demand_selector = KubeDefaultDemandSelector(grid_engine=grid_engine)
        launch_adapter = KubeLaunchAdapter()
    else:
        grid_engine = SunGridEngine(cmd_executor=cmd_executor, queue=queue_name, hostlist=queue_hostlist_name,
                                    queue_default=queue_default,
                                    gpu_resource_name=queue_gpu_resource_name,
                                    mem_resource_name=queue_mem_resource_name,
                                    exc_resource_name=queue_exc_resource_name)
        if custom_requirements_purge:
            job_preprocessor = SunGridEngineCustomRequestsPurgeJobProcessor(
                cmd_executor=cmd_executor,
                gpu_resource_name=queue_gpu_resource_name,
                mem_resource_name=queue_mem_resource_name,
                exc_resource_name=queue_exc_resource_name,
                dry_run=dry_run)
        else:
            job_preprocessor = DoNothingGridEngineJobProcessor()
        job_validator = SunGridEngineJobValidator(grid_engine=grid_engine,
                                                  instance_max_supply=biggest_instance_supply,
                                                  cluster_max_supply=cluster_supply)
        demand_selector = SunGridEngineDefaultDemandSelector(grid_engine=grid_engine)
        if custom_requirements:
            demand_selector = SunGridEngineGlobalDemandSelector(inner=demand_selector, grid_engine=grid_engine)
        launch_adapter = SunGridEngineLaunchAdapter(queue=queue_name, hostlist=queue_hostlist_name)

    host_storage = FileSystemHostStorage(cmd_executor=cmd_executor, storage_file=host_storage_file, clock=clock)
    host_storage = ThreadSafeHostStorage(host_storage)
    static_host_storage = FileSystemHostStorage(cmd_executor=cmd_executor, storage_file=host_storage_static_file,
                                                clock=clock)
    init_static_hosts(default_hostfile=cluster_hostfile, static_host_storage=static_host_storage, clock=clock,
                      active_timeout=active_timeout, static_hosts_enabled=queue_static and static_instance_number)

    if queue_static:
        Logger.info('Using static workers:\n{}\n{}'
                    .format('- {} {} ({} cpu, {} gpu, {} mem, {} exc)'
                            .format(cluster_master_name, static_instance.name,
                                    master_instance_supply.cpu,
                                    master_instance_supply.gpu,
                                    master_instance_supply.mem,
                                    master_instance_supply.exc),
                            '\n'.join('- {} {} ({} cpu, {} gpu, {} mem, {} exc)'
                                      .format(host, static_instance.name,
                                              static_instance_supply.cpu,
                                              static_instance_supply.gpu,
                                              static_instance_supply.mem,
                                              static_instance_supply.exc)
                                      for host in static_host_storage.load_hosts()
                                      if host != cluster_master_name))
                    .strip())
    Logger.info('Using autoscaling instance types:\n{}'
                .format('\n'.join('- {} ({} cpu, {} gpu, {} mem, {} exc)'
                                  .format(instance.name,
                                          instance_supply.cpu,
                                          instance_supply.gpu,
                                          instance_supply.mem,
                                          instance_supply.exc)
                                  for instance, instance_supply
                                  in zip(instances, instance_supplies))))

    instance_launch_params = fetch_instance_launch_params(api, launch_adapter, cluster_master_run_id,
                                                          instance_inheritable_params,
                                                          instance_inheritable_param_prefixes)

    worker_tags_handler = CloudPipelineWorkerTagsHandler(api=api, active_timeout=active_timeout,
                                                         active_tag=grid_engine_type + '_IN_USE',
                                                         host_storage=host_storage,
                                                         static_host_storage=static_host_storage, clock=clock,
                                                         common_utils=common_utils, dry_run=dry_run)
    scale_up_handler = GridEngineScaleUpHandler(cmd_executor=cmd_executor, api=api, grid_engine=grid_engine,
                                                launch_adapter=launch_adapter,
                                                host_storage=host_storage,
                                                parent_run_id=cluster_master_run_id,
                                                instance_disk=instance_disk, instance_image=instance_image,
                                                cmd_template=instance_cmd_template,
                                                price_type=instance_price_type, region_id=instance_region_id,
                                                queue=queue_name, hostlist=queue_hostlist_name,
                                                owner_param_name=instance_owner_param,
                                                polling_delay=scale_up_polling_delay,
                                                polling_timeout=scale_up_polling_timeout,
                                                instance_launch_params=instance_launch_params,
                                                clock=clock)
    if dry_run:
        scale_up_handler = DoNothingScaleUpHandler()
    scale_up_orchestrator = GridEngineScaleUpOrchestrator(scale_up_handler=scale_up_handler,
                                                          grid_engine=grid_engine,
                                                          host_storage=host_storage,
                                                          static_host_storage=static_host_storage,
                                                          worker_tags_handler=worker_tags_handler,
                                                          instance_selector=instance_selector,
                                                          worker_recorder=worker_recorder,
                                                          batch_size=scale_up_batch_size,
                                                          polling_delay=scale_up_polling_delay,
                                                          clock=clock)
    scale_down_handler = GridEngineScaleDownHandler(cmd_executor=cmd_executor, api=api, grid_engine=grid_engine,
                                                    common_utils=common_utils)
    if dry_run:
        scale_down_handler = DoNothingScaleDownHandler()
    scale_down_orchestrator = GridEngineScaleDownOrchestrator(scale_down_handler=scale_down_handler,
                                                              grid_engine=grid_engine,
                                                              host_storage=host_storage,
                                                              batch_size=scale_down_batch_size)
    worker_validator_handlers = [
        CloudPipelineWorkerValidatorHandler(api=api, common_utils=common_utils),
    ]

    if grid_engine_type == GridEngineType.SLURM:
        worker_validator_handlers.extend([
            GracePeriodWorkerValidatorHandler(inner=grid_engine, grace_period=scale_down_invalid_timeout, clock=clock)
        ])
    elif grid_engine_type == GridEngineType.KUBE:
        worker_validator_handlers.extend([
            GracePeriodWorkerValidatorHandler(inner=grid_engine, grace_period=scale_down_invalid_timeout, clock=clock)
        ])
    else:
        worker_validator_handlers.extend([
            SunGridEngineHostWorkerValidatorHandler(cmd_executor=cmd_executor),
            GracePeriodWorkerValidatorHandler(
                inner=SunGridEngineStateWorkerValidatorHandler(cmd_executor=cmd_executor, queue=queue_name),
                grace_period=scale_down_invalid_timeout,
                clock=clock)
        ])

    worker_validator = CloudPipelineWorkerValidator(cmd_executor=cmd_executor, api=api, host_storage=host_storage,
                                                    grid_engine=grid_engine, scale_down_handler=scale_down_handler,
                                                    handlers=worker_validator_handlers,
                                                    common_utils=common_utils, dry_run=dry_run)
    autoscaler = GridEngineAutoscaler(grid_engine=grid_engine,
                                      job_preprocessor=job_preprocessor,
                                      job_validator=job_validator,
                                      demand_selector=demand_selector,
                                      cmd_executor=cmd_executor,
                                      scale_up_orchestrator=scale_up_orchestrator,
                                      scale_down_orchestrator=scale_down_orchestrator,
                                      host_storage=host_storage, static_host_storage=static_host_storage,
                                      scale_up_timeout=scale_up_timeout,
                                      scale_down_timeout=scale_down_timeout,
                                      max_additional_hosts=autoscale_instance_number,
                                      idle_timeout=scale_down_idle_timeout, clock=clock)
    daemon = GridEngineAutoscalingDaemon(autoscaler=autoscaler, worker_validator=worker_validator,
                                         worker_tags_handler=worker_tags_handler, polling_timeout=polling_delay)
    if dry_init:
        daemon = DoNothingAutoscalingDaemon()
    return daemon