func()

in sdks/go/pkg/beam/core/runtime/graphx/translate.go [732:894]


func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
	handleErr := func(err error) (string, error) {
		return "", errors.Wrapf(err, "failed to expand Reshuffle transform for edge: %v", edge)
	}
	id := edgeID(edge.Edge)
	kvCoder, err := makeUnionCoder()
	if err != nil {
		return handleErr(err)
	}
	kvCoderID, err := m.coders.Add(kvCoder)
	if err != nil {
		return handleErr(err)
	}
	gbkCoderID, err := m.coders.Add(coder.NewCoGBK(kvCoder.Components))
	if err != nil {
		return handleErr(err)
	}

	var subtransforms []string

	in := edge.Edge.Input[0]

	origInput, err := m.addNode(in.From)
	if err != nil {
		return handleErr(err)
	}
	// We need to preserve the old windowing/triggering here
	// for re-instatement after the GBK.
	preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()

	// Get the windowing strategy from before:
	postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
	if _, err := m.makeNode(postReify, kvCoderID, in.From); err != nil {
		return handleErr(err)
	}

	// We need to replace postReify's windowing strategy with one appropriate
	// for reshuffles.
	{
		wfn := window.NewGlobalWindows()
		windowFn, err := makeWindowFn(wfn)
		if err != nil {
			return handleErr(err)
		}
		coderId, err := makeWindowCoder(wfn)
		if err != nil {
			return handleErr(err)
		}
		windowCoderId, err := m.coders.AddWindowCoder(coderId)
		if err != nil {
			return handleErr(err)
		}
		m.pcollections[postReify].WindowingStrategyId =
			m.internWindowingStrategy(&pipepb.WindowingStrategy{
				// Not segregated by time...
				WindowFn: windowFn,
				// ...output after every element is received...
				Trigger: &pipepb.Trigger{
					Trigger: &pipepb.Trigger_Always_{
						Always: &pipepb.Trigger_Always{},
					},
				},
				// ...and after outputing, discard the output elements...
				AccumulationMode: pipepb.AccumulationMode_DISCARDING,
				// ...and since every pane should have 1 element,
				// try to preserve the timestamp.
				OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
				// Defaults copied from marshalWindowingStrategy.
				// TODO(BEAM-3304): migrate to user side operations once trigger support is in.
				EnvironmentId:   m.addDefaultEnv(),
				MergeStatus:     pipepb.MergeStatus_NON_MERGING,
				WindowCoderId:   windowCoderId,
				ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
				AllowedLateness: 0,
				OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
			})
	}

	// Inputs (i)

	inputID := fmt.Sprintf("%v_reifyts", id)
	payload := &pipepb.ParDoPayload{
		DoFn: &pipepb.FunctionSpec{
			Urn: URNReshuffleInput,
			Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
				Urn: URNReshuffleInput,
			})),
		},
	}
	input := &pipepb.PTransform{
		UniqueName: inputID,
		Spec: &pipepb.FunctionSpec{
			Urn:     URNParDo,
			Payload: protox.MustEncode(payload),
		},
		Inputs:        map[string]string{"i0": nodeID(in.From)},
		Outputs:       map[string]string{"i0": postReify},
		EnvironmentId: m.addDefaultEnv(),
	}
	m.transforms[inputID] = input
	subtransforms = append(subtransforms, inputID)

	outNode := edge.Edge.Output[0].To

	// GBK

	gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
	if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
		return handleErr(err)
	}

	gbkID := fmt.Sprintf("%v_gbk", id)
	gbk := &pipepb.PTransform{
		UniqueName: gbkID,
		Spec:       &pipepb.FunctionSpec{Urn: URNGBK},
		Inputs:     map[string]string{"i0": postReify},
		Outputs:    map[string]string{"i0": gbkOut},
	}
	m.transforms[gbkID] = gbk
	subtransforms = append(subtransforms, gbkID)

	// Expand

	outPCol, err := m.addNode(outNode)
	if err != nil {
		return handleErr(err)
	}
	m.pcollections[outPCol].WindowingStrategyId = preservedWSId

	outputID := fmt.Sprintf("%v_unreify", id)
	outputPayload := &pipepb.ParDoPayload{
		DoFn: &pipepb.FunctionSpec{
			Urn: URNReshuffleOutput,
			Payload: []byte(protox.MustEncodeBase64(&v1pb.TransformPayload{
				Urn: URNReshuffleOutput,
			})),
		},
	}
	output := &pipepb.PTransform{
		UniqueName: outputID,
		Spec: &pipepb.FunctionSpec{
			Urn:     URNParDo,
			Payload: protox.MustEncode(outputPayload),
		},
		Inputs:        map[string]string{"i0": gbkOut},
		Outputs:       map[string]string{"i0": nodeID(outNode)},
		EnvironmentId: m.addDefaultEnv(),
	}
	m.transforms[id] = output
	subtransforms = append(subtransforms, id)

	// Add composite for visualization, or runner optimization
	reshuffleID := fmt.Sprintf("%v_reshuffle", id)
	m.transforms[reshuffleID] = &pipepb.PTransform{
		UniqueName:    edge.Name,
		Subtransforms: subtransforms,
		Spec: &pipepb.FunctionSpec{
			Urn: URNReshuffle,
		},
		EnvironmentId: m.addDefaultEnv(),
	}
	return reshuffleID, nil
}