fn write()

in src/rust/engine/testutil/mock/src/cas.rs [328:458]


  fn write(
    &self,
    ctx: grpcio::RpcContext<'_>,
    stream: grpcio::RequestStream<bazel_protos::bytestream::WriteRequest>,
    sink: grpcio::ClientStreamingSink<bazel_protos::bytestream::WriteResponse>,
  ) {
    check_auth!(self, ctx, sink);

    let always_errors = self.always_errors;
    let write_message_sizes = self.write_message_sizes.clone();
    let blobs = self.blobs.clone();
    let instance_name = self.instance_name();
    ctx.spawn(
      stream
        .collect()
        .into_future()
        .and_then(move |reqs| {
          let mut maybe_resource_name = None;
          let mut want_next_offset = 0;
          let mut bytes = Bytes::new();
          for req in reqs {
            match maybe_resource_name {
              None => maybe_resource_name = Some(req.get_resource_name().to_owned()),
              Some(ref resource_name) => {
                if resource_name != req.get_resource_name() {
                  return Err(grpcio::Error::RpcFailure(grpcio::RpcStatus::new(
                    grpcio::RpcStatusCode::InvalidArgument,
                    Some(format!(
                      "All resource names in stream must be the same. Got {} but earlier saw {}",
                      req.get_resource_name(),
                      resource_name
                    )),
                  )));
                }
              }
            }
            if req.get_write_offset() != want_next_offset {
              return Err(grpcio::Error::RpcFailure(grpcio::RpcStatus::new(
                grpcio::RpcStatusCode::InvalidArgument,
                Some(format!(
                  "Missing chunk. Expected next offset {}, got next offset: {}",
                  want_next_offset,
                  req.get_write_offset()
                )),
              )));
            }
            want_next_offset += req.get_data().len() as i64;
            write_message_sizes.lock().push(req.get_data().len());
            bytes.extend(req.get_data());
          }
          Ok((maybe_resource_name, bytes))
        })
        .map_err(move |err: grpcio::Error| match err {
          grpcio::Error::RpcFailure(status) => status,
          e => grpcio::RpcStatus::new(grpcio::RpcStatusCode::Unknown, Some(format!("{:?}", e))),
        })
        .and_then(
          move |(maybe_resource_name, bytes)| match maybe_resource_name {
            None => Err(grpcio::RpcStatus::new(
              grpcio::RpcStatusCode::InvalidArgument,
              Some("Stream saw no messages".to_owned()),
            )),
            Some(resource_name) => {
              let parts: Vec<_> = resource_name.splitn(6, '/').collect();
              if parts.len() != 6
                || parts.get(0) != Some(&instance_name.as_ref())
                || parts.get(1) != Some(&"uploads")
                || parts.get(3) != Some(&"blobs")
              {
                return Err(grpcio::RpcStatus::new(
                  grpcio::RpcStatusCode::InvalidArgument,
                  Some(format!("Bad resource name: {}", resource_name)),
                ));
              }
              let fingerprint = match Fingerprint::from_hex_string(parts[4]) {
                Ok(f) => f,
                Err(err) => {
                  return Err(grpcio::RpcStatus::new(
                    grpcio::RpcStatusCode::InvalidArgument,
                    Some(format!(
                      "Bad fingerprint in resource name: {}: {}",
                      parts[4], err
                    )),
                  ));
                }
              };
              let size = match parts[5].parse::<usize>() {
                Ok(s) => s,
                Err(err) => {
                  return Err(grpcio::RpcStatus::new(
                    grpcio::RpcStatusCode::InvalidArgument,
                    Some(format!("Bad size in resource name: {}: {}", parts[5], err)),
                  ));
                }
              };
              if size != bytes.len() {
                return Err(grpcio::RpcStatus::new(
                  grpcio::RpcStatusCode::InvalidArgument,
                  Some(format!(
                    "Size was incorrect: resource name said size={} but got {}",
                    size,
                    bytes.len()
                  )),
                ));
              }

              if always_errors {
                return Err(grpcio::RpcStatus::new(
                  grpcio::RpcStatusCode::Internal,
                  Some("StubCAS is configured to always fail".to_owned()),
                ));
              }

              {
                let mut blobs = blobs.lock();
                blobs.insert(fingerprint, bytes);
              }

              let mut response = bazel_protos::bytestream::WriteResponse::new();
              response.set_committed_size(size as i64);
              Ok(response)
            }
          },
        )
        .then(move |result| match result {
          Ok(resp) => sink.success(resp),
          Err(err) => sink.fail(err),
        })
        .then(move |_| Ok(())),
    );
  }