pipe-cli/mount/pipefuse/mpu.py (395 lines of code) (raw):
# Copyright 2017-2020 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
from abc import abstractmethod, ABCMeta
from collections import namedtuple
import intervals
from pipefuse.fuseutils import MB, GB
_CopyPart = namedtuple('CopyPart', ['start', 'end', 'offset', 'part_number', 'part_path', 'keep'])
class UnmanageableMultipartUploadException(RuntimeError):
def __init__(self, *args):
super(UnmanageableMultipartUploadException, self).__init__(*args)
class MultipartUpload:
__metaclass__ = ABCMeta
@property
@abstractmethod
def path(self):
pass
@abstractmethod
def initiate(self):
pass
@abstractmethod
def upload_part(self, buf, offset=None, part_number=None, part_path=None, keep=False):
pass
@abstractmethod
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
pass
@abstractmethod
def complete(self):
pass
@abstractmethod
def abort(self):
pass
class MultipartUploadDecorator(MultipartUpload):
def __init__(self, mpu):
self._mpu = mpu
@property
def path(self):
return self._mpu.path
def initiate(self):
self._mpu.initiate()
def upload_part(self, buf, offset=None, part_number=None, part_path=None, keep=False):
self._mpu.upload_part(buf, offset, part_number)
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
self._mpu.upload_copy_part(start, end, offset, part_number)
def complete(self):
self._mpu.complete()
def abort(self):
self._mpu.abort()
class _PartialChunk:
__metaclass__ = ABCMeta
@property
@abstractmethod
def offset(self):
pass
@abstractmethod
def append(self, offset, buf):
pass
@abstractmethod
def is_full(self):
pass
@abstractmethod
def missing_intervals(self):
pass
@abstractmethod
def collect(self):
pass
class _IncompletePartialChunk(_PartialChunk):
def __init__(self, offset, size):
self._offset = offset
self._size = size
self._buf = bytearray(size)
self._bounds_interval = intervals.closed(0, self._size)
self._interval = intervals.empty()
@property
def offset(self):
return self._offset
def append(self, offset, buf):
end = offset + len(buf)
self._buf[offset:end] = buf[:]
self._interval |= intervals.closed(offset, end)
def is_full(self):
return self._missing_interval().is_empty()
def missing_intervals(self):
for interval in self._missing_interval():
yield interval.lower, interval.upper
def _missing_interval(self):
return self._interval.complement().intersection(self._bounds_interval)
def collect(self):
return self._buf[:self._interval.upper]
class _CompletePartialChunk(_PartialChunk):
def __init__(self, offset):
self._offset = offset
@property
def offset(self):
return self._offset
def append(self, offset, buf):
pass
def is_full(self):
return True
def missing_intervals(self):
return []
def collect(self):
return bytearray(0)
class ChunkedMultipartUpload(MultipartUploadDecorator):
def __init__(self, mpu, original_size, download, chunk_size, min_chunk, max_chunk):
"""
Chunked multipart upload.
Cuts all the incoming uploads into chunks of the given size.
Has a limit on the maximum file size that can be written using chunked multipart upload.
It can be calculated multiplying chunk size by max chunk number. F.e. for chunk size of 10MB and 10000 chunks
it will be 100GB, for chunk size of 100MB and 10000 chunks it will be 1TB.
Fills gaps between uploaded parts with the copy parts of unlimited size.
:param mpu: Wrapping multipart upload.
:param original_size: Destination file original size.
:param download: Function that retrieves content from an object by its path, offset and length.
:param chunk_size: Size of a single upload part.
:param min_chunk: Minimum allowed chunk number.
:param max_chunk: Maximum allowed chunk number.
"""
super(ChunkedMultipartUpload, self).__init__(mpu)
self._mpu = mpu
self._original_size = original_size
self._download = download
self._chunk_size = chunk_size
self._partial_chunks = {}
self._min_chunk = min_chunk
self._max_chunk = max_chunk
def upload_part(self, buf, offset=None, part_number=None, part_path=None, keep=False):
chunk = self._resolve_chunk(offset)
chunk_offset = self._chunk_offset(chunk)
chunk_shift = offset - chunk_offset
buf_shift = 0
while buf_shift < len(buf):
if chunk_shift or buf_shift + self._chunk_size - chunk_shift > len(buf):
partial_chunk = self._partial_chunks.get(chunk, None)
if not partial_chunk:
partial_chunk = _IncompletePartialChunk(chunk_offset, self._chunk_size)
self._partial_chunks[chunk] = partial_chunk
if partial_chunk.is_full():
raise UnmanageableMultipartUploadException(
'Multipart upload chunk %s cannot be reuploaded for %s.' % (chunk, self.path))
partial_chunk.append(chunk_shift, buf[buf_shift:buf_shift + self._chunk_size - chunk_shift])
if partial_chunk.is_full():
self._mpu.upload_part(partial_chunk.collect(), partial_chunk.offset, chunk)
self._partial_chunks[chunk] = _CompletePartialChunk(partial_chunk.offset)
else:
chunk_buf = bytearray(self._chunk_size)
chunk_buf[chunk_shift:self._chunk_size] = buf[buf_shift:buf_shift + self._chunk_size]
self._mpu.upload_part(chunk_buf, chunk_offset, chunk)
self._partial_chunks[chunk] = _CompletePartialChunk(chunk_offset)
buf_shift += self._chunk_size - chunk_shift
chunk += 1
chunk_offset += self._chunk_size
chunk_shift = 0
def _resolve_chunk(self, offset):
first_chunk = self._min_chunk
last_chunk = self._max_chunk
while last_chunk - first_chunk > 1:
mid_chunk = first_chunk + (last_chunk - first_chunk) / 2
mid_chunk_offset = self._chunk_offset(mid_chunk)
if offset > mid_chunk_offset:
first_chunk = mid_chunk
elif offset < mid_chunk_offset:
last_chunk = mid_chunk
else:
return mid_chunk
return last_chunk if offset >= self._chunk_offset(last_chunk) else first_chunk
def _chunk_offset(self, chunk):
return (chunk - 1) * self._chunk_size
def complete(self):
current_missing_chunk = 1
for chunk in sorted(self._partial_chunks.keys()):
if chunk > current_missing_chunk:
missing_start = self._chunk_offset(current_missing_chunk)
missing_end = self._chunk_offset(chunk)
self._mpu.upload_copy_part(missing_start, missing_end, missing_start, current_missing_chunk)
current_missing_chunk = chunk + 1
last_written_chunk = max(self._partial_chunks.keys())
last_missing_chunk = last_written_chunk + 1
last_chunk_end = self._chunk_offset(last_missing_chunk)
if last_chunk_end < self._original_size:
self._mpu.upload_copy_part(last_chunk_end, self._original_size, last_chunk_end, last_missing_chunk)
for chunk_number, partial_chunk in self._partial_chunks.items():
if partial_chunk.is_full():
continue
for missing_start, missing_end in partial_chunk.missing_intervals():
actual_start = missing_start + partial_chunk.offset
actual_end = missing_end + partial_chunk.offset
if self._original_size <= actual_start:
if chunk_number < last_written_chunk:
partial_chunk.append(missing_start,
bytearray(actual_end - actual_start))
elif actual_start < self._original_size < actual_end:
partial_chunk.append(missing_start,
self._download(self.path, actual_start, self._original_size - actual_start))
if chunk_number < last_written_chunk:
partial_chunk.append(missing_start + self._original_size - actual_start,
bytearray(actual_end - self._original_size))
else:
partial_chunk.append(missing_start, self._download(self.path, actual_start, actual_end - actual_start))
chunk = partial_chunk.collect()
self._mpu.upload_part(chunk, partial_chunk.offset, chunk_number)
self._mpu.complete()
class SplittingMultipartCopyUpload(MultipartUploadDecorator):
def __init__(self, mpu, min_part_size=5 * MB, max_part_size=5 * GB):
"""
Splitting multipart copy upload.
Splits copy upload parts into several ones to fit maximum upload part size limit.
Also takes into the account minimum upload part size.
:param mpu: Wrapping multipart upload.
:param min_part_size: Minimum upload part size.
:param max_part_size: Maximum upload part size.
"""
super(SplittingMultipartCopyUpload, self).__init__(mpu)
self._mpu = mpu
self._min_part_size = min_part_size
self._max_part_size = max_part_size
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
copy_part_length = end - start
if copy_part_length > self._max_part_size:
logging.debug('Splitting copy upload part %d into pieces for %s' % (part_number, self.path))
remaining_length = copy_part_length
current_offset = offset
actual_part_number = part_number
while remaining_length > 0:
part_size = self._resolve_part_size(remaining_length)
self._mpu.upload_copy_part(current_offset, current_offset + part_size, current_offset,
actual_part_number)
remaining_length -= part_size
current_offset += part_size
actual_part_number += 1
else:
self._mpu.upload_copy_part(start, end, offset, part_number)
def _resolve_part_size(self, remaining_length):
if self._min_part_size <= remaining_length <= self._max_part_size:
return remaining_length
else:
return min(self._max_part_size, remaining_length - self._min_part_size)
class OutOfBoundsSplittingMultipartCopyUpload(SplittingMultipartCopyUpload):
def __init__(self, mpu, original_size, min_part_size, max_part_size):
"""
Out of bounds splitting multipart copy upload.
Splits out of bounds copy upload parts into several ones to fit memory-safe part size limit.
Also takes into the account minimum upload part size.
:param mpu: Wrapping multipart upload.
:param original_size: Destination file original size.
:param min_part_size: Minimum upload part size.
:param max_part_size: Maximum upload part size.
"""
super(OutOfBoundsSplittingMultipartCopyUpload, self).__init__(mpu, min_part_size, max_part_size)
self._original_size = original_size
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
if self._original_size < end:
super(OutOfBoundsSplittingMultipartCopyUpload, self).upload_copy_part(start, end, offset, part_number)
else:
self._mpu.upload_copy_part(start, end, offset, part_number)
class OutOfBoundsFillingMultipartCopyUpload(MultipartUploadDecorator):
def __init__(self, mpu, original_size, download):
"""
Out of bounds filling multipart copy upload.
Fills copy upload part regions which are located beyond the original file size with null bytes.
:param mpu: Wrapping multipart upload.
:param original_size: Destination file original size.
:param download: Function that retrieves content from the original file by its offset and length.
"""
super(OutOfBoundsFillingMultipartCopyUpload, self).__init__(mpu)
self._mpu = mpu
self._original_size = original_size
self._download = download
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
if self._original_size <= start:
logging.debug('Filling out of bounds copy upload part %s whole %d-%d for %s'
% (part_number, start, end, self.path))
self._mpu.upload_part(bytearray(end - start), offset, part_number)
elif start < self._original_size < end:
logging.debug('Filling out of bounds copy upload part %s region %d-%d with nulls for %s'
% (part_number, self._original_size, end, self.path))
original_buf = self._download(self.path, start, self._original_size - start)
modified_buf = bytearray(end - start)
modified_buf[0:len(original_buf)] = original_buf
self._mpu.upload_part(modified_buf, offset, part_number)
else:
self._mpu.upload_copy_part(start, end, offset, part_number)
class DownloadingMultipartCopyUpload(MultipartUploadDecorator):
def __init__(self, mpu, download):
"""
Downloading multipart copy upload.
Downloads all copy parts from original file and uploads them as regular parts.
:param mpu: Wrapping multipart upload.
:param download: Function that retrieves content from an object by its path, offset and length.
"""
super(DownloadingMultipartCopyUpload, self).__init__(mpu)
self._mpu = mpu
self._download = download
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
self._mpu.upload_part(self._download(self.path, start, end - start), offset, part_number, part_path, keep)
class AppendOptimizedCompositeMultipartCopyUpload(MultipartUploadDecorator):
def __init__(self, mpu, original_size, chunk_size, download):
"""
Append optimized composite multipart copy upload.
Uses original object as a pre uploaded composite part in case of append writes.
In order to do so it adjusts the first of the already uploaded chunks.
Uploads copy parts as regular parts using content of the original file.
:param mpu: Wrapping composite multipart upload.
:param original_size: Destination file original size.
:param chunk_size: Size of a single upload part.
:param download: Function that retrieves content from an object by its path, offset and length.
"""
super(AppendOptimizedCompositeMultipartCopyUpload, self).__init__(mpu)
self._mpu = mpu
self._original_size = original_size
self._chunk_size = chunk_size
self._download = download
self._copy_parts = []
self._first_chunk = sys.maxint
self._first_chunk_offset = 0
def upload_part(self, buf, offset=None, part_number=None, part_path=None, keep=False):
self._mpu.upload_part(buf, offset, part_number, part_path)
if part_number < self._first_chunk:
self._first_chunk = part_number
self._first_chunk_offset = offset
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
self._copy_parts.append(_CopyPart(start, end, offset, part_number, part_path, keep))
def complete(self):
prefix_copy_part = self._extract_prefix()
if prefix_copy_part:
first_uploaded_part_adjusted = self._adjust_first_uploaded_part(self._copy_parts[-1].end)
if first_uploaded_part_adjusted:
self._upload_copy_part(prefix_copy_part)
else:
self._upload_parts(self._copy_parts)
else:
self._upload_parts(self._copy_parts)
self._mpu.complete()
def _extract_prefix(self):
if self._copy_parts:
sorted_copy_parts = sorted(self._copy_parts, key=lambda copy_part: copy_part.offset)
prefix_end = 0
for copy_index, copy_part in enumerate(sorted_copy_parts):
if copy_part.offset != prefix_end:
break
prefix_end += copy_part.end - copy_part.start
if prefix_end and prefix_end + self._chunk_size >= self._original_size:
return _CopyPart(0, self._original_size, offset=0, part_number=1, part_path=self.path, keep=True)
return None
def _adjust_first_uploaded_part(self, prefix_end):
diff_offset = self._original_size - prefix_end
if diff_offset > 0:
part_path = self._mpu.composite_part_path(self._first_chunk)
original_part = self._download(self.path, self._first_chunk_offset, self._chunk_size)
uploaded_part = self._download(part_path, 0, self._chunk_size)
if original_part[:diff_offset] == uploaded_part[:diff_offset]:
self._mpu.upload_part(uploaded_part[diff_offset:],
offset=self._first_chunk_offset, part_number=self._first_chunk,
part_path=part_path)
else:
return False
return True
def _upload_copy_part(self, copy_part):
self._mpu.upload_copy_part(copy_part.start, copy_part.end, offset=copy_part.offset,
part_number=copy_part.part_number, part_path=copy_part.part_path,
keep=copy_part.keep)
def _upload_parts(self, copy_parts):
for copy_part in copy_parts:
self._mpu.upload_part(self._download(self.path, copy_part.start, copy_part.end - copy_part.start),
offset=copy_part.offset, part_number=copy_part.part_number,
part_path=copy_part.part_path, keep=copy_part.keep)
class CompositeMultipartUpload(MultipartUpload):
def __init__(self, bucket, path, new_mpu, mv, max_composite_parts=32):
"""
Composite object multipart upload.
Uploads each part as an independent file then merges all the files into a single composite one.
Takes into the account maximum allowed composite object parts.
:param bucket: Destination bucket name.
:param path: Destination bucket relative path.
:param new_mpu: Function that instantiates new multipart upload.
:param mv: Function that moves object within bucket.
:param max_composite_parts: Number of allowed composite object parts.
"""
self._bucket = bucket
self._path = path
self._new_mpu = new_mpu
self._mv = mv
self._max_composite_parts = max_composite_parts
self._bucket_object = None
self._blob_object = None
self._mpus = {}
@property
def path(self):
return self._path
def initiate(self):
pass
def upload_part(self, buf, offset=None, part_number=None, part_path=None, keep=False):
mpu_number = self._mpu_number(part_number)
mpu = self._get_mpu(mpu_number)
mpu.upload_part(buf, offset, part_number, self._part_path(mpu_number, part_number), keep)
def _mpu_number(self, part_number):
return (part_number - 1) / self._max_composite_parts + 1
def _get_mpu(self, mpu_number):
mpu = self._mpus.get(mpu_number, None)
if not mpu:
mpu_path = self._mpu_path(mpu_number)
mpu = self._new_mpu(mpu_path)
mpu.initiate()
self._mpus[mpu_number] = mpu
return mpu
def upload_copy_part(self, start, end, offset=None, part_number=None, part_path=None, keep=False):
mpu_number = self._mpu_number(part_number)
mpu = self._get_mpu(mpu_number)
mpu.upload_copy_part(start, end, offset, part_number, part_path, keep)
def complete(self):
for mpu in self._mpus.values():
mpu.complete()
self._compose([(mpu_number, self._mpus[mpu_number]) for mpu_number in sorted(self._mpus.keys())])
def _compose(self, mpus):
if not mpus:
return
remaining_mpus = list(mpus)
composed_mpu = None
while remaining_mpus:
if composed_mpu:
merging_mpus = remaining_mpus[:self._max_composite_parts - 1]
composed_mpu = self._merge([(0, composed_mpu)] + merging_mpus)
remaining_mpus = remaining_mpus[self._max_composite_parts - 1:]
else:
merging_mpus = remaining_mpus[:self._max_composite_parts]
composed_mpu = self._merge(merging_mpus)
remaining_mpus = remaining_mpus[self._max_composite_parts:]
for mpu_number, _ in merging_mpus:
del self._mpus[mpu_number]
self._mv(composed_mpu.path, self._path)
def _merge(self, mpus):
merged_mpu = self._new_mpu(self._merged_mpu_path(mpus[0][0], mpus[-1][0]))
merged_mpu.initiate()
for mpu_number, mpu in mpus:
merged_mpu.upload_copy_part(None, None, None, mpu_number, mpu.path)
merged_mpu.complete()
return merged_mpu
def abort(self):
for mpu in self._mpus.values():
mpu.abort()
def composite_part_path(self, part_number):
return self._part_path(self._mpu_number(part_number), part_number)
def _mpu_path(self, mpu_number):
return '%s_%s.tmp/%d' % (self._path, self._hashed_path(), mpu_number)
def _part_path(self, mpu_number, part_number):
return '%s_%s.tmp/%d.%d' % (self._path, self._hashed_path(), mpu_number, part_number)
def _merged_mpu_path(self, left_mpu_number, right_mpu_number):
return '%s_%s.tmp/%d:%d' % (self._path, self._hashed_path(), left_mpu_number, right_mpu_number)
def _hashed_path(self):
return str(abs(hash(self._path)))
class TruncatingMultipartCopyUpload(MultipartUploadDecorator):
def __init__(self, mpu, length, min_part_number=1):
"""
Truncating multipart copy upload.
Truncates the file length to the given size which has to be smaller then the original one.
:param mpu: Wrapping multipart upload.
:param length: Target size of the truncating file.
:param min_part_number: Minimal allowed part number. Same as minimum allowed chunk number.
"""
super(TruncatingMultipartCopyUpload, self).__init__(mpu)
self._mpu = mpu
self._length = length
self._min_part_number = min_part_number
def complete(self):
self._mpu.upload_copy_part(start=0, end=self._length, offset=0, part_number=self._min_part_number)
self._mpu.complete()