public List getAlternativePlans()

in flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java [299:523]


	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
		// check if we have a cached version
		if (this.cachedPlans != null) {
			return this.cachedPlans;
		}

		boolean childrenSkippedDueToReplicatedInput = false;

		// step down to all producer nodes and calculate alternative plans
		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);

		// calculate alternative sub-plans for predecessor
		final Set<RequestedGlobalProperties> intGlobal1 = this.input1.getInterestingProperties().getGlobalProperties();
		final Set<RequestedGlobalProperties> intGlobal2 = this.input2.getInterestingProperties().getGlobalProperties();
		
		// calculate alternative sub-plans for broadcast inputs
		final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
		List<DagConnection> broadcastConnections = getBroadcastConnections();
		List<String> broadcastConnectionNames = getBroadcastConnectionNames();

		for (int i = 0; i < broadcastConnections.size(); i++ ) {
			DagConnection broadcastConnection = broadcastConnections.get(i);
			String broadcastConnectionName = broadcastConnectionNames.get(i);
			List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);

			// wrap the plan candidates in named channels
			HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
			for (PlanNode plan: broadcastPlanCandidates) {
				final NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
				DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
											ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
				c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
				broadcastChannels.add(c);
			}
			broadcastPlanChannels.add(broadcastChannels);
		}
		
		final GlobalPropertiesPair[] allGlobalPairs;
		final LocalPropertiesPair[] allLocalPairs;
		{
			Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>();
			Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>();
			for (OperatorDescriptorDual ods : getProperties()) {
				pairsGlob.addAll(ods.getPossibleGlobalProperties());
				pairsLoc.addAll(ods.getPossibleLocalProperties());
			}
			allGlobalPairs = pairsGlob.toArray(new GlobalPropertiesPair[pairsGlob.size()]);
			allLocalPairs = pairsLoc.toArray(new LocalPropertiesPair[pairsLoc.size()]);
		}
		
		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();

		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();

		final int parallelism = getParallelism();
		final int inParallelism1 = getFirstPredecessorNode().getParallelism();
		final int inParallelism2 = getSecondPredecessorNode().getParallelism();

		final boolean dopChange1 = parallelism != inParallelism1;
		final boolean dopChange2 = parallelism != inParallelism2;

		final boolean input1breaksPipeline = this.input1.isBreakingPipeline();
		final boolean input2breaksPipeline = this.input2.isBreakingPipeline();

		// enumerate all pairwise combination of the children's plans together with
		// all possible operator strategy combination
		
		// create all candidates
		for (PlanNode child1 : subPlans1) {

			if (child1.getGlobalProperties().isFullyReplicated()) {
				// fully replicated input is always locally forwarded if parallelism is not changed
				if (dopChange1) {
					// can not continue with this child
					childrenSkippedDueToReplicatedInput = true;
					continue;
				} else {
					this.input1.setShipStrategy(ShipStrategyType.FORWARD);
				}
			}

			for (PlanNode child2 : subPlans2) {

				if (child2.getGlobalProperties().isFullyReplicated()) {
					// fully replicated input is always locally forwarded if parallelism is not changed
					if (dopChange2) {
						// can not continue with this child
						childrenSkippedDueToReplicatedInput = true;
						continue;
					} else {
						this.input2.setShipStrategy(ShipStrategyType.FORWARD);
					}
				}
				
				// check that the children go together. that is the case if they build upon the same
				// candidate at the joined branch plan. 
				if (!areBranchCompatible(child1, child2)) {
					continue;
				}
				
				for (RequestedGlobalProperties igps1: intGlobal1) {
					// create a candidate channel for the first input. mark it cached, if the connection says so
					final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
					if (this.input1.getShipStrategy() == null) {
						// free to choose the ship strategy
						igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline);
						
						// if the parallelism changed, make sure that we cancel out properties, unless the
						// ship strategy preserves/establishes them even under changing parallelisms
						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
							c1.getGlobalProperties().reset();
						}
					}
					else {
						// ship strategy fixed by compiler hint
						ShipStrategyType shipType = this.input1.getShipStrategy();
						DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline);
						if (this.keys1 != null) {
							c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode);
						}
						else {
							c1.setShipStrategy(shipType, exMode);
						}
						
						if (dopChange1) {
							c1.adjustGlobalPropertiesForFullParallelismChange();
						}
					}
					
					for (RequestedGlobalProperties igps2: intGlobal2) {
						// create a candidate channel for the second input. mark it cached, if the connection says so
						final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
						if (this.input2.getShipStrategy() == null) {
							// free to choose the ship strategy
							igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
							
							// if the parallelism changed, make sure that we cancel out properties, unless the
							// ship strategy preserves/establishes them even under changing parallelisms
							if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
								c2.getGlobalProperties().reset();
							}
						} else {
							// ship strategy fixed by compiler hint
							ShipStrategyType shipType = this.input2.getShipStrategy();
							DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline);
							if (this.keys2 != null) {
								c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode);
							} else {
								c2.setShipStrategy(shipType, exMode);
							}
							
							if (dopChange2) {
								c2.adjustGlobalPropertiesForFullParallelismChange();
							}
						}
						
						/* ********************************************************************
						 * NOTE: Depending on how we proceed with different partitioning,
						 *       we might at some point need a compatibility check between
						 *       the pairs of global properties.
						 * *******************************************************************/
						
						outer:
						for (GlobalPropertiesPair gpp : allGlobalPairs) {
							if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
								gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
							{
								for (OperatorDescriptorDual desc : getProperties()) {
									if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), 
											c1.getGlobalProperties(), c2.getGlobalProperties()))
									{
										Channel c1Clone = c1.clone();
										c1Clone.setRequiredGlobalProps(gpp.getProperties1());
										c2.setRequiredGlobalProps(gpp.getProperties2());
										
										// we form a valid combination, so create the local candidates
										// for this
										addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2,
																			outputPlans, allLocalPairs, estimator);
										break outer;
									}
								}
							}
						}
						
						// break the loop over input2's possible global properties, if the property
						// is fixed via a hint. All the properties are overridden by the hint anyways,
						// so we can stop after the first
						if (this.input2.getShipStrategy() != null) {
							break;
						}
					}
					
					// break the loop over input1's possible global properties, if the property
					// is fixed via a hint. All the properties are overridden by the hint anyways,
					// so we can stop after the first
					if (this.input1.getShipStrategy() != null) {
						break;
					}
				}
			}
		}

		if(outputPlans.isEmpty()) {
			if(childrenSkippedDueToReplicatedInput) {
				throw new CompilerException("No plan meeting the requirements could be created @ " + this
											+ ". Most likely reason: Invalid use of replicated input.");
			} else {
				throw new CompilerException("No plan meeting the requirements could be created @ " + this
											+ ". Most likely reason: Too restrictive plan hints.");
			}
		}

		// cost and prune the plans
		for (PlanNode node : outputPlans) {
			estimator.costOperator(node);
		}
		prunePlanAlternatives(outputPlans);
		outputPlans.trimToSize();

		this.cachedPlans = outputPlans;
		return outputPlans;
	}