func()

in sdks/go/pkg/beam/core/runtime/graphx/translate.go [282:512]


func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
	handleErr := func(err error) ([]string, error) {
		return nil, errors.Wrapf(err, "failed to add input kind: %v", edge)
	}
	id := edgeID(edge.Edge)
	if _, exists := m.transforms[id]; exists {
		return []string{id}, nil
	}

	switch {
	case edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1:
		cogbkID, err := m.expandCoGBK(edge)
		if err != nil {
			return handleErr(err)
		}
		return []string{cogbkID}, nil
	case edge.Edge.Op == graph.Reshuffle:
		reshuffleID, err := m.expandReshuffle(edge)
		if err != nil {
			return handleErr(err)
		}
		return []string{reshuffleID}, nil
	case edge.Edge.Op == graph.External:
		if edge.Edge.External != nil {
			if edge.Edge.External.Expanded != nil {
				m.needsExpansion = true
			}
		}
		if edge.Edge.Payload == nil {
			edgeID, err := m.expandCrossLanguage(edge)
			if err != nil {
				return handleErr(err)
			}
			return []string{edgeID}, nil
		}
	}

	inputs := make(map[string]string)
	for i, in := range edge.Edge.Input {
		if _, err := m.addNode(in.From); err != nil {
			return handleErr(err)
		}
		inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
	}
	outputs := make(map[string]string)
	for i, out := range edge.Edge.Output {
		if _, err := m.addNode(out.To); err != nil {
			return handleErr(err)
		}
		outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
	}
	var annotations map[string][]byte

	// allPIds tracks additional PTransformIDs generated for the pipeline
	var allPIds []string
	var spec *pipepb.FunctionSpec
	switch edge.Edge.Op {
	case graph.Impulse:
		spec = &pipepb.FunctionSpec{Urn: URNImpulse}

	case graph.ParDo:
		si := make(map[string]*pipepb.SideInput)
		for i, in := range edge.Edge.Input {
			switch in.Kind {
			case graph.Main:
				// ignore: not a side input

			case graph.Singleton, graph.Slice, graph.Iter, graph.ReIter:
				// The only supported form of side input is MultiMap, but we
				// want just iteration. So we must manually add a fixed key,
				// "", even if the input is already KV.

				out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i)
				coderId, err := m.coders.Add(makeBytesKeyedCoder(in.From.Coder))
				if err != nil {
					return handleErr(err)
				}
				if _, err := m.makeNode(out, coderId, in.From); err != nil {
					return handleErr(err)
				}

				payload := &pipepb.ParDoPayload{
					DoFn: &pipepb.FunctionSpec{
						Urn: URNIterableSideInputKey,
						Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
							Urn: URNIterableSideInputKey,
						})),
					},
				}

				keyedID := fmt.Sprintf("%v_keyed%v", edgeID(edge.Edge), i)
				keyed := &pipepb.PTransform{
					UniqueName: keyedID,
					Spec: &pipepb.FunctionSpec{
						Urn:     URNParDo,
						Payload: protox.MustEncode(payload),
					},
					Inputs:        map[string]string{"i0": nodeID(in.From)},
					Outputs:       map[string]string{"i0": out},
					EnvironmentId: m.addDefaultEnv(),
				}
				m.transforms[keyedID] = keyed
				allPIds = append(allPIds, keyedID)

				// Fixup input map
				inputs[fmt.Sprintf("i%v", i)] = out

				si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
					AccessPattern: &pipepb.FunctionSpec{
						Urn: URNMultimapSideInput,
					},
					ViewFn: &pipepb.FunctionSpec{
						Urn: "foo",
					},
					WindowMappingFn: &pipepb.FunctionSpec{
						Urn: "bar",
					},
				}

			case graph.Map, graph.MultiMap:
				return nil, errors.Errorf("not implemented")

			default:
				return nil, errors.Errorf("unexpected input kind: %v", edge)
			}
		}

		mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
		if err != nil {
			return handleErr(err)
		}

		payload := &pipepb.ParDoPayload{
			DoFn: &pipepb.FunctionSpec{
				Urn:     URNDoFn,
				Payload: []byte(mustEncodeMultiEdge),
			},
			SideInputs: si,
		}
		if edge.Edge.DoFn.IsSplittable() {
			coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
			if err != nil {
				return handleErr(err)
			}
			payload.RestrictionCoderId = coderId
			m.requirements[URNRequiresSplittableDoFn] = true
		}
		spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
		annotations = edge.Edge.DoFn.Annotations()

	case graph.Combine:
		mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
		if err != nil {
			return handleErr(err)
		}
		payload := &pipepb.ParDoPayload{
			DoFn: &pipepb.FunctionSpec{
				Urn:     URNDoFn,
				Payload: []byte(mustEncodeMultiEdge),
			},
		}
		spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}

	case graph.Flatten:
		spec = &pipepb.FunctionSpec{Urn: URNFlatten}

	case graph.CoGBK:
		spec = &pipepb.FunctionSpec{Urn: URNGBK}

	case graph.WindowInto:
		windowFn, err := makeWindowFn(edge.Edge.WindowFn)
		if err != nil {
			return handleErr(err)
		}
		payload := &pipepb.WindowIntoPayload{
			WindowFn: windowFn,
		}
		spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}

	case graph.External:
		pyld := edge.Edge.Payload
		spec = &pipepb.FunctionSpec{Urn: pyld.URN, Payload: pyld.Data}

		if len(pyld.InputsMap) != 0 {
			if got, want := len(pyld.InputsMap), len(edge.Edge.Input); got != want {
				return handleErr(errors.Errorf("mismatch'd counts between External tags (%v) and inputs (%v)", got, want))
			}
			inputs = make(map[string]string)
			for tag, in := range InboundTagToNode(pyld.InputsMap, edge.Edge.Input) {
				if _, err := m.addNode(in); err != nil {
					return handleErr(err)
				}
				inputs[tag] = nodeID(in)
			}
		}

		if len(pyld.OutputsMap) != 0 {
			if got, want := len(pyld.OutputsMap), len(edge.Edge.Output); got != want {
				return handleErr(errors.Errorf("mismatch'd counts between External tags (%v) and outputs (%v)", got, want))
			}
			outputs = make(map[string]string)
			for tag, out := range OutboundTagToNode(pyld.OutputsMap, edge.Edge.Output) {
				if _, err := m.addNode(out); err != nil {
					return handleErr(err)
				}
				outputs[tag] = nodeID(out)
			}
		}

	default:
		err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
		return handleErr(err)
	}

	var transformEnvID = ""
	if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
		transformEnvID = m.addDefaultEnv()
	}

	transform := &pipepb.PTransform{
		UniqueName:    edge.Name,
		Spec:          spec,
		Inputs:        inputs,
		Outputs:       outputs,
		EnvironmentId: transformEnvID,
		Annotations:   annotations,
	}
	m.transforms[id] = transform
	allPIds = append(allPIds, id)
	return allPIds, nil
}