in src/rust/engine/process_execution/src/remote.rs [129:299]
fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String> {
let operations_client = self.operations_client.clone();
let store = self.store.clone();
let execute_request_result = make_execute_request(
&req,
&self.instance_name,
&self.cache_key_gen_version,
self.platform_properties.clone(),
);
let ExecuteProcessRequest {
description,
timeout,
input_files,
..
} = req;
let description2 = description.clone();
match execute_request_result {
Ok((action, command, execute_request)) => {
let command_runner = self.clone();
let command_runner2 = self.clone();
let command_runner3 = self.clone();
let execute_request = Arc::new(execute_request);
let execute_request2 = execute_request.clone();
let futures_timer_thread = self.futures_timer_thread.clone();
let store2 = store.clone();
let mut history = ExecutionHistory::default();
self
.store_proto_locally(&command)
.join(self.store_proto_locally(&action))
.and_then(move |(command_digest, action_digest)| {
store2.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files])
})
.and_then(move |summary| {
history.current_attempt += summary;
trace!(
"Executing remotely request: {:?} (command: {:?})",
execute_request,
command
);
command_runner
.oneshot_execute(&execute_request)
.join(future::ok(history))
})
.and_then(move |(operation, history)| {
let start_time = Instant::now();
future::loop_fn(
(history, operation, 0),
move |(mut history, operation, iter_num)| {
let description = description.clone();
let execute_request2 = execute_request2.clone();
let store = store.clone();
let operations_client = operations_client.clone();
let command_runner2 = command_runner2.clone();
let command_runner3 = command_runner3.clone();
let futures_timer_thread = futures_timer_thread.clone();
let f = command_runner2.extract_execute_response(operation, &mut history);
f.map(future::Loop::Break).or_else(move |value| {
match value {
ExecutionError::Fatal(err) => future::err(err).to_boxed(),
ExecutionError::MissingDigests(missing_digests) => {
let ExecutionHistory {
mut attempts,
current_attempt,
} = history;
trace!(
"Server reported missing digests ({:?}); trying to upload: {:?}",
current_attempt,
missing_digests,
);
attempts.push(current_attempt);
let history = ExecutionHistory {
attempts,
current_attempt: ExecutionStats::default(),
};
let execute_request = execute_request2.clone();
store
.ensure_remote_has_recursive(missing_digests)
.and_then(move |summary| {
let mut history = history;
history.current_attempt += summary;
command_runner2
.oneshot_execute(&execute_request)
.join(future::ok(history))
})
// Reset `iter_num` on `MissingDigests`
.map(|(operation, history)| future::Loop::Continue((history, operation, 0)))
.to_boxed()
}
ExecutionError::NotFinished(operation_name) => {
let mut operation_request =
bazel_protos::operations::GetOperationRequest::new();
operation_request.set_name(operation_name.clone());
let backoff_period = min(
CommandRunner::BACKOFF_MAX_WAIT_MILLIS,
(1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS,
);
// take the grpc result and cancel the op if too much time has passed.
let elapsed = start_time.elapsed();
if elapsed > timeout {
future::err(format!(
"Exceeded time out of {:?} with {:?} for operation {}, {}",
timeout, elapsed, operation_name, description
))
.to_boxed()
} else {
// maybe the delay here should be the min of remaining time and the backoff period
Delay::new_handle(
Instant::now() + Duration::from_millis(backoff_period),
futures_timer_thread.with(futures_timer::HelperThread::handle),
)
.map_err(move |e| {
format!(
"Future-Delay errored at operation result polling for {}, {}: {}",
operation_name, description, e
)
})
.and_then(move |_| {
future::done(
operations_client
.get_operation_opt(&operation_request, command_runner3.call_option())
.or_else(move |err| {
rpcerror_recover_cancelled(operation_request.take_name(), err)
})
.map(OperationOrStatus::Operation)
.map_err(rpcerror_to_string),
)
.map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1))
})
.to_boxed()
})
.to_boxed()
}
}
}
})
},
)
})
.map(move |resp| {
let mut attempts = String::new();
for (i, attempt) in resp.execution_attempts.iter().enumerate() {
attempts += &format!("\nAttempt {}: {:?}", i, attempt);
}
debug!(
"Finished remote exceution of {} after {} attempts: Stats: {}",
description2,
resp.execution_attempts.len(),
attempts
);
resp
})
.to_boxed()
}
Err(err) => future::err(err).to_boxed(),
}
}