func()

in sdks/go/pkg/beam/core/runtime/harness/harness.go [262:462]


func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {
	instID := instructionID(req.GetInstructionId())
	ctx = setInstID(ctx, instID)

	switch {
	case req.GetRegister() != nil:
		msg := req.GetRegister()

		c.mu.Lock()
		for _, desc := range msg.GetProcessBundleDescriptor() {
			c.descriptors[bundleDescriptorID(desc.GetId())] = desc
		}
		c.mu.Unlock()

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_Register{
				Register: &fnpb.RegisterResponse{},
			},
		}

	case req.GetProcessBundle() != nil:
		msg := req.GetProcessBundle()

		// NOTE: the harness sends a 0-length process bundle request to sources (changed?)

		bdID := bundleDescriptorID(msg.GetProcessBundleDescriptorId())
		log.Debugf(ctx, "PB [%v]: %v", instID, msg)
		plan, err := c.getOrCreatePlan(bdID)

		// Make the plan active.
		c.mu.Lock()
		c.inactive.Remove(instID)
		c.active[instID] = plan
		c.mu.Unlock()

		if err != nil {
			return fail(ctx, instID, "Failed: %v", err)
		}

		data := NewScopedDataManager(c.data, instID)
		state := NewScopedStateReader(c.state, instID)
		err = plan.Execute(ctx, string(instID), exec.DataContext{Data: data, State: state})
		data.Close()
		state.Close()

		mons, pylds := monitoring(plan)
		// Move the plan back to the candidate state
		c.mu.Lock()
		// Mark the instruction as failed.
		if err != nil {
			c.failed[instID] = err
		}
		c.plans[bdID] = append(c.plans[bdID], plan)
		delete(c.active, instID)

		if removed, ok := c.inactive.Insert(instID); ok {
			delete(c.failed, removed) // Also GC old failed bundles.
		}
		c.mu.Unlock()

		if err != nil {
			return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
		}

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundle{
				ProcessBundle: &fnpb.ProcessBundleResponse{
					MonitoringData:  pylds,
					MonitoringInfos: mons,
				},
			},
		}

	case req.GetProcessBundleProgress() != nil:
		msg := req.GetProcessBundleProgress()

		ref := instructionID(msg.GetInstructionId())

		plan, resp := c.getPlanOrResponse(ctx, "progress", instID, ref)
		if resp != nil {
			return resp
		}
		if plan == nil && resp == nil {
			return &fnpb.InstructionResponse{
				InstructionId: string(instID),
				Response: &fnpb.InstructionResponse_ProcessBundleProgress{
					ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{},
				},
			}
		}

		mons, pylds := monitoring(plan)

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundleProgress{
				ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
					MonitoringData:  pylds,
					MonitoringInfos: mons,
				},
			},
		}

	case req.GetProcessBundleSplit() != nil:
		msg := req.GetProcessBundleSplit()

		log.Debugf(ctx, "PB Split: %v", msg)
		ref := instructionID(msg.GetInstructionId())

		plan, resp := c.getPlanOrResponse(ctx, "split", instID, ref)
		if resp != nil {
			return resp
		}
		if plan == nil {
			return &fnpb.InstructionResponse{
				InstructionId: string(instID),
				Response: &fnpb.InstructionResponse_ProcessBundleSplit{
					ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
				},
			}
		}

		// Get the desired splits for the root FnAPI read operation.
		ds := msg.GetDesiredSplits()[plan.SourcePTransformID()]
		if ds == nil {
			return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
		}
		sr, err := plan.Split(exec.SplitPoints{
			Splits:  ds.GetAllowedSplitPoints(),
			Frac:    ds.GetFractionOfRemainder(),
			BufSize: ds.GetEstimatedInputElements(),
		})

		if err != nil {
			return fail(ctx, instID, "unable to split %v: %v", ref, err)
		}

		var pRoots []*fnpb.BundleApplication
		var rRoots []*fnpb.DelayedBundleApplication
		if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {
			pRoots = make([]*fnpb.BundleApplication, len(sr.PS))
			for i, p := range sr.PS {
				pRoots[i] = &fnpb.BundleApplication{
					TransformId: sr.TId,
					InputId:     sr.InId,
					Element:     p,
				}
			}
			rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS))
			for i, r := range sr.RS {
				rRoots[i] = &fnpb.DelayedBundleApplication{
					Application: &fnpb.BundleApplication{
						TransformId: sr.TId,
						InputId:     sr.InId,
						Element:     r,
					},
				}
			}
		}

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundleSplit{
				ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{
					ChannelSplits: []*fnpb.ProcessBundleSplitResponse_ChannelSplit{{
						TransformId:          plan.SourcePTransformID(),
						LastPrimaryElement:   sr.PI,
						FirstResidualElement: sr.RI,
					}},
					PrimaryRoots:  pRoots,
					ResidualRoots: rRoots,
				},
			},
		}
	case req.GetMonitoringInfos() != nil:
		msg := req.GetMonitoringInfos()
		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_MonitoringInfos{
				MonitoringInfos: &fnpb.MonitoringInfosMetadataResponse{
					MonitoringInfo: shortIdsToInfos(msg.GetMonitoringInfoId()),
				},
			},
		}
	case req.GetHarnessMonitoringInfos() != nil:
		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_HarnessMonitoringInfos{
				HarnessMonitoringInfos: &fnpb.HarnessMonitoringInfosResponse{
					// TODO(BEAM-11092): Populate with non-bundle metrics data.
					MonitoringData: map[string][]byte{},
				},
			},
		}

	default:
		return fail(ctx, instID, "Unexpected request: %v", req)
	}
}