def cp()

in pipe-cli/src/utilities/datastorage_operations.py [0:0]


    def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags, file_list, symlinks,
           additional_options, threads, io_threads, on_unsafe_chars, on_unsafe_chars_replacement,
           on_empty_files, on_failures, clean=False, skip_existing=False, sync_newer=False, verify_destination=False,
           checksum_algorithm='md5', checksum_skip=False):

        # Check if any external extension should handle this call
        if ExtensionHandlerRegistry.accept('storage', 'cp', locals()):
            sys.exit(0)

        source_wrapper = DataStorageWrapper.get_wrapper(source, symlinks)
        destination_wrapper = DataStorageWrapper.get_wrapper(destination)
        files_to_copy = []

        if source_wrapper is None:
            click.echo('Could not resolve path {}'.format(source), err=True)
            sys.exit(1)
        if destination_wrapper is None:
            click.echo('Could not resolve path {}'.format(destination), err=True)
            sys.exit(1)

        source_type = source_wrapper.get_type()
        destination_type = destination_wrapper.get_type()
        logging.debug('Transferring files {} -> {}...'.format(source_type, destination_type))

        if source_type in [WrapperType.STREAM] or destination_type in [WrapperType.STREAM]:
            quiet = True

        if not source_wrapper.exists():
            click.echo("Source {} doesn't exist".format(source), err=True)
            sys.exit(1)
        if not recursive and not source_wrapper.is_file():
            click.echo('Flag --recursive (-r) is required to copy folders.', err=True)
            sys.exit(1)
        if recursive and source_type in [WrapperType.STREAM]:
            click.echo('Flag --recursive (-r) is not supported for {} sources.'.format(source_type), err=True)
            sys.exit(1)
        if recursive and destination_type in [WrapperType.STREAM]:
            click.echo('Flag --recursive (-r) is not supported for {} destinations.'.format(destination_type), err=True)
            sys.exit(1)
        if clean and source_type in [WrapperType.STREAM, WrapperType.HTTP, WrapperType.FTP]:
            click.echo('Cannot perform \'mv\' operation due to deletion remote files '
                       'is not supported for {} sources.'.format(source_type), err=True)
            sys.exit(1)
        if file_list and source_type in [WrapperType.STREAM]:
            click.echo('Option --file-list (-l) is not supported for {} sources.'.format(source_type), err=True)
            sys.exit(1)
        if skip_existing and sync_newer and source_type in [WrapperType.STREAM, WrapperType.HTTP, WrapperType.FTP]:
            click.echo('Option --sync-newer is not supported for {} sources.'.format(source_type), err=True)
            sys.exit(1)
        if file_list and source_wrapper.is_file():
            click.echo('Option --file-list (-l) allowed for folders copy only.', err=True)
            sys.exit(1)
        if file_list and not os.path.exists(file_list):
            click.echo('Specified --file-list file does not exist.', err=True)
            sys.exit(1)
        if file_list:
            files_to_copy = cls.__get_file_to_copy(file_list, source_wrapper.path)
        if threads and not recursive:
            click.echo('-n (--threads) is allowed for folders only.', err=True)
            sys.exit(1)
        if threads and platform.system() == 'Windows':
            click.echo('-n (--threads) is not supported for Windows OS', err=True)
            sys.exit(1)
        relative = os.path.basename(source) if source_wrapper.is_file() else None
        if not force and not verify_destination and not destination_wrapper.is_empty(relative=relative):
            click.echo('The destination already exists. Specify --force (-f) flag to overwrite data or '
                       '--verify-destination (-vd) flag to enable existence check for each destination path.',
                       err=True)
            sys.exit(1)
        if not checksum_algorithm == 'md5' and source_type not in [WrapperType.LOCAL]:
            click.echo('Checksum algorithm {} is not supported for {} sources.'
                       .format(checksum_algorithm, source_type), err=True)
            sys.exit(1)
        if not checksum_algorithm == 'md5' and destination_type not in [WrapperType.S3]:
            click.echo('Checksum algorithm {} is not supported for {} destinations.'
                       .format(checksum_algorithm, source_type), err=True)
            sys.exit(1)
        if checksum_skip and source_type not in [WrapperType.LOCAL]:
            click.echo('Option --checksum-skip is not supported for {} sources.'.format(source_type), err=True)
            sys.exit(1)
        if checksum_skip and destination_type not in [WrapperType.S3]:
            click.echo('Option --checksum-skip is not supported for {} destinations.'
                       .format(source_type), err=True)
            sys.exit(1)

        # append slashes to path to correctly determine file/folder type
        if not source_wrapper.is_file():
            if not source_wrapper.is_local() and not source_wrapper.path.endswith('/'):
                source_wrapper.path = source_wrapper.path + '/'
            if destination_wrapper.is_local() and not destination_wrapper.path.endswith(os.path.sep):
                destination_wrapper.path = destination_wrapper.path + os.path.sep
            if not destination_wrapper.is_local() and not destination_wrapper.path.endswith('/'):
                destination_wrapper.path = destination_wrapper.path + '/'

        # copying a file to a remote destination, we need to set folder/file flag correctly
        if source_wrapper.is_file() and not destination_wrapper.is_local() and not destination.endswith('/'):
            destination_wrapper.is_file_flag = True

        command = 'mv' if clean else 'cp'
        permission_to_check = os.R_OK if command == 'cp' else os.W_OK

        audit_ctx = auditing()
        manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper,
                                                           events=audit_ctx.container, command=command)

        batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.LOCAL
                                                                      or source_wrapper.get_type() == WrapperType.S3)
        if batch_allowed:
            cls._transfer_batch_items(threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet,
                                      tags, io_threads, on_failures, checksum_algorithm, checksum_skip,
                                      permission_to_check, include, exclude, force, skip_existing, sync_newer,
                                      verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files)
            sys.exit(0)
        items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet)
        if source_type not in [WrapperType.STREAM]:
            items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check,
                                      include, exclude, force, quiet, skip_existing, sync_newer, verify_destination,
                                      on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files)
        if threads:
            cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper,
                                             audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm,
                                             checksum_skip)
        else:
            cls._transfer_items_with_audit_ctx(items, manager, source_wrapper, destination_wrapper,
                                               audit_ctx, clean, quiet, tags, io_threads, on_failures,
                                               checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip)