in pipe-cli/src/utilities/datastorage_operations.py [0:0]
def cp(cls, source, destination, recursive, force, exclude, include, quiet, tags, file_list, symlinks,
additional_options, threads, io_threads, on_unsafe_chars, on_unsafe_chars_replacement,
on_empty_files, on_failures, clean=False, skip_existing=False, sync_newer=False, verify_destination=False,
checksum_algorithm='md5', checksum_skip=False):
# Check if any external extension should handle this call
if ExtensionHandlerRegistry.accept('storage', 'cp', locals()):
sys.exit(0)
source_wrapper = DataStorageWrapper.get_wrapper(source, symlinks)
destination_wrapper = DataStorageWrapper.get_wrapper(destination)
files_to_copy = []
if source_wrapper is None:
click.echo('Could not resolve path {}'.format(source), err=True)
sys.exit(1)
if destination_wrapper is None:
click.echo('Could not resolve path {}'.format(destination), err=True)
sys.exit(1)
source_type = source_wrapper.get_type()
destination_type = destination_wrapper.get_type()
logging.debug('Transferring files {} -> {}...'.format(source_type, destination_type))
if source_type in [WrapperType.STREAM] or destination_type in [WrapperType.STREAM]:
quiet = True
if not source_wrapper.exists():
click.echo("Source {} doesn't exist".format(source), err=True)
sys.exit(1)
if not recursive and not source_wrapper.is_file():
click.echo('Flag --recursive (-r) is required to copy folders.', err=True)
sys.exit(1)
if recursive and source_type in [WrapperType.STREAM]:
click.echo('Flag --recursive (-r) is not supported for {} sources.'.format(source_type), err=True)
sys.exit(1)
if recursive and destination_type in [WrapperType.STREAM]:
click.echo('Flag --recursive (-r) is not supported for {} destinations.'.format(destination_type), err=True)
sys.exit(1)
if clean and source_type in [WrapperType.STREAM, WrapperType.HTTP, WrapperType.FTP]:
click.echo('Cannot perform \'mv\' operation due to deletion remote files '
'is not supported for {} sources.'.format(source_type), err=True)
sys.exit(1)
if file_list and source_type in [WrapperType.STREAM]:
click.echo('Option --file-list (-l) is not supported for {} sources.'.format(source_type), err=True)
sys.exit(1)
if skip_existing and sync_newer and source_type in [WrapperType.STREAM, WrapperType.HTTP, WrapperType.FTP]:
click.echo('Option --sync-newer is not supported for {} sources.'.format(source_type), err=True)
sys.exit(1)
if file_list and source_wrapper.is_file():
click.echo('Option --file-list (-l) allowed for folders copy only.', err=True)
sys.exit(1)
if file_list and not os.path.exists(file_list):
click.echo('Specified --file-list file does not exist.', err=True)
sys.exit(1)
if file_list:
files_to_copy = cls.__get_file_to_copy(file_list, source_wrapper.path)
if threads and not recursive:
click.echo('-n (--threads) is allowed for folders only.', err=True)
sys.exit(1)
if threads and platform.system() == 'Windows':
click.echo('-n (--threads) is not supported for Windows OS', err=True)
sys.exit(1)
relative = os.path.basename(source) if source_wrapper.is_file() else None
if not force and not verify_destination and not destination_wrapper.is_empty(relative=relative):
click.echo('The destination already exists. Specify --force (-f) flag to overwrite data or '
'--verify-destination (-vd) flag to enable existence check for each destination path.',
err=True)
sys.exit(1)
if not checksum_algorithm == 'md5' and source_type not in [WrapperType.LOCAL]:
click.echo('Checksum algorithm {} is not supported for {} sources.'
.format(checksum_algorithm, source_type), err=True)
sys.exit(1)
if not checksum_algorithm == 'md5' and destination_type not in [WrapperType.S3]:
click.echo('Checksum algorithm {} is not supported for {} destinations.'
.format(checksum_algorithm, source_type), err=True)
sys.exit(1)
if checksum_skip and source_type not in [WrapperType.LOCAL]:
click.echo('Option --checksum-skip is not supported for {} sources.'.format(source_type), err=True)
sys.exit(1)
if checksum_skip and destination_type not in [WrapperType.S3]:
click.echo('Option --checksum-skip is not supported for {} destinations.'
.format(source_type), err=True)
sys.exit(1)
# append slashes to path to correctly determine file/folder type
if not source_wrapper.is_file():
if not source_wrapper.is_local() and not source_wrapper.path.endswith('/'):
source_wrapper.path = source_wrapper.path + '/'
if destination_wrapper.is_local() and not destination_wrapper.path.endswith(os.path.sep):
destination_wrapper.path = destination_wrapper.path + os.path.sep
if not destination_wrapper.is_local() and not destination_wrapper.path.endswith('/'):
destination_wrapper.path = destination_wrapper.path + '/'
# copying a file to a remote destination, we need to set folder/file flag correctly
if source_wrapper.is_file() and not destination_wrapper.is_local() and not destination.endswith('/'):
destination_wrapper.is_file_flag = True
command = 'mv' if clean else 'cp'
permission_to_check = os.R_OK if command == 'cp' else os.W_OK
audit_ctx = auditing()
manager = DataStorageWrapper.get_operation_manager(source_wrapper, destination_wrapper,
events=audit_ctx.container, command=command)
batch_allowed = not verify_destination and not file_list and (source_wrapper.get_type() == WrapperType.LOCAL
or source_wrapper.get_type() == WrapperType.S3)
if batch_allowed:
cls._transfer_batch_items(threads, manager, source_wrapper, destination_wrapper, audit_ctx, clean, quiet,
tags, io_threads, on_failures, checksum_algorithm, checksum_skip,
permission_to_check, include, exclude, force, skip_existing, sync_newer,
verify_destination, on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files)
sys.exit(0)
items = files_to_copy if file_list else source_wrapper.get_items(quiet=quiet)
if source_type not in [WrapperType.STREAM]:
items = cls._filter_items(items, manager, source_wrapper, destination_wrapper, permission_to_check,
include, exclude, force, quiet, skip_existing, sync_newer, verify_destination,
on_unsafe_chars, on_unsafe_chars_replacement, on_empty_files)
if threads:
cls._multiprocess_transfer_items(items, threads, manager, source_wrapper, destination_wrapper,
audit_ctx, clean, quiet, tags, io_threads, on_failures, checksum_algorithm,
checksum_skip)
else:
cls._transfer_items_with_audit_ctx(items, manager, source_wrapper, destination_wrapper,
audit_ctx, clean, quiet, tags, io_threads, on_failures,
checksum_algorithm=checksum_algorithm, checksum_skip=checksum_skip)