in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java [196:440]
public void go() throws IOException {
// ------------------- In-Memory Cache ------------------------
final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
CircularElement<E> element;
boolean cacheOnly = false;
// fill cache
while (isRunning()) {
// take next element from queue
try {
element = this.queues.spill.take();
}
catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
"Retrying to grab buffer...");
continue;
} else {
return;
}
}
if (element == spillingMarker()) {
break;
}
else if (element == endMarker()) {
cacheOnly = true;
break;
}
cache.add(element);
}
// check whether the thread was canceled
if (!isRunning()) {
return;
}
// ------------------- In-Memory Merge ------------------------
if (cacheOnly) {
/* operates on in-memory segments only */
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating in memory merge.");
}
List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size());
// iterate buffers and collect a set of iterators
for (CircularElement<E> cached : cache) {
iterators.add(cached.buffer.getIterator());
}
// release the remaining sort-buffers
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing unused sort-buffer memory.");
}
disposeSortBuffers(true);
// set lazy iterator
MutableObjectIterator<E> resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
iterators.size() == 1 ? iterators.get(0) :
new MergeIterator<E>(iterators, this.comparator);
setResultIterator(resIter);
return;
}
// ------------------- Spilling Phase ------------------------
final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// now that we are actually spilling, take the combiner, and open it
try {
Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
FunctionUtils.openFunction (combineStub, (conf == null ? new Configuration() : conf));
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
}
final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
// loop as long as the thread is marked alive and we do not see the final element
while (isRunning()) {
try {
element = takeNext(this.queues.spill, cache);
}
catch (InterruptedException iex) {
if (isRunning()) {
LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
"Retrying to grab buffer...");
continue;
} else {
return;
}
}
// check if we are still running
if (!isRunning()) {
return;
}
// check if this is the end-of-work buffer
if (element == endMarker()) {
break;
}
// open next channel
FileIOChannel.ID channel = enumerator.next();
registerChannelToBeRemovedAtShudown(channel);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating temp file " + channel.toString() + '.');
}
// create writer
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
registerOpenChannelToBeRemovedAtShudown(writer);
final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
this.memManager.getPageSize());
// write sort-buffer to channel
if (LOG.isDebugEnabled()) {
LOG.debug("Combining buffer " + element.id + '.');
}
// set up the combining helpers
final InMemorySorter<E> buffer = element.buffer;
final CombineValueIterator<E> iter = new CombineValueIterator<E>(
buffer, this.serializer.createInstance(), this.objectReuseEnabled);
final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
int i = 0;
int stop = buffer.size() - 1;
try {
while (i < stop) {
int seqStart = i;
while (i < stop && 0 == buffer.compare(i, i + 1)) {
i++;
}
if (i == seqStart) {
// no duplicate key, no need to combine. simply copy
buffer.writeToOutput(output, seqStart, 1);
} else {
// get the iterator over the values
iter.set(seqStart, i);
// call the combiner to combine
combineStub.combine(iter, collector);
}
i++;
}
}
catch (Exception ex) {
throw new IOException("An error occurred in the combiner user code.", ex);
}
// write the last pair, if it has not yet been included in the last iteration
if (i == stop) {
buffer.writeToOutput(output, stop, 1);
}
// done combining and writing out
if (LOG.isDebugEnabled()) {
LOG.debug("Combined and spilled buffer " + element.id + ".");
}
output.close();
unregisterOpenChannelToBeRemovedAtShudown(writer);
channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
// pass empty sort-buffer to reading thread
element.buffer.reset();
this.queues.empty.add(element);
}
// done with the spilling
if (LOG.isDebugEnabled()) {
LOG.debug("Spilling done.");
LOG.debug("Releasing sort-buffer memory.");
}
// clear the sort buffers, but do not return the memory to the manager, as we use it for merging
disposeSortBuffers(false);
if (LOG.isDebugEnabled()) {
LOG.debug("Closing combiner user code.");
}
// close the user code
try {
FunctionUtils.closeFunction(combineStub);
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
}
if (LOG.isDebugEnabled()) {
LOG.debug("User code closed.");
}
// ------------------- Merging Phase ------------------------
// merge channels until sufficient file handles are available
while (isRunning() && channelIDs.size() > this.maxFanIn) {
channelIDs = mergeChannelList(channelIDs, this.mergeReadMemory, this.writeMemory);
}
// from here on, we won't write again
this.memManager.release(this.writeMemory);
this.writeMemory.clear();
// check if we have spilled some data at all
if (channelIDs.isEmpty()) {
setResultIterator(EmptyMutableObjectIterator.<E>get());
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug("Beginning final merge.");
}
// allocate the memory for the final merging step
List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size());
// allocate the read memory and register it to be released
getSegmentsForReaders(readBuffers, this.mergeReadMemory, channelIDs.size());
// get the readers and register them to be released
final MergeIterator<E> mergeIterator = getMergingIterator(
channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), null);
// set the target for the user iterator
// if the final merge combines, create a combining iterator around the merge iterator,
// otherwise not
setResultIterator(mergeIterator);
}
// done
if (LOG.isDebugEnabled()) {
LOG.debug("Spilling and merging thread done.");
}
}