public void go()

in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java [1245:1482]


		public void go() throws IOException {
			
			final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
			CircularElement<E> element;
			boolean cacheOnly = false;
			
			// ------------------- In-Memory Cache ------------------------
			// fill cache
			while (isRunning()) {
				// take next element from queue
				try {
					element = this.queues.spill.take();
				}
				catch (InterruptedException iex) {
					throw new IOException("The spilling thread was interrupted.");
				}

				if (element == SPILLING_MARKER) {
					break;
				}
				else if (element == EOF_MARKER) {
					cacheOnly = true;
					break;
				}
				cache.add(element);
			}
			
			// check whether the thread was canceled
			if (!isRunning()) {
				return;
			}
			
			MutableObjectIterator<E> largeRecords = null;
			
			// check if we can stay in memory with the large record handler
			if (cacheOnly && largeRecordHandler != null && largeRecordHandler.hasData()) {
				List<MemorySegment> memoryForLargeRecordSorting = new ArrayList<MemorySegment>();
				
				CircularElement<E> circElement;
				while ((circElement = this.queues.empty.poll()) != null) {
					circElement.buffer.dispose();
					memoryForLargeRecordSorting.addAll(circElement.memory);
				}
				
				if (memoryForLargeRecordSorting.isEmpty()) {
					cacheOnly = false;
					LOG.debug("Going to disk-based merge because of large records.");
					
				} else {
					LOG.debug("Sorting large records, to add them to in-memory merge.");
					largeRecords = largeRecordHandler.finishWriteAndSortKeys(memoryForLargeRecordSorting);
				}
			}
			
			// ------------------- In-Memory Merge ------------------------
			if (cacheOnly) {
				// operates on in-memory buffers only
				if (LOG.isDebugEnabled()) {
					LOG.debug("Initiating in memory merge.");
				}
				
				List<MutableObjectIterator<E>> iterators = new ArrayList<MutableObjectIterator<E>>(cache.size() + 1);
				
				// iterate buffers and collect a set of iterators
				for (CircularElement<E> cached : cache) {
					// note: the yielded iterator only operates on the buffer heap (and disregards the stack)
					iterators.add(cached.buffer.getIterator());
				}
				
				if (largeRecords != null) {
					iterators.add(largeRecords);
				}
				
				// release the remaining sort-buffers
				if (LOG.isDebugEnabled()) {
					LOG.debug("Releasing unused sort-buffer memory.");
				}
				disposeSortBuffers(true);
				
				// set lazy iterator
				setResultIterator(iterators.isEmpty() ? EmptyMutableObjectIterator.<E>get() :
						iterators.size() == 1 ? iterators.get(0) : 
						new MergeIterator<E>(iterators, this.comparator));
				return;
			}
			
			// ------------------- Spilling Phase ------------------------
			
			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
			
			// loop as long as the thread is marked alive and we do not see the final element
			while (isRunning()) {
				try {
					element = takeNext(this.queues.spill, cache);
				}
				catch (InterruptedException iex) {
					if (isRunning()) {
						LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. " +
								"Retrying to grab buffer...");
						continue;
					} else {
						return;
					}
				}
				
				// check if we are still running
				if (!isRunning()) {
					return;
				}
				// check if this is the end-of-work buffer
				if (element == EOF_MARKER) {
					break;
				}
				
				// open next channel
				FileIOChannel.ID channel = enumerator.next();
				registerChannelToBeRemovedAtShudown(channel);

				// create writer
				final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
				registerOpenChannelToBeRemovedAtShudown(writer);
				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
																			this.memManager.getPageSize());

				// write sort-buffer to channel
				if (LOG.isDebugEnabled()) {
					LOG.debug("Spilling buffer " + element.id + ".");
				}
				
				element.buffer.writeToOutput(output, largeRecordHandler);
				
				if (LOG.isDebugEnabled()) {
					LOG.debug("Spilled buffer " + element.id + ".");
				}

				output.close();
				unregisterOpenChannelToBeRemovedAtShudown(writer);
				
				if (output.getBytesWritten() > 0) {
					channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount()));
				}

				// pass empty sort-buffer to reading thread
				element.buffer.reset();
				this.queues.empty.add(element);
			}

			// done with the spilling
			if (LOG.isDebugEnabled()) {
				LOG.debug("Spilling done.");
				LOG.debug("Releasing sort-buffer memory.");
			}
			
			// clear the sort buffers, but do not return the memory to the manager, as we use it for merging
			disposeSortBuffers(false);

			
			// ------------------- Merging Phase ------------------------
			
			// make sure we have enough memory to merge and for large record handling
			List<MemorySegment> mergeReadMemory;
			
			if (largeRecordHandler != null && largeRecordHandler.hasData()) {
				
				List<MemorySegment> longRecMem;
				if (channelIDs.isEmpty()) {
					// only long records
					longRecMem = this.mergeReadMemory;
					mergeReadMemory = Collections.emptyList();
				}
				else {
					int maxMergedStreams = Math.min(this.maxFanIn, channelIDs.size());
					
					int pagesPerStream = Math.max(MIN_NUM_WRITE_BUFFERS,
							Math.min(MAX_NUM_WRITE_BUFFERS, this.mergeReadMemory.size() / 2 / maxMergedStreams));
					
					int totalMergeReadMemory = maxMergedStreams * pagesPerStream;
					
					// grab the merge memory
					mergeReadMemory = new ArrayList<MemorySegment>(totalMergeReadMemory);
					for (int i = 0; i < totalMergeReadMemory; i++) {
						mergeReadMemory.add(this.mergeReadMemory.get(i));
					}
					
					// the remainder of the memory goes to the long record sorter
					longRecMem = new ArrayList<MemorySegment>();
					for (int i = totalMergeReadMemory; i < this.mergeReadMemory.size(); i++) {
						longRecMem.add(this.mergeReadMemory.get(i));
					}
				}
				
				if (LOG.isDebugEnabled()) {
					LOG.debug("Sorting keys for large records.");
				}
				largeRecords = largeRecordHandler.finishWriteAndSortKeys(longRecMem);
			}
			else {
				mergeReadMemory = this.mergeReadMemory;
			}
			
			// merge channels until sufficient file handles are available
			while (isRunning() && channelIDs.size() > this.maxFanIn) {
				channelIDs = mergeChannelList(channelIDs, mergeReadMemory, this.writeMemory);
			}
			
			// from here on, we won't write again
			this.memManager.release(this.writeMemory);
			this.writeMemory.clear();
			
			// check if we have spilled some data at all
			if (channelIDs.isEmpty()) {
				if (largeRecords == null) {
					setResultIterator(EmptyMutableObjectIterator.<E>get());
				} else {
					setResultIterator(largeRecords);
				}
			}
			else {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Beginning final merge.");
				}
				
				// allocate the memory for the final merging step
				List<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelIDs.size());
				
				// allocate the read memory and register it to be released
				getSegmentsForReaders(readBuffers, mergeReadMemory, channelIDs.size());
				
				// get the readers and register them to be released
				setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), largeRecords));
			}

			// done
			if (LOG.isDebugEnabled()) {
				LOG.debug("Spilling and merging thread done.");
			}
		}