in sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go [106:290]
func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, error) {
t := x.comp.Transforms[id]
prop := properties{
UserName: userName(trunk, t.UniqueName),
OutputInfo: x.translateOutputs(t.Outputs),
}
urn := t.GetSpec().GetUrn()
switch urn {
case graphx.URNImpulse:
// NOTE: The impulse []data value is encoded in a special way as a
// URL Query-escaped windowed _unnested_ value. It is read back in
// a nested context at runtime.
var buf bytes.Buffer
if err := exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
return nil, err
}
value := string(append(buf.Bytes(), t.GetSpec().Payload...))
// log.Printf("Impulse data: %v", url.QueryEscape(value))
prop.Element = []string{url.QueryEscape(value)}
return []*df.Step{x.newStep(id, impulseKind, prop)}, nil
case graphx.URNParDo:
var payload pipepb.ParDoPayload
if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
return nil, errors.Wrapf(err, "invalid ParDo payload for %v", t)
}
var steps []*df.Step
rem := reflectx.ShallowClone(t.Inputs).(map[string]string)
prop.NonParallelInputs = make(map[string]*outputReference)
for key, sideInput := range payload.SideInputs {
// Side input require an additional conversion step, which must
// be before the present one.
delete(rem, key)
pcol := x.comp.Pcollections[t.Inputs[key]]
ref := x.pcollections[t.Inputs[key]]
c := x.translateCoder(pcol, pcol.CoderId)
var outputInfo output
outputInfo = output{
UserName: "i0",
OutputName: "i0",
Encoding: graphx.WrapIterable(c),
}
if graphx.URNMultimapSideInput == sideInput.GetAccessPattern().GetUrn() {
outputInfo.UseIndexedFormat = true
}
side := &df.Step{
Name: fmt.Sprintf("view%v_%v", id, key),
Kind: sideInputKind,
Properties: newMsg(properties{
ParallelInput: ref,
OutputInfo: []output{
outputInfo,
},
UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)),
}),
}
steps = append(steps, side)
prop.NonParallelInputs[key] = newOutputReference(side.Name, "i0")
}
rcid := payload.GetRestrictionCoderId()
if rcid != "" {
rc, err := x.coders.Coder(rcid)
if err != nil {
return nil, err
}
enc, err := graphx.EncodeCoderRef(rc)
if err != nil {
return nil, errors.Wrapf(err, "invalid splittable ParDoPayload, couldn't encode Restriction Coder %v", t)
}
prop.RestrictionEncoder = enc
}
in := stringx.SingleValue(rem)
prop.ParallelInput = x.pcollections[in]
prop.SerializedFn = id // == reference into the proto pipeline
return append(steps, x.newStep(id, parDoKind, prop)), nil
case graphx.URNCombinePerKey:
// Dataflow uses a GBK followed by a CombineValues to determine when it can lift.
// To achieve this, we use the combine composite's subtransforms, and modify the
// Combine ParDo with the CombineValues kind, set its SerializedFn to map to the
// composite payload, and the accumulator coding.
if len(t.Subtransforms) != 2 {
return nil, errors.Errorf("invalid CombinePerKey, expected 2 subtransforms but got %d in %v", len(t.Subtransforms), t)
}
steps, err := x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
if err != nil {
return nil, errors.Wrapf(err, "invalid CombinePerKey, couldn't extract GBK from %v", t)
}
var payload pipepb.CombinePayload
if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
return nil, errors.Wrapf(err, "invalid Combine payload for %v", t)
}
c, err := x.coders.Coder(payload.AccumulatorCoderId)
if err != nil {
return nil, errors.Wrapf(err, "invalid Combine payload , missing Accumulator Coder %v", t)
}
enc, err := graphx.EncodeCoderRef(c)
if err != nil {
return nil, errors.Wrapf(err, "invalid Combine payload, couldn't encode Accumulator Coder %v", t)
}
json.Unmarshal([]byte(steps[1].Properties), &prop)
prop.Encoding = enc
prop.SerializedFn = id
steps[1].Kind = combineKind
steps[1].Properties = newMsg(prop)
return steps, nil
case graphx.URNReshuffle:
return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
case graphx.URNFlatten:
for _, in := range t.Inputs {
prop.Inputs = append(prop.Inputs, x.pcollections[in])
}
return []*df.Step{x.newStep(id, flattenKind, prop)}, nil
case graphx.URNGBK:
in := stringx.SingleValue(t.Inputs)
prop.ParallelInput = x.pcollections[in]
prop.SerializedFn = encodeSerializedFn(x.extractWindowingStrategy(in))
return []*df.Step{x.newStep(id, gbkKind, prop)}, nil
case graphx.URNWindow:
in := stringx.SingleValue(t.Inputs)
out := stringx.SingleValue(t.Outputs)
prop.ParallelInput = x.pcollections[in]
prop.SerializedFn = encodeSerializedFn(x.extractWindowingStrategy(out))
return []*df.Step{x.newStep(id, windowIntoKind, prop)}, nil
case pubsub_v1.PubSubPayloadURN:
// Translate to native handling of PubSub I/O.
var msg pubsub_v1.PubSubPayload
if err := proto.Unmarshal(t.Spec.Payload, &msg); err != nil {
return nil, errors.Wrap(err, "bad pubsub payload")
}
prop.Format = "pubsub"
prop.PubSubTopic = msg.GetTopic()
prop.PubSubSubscription = msg.GetSubscription()
prop.PubSubIDLabel = msg.GetIdAttribute()
prop.PubSubTimestampLabel = msg.GetTimestampAttribute()
prop.PubSubWithAttributes = msg.GetWithAttributes()
if prop.PubSubSubscription != "" {
prop.PubSubTopic = ""
}
switch msg.Op {
case pubsub_v1.PubSubPayload_READ:
return []*df.Step{x.newStep(id, readKind, prop)}, nil
case pubsub_v1.PubSubPayload_WRITE:
in := stringx.SingleValue(t.Inputs)
prop.ParallelInput = x.pcollections[in]
prop.Encoding = x.wrapCoder(x.comp.Pcollections[in], coder.NewBytes())
return []*df.Step{x.newStep(id, writeKind, prop)}, nil
default:
return nil, errors.Errorf("bad pubsub op: %v", msg.Op)
}
default:
if len(t.Subtransforms) > 0 {
return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
}
return nil, errors.Errorf("unexpected primitive urn: %v", t)
}
}