fn extract_execute_response()

in src/rust/engine/process_execution/src/remote.rs [376:541]


  fn extract_execute_response(
    &self,
    operation_or_status: OperationOrStatus,
    attempts: &mut ExecutionHistory,
  ) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
    trace!("Got operation response: {:?}", operation_or_status);

    let status = match operation_or_status {
      OperationOrStatus::Operation(mut operation) => {
        if !operation.get_done() {
          return future::err(ExecutionError::NotFinished(operation.take_name())).to_boxed();
        }
        if operation.has_error() {
          return future::err(ExecutionError::Fatal(format_error(&operation.get_error())))
            .to_boxed();
        }
        if !operation.has_response() {
          return future::err(ExecutionError::Fatal(
            "Operation finished but no response supplied".to_string(),
          ))
          .to_boxed();
        }

        let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
        try_future!(execute_response
          .merge_from_bytes(operation.get_response().get_value())
          .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))));
        trace!("Got (nested) execute response: {:?}", execute_response);

        if execute_response.get_result().has_execution_metadata() {
          let metadata = execute_response.get_result().get_execution_metadata();
          let enqueued = timespec_from(metadata.get_queued_timestamp());
          let worker_start = timespec_from(metadata.get_worker_start_timestamp());
          let input_fetch_start = timespec_from(metadata.get_input_fetch_start_timestamp());
          let input_fetch_completed = timespec_from(metadata.get_input_fetch_completed_timestamp());
          let execution_start = timespec_from(metadata.get_execution_start_timestamp());
          let execution_completed = timespec_from(metadata.get_execution_completed_timestamp());
          let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp());
          let output_upload_completed =
            timespec_from(metadata.get_output_upload_completed_timestamp());

          match (worker_start - enqueued).to_std() {
            Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
            Err(err) => warn!("Got negative remote queue time: {}", err),
          }
          match (input_fetch_completed - input_fetch_start).to_std() {
            Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
            Err(err) => warn!("Got negative remote input fetch time: {}", err),
          }
          match (execution_completed - execution_start).to_std() {
            Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
            Err(err) => warn!("Got negative remote execution time: {}", err),
          }
          match (output_upload_completed - output_upload_start).to_std() {
            Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
            Err(err) => warn!("Got negative remote output store time: {}", err),
          }
          attempts.current_attempt.was_cache_hit = execute_response.cached_result;
        }

        let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
        execution_attempts.push(attempts.current_attempt);

        let status = execute_response.take_status();
        if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok {
          return self
            .extract_stdout(&execute_response)
            .join(self.extract_stderr(&execute_response))
            .join(self.extract_output_files(&execute_response))
            .and_then(move |((stdout, stderr), output_directory)| {
              Ok(FallibleExecuteProcessResult {
                stdout: stdout,
                stderr: stderr,
                exit_code: execute_response.get_result().get_exit_code(),
                output_directory: output_directory,
                execution_attempts: execution_attempts,
              })
            })
            .to_boxed();
        }
        status
      }
      OperationOrStatus::Status(status) => status,
    };

    match grpcio::RpcStatusCode::from(status.get_code()) {
      grpcio::RpcStatusCode::Ok => unreachable!(),
      grpcio::RpcStatusCode::FailedPrecondition => {
        if status.get_details().len() != 1 {
          return future::err(ExecutionError::Fatal(format!(
            "Received multiple details in FailedPrecondition ExecuteResponse's status field: {:?}",
            status.get_details()
          )))
          .to_boxed();
        }
        let details = status.get_details().get(0).unwrap();
        let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new();
        if details.get_type_url()
          != format!(
            "type.googleapis.com/{}",
            precondition_failure.descriptor().full_name()
          )
        {
          return future::err(ExecutionError::Fatal(format!(
            "Received FailedPrecondition, but didn't know how to resolve it: {},\
             protobuf type {}",
            status.get_message(),
            details.get_type_url()
          )))
          .to_boxed();
        }
        try_future!(precondition_failure
          .merge_from_bytes(details.get_value())
          .map_err(|e| ExecutionError::Fatal(format!(
            "Error deserializing FailedPrecondition proto: {:?}",
            e
          ))));

        let mut missing_digests = Vec::with_capacity(precondition_failure.get_violations().len());

        for violation in precondition_failure.get_violations() {
          if violation.get_field_type() != "MISSING" {
            return future::err(ExecutionError::Fatal(format!(
              "Didn't know how to process PreconditionFailure violation: {:?}",
              violation
            )))
            .to_boxed();
          }
          let parts: Vec<_> = violation.get_subject().split('/').collect();
          if parts.len() != 3 || parts[0] != "blobs" {
            return future::err(ExecutionError::Fatal(format!(
              "Received FailedPrecondition MISSING but didn't recognize subject {}",
              violation.get_subject()
            )))
            .to_boxed();
          }
          let digest = Digest(
            try_future!(Fingerprint::from_hex_string(parts[1]).map_err(|e| {
              ExecutionError::Fatal(format!("Bad digest in missing blob: {}: {}", parts[1], e))
            })),
            try_future!(parts[2]
              .parse::<usize>()
              .map_err(|e| ExecutionError::Fatal(format!(
                "Missing blob had bad size: {}: {}",
                parts[2], e
              )))),
          );
          missing_digests.push(digest);
        }
        if missing_digests.is_empty() {
          return future::err(ExecutionError::Fatal(
            "Error from remote execution: FailedPrecondition, but no details".to_owned(),
          ))
          .to_boxed();
        }
        future::err(ExecutionError::MissingDigests(missing_digests)).to_boxed()
      }
      code => future::err(ExecutionError::Fatal(format!(
        "Error from remote execution: {:?}: {:?}",
        code,
        status.get_message()
      )))
      .to_boxed(),
    }
    .to_boxed()
  }