public void go()

in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java [196:440]


		public void go() throws IOException {
			// ------------------- In-Memory Cache ------------------------
			
			final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
			CircularElement<E> element;
			boolean cacheOnly = false;
			
			// fill cache
			while (isRunning()) {
				// take next element from queue
				try {
					element = this.queues.spill.take();
				}
				catch (InterruptedException iex) {
					if (isRunning()) {
						LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
								"Retrying to grab buffer...");
						continue;
					} else {
						return;
					}
				}
				if (element == spillingMarker()) {
					break;
				}
				else if (element == endMarker()) {
					cacheOnly = true;
					break;
				}
				cache.add(element);
			}
			
			// check whether the thread was canceled
			if (!isRunning()) {
				return;
			}
			
			// ------------------- In-Memory Merge ------------------------
			if (cacheOnly) {
				/* operates on in-memory segments only */
				if (LOG.isDebugEnabled()) {
					LOG.debug("Initiating in memory merge.");
				}
				
				List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size());
								
				// iterate buffers and collect a set of iterators
				for (CircularElement<E> cached : cache) {
					iterators.add(cached.buffer.getIterator());
				}
				
				// release the remaining sort-buffers
				if (LOG.isDebugEnabled()) {
					LOG.debug("Releasing unused sort-buffer memory.");
				}
				disposeSortBuffers(true);
				
				// set lazy iterator
				MutableObjectIterator<E> resIter = iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
						iterators.size() == 1 ? iterators.get(0) : 
						new MergeIterator<E>(iterators, this.comparator);
				
				setResultIterator(resIter);
				return;
			}
			
			// ------------------- Spilling Phase ------------------------
			
			final GroupCombineFunction<E, E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
			
			// now that we are actually spilling, take the combiner, and open it
			try {
				Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
				FunctionUtils.openFunction (combineStub, (conf == null ? new Configuration() : conf));
			}
			catch (Throwable t) {
				throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
			}
			
			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();

			
			// loop as long as the thread is marked alive and we do not see the final element
			while (isRunning())	{
				try {
					element = takeNext(this.queues.spill, cache);
				}
				catch (InterruptedException iex) {
					if (isRunning()) {
						LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
								"Retrying to grab buffer...");
						continue;
					} else {
						return;
					}
				}
				
				// check if we are still running
				if (!isRunning()) {
					return;
				}
				// check if this is the end-of-work buffer
				if (element == endMarker()) {
					break;
				}
				
				// open next channel
				FileIOChannel.ID channel = enumerator.next();
				registerChannelToBeRemovedAtShudown(channel);
				
				if (LOG.isDebugEnabled()) {
					LOG.debug("Creating temp file " + channel.toString() + '.');
				}

				// create writer
				final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
				registerOpenChannelToBeRemovedAtShudown(writer);
				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
																			this.memManager.getPageSize());

				// write sort-buffer to channel
				if (LOG.isDebugEnabled()) {
					LOG.debug("Combining buffer " + element.id + '.');
				}

				// set up the combining helpers
				final InMemorySorter<E> buffer = element.buffer;
				final CombineValueIterator<E> iter = new CombineValueIterator<E>(
						buffer, this.serializer.createInstance(), this.objectReuseEnabled);
				final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);

				int i = 0;
				int stop = buffer.size() - 1;

				try {
					while (i < stop) {
						int seqStart = i;
						while (i < stop && 0 == buffer.compare(i, i + 1)) {
							i++;
						}
	
						if (i == seqStart) {
							// no duplicate key, no need to combine. simply copy
							buffer.writeToOutput(output, seqStart, 1);
						} else {
							// get the iterator over the values
							iter.set(seqStart, i);
							// call the combiner to combine
							combineStub.combine(iter, collector);
						}
						i++;
					}
				}
				catch (Exception ex) {
					throw new IOException("An error occurred in the combiner user code.", ex);
				}

				// write the last pair, if it has not yet been included in the last iteration
				if (i == stop) {
					buffer.writeToOutput(output, stop, 1);
				}

				// done combining and writing out
				if (LOG.isDebugEnabled()) {
					LOG.debug("Combined and spilled buffer " + element.id + ".");
				}

				output.close();
				unregisterOpenChannelToBeRemovedAtShudown(writer);
				
				channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));

				// pass empty sort-buffer to reading thread
				element.buffer.reset();
				this.queues.empty.add(element);
			}

			// done with the spilling
			if (LOG.isDebugEnabled()) {
				LOG.debug("Spilling done.");
				LOG.debug("Releasing sort-buffer memory.");
			}
			
			// clear the sort buffers, but do not return the memory to the manager, as we use it for merging
			disposeSortBuffers(false);
			
			
			if (LOG.isDebugEnabled()) {
				LOG.debug("Closing combiner user code.");
			}
			
			// close the user code
			try {
				FunctionUtils.closeFunction(combineStub);
			}
			catch (Throwable t) {
				throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
			}
			
			if (LOG.isDebugEnabled()) {
				LOG.debug("User code closed.");
			}

			// ------------------- Merging Phase ------------------------

			// merge channels until sufficient file handles are available
			while (isRunning() && channelIDs.size() > this.maxFanIn) {
				channelIDs = mergeChannelList(channelIDs, this.mergeReadMemory, this.writeMemory);
			}
			
			// from here on, we won't write again
			this.memManager.release(this.writeMemory);
			this.writeMemory.clear();
			
			// check if we have spilled some data at all
			if (channelIDs.isEmpty()) {
				setResultIterator(EmptyMutableObjectIterator.<E>get());
			}
			else {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Beginning final merge.");
				}
				
				// allocate the memory for the final merging step
				List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size());
				
				// allocate the read memory and register it to be released
				getSegmentsForReaders(readBuffers, this.mergeReadMemory, channelIDs.size());
				
				// get the readers and register them to be released
				final MergeIterator<E> mergeIterator = getMergingIterator(
						channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), null);
				
				// set the target for the user iterator
				// if the final merge combines, create a combining iterator around the merge iterator,
				// otherwise not
				setResultIterator(mergeIterator);
			}

			// done
			if (LOG.isDebugEnabled()) {
				LOG.debug("Spilling and merging thread done.");
			}
		}