in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java [1245:1482]
public void go() throws IOException {
final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
CircularElement<E> element;
boolean cacheOnly = false;
// ------------------- In-Memory Cache ------------------------
// fill cache
while (isRunning()) {
// take next element from queue
try {
element = this.queues.spill.take();
}
catch (InterruptedException iex) {
throw new IOException("The spilling thread was interrupted.");
}
if (element == SPILLING_MARKER) {
break;
}
else if (element == EOF_MARKER) {
cacheOnly = true;
break;
}
cache.add(element);
}
// check whether the thread was canceled
if (!isRunning()) {
return;
}
MutableObjectIterator<E> largeRecords = null;
// check if we can stay in memory with the large record handler
if (cacheOnly && largeRecordHandler != null && largeRecordHandler.hasData()) {
List<MemorySegment> memoryForLargeRecordSorting = new ArrayList<MemorySegment>();
CircularElement<E> circElement;
while ((circElement = this.queues.empty.poll()) != null) {
circElement.buffer.dispose();
memoryForLargeRecordSorting.addAll(circElement.memory);
}
if (memoryForLargeRecordSorting.isEmpty()) {
cacheOnly = false;
LOG.debug("Going to disk-based merge because of large records.");
} else {
LOG.debug("Sorting large records, to add them to in-memory merge.");
largeRecords = largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting);
}
}
// ------------------- In-Memory Merge ------------------------
if (cacheOnly) {
// operates on in-memory buffers only
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating in memory merge.");
}
List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size() + 1);
// iterate buffers and collect a set of iterators
for (CircularElement<E> cached : cache) {
// note: the yielded iterator only operates on the buffer heap (and disregards the stack)
iterators.add(cached.buffer.getIterator());
}
if (largeRecords != null) {
iterators.add(largeRecords);
}
// release the remaining sort-buffers
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing unused sort-buffer memory.");
}
disposeSortBuffers(true);
// set lazy iterator
setResultIterator(iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
iterators.size() == 1 ? iterators.get(0) :
new MergeIterator<E>(iterators, this.comparator));
return;
}
// ------------------- Spilling Phase ------------------------
final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
// loop as long as the thread is marked alive and we do not see the final element
while (isRunning()) {
try {
element = takeNext(this.queues.spill, cache);
}
catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
"Retrying to grab buffer...");
continue;
} else {
return;
}
}
// check if we are still running
if (!isRunning()) {
return;
}
// check if this is the end-of-work buffer
if (element == EOF_MARKER) {
break;
}
// open next channel
FileIOChannel.ID channel = enumerator.next();
registerChannelToBeRemovedAtShudown(channel);
// create writer
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
// write sort-buffer to channel
if (LOG.isDebugEnabled()) {
LOG.debug("Spilling buffer " + element.id + ".");
}
element.buffer.writeToOutput(output, largeRecordHandler);
if (LOG.isDebugEnabled()) {
LOG.debug("Spilled buffer " + element.id + ".");
}
output.close();
unregisterOpenChannelToBeRemovedAtShudown(writer);
if (output.getBytesWritten() > 0) {
channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
}
// pass empty sort-buffer to reading thread
element.buffer.reset();
this.queues.empty.add(element);
}
// done with the spilling
if (LOG.isDebugEnabled()) {
LOG.debug("Spilling done.");
LOG.debug("Releasing sort-buffer memory.");
}
// clear the sort buffers, but do not return the memory to the manager, as we use it for merging
disposeSortBuffers(false);
// ------------------- Merging Phase ------------------------
// make sure we have enough memory to merge and for large record handling
List<MemorySegment> mergeReadMemory;
if (largeRecordHandler != null && largeRecordHandler.hasData()) {
List<MemorySegment> longRecMem;
if (channelIDs.isEmpty()) {
// only long records
longRecMem = this.mergeReadMemory;
mergeReadMemory = Collections.emptyList();
}
else {
int maxMergedStreams = Math.min(this.maxFanIn, channelIDs.size());
int pagesPerStream = Math.max(MIN_NUM_WRITE_BUFFERS,
Math.min(MAX_NUM_WRITE_BUFFERS, this.mergeReadMemory.size() / 2 / maxMergedStreams));
int totalMergeReadMemory = maxMergedStreams * pagesPerStream;
// grab the merge memory
mergeReadMemory = new ArrayList<MemorySegment>(totalMergeReadMemory);
for (int i = 0; i < totalMergeReadMemory; i++) {
mergeReadMemory.add(this.mergeReadMemory.get(i));
}
// the remainder of the memory goes to the long record sorter
longRecMem = new ArrayList<MemorySegment>();
for (int i = totalMergeReadMemory; i < this.mergeReadMemory.size(); i++) {
longRecMem.add(this.mergeReadMemory.get(i));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sorting keys for large records.");
}
largeRecords = largeRecordHandler.finishWriteAndSortKeys(longRecMem);
}
else {
mergeReadMemory = this.mergeReadMemory;
}
// merge channels until sufficient file handles are available
while (isRunning() && channelIDs.size() > this.maxFanIn) {
channelIDs = mergeChannelList(channelIDs, mergeReadMemory, this.writeMemory);
}
// from here on, we won't write again
this.memManager.release(this.writeMemory);
this.writeMemory.clear();
// check if we have spilled some data at all
if (channelIDs.isEmpty()) {
if (largeRecords == null) {
setResultIterator(EmptyMutableObjectIterator.<E>get());
} else {
setResultIterator(largeRecords);
}
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("Beginning final merge.");
}
// allocate the memory for the final merging step
List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size());
// allocate the read memory and register it to be released
getSegmentsForReaders(readBuffers, mergeReadMemory, channelIDs.size());
// get the readers and register them to be released
setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), largeRecords));
}
// done
if (LOG.isDebugEnabled()) {
LOG.debug("Spilling and merging thread done.");
}
}