public void invoke()

in flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java [100:289]


	public void invoke() throws Exception {
		// --------------------------------------------------------------------
		// Initialize
		// --------------------------------------------------------------------
		LOG.debug(getLogString("Start registering input and output"));

		// initialize OutputFormat
		initOutputFormat();

		// initialize input readers
		try {
			initInputReaders();
		} catch (Exception e) {
			throw new RuntimeException("Initializing the input streams failed" +
					(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
		}

		LOG.debug(getLogString("Finished registering input and output"));

		// --------------------------------------------------------------------
		// Invoke
		// --------------------------------------------------------------------
		LOG.debug(getLogString("Starting data sink operator"));

		RuntimeContext ctx = createRuntimeContext();

		final Counter numRecordsIn;
		{
			Counter tmpNumRecordsIn;
			try {
				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
				ioMetricGroup.reuseInputMetricsForTask();
				ioMetricGroup.reuseOutputMetricsForTask();
				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				tmpNumRecordsIn = new SimpleCounter();
			}
			numRecordsIn = tmpNumRecordsIn;
		}

		if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
			((RichOutputFormat) this.format).setRuntimeContext(ctx);
			LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
		}

		ExecutionConfig executionConfig = getExecutionConfig();

		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
		
		try {
			// initialize local strategies
			MutableObjectIterator<IT> input1;
			switch (this.config.getInputLocalStrategy(0)) {
			case NONE:
				// nothing to do
				localStrategy = null;
				input1 = reader;
				break;
			case SORT:
				// initialize sort local strategy
				try {
					// get type comparator
					TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0,
							getUserCodeClassLoader());
					if (compFact == null) {
						throw new Exception("Missing comparator factory for local strategy on input " + 0);
					}
					
					// initialize sorter
					UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(
							getEnvironment().getMemoryManager(), 
							getEnvironment().getIOManager(),
							this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(),
							this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
							this.config.getSpillingThresholdInput(0),
							this.config.getUseLargeRecordHandler(),
							this.getExecutionConfig().isObjectReuseEnabled());
					
					this.localStrategy = sorter;
					input1 = sorter.getIterator();
				} catch (Exception e) {
					throw new RuntimeException("Initializing the input processing failed" +
							(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
				}
				break;
			default:
				throw new RuntimeException("Invalid local strategy for DataSinkTask");
			}
			
			// read the reader and write it to the output
			
			final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer();
			final MutableObjectIterator<IT> input = input1;
			final OutputFormat<IT> format = this.format;


			// check if task has been canceled
			if (this.taskCanceled) {
				return;
			}

			LOG.debug(getLogString("Starting to produce output"));

			// open
			format.open(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask(), this.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks());

			if (objectReuseEnabled) {
				IT record = serializer.createInstance();

				// work!
				while (!this.taskCanceled && ((record = input.next(record)) != null)) {
					numRecordsIn.inc();
					format.writeRecord(record);
				}
			} else {
				IT record;

				// work!
				while (!this.taskCanceled && ((record = input.next()) != null)) {
					numRecordsIn.inc();
					format.writeRecord(record);
				}
			}
			
			// close. We close here such that a regular close throwing an exception marks a task as failed.
			if (!this.taskCanceled) {
				this.format.close();
				this.format = null;
			}
		}
		catch (Exception ex) {
			
			// make a best effort to clean up
			try {
				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
					cleanupCalled = true;
					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
				}
			}
			catch (Throwable t) {
				LOG.error("Cleanup on error failed.", t);
			}
			
			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

			if (ex instanceof CancelTaskException) {
				// forward canceling exception
				throw ex;
			}
			// drop, if the task was canceled
			else if (!this.taskCanceled) {
				if (LOG.isErrorEnabled()) {
					LOG.error(getLogString("Error in user code: " + ex.getMessage()), ex);
				}
				throw ex;
			}
		}
		finally {
			if (this.format != null) {
				// close format, if it has not been closed, yet.
				// This should only be the case if we had a previous error, or were canceled.
				try {
					this.format.close();
				}
				catch (Throwable t) {
					if (LOG.isWarnEnabled()) {
						LOG.warn(getLogString("Error closing the output format"), t);
					}
				}
			}
			// close local strategy if necessary
			if (localStrategy != null) {
				try {
					this.localStrategy.close();
				} catch (Throwable t) {
					LOG.error("Error closing local strategy", t);
				}
			}

			BatchTask.clearReaders(new MutableReader<?>[]{inputReader});
		}

		if (!this.taskCanceled) {
			LOG.debug(getLogString("Finished data sink operator"));
		}
		else {
			LOG.debug(getLogString("Data sink operator cancelled"));
		}
	}