func()

in sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go [106:290]


func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, error) {
	t := x.comp.Transforms[id]

	prop := properties{
		UserName:   userName(trunk, t.UniqueName),
		OutputInfo: x.translateOutputs(t.Outputs),
	}

	urn := t.GetSpec().GetUrn()
	switch urn {
	case graphx.URNImpulse:
		// NOTE: The impulse []data value is encoded in a special way as a
		// URL Query-escaped windowed _unnested_ value. It is read back in
		// a nested context at runtime.
		var buf bytes.Buffer
		if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
			return nil, err
		}
		value := string(append(buf.Bytes(), t.GetSpec().Payload...))
		// log.Printf("Impulse data: %v", url.QueryEscape(value))

		prop.Element = []string{url.QueryEscape(value)}
		return []*df.Step{x.newStep(id, impulseKind, prop)}, nil

	case graphx.URNParDo:
		var payload pipepb.ParDoPayload
		if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
			return nil, errors.Wrapf(err, "invalid ParDo payload for %v", t)
		}

		var steps []*df.Step
		rem := reflectx.ShallowClone(t.Inputs).(map[string]string)

		prop.NonParallelInputs = make(map[string]*outputReference)
		for key, sideInput := range payload.SideInputs {
			// Side input require an additional conversion step, which must
			// be before the present one.
			delete(rem, key)

			pcol := x.comp.Pcollections[t.Inputs[key]]
			ref := x.pcollections[t.Inputs[key]]
			c := x.translateCoder(pcol, pcol.CoderId)

			var outputInfo output
			outputInfo = output{
				UserName:   "i0",
				OutputName: "i0",
				Encoding:   graphx.WrapIterable(c),
			}
			if graphx.URNMultimapSideInput == sideInput.GetAccessPattern().GetUrn() {
				outputInfo.UseIndexedFormat = true
			}

			side := &df.Step{
				Name: fmt.Sprintf("view%v_%v", id, key),
				Kind: sideInputKind,
				Properties: newMsg(properties{
					ParallelInput: ref,
					OutputInfo: []output{
						outputInfo,
					},
					UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)),
				}),
			}
			steps = append(steps, side)

			prop.NonParallelInputs[key] = newOutputReference(side.Name, "i0")
		}

		rcid := payload.GetRestrictionCoderId()
		if rcid != "" {
			rc, err := x.coders.Coder(rcid)
			if err != nil {
				return nil, err
			}
			enc, err := graphx.EncodeCoderRef(rc)
			if err != nil {
				return nil, errors.Wrapf(err, "invalid splittable ParDoPayload, couldn't encode Restriction Coder %v", t)
			}
			prop.RestrictionEncoder = enc
		}

		in := stringx.SingleValue(rem)

		prop.ParallelInput = x.pcollections[in]
		prop.SerializedFn = id // == reference into the proto pipeline
		return append(steps, x.newStep(id, parDoKind, prop)), nil
	case graphx.URNCombinePerKey:
		// Dataflow uses a GBK followed by a CombineValues to determine when it can lift.
		// To achieve this, we use the combine composite's subtransforms, and modify the
		// Combine ParDo with the CombineValues kind, set its SerializedFn to map to the
		// composite payload, and the accumulator coding.
		if len(t.Subtransforms) != 2 {
			return nil, errors.Errorf("invalid CombinePerKey, expected 2 subtransforms but got %d in %v", len(t.Subtransforms), t)
		}
		steps, err := x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
		if err != nil {
			return nil, errors.Wrapf(err, "invalid CombinePerKey, couldn't extract GBK from %v", t)
		}
		var payload pipepb.CombinePayload
		if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
			return nil, errors.Wrapf(err, "invalid Combine payload for %v", t)
		}

		c, err := x.coders.Coder(payload.AccumulatorCoderId)
		if err != nil {
			return nil, errors.Wrapf(err, "invalid Combine payload , missing Accumulator Coder %v", t)
		}
		enc, err := graphx.EncodeCoderRef(c)
		if err != nil {
			return nil, errors.Wrapf(err, "invalid Combine payload, couldn't encode Accumulator Coder %v", t)
		}
		json.Unmarshal([]byte(steps[1].Properties), &prop)
		prop.Encoding = enc
		prop.SerializedFn = id
		steps[1].Kind = combineKind
		steps[1].Properties = newMsg(prop)
		return steps, nil

	case graphx.URNReshuffle:
		return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)

	case graphx.URNFlatten:
		for _, in := range t.Inputs {
			prop.Inputs = append(prop.Inputs, x.pcollections[in])
		}
		return []*df.Step{x.newStep(id, flattenKind, prop)}, nil

	case graphx.URNGBK:
		in := stringx.SingleValue(t.Inputs)

		prop.ParallelInput = x.pcollections[in]
		prop.SerializedFn = encodeSerializedFn(x.extractWindowingStrategy(in))
		return []*df.Step{x.newStep(id, gbkKind, prop)}, nil

	case graphx.URNWindow:
		in := stringx.SingleValue(t.Inputs)
		out := stringx.SingleValue(t.Outputs)

		prop.ParallelInput = x.pcollections[in]
		prop.SerializedFn = encodeSerializedFn(x.extractWindowingStrategy(out))
		return []*df.Step{x.newStep(id, windowIntoKind, prop)}, nil

	case pubsub_v1.PubSubPayloadURN:
		// Translate to native handling of PubSub I/O.

		var msg pubsub_v1.PubSubPayload
		if err := proto.Unmarshal(t.Spec.Payload, &msg); err != nil {
			return nil, errors.Wrap(err, "bad pubsub payload")
		}

		prop.Format = "pubsub"
		prop.PubSubTopic = msg.GetTopic()
		prop.PubSubSubscription = msg.GetSubscription()
		prop.PubSubIDLabel = msg.GetIdAttribute()
		prop.PubSubTimestampLabel = msg.GetTimestampAttribute()
		prop.PubSubWithAttributes = msg.GetWithAttributes()

		if prop.PubSubSubscription != "" {
			prop.PubSubTopic = ""
		}

		switch msg.Op {
		case pubsub_v1.PubSubPayload_READ:
			return []*df.Step{x.newStep(id, readKind, prop)}, nil

		case pubsub_v1.PubSubPayload_WRITE:
			in := stringx.SingleValue(t.Inputs)

			prop.ParallelInput = x.pcollections[in]
			prop.Encoding = x.wrapCoder(x.comp.Pcollections[in], coder.NewBytes())
			return []*df.Step{x.newStep(id, writeKind, prop)}, nil

		default:
			return nil, errors.Errorf("bad pubsub op: %v", msg.Op)
		}

	default:
		if len(t.Subtransforms) > 0 {
			return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
		}

		return nil, errors.Errorf("unexpected primitive urn: %v", t)
	}
}