in sdks/go/pkg/beam/core/runtime/graphx/translate.go [282:512]
func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
handleErr := func(err error) ([]string, error) {
return nil, errors.Wrapf(err, "failed to add input kind: %v", edge)
}
id := edgeID(edge.Edge)
if _, exists := m.transforms[id]; exists {
return []string{id}, nil
}
switch {
case edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1:
cogbkID, err := m.expandCoGBK(edge)
if err != nil {
return handleErr(err)
}
return []string{cogbkID}, nil
case edge.Edge.Op == graph.Reshuffle:
reshuffleID, err := m.expandReshuffle(edge)
if err != nil {
return handleErr(err)
}
return []string{reshuffleID}, nil
case edge.Edge.Op == graph.External:
if edge.Edge.External != nil {
if edge.Edge.External.Expanded != nil {
m.needsExpansion = true
}
}
if edge.Edge.Payload == nil {
edgeID, err := m.expandCrossLanguage(edge)
if err != nil {
return handleErr(err)
}
return []string{edgeID}, nil
}
}
inputs := make(map[string]string)
for i, in := range edge.Edge.Input {
if _, err := m.addNode(in.From); err != nil {
return handleErr(err)
}
inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
}
outputs := make(map[string]string)
for i, out := range edge.Edge.Output {
if _, err := m.addNode(out.To); err != nil {
return handleErr(err)
}
outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
}
var annotations map[string][]byte
// allPIds tracks additional PTransformIDs generated for the pipeline
var allPIds []string
var spec *pipepb.FunctionSpec
switch edge.Edge.Op {
case graph.Impulse:
spec = &pipepb.FunctionSpec{Urn: URNImpulse}
case graph.ParDo:
si := make(map[string]*pipepb.SideInput)
for i, in := range edge.Edge.Input {
switch in.Kind {
case graph.Main:
// ignore: not a side input
case graph.Singleton, graph.Slice, graph.Iter, graph.ReIter:
// The only supported form of side input is MultiMap, but we
// want just iteration. So we must manually add a fixed key,
// "", even if the input is already KV.
out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i)
coderId, err := m.coders.Add(makeBytesKeyedCoder(in.From.Coder))
if err != nil {
return handleErr(err)
}
if _, err := m.makeNode(out, coderId, in.From); err != nil {
return handleErr(err)
}
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNIterableSideInputKey,
Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
Urn: URNIterableSideInputKey,
})),
},
}
keyedID := fmt.Sprintf("%v_keyed%v", edgeID(edge.Edge), i)
keyed := &pipepb.PTransform{
UniqueName: keyedID,
Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(payload),
},
Inputs: map[string]string{"i0": nodeID(in.From)},
Outputs: map[string]string{"i0": out},
EnvironmentId: m.addDefaultEnv(),
}
m.transforms[keyedID] = keyed
allPIds = append(allPIds, keyedID)
// Fixup input map
inputs[fmt.Sprintf("i%v", i)] = out
si[fmt.Sprintf("i%v", i)] = &pipepb.SideInput{
AccessPattern: &pipepb.FunctionSpec{
Urn: URNMultimapSideInput,
},
ViewFn: &pipepb.FunctionSpec{
Urn: "foo",
},
WindowMappingFn: &pipepb.FunctionSpec{
Urn: "bar",
},
}
case graph.Map, graph.MultiMap:
return nil, errors.Errorf("not implemented")
default:
return nil, errors.Errorf("unexpected input kind: %v", edge)
}
}
mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
if err != nil {
return handleErr(err)
}
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNDoFn,
Payload: []byte(mustEncodeMultiEdge),
},
SideInputs: si,
}
if edge.Edge.DoFn.IsSplittable() {
coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
if err != nil {
return handleErr(err)
}
payload.RestrictionCoderId = coderId
m.requirements[URNRequiresSplittableDoFn] = true
}
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
annotations = edge.Edge.DoFn.Annotations()
case graph.Combine:
mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
if err != nil {
return handleErr(err)
}
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNDoFn,
Payload: []byte(mustEncodeMultiEdge),
},
}
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
case graph.Flatten:
spec = &pipepb.FunctionSpec{Urn: URNFlatten}
case graph.CoGBK:
spec = &pipepb.FunctionSpec{Urn: URNGBK}
case graph.WindowInto:
windowFn, err := makeWindowFn(edge.Edge.WindowFn)
if err != nil {
return handleErr(err)
}
payload := &pipepb.WindowIntoPayload{
WindowFn: windowFn,
}
spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}
case graph.External:
pyld := edge.Edge.Payload
spec = &pipepb.FunctionSpec{Urn: pyld.URN, Payload: pyld.Data}
if len(pyld.InputsMap) != 0 {
if got, want := len(pyld.InputsMap), len(edge.Edge.Input); got != want {
return handleErr(errors.Errorf("mismatch'd counts between External tags (%v) and inputs (%v)", got, want))
}
inputs = make(map[string]string)
for tag, in := range InboundTagToNode(pyld.InputsMap, edge.Edge.Input) {
if _, err := m.addNode(in); err != nil {
return handleErr(err)
}
inputs[tag] = nodeID(in)
}
}
if len(pyld.OutputsMap) != 0 {
if got, want := len(pyld.OutputsMap), len(edge.Edge.Output); got != want {
return handleErr(errors.Errorf("mismatch'd counts between External tags (%v) and outputs (%v)", got, want))
}
outputs = make(map[string]string)
for tag, out := range OutboundTagToNode(pyld.OutputsMap, edge.Edge.Output) {
if _, err := m.addNode(out); err != nil {
return handleErr(err)
}
outputs[tag] = nodeID(out)
}
}
default:
err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
return handleErr(err)
}
var transformEnvID = ""
if !(spec.Urn == URNGBK || spec.Urn == URNImpulse) {
transformEnvID = m.addDefaultEnv()
}
transform := &pipepb.PTransform{
UniqueName: edge.Name,
Spec: spec,
Inputs: inputs,
Outputs: outputs,
EnvironmentId: transformEnvID,
Annotations: annotations,
}
m.transforms[id] = transform
allPIds = append(allPIds, id)
return allPIds, nil
}