protected UnilateralSortMerger()

in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java [236:429]


	protected UnilateralSortMerger(
			MemoryManager memoryManager,
			List<MemorySegment> memory,
			IOManager ioManager,
			MutableObjectIterator<E> input,
			AbstractInvokable parentTask,
			TypeSerializerFactory<E> serializerFactory,
			TypeComparator<E> comparator,
			int numSortBuffers,
			int maxNumFileHandles,
			float startSpillingFraction,
			boolean noSpillingMemory,
			boolean handleLargeRecords,
			boolean objectReuseEnabled,
			InMemorySorterFactory<E> inMemorySorterFactory) throws IOException {
		// sanity checks
		if (memoryManager == null || (ioManager == null && !noSpillingMemory) || serializerFactory == null || comparator == null) {
			throw new NullPointerException();
		}
		if (parentTask == null) {
			throw new NullPointerException("Parent Task must not be null.");
		}
		if (maxNumFileHandles < 2) {
			throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
		}
		
		this.memoryManager = memoryManager;
		this.objectReuseEnabled = objectReuseEnabled;

		// adjust the memory quotas to the page size
		final int numPagesTotal = memory.size();

		if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) {
			throw new IllegalArgumentException("Too little memory provided to sorter to perform task. " +
				"Required are at least " + (MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) + 
				" pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
		}
		
		// determine how many buffers to use for writing
		final int numWriteBuffers;
		final int numLargeRecordBuffers;
		
		if (noSpillingMemory && !handleLargeRecords) {
			numWriteBuffers = 0;
			numLargeRecordBuffers = 0;
		}
		else {
			int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0);
			
			// determine how many buffers we have when we do a full mere with maximal fan-in 
			final int minBuffersForMerging = maxNumFileHandles + numConsumers * MIN_NUM_WRITE_BUFFERS;

			if (minBuffersForMerging > numPagesTotal) {
				numWriteBuffers = noSpillingMemory ? 0 : MIN_NUM_WRITE_BUFFERS;
				numLargeRecordBuffers = handleLargeRecords ? 2*MIN_NUM_WRITE_BUFFERS : 0;

				maxNumFileHandles = numPagesTotal - numConsumers * MIN_NUM_WRITE_BUFFERS;
				if (LOG.isDebugEnabled()) {
					LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge");
				}
			}
			else {
				// we are free to choose. make sure that we do not eat up too much memory for writing
				final int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);
				
				if (fractionalAuxBuffers >= MAX_NUM_WRITE_BUFFERS) {
					numWriteBuffers = noSpillingMemory ? 0 : MAX_NUM_WRITE_BUFFERS;
					numLargeRecordBuffers = handleLargeRecords ? 2*MAX_NUM_WRITE_BUFFERS : 0;
				}
				else {
					numWriteBuffers = noSpillingMemory ? 0 :
							Math.max(MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers);	// at least the lower bound
					
					numLargeRecordBuffers = handleLargeRecords ? 
							Math.max(2*MIN_NUM_WRITE_BUFFERS, fractionalAuxBuffers) // at least the lower bound
							: 0;
				}
			}
		}
		
		final int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
		final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize();
		
		// decide how many sort buffers to use
		if (numSortBuffers < 1) {
			if (sortMemory > 100 * 1024 * 1024) {
				numSortBuffers = 2;
			}
			else {
				numSortBuffers = 1;
			}
		}
		final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
		
		if (LOG.isDebugEnabled()) {
			LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (="
					+ "%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d" 
					+ " buffers for writing sorted results and merging maximally %d streams at once. "
					+ "Using %d memory segments for large record spilling.",
					sortMemPages, sortMemory, numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers,
					maxNumFileHandles, numLargeRecordBuffers));
		}
		
		
		this.sortReadMemory = memory;
		this.writeMemory = new ArrayList<MemorySegment>(numWriteBuffers);
		
		final TypeSerializer<E> serializer = serializerFactory.getSerializer();
		
		// move some pages from the sort memory to the write memory
		if (numWriteBuffers > 0) {
			for (int i = 0; i < numWriteBuffers; i++) {
				this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
			}
		}
		if (numLargeRecordBuffers > 0) {
			List<MemorySegment> mem = new ArrayList<MemorySegment>();
			for (int i = 0; i < numLargeRecordBuffers; i++) {
				mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
			}
			
			this.largeRecordHandler = new LargeRecordHandler<E>(serializer, comparator.duplicate(), 
					ioManager, memoryManager, mem, parentTask, maxNumFileHandles);
		}
		else {
			this.largeRecordHandler = null;
		}
		
		// circular queues pass buffers between the threads
		final CircularQueues<E> circularQueues = new CircularQueues<E>();

		inMemorySorters = new ArrayList<>(numSortBuffers);
		
		// allocate the sort buffers and fill empty queue with them
		final Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
		for (int i = 0; i < numSortBuffers; i++)
		{
			// grab some memory
			final List<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
			for (int k = (i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer); k > 0 && segments.hasNext(); k--) {
				sortSegments.add(segments.next());
			}
			
			final InMemorySorter<E> inMemorySorter = inMemorySorterFactory.create(sortSegments);
			inMemorySorters.add(inMemorySorter);

			// add to empty queue
			CircularElement<E> element = new CircularElement<E>(i, inMemorySorter, sortSegments);
			circularQueues.empty.add(element);
		}

		// exception handling
		ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>() {
			public void handleException(IOException exception) {
				// forward exception
				if (!closed) {
					setResultIteratorException(exception);
					close();
				}
			}
		};
		
		// create sets that track the channels we need to clean up when closing the sorter
		this.channelsToDeleteAtShutdown = new HashSet<FileIOChannel.ID>(64);
		this.openChannels = new HashSet<FileIOChannel>(64);

		// start the thread that reads the input channels
		this.readThread = getReadingThread(exceptionHandler, input, circularQueues, largeRecordHandler,
				parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));

		// start the thread that sorts the buffers
		this.sortThread = getSortingThread(exceptionHandler, circularQueues, parentTask);

		// start the thread that handles spilling to secondary storage
		this.spillThread = getSpillingThread(exceptionHandler, circularQueues, parentTask, 
				memoryManager, ioManager, serializerFactory, comparator, this.sortReadMemory, this.writeMemory, 
				maxNumFileHandles);
		
		// propagate the context class loader to the spawned threads
		ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
		if (contextLoader != null) {
			if (this.readThread != null) {
				this.readThread.setContextClassLoader(contextLoader);
			}
			if (this.sortThread != null) {
				this.sortThread.setContextClassLoader(contextLoader);
			}
			if (this.spillThread != null) {
				this.spillThread.setContextClassLoader(contextLoader);
			}
		}
		
		startThreads();
	}