in sdks/go/pkg/beam/core/runtime/exec/translate.go [317:563]
func (b *builder) makeLink(from string, id linkID) (Node, error) {
if n, ok := b.links[id]; ok {
return n, nil
}
// Process all incoming links for the edge and cache them. It thus doesn't matter
// which exact link triggers the Node generation. The link caching is only needed
// to process ParDo side inputs.
transform := b.desc.GetTransforms()[id.to]
urn := transform.GetSpec().GetUrn()
payload := transform.GetSpec().GetPayload()
// TODO(herohde) 1/25/2018: do we need to handle composites?
out, err := b.makePCollections(unmarshalKeyedValues(transform.GetOutputs()))
if err != nil {
return nil, err
}
var u Node
switch urn {
case graphx.URNParDo,
urnPerKeyCombinePre,
urnPerKeyCombineMerge,
urnPerKeyCombineExtract,
urnPerKeyCombineConvert,
urnPairWithRestriction,
urnSplitAndSizeRestrictions,
urnProcessSizedElementsAndRestrictions:
var data string
switch urn {
case graphx.URNParDo,
urnPairWithRestriction,
urnSplitAndSizeRestrictions,
urnProcessSizedElementsAndRestrictions:
var pardo pipepb.ParDoPayload
if err := proto.Unmarshal(payload, &pardo); err != nil {
return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform)
}
data = string(pardo.GetDoFn().GetPayload())
case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
var cmb pipepb.CombinePayload
if err := proto.Unmarshal(payload, &cmb); err != nil {
return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform)
}
data = string(cmb.GetCombineFn().GetPayload())
default:
// TODO(herohde) 12/4/2017: we see DoFns directly with Dataflow. Handle that
// case here, for now, so that the harness can use this logic.
data = string(payload)
}
// TODO(herohde) 1/28/2018: Once Dataflow's fully off the old way,
// we can simply switch on the ParDo DoFn URN directly.
var tp v1pb.TransformPayload
if err := protox.DecodeBase64(data, &tp); err != nil {
return nil, errors.Wrapf(err, "invalid transform payload for %v", transform)
}
switch tpUrn := tp.GetUrn(); tpUrn {
case graphx.URNDoFn:
op, fn, _, in, _, err := graphx.DecodeMultiEdge(tp.GetEdge())
if err != nil {
return nil, err
}
switch op {
case graph.ParDo:
dofn, err := graph.AsDoFn(fn, graph.MainUnknown)
if err != nil {
return nil, err
}
switch urn {
case urnPairWithRestriction:
u = &PairWithRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
case urnSplitAndSizeRestrictions:
u = &SplitAndSizeRestrictions{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
default:
n := &ParDo{UID: b.idgen.New(), Fn: dofn, Inbound: in, Out: out}
n.PID = transform.GetUniqueName()
input := unmarshalKeyedValues(transform.GetInputs())
for i := 1; i < len(input); i++ {
// TODO(herohde) 8/8/2018: handle different windows, view_fn and window_mapping_fn.
// For now, assume we don't need any information in the pardo payload.
ec, wc, err := b.makeCoderForPCollection(input[i])
if err != nil {
return nil, err
}
sid := StreamID{
Port: Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
PtransformID: id.to,
}
sideInputID := fmt.Sprintf("i%v", i) // SideInputID (= local id, "iN")
side := NewSideInputAdapter(sid, sideInputID, coder.NewW(ec, wc))
n.Side = append(n.Side, side)
}
u = n
if urn == urnProcessSizedElementsAndRestrictions {
u = &ProcessSizedElementsAndRestrictions{PDo: n, TfId: id.to}
} else if dofn.IsSplittable() {
u = &SdfFallback{PDo: n}
}
}
case graph.Combine:
cn := &Combine{UID: b.idgen.New(), Out: out[0]}
cn.Fn, err = graph.AsCombineFn(fn)
if err != nil {
return nil, err
}
cn.UsesKey = typex.IsKV(in[0].Type)
cn.PID = transform.GetUniqueName()
switch urn {
case urnPerKeyCombinePre:
inputs := unmarshalKeyedValues(transform.GetInputs())
if len(inputs) != 1 {
return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
}
ec, _, err := b.makeCoderForPCollection(inputs[0])
if err != nil {
return nil, err
}
if !coder.IsKV(ec) {
return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
}
u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0]}
case urnPerKeyCombineMerge:
u = &MergeAccumulators{Combine: cn}
case urnPerKeyCombineExtract:
u = &ExtractOutput{Combine: cn}
case urnPerKeyCombineConvert:
u = &ConvertToAccumulators{Combine: cn}
default: // For unlifted combines
u = cn
}
default:
panic(fmt.Sprintf("Opcode should be one of ParDo or Combine, but it is: %v", op))
}
case graphx.URNIterableSideInputKey:
u = &FixedKey{UID: b.idgen.New(), Key: []byte(iterableSideInputKey), Out: out[0]}
case graphx.URNInject:
c, _, err := b.makeCoderForPCollection(from)
if err != nil {
return nil, err
}
if !coder.IsKV(c) {
return nil, errors.Errorf("unexpected inject coder: %v", c)
}
u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
case graphx.URNExpand:
var pid string
for _, id := range transform.GetOutputs() {
pid = id
}
c, _, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
if !coder.IsCoGBK(c) {
return nil, errors.Errorf("unexpected expand coder: %v", c)
}
var decoders []ElementDecoder
for _, dc := range c.Components[1:] {
decoders = append(decoders, MakeElementDecoder(dc))
}
u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: out[0]}
case graphx.URNReshuffleInput:
c, w, err := b.makeCoderForPCollection(from)
if err != nil {
return nil, err
}
u = &ReshuffleInput{UID: b.idgen.New(), Seed: rand.Int63(), Coder: coder.NewW(c, w), Out: out[0]}
case graphx.URNReshuffleOutput:
var pid string
// There's only one output PCollection, and iterating through the map
// is the only way to extract it.
for _, id := range transform.GetOutputs() {
pid = id
}
c, w, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
u = &ReshuffleOutput{UID: b.idgen.New(), Coder: coder.NewW(c, w), Out: out[0]}
default:
return nil, errors.Errorf("unexpected payload: %v", tp)
}
case graphx.URNWindow:
var wp pipepb.WindowIntoPayload
if err := proto.Unmarshal(payload, &wp); err != nil {
return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform)
}
wfn, err := unmarshalWindowFn(wp.GetWindowFn())
if err != nil {
return nil, err
}
u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}
case graphx.URNFlatten:
u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}
// Use the same flatten instance for all the inputs links to this transform.
for i := 0; i < len(transform.Inputs); i++ {
b.links[linkID{id.to, i}] = u
}
case urnDataSink:
port, cid, err := unmarshalPort(payload)
if err != nil {
return nil, err
}
sink := &DataSink{UID: b.idgen.New()}
sink.SID = StreamID{PtransformID: id.to, Port: port}
sink.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder
if err != nil {
return nil, err
}
if !coder.IsW(sink.Coder) {
return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
}
u = sink
default:
panic(fmt.Sprintf("Unexpected transform URN: %v", urn))
}
b.links[id] = u
b.units = append(b.units, u)
return u, nil
}