func()

in sdks/go/pkg/beam/core/runtime/exec/translate.go [317:563]


func (b *builder) makeLink(from string, id linkID) (Node, error) {
	if n, ok := b.links[id]; ok {
		return n, nil
	}

	// Process all incoming links for the edge and cache them. It thus doesn't matter
	// which exact link triggers the Node generation. The link caching is only needed
	// to process ParDo side inputs.

	transform := b.desc.GetTransforms()[id.to]
	urn := transform.GetSpec().GetUrn()
	payload := transform.GetSpec().GetPayload()

	// TODO(herohde) 1/25/2018: do we need to handle composites?

	out, err := b.makePCollections(unmarshalKeyedValues(transform.GetOutputs()))
	if err != nil {
		return nil, err
	}

	var u Node
	switch urn {
	case graphx.URNParDo,
		urnPerKeyCombinePre,
		urnPerKeyCombineMerge,
		urnPerKeyCombineExtract,
		urnPerKeyCombineConvert,
		urnPairWithRestriction,
		urnSplitAndSizeRestrictions,
		urnProcessSizedElementsAndRestrictions:
		var data string
		switch urn {
		case graphx.URNParDo,
			urnPairWithRestriction,
			urnSplitAndSizeRestrictions,
			urnProcessSizedElementsAndRestrictions:
			var pardo pipepb.ParDoPayload
			if err := proto.Unmarshal(payload, &pardo); err != nil {
				return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform)
			}
			data = string(pardo.GetDoFn().GetPayload())
		case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
			var cmb pipepb.CombinePayload
			if err := proto.Unmarshal(payload, &cmb); err != nil {
				return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform)
			}
			data = string(cmb.GetCombineFn().GetPayload())
		default:
			// TODO(herohde) 12/4/2017: we see DoFns directly with Dataflow. Handle that
			// case here, for now, so that the harness can use this logic.

			data = string(payload)
		}

		// TODO(herohde) 1/28/2018: Once Dataflow's fully off the old way,
		// we can simply switch on the ParDo DoFn URN directly.

		var tp v1pb.TransformPayload
		if err := protox.DecodeBase64(data, &tp); err != nil {
			return nil, errors.Wrapf(err, "invalid transform payload for %v", transform)
		}

		switch tpUrn := tp.GetUrn(); tpUrn {
		case graphx.URNDoFn:
			op, fn, _, in, _, err := graphx.DecodeMultiEdge(tp.GetEdge())
			if err != nil {
				return nil, err
			}

			switch op {
			case graph.ParDo:
				dofn, err := graph.AsDoFn(fn, graph.MainUnknown)
				if err != nil {
					return nil, err
				}
				switch urn {
				case urnPairWithRestriction:
					u = &PairWithRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
				case urnSplitAndSizeRestrictions:
					u = &SplitAndSizeRestrictions{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
				default:
					n := &ParDo{UID: b.idgen.New(), Fn: dofn, Inbound: in, Out: out}
					n.PID = transform.GetUniqueName()

					input := unmarshalKeyedValues(transform.GetInputs())
					for i := 1; i < len(input); i++ {
						// TODO(herohde) 8/8/2018: handle different windows, view_fn and window_mapping_fn.
						// For now, assume we don't need any information in the pardo payload.

						ec, wc, err := b.makeCoderForPCollection(input[i])
						if err != nil {
							return nil, err
						}

						sid := StreamID{
							Port:         Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
							PtransformID: id.to,
						}
						sideInputID := fmt.Sprintf("i%v", i) // SideInputID (= local id, "iN")
						side := NewSideInputAdapter(sid, sideInputID, coder.NewW(ec, wc))
						n.Side = append(n.Side, side)
					}
					u = n
					if urn == urnProcessSizedElementsAndRestrictions {
						u = &ProcessSizedElementsAndRestrictions{PDo: n, TfId: id.to}
					} else if dofn.IsSplittable() {
						u = &SdfFallback{PDo: n}
					}
				}

			case graph.Combine:
				cn := &Combine{UID: b.idgen.New(), Out: out[0]}
				cn.Fn, err = graph.AsCombineFn(fn)
				if err != nil {
					return nil, err
				}
				cn.UsesKey = typex.IsKV(in[0].Type)

				cn.PID = transform.GetUniqueName()

				switch urn {
				case urnPerKeyCombinePre:
					inputs := unmarshalKeyedValues(transform.GetInputs())
					if len(inputs) != 1 {
						return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
					}
					ec, _, err := b.makeCoderForPCollection(inputs[0])
					if err != nil {
						return nil, err
					}
					if !coder.IsKV(ec) {
						return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
					}
					u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0]}
				case urnPerKeyCombineMerge:
					u = &MergeAccumulators{Combine: cn}
				case urnPerKeyCombineExtract:
					u = &ExtractOutput{Combine: cn}
				case urnPerKeyCombineConvert:
					u = &ConvertToAccumulators{Combine: cn}
				default: // For unlifted combines
					u = cn
				}
			default:
				panic(fmt.Sprintf("Opcode should be one of ParDo or Combine, but it is: %v", op))
			}

		case graphx.URNIterableSideInputKey:
			u = &FixedKey{UID: b.idgen.New(), Key: []byte(iterableSideInputKey), Out: out[0]}

		case graphx.URNInject:
			c, _, err := b.makeCoderForPCollection(from)
			if err != nil {
				return nil, err
			}
			if !coder.IsKV(c) {
				return nil, errors.Errorf("unexpected inject coder: %v", c)
			}
			u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}

		case graphx.URNExpand:
			var pid string
			for _, id := range transform.GetOutputs() {
				pid = id
			}
			c, _, err := b.makeCoderForPCollection(pid)
			if err != nil {
				return nil, err
			}
			if !coder.IsCoGBK(c) {
				return nil, errors.Errorf("unexpected expand coder: %v", c)
			}

			var decoders []ElementDecoder
			for _, dc := range c.Components[1:] {
				decoders = append(decoders, MakeElementDecoder(dc))
			}
			u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: out[0]}

		case graphx.URNReshuffleInput:
			c, w, err := b.makeCoderForPCollection(from)
			if err != nil {
				return nil, err
			}
			u = &ReshuffleInput{UID: b.idgen.New(), Seed: rand.Int63(), Coder: coder.NewW(c, w), Out: out[0]}

		case graphx.URNReshuffleOutput:
			var pid string
			// There's only one output PCollection, and iterating through the map
			// is the only way to extract it.
			for _, id := range transform.GetOutputs() {
				pid = id
			}
			c, w, err := b.makeCoderForPCollection(pid)
			if err != nil {
				return nil, err
			}
			u = &ReshuffleOutput{UID: b.idgen.New(), Coder: coder.NewW(c, w), Out: out[0]}

		default:
			return nil, errors.Errorf("unexpected payload: %v", tp)
		}

	case graphx.URNWindow:
		var wp pipepb.WindowIntoPayload
		if err := proto.Unmarshal(payload, &wp); err != nil {
			return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform)
		}
		wfn, err := unmarshalWindowFn(wp.GetWindowFn())
		if err != nil {
			return nil, err
		}
		u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}

	case graphx.URNFlatten:
		u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}

		// Use the same flatten instance for all the inputs links to this transform.
		for i := 0; i < len(transform.Inputs); i++ {
			b.links[linkID{id.to, i}] = u
		}

	case urnDataSink:
		port, cid, err := unmarshalPort(payload)
		if err != nil {
			return nil, err
		}

		sink := &DataSink{UID: b.idgen.New()}
		sink.SID = StreamID{PtransformID: id.to, Port: port}
		sink.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder
		if err != nil {
			return nil, err
		}
		if !coder.IsW(sink.Coder) {
			return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
		}
		u = sink

	default:
		panic(fmt.Sprintf("Unexpected transform URN: %v", urn))
	}

	b.links[id] = u
	b.units = append(b.units, u)
	return u, nil
}