in sdks/go/pkg/beam/core/runtime/graphx/translate.go [732:894]
func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
handleErr := func(err error) (string, error) {
return "", errors.Wrapf(err, "failed to expand Reshuffle transform for edge: %v", edge)
}
id := edgeID(edge.Edge)
kvCoder, err := makeUnionCoder()
if err != nil {
return handleErr(err)
}
kvCoderID, err := m.coders.Add(kvCoder)
if err != nil {
return handleErr(err)
}
gbkCoderID, err := m.coders.Add(coder.NewCoGBK(kvCoder.Components))
if err != nil {
return handleErr(err)
}
var subtransforms []string
in := edge.Edge.Input[0]
origInput, err := m.addNode(in.From)
if err != nil {
return handleErr(err)
}
// We need to preserve the old windowing/triggering here
// for re-instatement after the GBK.
preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
// Get the windowing strategy from before:
postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
if _, err := m.makeNode(postReify, kvCoderID, in.From); err != nil {
return handleErr(err)
}
// We need to replace postReify's windowing strategy with one appropriate
// for reshuffles.
{
wfn := window.NewGlobalWindows()
windowFn, err := makeWindowFn(wfn)
if err != nil {
return handleErr(err)
}
coderId, err := makeWindowCoder(wfn)
if err != nil {
return handleErr(err)
}
windowCoderId, err := m.coders.AddWindowCoder(coderId)
if err != nil {
return handleErr(err)
}
m.pcollections[postReify].WindowingStrategyId =
m.internWindowingStrategy(&pipepb.WindowingStrategy{
// Not segregated by time...
WindowFn: windowFn,
// ...output after every element is received...
Trigger: &pipepb.Trigger{
Trigger: &pipepb.Trigger_Always_{
Always: &pipepb.Trigger_Always{},
},
},
// ...and after outputing, discard the output elements...
AccumulationMode: pipepb.AccumulationMode_DISCARDING,
// ...and since every pane should have 1 element,
// try to preserve the timestamp.
OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
// Defaults copied from marshalWindowingStrategy.
// TODO(BEAM-3304): migrate to user side operations once trigger support is in.
EnvironmentId: m.addDefaultEnv(),
MergeStatus: pipepb.MergeStatus_NON_MERGING,
WindowCoderId: windowCoderId,
ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
AllowedLateness: 0,
OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
})
}
// Inputs (i)
inputID := fmt.Sprintf("%v_reifyts", id)
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNReshuffleInput,
Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
Urn: URNReshuffleInput,
})),
},
}
input := &pipepb.PTransform{
UniqueName: inputID,
Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(payload),
},
Inputs: map[string]string{"i0": nodeID(in.From)},
Outputs: map[string]string{"i0": postReify},
EnvironmentId: m.addDefaultEnv(),
}
m.transforms[inputID] = input
subtransforms = append(subtransforms, inputID)
outNode := edge.Edge.Output[0].To
// GBK
gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
return handleErr(err)
}
gbkID := fmt.Sprintf("%v_gbk", id)
gbk := &pipepb.PTransform{
UniqueName: gbkID,
Spec: &pipepb.FunctionSpec{Urn: URNGBK},
Inputs: map[string]string{"i0": postReify},
Outputs: map[string]string{"i0": gbkOut},
}
m.transforms[gbkID] = gbk
subtransforms = append(subtransforms, gbkID)
// Expand
outPCol, err := m.addNode(outNode)
if err != nil {
return handleErr(err)
}
m.pcollections[outPCol].WindowingStrategyId = preservedWSId
outputID := fmt.Sprintf("%v_unreify", id)
outputPayload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNReshuffleOutput,
Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
Urn: URNReshuffleOutput,
})),
},
}
output := &pipepb.PTransform{
UniqueName: outputID,
Spec: &pipepb.FunctionSpec{
Urn: URNParDo,
Payload: protox.MustEncode(outputPayload),
},
Inputs: map[string]string{"i0": gbkOut},
Outputs: map[string]string{"i0": nodeID(outNode)},
EnvironmentId: m.addDefaultEnv(),
}
m.transforms[id] = output
subtransforms = append(subtransforms, id)
// Add composite for visualization, or runner optimization
reshuffleID := fmt.Sprintf("%v_reshuffle", id)
m.transforms[reshuffleID] = &pipepb.PTransform{
UniqueName: edge.Name,
Subtransforms: subtransforms,
Spec: &pipepb.FunctionSpec{
Urn: URNReshuffle,
},
EnvironmentId: m.addDefaultEnv(),
}
return reshuffleID, nil
}