in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java [236:429]
protected UnilateralSortMerger(
MemoryManager memoryManager,
List<MemorySegment> memory,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
int numSortBuffers,
int maxNumFileHandles,
float startSpillingFraction,
boolean noSpillingMemory,
boolean handleLargeRecords,
boolean objectReuseEnabled,
InMemorySorterFactory<E> inMemorySorterFactory) throws IOException {
// sanity checks
if (memoryManager == null || (ioManager == null && !noSpillingMemory) || serializerFactory == null || comparator == null) {
throw new NullPointerException();
}
if (parentTask == null) {
throw new NullPointerException("Parent Task must not be null.");
}
if (maxNumFileHandles < 2) {
throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
}
this.memoryManager = memoryManager;
this.objectReuseEnabled = objectReuseEnabled;
// adjust the memory quotas to the page size
final int numPagesTotal = memory.size();
if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) {
throw new IllegalArgumentException("Too little memory provided to sorter to perform task. " +
"Required are at least " + (MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) +
" pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
}
// determine how many buffers to use for writing
final int numWriteBuffers;
final int numLargeRecordBuffers;
if (noSpillingMemory && !handleLargeRecords) {
numWriteBuffers = 0;
numLargeRecordBuffers = 0;
}
else {
int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0);
// determine how many buffers we have when we do a full mere with maximal fan-in
final int minBuffersForMerging = maxNumFileHandles + numConsumers * MIN_NUM_WRITE_BUFFERS;
if (minBuffersForMerging > numPagesTotal) {
numWriteBuffers = noSpillingMemory ? 0 : MIN_NUM_WRITE_BUFFERS;
numLargeRecordBuffers = handleLargeRecords ? 2*MIN_NUM_WRITE_BUFFERS : 0;
maxNumFileHandles = numPagesTotal - numConsumers * MIN_NUM_WRITE_BUFFERS;
if (LOG.isDebugEnabled()) {
LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge");
}
}
else {
// we are free to choose. make sure that we do not eat up too much memory for writing
final int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);
if (fractionalAuxBuffers >= MAX_NUM_WRITE_BUFFERS) {
numWriteBuffers = noSpillingMemory ? 0 : MAX_NUM_WRITE_BUFFERS;
numLargeRecordBuffers = handleLargeRecords ? 2*MAX_NUM_WRITE_BUFFERS : 0;
}
else {
numWriteBuffers = noSpillingMemory ? 0 :
Math.max(MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers); // at least the lower bound
numLargeRecordBuffers = handleLargeRecords ?
Math.max(2*MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers) // at least the lower bound
: 0;
}
}
}
final int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize();
// decide how many sort buffers to use
if (numSortBuffers < 1) {
if (sortMemory > 100 * 1024 * 1024) {
numSortBuffers = 2;
}
else {
numSortBuffers = 1;
}
}
final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (="
+ "%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d"
+ " buffers for writing sorted results and merging maximally %d streams at once. "
+ "Using %d memory segments for large record spilling.",
sortMemPages, sortMemory, numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers,
maxNumFileHandles, numLargeRecordBuffers));
}
this.sortReadMemory = memory;
this.writeMemory = new ArrayList<MemorySegment>(numWriteBuffers);
final TypeSerializer<E> serializer = serializerFactory.getSerializer();
// move some pages from the sort memory to the write memory
if (numWriteBuffers > 0) {
for (int i = 0; i < numWriteBuffers; i++) {
this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
}
}
if (numLargeRecordBuffers > 0) {
List<MemorySegment> mem = new ArrayList<MemorySegment>();
for (int i = 0; i < numLargeRecordBuffers; i++) {
mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
}
this.largeRecordHandler = new LargeRecordHandler<E>(serializer, comparator.duplicate(),
ioManager, memoryManager, mem, parentTask, maxNumFileHandles);
}
else {
this.largeRecordHandler = null;
}
// circular queues pass buffers between the threads
final CircularQueues<E> circularQueues = new CircularQueues<E>();
inMemorySorters = new ArrayList<>(numSortBuffers);
// allocate the sort buffers and fill empty queue with them
final Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
for (int i = 0; i < numSortBuffers; i++)
{
// grab some memory
final List<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
for (int k = (i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer); k > 0 && segments.hasNext(); k--) {
sortSegments.add(segments.next());
}
final InMemorySorter<E> inMemorySorter = inMemorySorterFactory.create(sortSegments);
inMemorySorters.add(inMemorySorter);
// add to empty queue
CircularElement<E> element = new CircularElement<E>(i, inMemorySorter, sortSegments);
circularQueues.empty.add(element);
}
// exception handling
ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>() {
public void handleException(IOException exception) {
// forward exception
if (!closed) {
setResultIteratorException(exception);
close();
}
}
};
// create sets that track the channels we need to clean up when closing the sorter
this.channelsToDeleteAtShutdown = new HashSet<FileIOChannel.ID>(64);
this.openChannels = new HashSet<FileIOChannel>(64);
// start the thread that reads the input channels
this.readThread = getReadingThread(exceptionHandler, input, circularQueues, largeRecordHandler,
parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
// start the thread that sorts the buffers
this.sortThread = getSortingThread(exceptionHandler, circularQueues, parentTask);
// start the thread that handles spilling to secondary storage
this.spillThread = getSpillingThread(exceptionHandler, circularQueues, parentTask,
memoryManager, ioManager, serializerFactory, comparator, this.sortReadMemory, this.writeMemory,
maxNumFileHandles);
// propagate the context class loader to the spawned threads
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
if (contextLoader != null) {
if (this.readThread != null) {
this.readThread.setContextClassLoader(contextLoader);
}
if (this.sortThread != null) {
this.sortThread.setContextClassLoader(contextLoader);
}
if (this.spillThread != null) {
this.spillThread.setContextClassLoader(contextLoader);
}
}
startThreads();
}