in sdks/go/pkg/beam/core/runtime/harness/harness.go [262:462]
func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {
instID := instructionID(req.GetInstructionId())
ctx = setInstID(ctx, instID)
switch {
case req.GetRegister() != nil:
msg := req.GetRegister()
c.mu.Lock()
for _, desc := range msg.GetProcessBundleDescriptor() {
c.descriptors[bundleDescriptorID(desc.GetId())] = desc
}
c.mu.Unlock()
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_Register{
Register: &fnpb.RegisterResponse{},
},
}
case req.GetProcessBundle() != nil:
msg := req.GetProcessBundle()
// NOTE: the harness sends a 0-length process bundle request to sources (changed?)
bdID := bundleDescriptorID(msg.GetProcessBundleDescriptorId())
log.Debugf(ctx, "PB [%v]: %v", instID, msg)
plan, err := c.getOrCreatePlan(bdID)
// Make the plan active.
c.mu.Lock()
c.inactive.Remove(instID)
c.active[instID] = plan
c.mu.Unlock()
if err != nil {
return fail(ctx, instID, "Failed: %v", err)
}
data := NewScopedDataManager(c.data, instID)
state := NewScopedStateReader(c.state, instID)
err = plan.Execute(ctx, string(instID), exec.DataContext{Data: data, State: state})
data.Close()
state.Close()
mons, pylds := monitoring(plan)
// Move the plan back to the candidate state
c.mu.Lock()
// Mark the instruction as failed.
if err != nil {
c.failed[instID] = err
}
c.plans[bdID] = append(c.plans[bdID], plan)
delete(c.active, instID)
if removed, ok := c.inactive.Insert(instID); ok {
delete(c.failed, removed) // Also GC old failed bundles.
}
c.mu.Unlock()
if err != nil {
return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
}
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleResponse{
MonitoringData: pylds,
MonitoringInfos: mons,
},
},
}
case req.GetProcessBundleProgress() != nil:
msg := req.GetProcessBundleProgress()
ref := instructionID(msg.GetInstructionId())
plan, resp := c.getPlanOrResponse(ctx, "progress", instID, ref)
if resp != nil {
return resp
}
if plan == nil && resp == nil {
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{},
},
}
}
mons, pylds := monitoring(plan)
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
MonitoringData: pylds,
MonitoringInfos: mons,
},
},
}
case req.GetProcessBundleSplit() != nil:
msg := req.GetProcessBundleSplit()
log.Debugf(ctx, "PB Split: %v", msg)
ref := instructionID(msg.GetInstructionId())
plan, resp := c.getPlanOrResponse(ctx, "split", instID, ref)
if resp != nil {
return resp
}
if plan == nil {
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
},
}
}
// Get the desired splits for the root FnAPI read operation.
ds := msg.GetDesiredSplits()[plan.SourcePTransformID()]
if ds == nil {
return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
}
sr, err := plan.Split(exec.SplitPoints{
Splits: ds.GetAllowedSplitPoints(),
Frac: ds.GetFractionOfRemainder(),
BufSize: ds.GetEstimatedInputElements(),
})
if err != nil {
return fail(ctx, instID, "unable to split %v: %v", ref, err)
}
var pRoots []*fnpb.BundleApplication
var rRoots []*fnpb.DelayedBundleApplication
if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {
pRoots = make([]*fnpb.BundleApplication, len(sr.PS))
for i, p := range sr.PS {
pRoots[i] = &fnpb.BundleApplication{
TransformId: sr.TId,
InputId: sr.InId,
Element: p,
}
}
rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS))
for i, r := range sr.RS {
rRoots[i] = &fnpb.DelayedBundleApplication{
Application: &fnpb.BundleApplication{
TransformId: sr.TId,
InputId: sr.InId,
Element: r,
},
}
}
}
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{
ChannelSplits: []*fnpb.ProcessBundleSplitResponse_ChannelSplit{{
TransformId: plan.SourcePTransformID(),
LastPrimaryElement: sr.PI,
FirstResidualElement: sr.RI,
}},
PrimaryRoots: pRoots,
ResidualRoots: rRoots,
},
},
}
case req.GetMonitoringInfos() != nil:
msg := req.GetMonitoringInfos()
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_MonitoringInfos{
MonitoringInfos: &fnpb.MonitoringInfosMetadataResponse{
MonitoringInfo: shortIdsToInfos(msg.GetMonitoringInfoId()),
},
},
}
case req.GetHarnessMonitoringInfos() != nil:
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_HarnessMonitoringInfos{
HarnessMonitoringInfos: &fnpb.HarnessMonitoringInfosResponse{
// TODO(BEAM-11092): Populate with non-bundle metrics data.
MonitoringData: map[string][]byte{},
},
},
}
default:
return fail(ctx, instID, "Unexpected request: %v", req)
}
}