in src/rust/engine/process_execution/src/remote.rs [376:541]
fn extract_execute_response(
&self,
operation_or_status: OperationOrStatus,
attempts: &mut ExecutionHistory,
) -> BoxFuture<FallibleExecuteProcessResult, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);
let status = match operation_or_status {
OperationOrStatus::Operation(mut operation) => {
if !operation.get_done() {
return future::err(ExecutionError::NotFinished(operation.take_name())).to_boxed();
}
if operation.has_error() {
return future::err(ExecutionError::Fatal(format_error(&operation.get_error())))
.to_boxed();
}
if !operation.has_response() {
return future::err(ExecutionError::Fatal(
"Operation finished but no response supplied".to_string(),
))
.to_boxed();
}
let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new();
try_future!(execute_response
.merge_from_bytes(operation.get_response().get_value())
.map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))));
trace!("Got (nested) execute response: {:?}", execute_response);
if execute_response.get_result().has_execution_metadata() {
let metadata = execute_response.get_result().get_execution_metadata();
let enqueued = timespec_from(metadata.get_queued_timestamp());
let worker_start = timespec_from(metadata.get_worker_start_timestamp());
let input_fetch_start = timespec_from(metadata.get_input_fetch_start_timestamp());
let input_fetch_completed = timespec_from(metadata.get_input_fetch_completed_timestamp());
let execution_start = timespec_from(metadata.get_execution_start_timestamp());
let execution_completed = timespec_from(metadata.get_execution_completed_timestamp());
let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp());
let output_upload_completed =
timespec_from(metadata.get_output_upload_completed_timestamp());
match (worker_start - enqueued).to_std() {
Ok(duration) => attempts.current_attempt.remote_queue = Some(duration),
Err(err) => warn!("Got negative remote queue time: {}", err),
}
match (input_fetch_completed - input_fetch_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration),
Err(err) => warn!("Got negative remote input fetch time: {}", err),
}
match (execution_completed - execution_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_execution = Some(duration),
Err(err) => warn!("Got negative remote execution time: {}", err),
}
match (output_upload_completed - output_upload_start).to_std() {
Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration),
Err(err) => warn!("Got negative remote output store time: {}", err),
}
attempts.current_attempt.was_cache_hit = execute_response.cached_result;
}
let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]);
execution_attempts.push(attempts.current_attempt);
let status = execute_response.take_status();
if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok {
return self
.extract_stdout(&execute_response)
.join(self.extract_stderr(&execute_response))
.join(self.extract_output_files(&execute_response))
.and_then(move |((stdout, stderr), output_directory)| {
Ok(FallibleExecuteProcessResult {
stdout: stdout,
stderr: stderr,
exit_code: execute_response.get_result().get_exit_code(),
output_directory: output_directory,
execution_attempts: execution_attempts,
})
})
.to_boxed();
}
status
}
OperationOrStatus::Status(status) => status,
};
match grpcio::RpcStatusCode::from(status.get_code()) {
grpcio::RpcStatusCode::Ok => unreachable!(),
grpcio::RpcStatusCode::FailedPrecondition => {
if status.get_details().len() != 1 {
return future::err(ExecutionError::Fatal(format!(
"Received multiple details in FailedPrecondition ExecuteResponse's status field: {:?}",
status.get_details()
)))
.to_boxed();
}
let details = status.get_details().get(0).unwrap();
let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new();
if details.get_type_url()
!= format!(
"type.googleapis.com/{}",
precondition_failure.descriptor().full_name()
)
{
return future::err(ExecutionError::Fatal(format!(
"Received FailedPrecondition, but didn't know how to resolve it: {},\
protobuf type {}",
status.get_message(),
details.get_type_url()
)))
.to_boxed();
}
try_future!(precondition_failure
.merge_from_bytes(details.get_value())
.map_err(|e| ExecutionError::Fatal(format!(
"Error deserializing FailedPrecondition proto: {:?}",
e
))));
let mut missing_digests = Vec::with_capacity(precondition_failure.get_violations().len());
for violation in precondition_failure.get_violations() {
if violation.get_field_type() != "MISSING" {
return future::err(ExecutionError::Fatal(format!(
"Didn't know how to process PreconditionFailure violation: {:?}",
violation
)))
.to_boxed();
}
let parts: Vec<_> = violation.get_subject().split('/').collect();
if parts.len() != 3 || parts[0] != "blobs" {
return future::err(ExecutionError::Fatal(format!(
"Received FailedPrecondition MISSING but didn't recognize subject {}",
violation.get_subject()
)))
.to_boxed();
}
let digest = Digest(
try_future!(Fingerprint::from_hex_string(parts[1]).map_err(|e| {
ExecutionError::Fatal(format!("Bad digest in missing blob: {}: {}", parts[1], e))
})),
try_future!(parts[2]
.parse::<usize>()
.map_err(|e| ExecutionError::Fatal(format!(
"Missing blob had bad size: {}: {}",
parts[2], e
)))),
);
missing_digests.push(digest);
}
if missing_digests.is_empty() {
return future::err(ExecutionError::Fatal(
"Error from remote execution: FailedPrecondition, but no details".to_owned(),
))
.to_boxed();
}
future::err(ExecutionError::MissingDigests(missing_digests)).to_boxed()
}
code => future::err(ExecutionError::Fatal(format!(
"Error from remote execution: {:?}: {:?}",
code,
status.get_message()
)))
.to_boxed(),
}
.to_boxed()
}