fn run()

in src/rust/engine/process_execution/src/remote.rs [129:299]


  fn run(&self, req: ExecuteProcessRequest) -> BoxFuture<FallibleExecuteProcessResult, String> {
    let operations_client = self.operations_client.clone();

    let store = self.store.clone();
    let execute_request_result = make_execute_request(
      &req,
      &self.instance_name,
      &self.cache_key_gen_version,
      self.platform_properties.clone(),
    );

    let ExecuteProcessRequest {
      description,
      timeout,
      input_files,
      ..
    } = req;

    let description2 = description.clone();

    match execute_request_result {
      Ok((action, command, execute_request)) => {
        let command_runner = self.clone();
        let command_runner2 = self.clone();
        let command_runner3 = self.clone();
        let execute_request = Arc::new(execute_request);
        let execute_request2 = execute_request.clone();
        let futures_timer_thread = self.futures_timer_thread.clone();

        let store2 = store.clone();
        let mut history = ExecutionHistory::default();

        self
          .store_proto_locally(&command)
          .join(self.store_proto_locally(&action))
          .and_then(move |(command_digest, action_digest)| {
            store2.ensure_remote_has_recursive(vec![command_digest, action_digest, input_files])
          })
          .and_then(move |summary| {
            history.current_attempt += summary;
            trace!(
              "Executing remotely request: {:?} (command: {:?})",
              execute_request,
              command
            );
            command_runner
              .oneshot_execute(&execute_request)
              .join(future::ok(history))
          })
          .and_then(move |(operation, history)| {
            let start_time = Instant::now();

            future::loop_fn(
              (history, operation, 0),
              move |(mut history, operation, iter_num)| {
                let description = description.clone();

                let execute_request2 = execute_request2.clone();
                let store = store.clone();
                let operations_client = operations_client.clone();
                let command_runner2 = command_runner2.clone();
                let command_runner3 = command_runner3.clone();
                let futures_timer_thread = futures_timer_thread.clone();
                let f = command_runner2.extract_execute_response(operation, &mut history);
                f.map(future::Loop::Break).or_else(move |value| {
                  match value {
                    ExecutionError::Fatal(err) => future::err(err).to_boxed(),
                    ExecutionError::MissingDigests(missing_digests) => {
                      let ExecutionHistory {
                        mut attempts,
                        current_attempt,
                      } = history;

                      trace!(
                        "Server reported missing digests ({:?}); trying to upload: {:?}",
                        current_attempt,
                        missing_digests,
                      );

                      attempts.push(current_attempt);
                      let history = ExecutionHistory {
                        attempts,
                        current_attempt: ExecutionStats::default(),
                      };

                      let execute_request = execute_request2.clone();
                      store
                        .ensure_remote_has_recursive(missing_digests)
                        .and_then(move |summary| {
                          let mut history = history;
                          history.current_attempt += summary;
                          command_runner2
                            .oneshot_execute(&execute_request)
                            .join(future::ok(history))
                        })
                        // Reset `iter_num` on `MissingDigests`
                        .map(|(operation, history)| future::Loop::Continue((history, operation, 0)))
                        .to_boxed()
                    }
                    ExecutionError::NotFinished(operation_name) => {
                      let mut operation_request =
                        bazel_protos::operations::GetOperationRequest::new();
                      operation_request.set_name(operation_name.clone());

                      let backoff_period = min(
                        CommandRunner::BACKOFF_MAX_WAIT_MILLIS,
                        (1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS,
                      );

                      // take the grpc result and cancel the op if too much time has passed.
                      let elapsed = start_time.elapsed();

                      if elapsed > timeout {
                        future::err(format!(
                          "Exceeded time out of {:?} with {:?} for operation {}, {}",
                          timeout, elapsed, operation_name, description
                        ))
                        .to_boxed()
                      } else {
                        // maybe the delay here should be the min of remaining time and the backoff period
                        Delay::new_handle(
                          Instant::now() + Duration::from_millis(backoff_period),
                          futures_timer_thread.with(futures_timer::HelperThread::handle),
                        )
                        .map_err(move |e| {
                          format!(
                            "Future-Delay errored at operation result polling for {}, {}: {}",
                            operation_name, description, e
                          )
                        })
                        .and_then(move |_| {
                          future::done(
                            operations_client
                              .get_operation_opt(&operation_request, command_runner3.call_option())
                              .or_else(move |err| {
                                rpcerror_recover_cancelled(operation_request.take_name(), err)
                              })
                              .map(OperationOrStatus::Operation)
                              .map_err(rpcerror_to_string),
                          )
                          .map(move |operation| {
                            future::Loop::Continue((history, operation, iter_num + 1))
                          })
                          .to_boxed()
                        })
                        .to_boxed()
                      }
                    }
                  }
                })
              },
            )
          })
          .map(move |resp| {
            let mut attempts = String::new();
            for (i, attempt) in resp.execution_attempts.iter().enumerate() {
              attempts += &format!("\nAttempt {}: {:?}", i, attempt);
            }
            debug!(
              "Finished remote exceution of {} after {} attempts: Stats: {}",
              description2,
              resp.execution_attempts.len(),
              attempts
            );
            resp
          })
          .to_boxed()
      }
      Err(err) => future::err(err).to_boxed(),
    }
  }