in src/rust/engine/testutil/mock/src/cas.rs [328:458]
fn write(
&self,
ctx: grpcio::RpcContext<'_>,
stream: grpcio::RequestStream<bazel_protos::bytestream::WriteRequest>,
sink: grpcio::ClientStreamingSink<bazel_protos::bytestream::WriteResponse>,
) {
check_auth!(self, ctx, sink);
let always_errors = self.always_errors;
let write_message_sizes = self.write_message_sizes.clone();
let blobs = self.blobs.clone();
let instance_name = self.instance_name();
ctx.spawn(
stream
.collect()
.into_future()
.and_then(move |reqs| {
let mut maybe_resource_name = None;
let mut want_next_offset = 0;
let mut bytes = Bytes::new();
for req in reqs {
match maybe_resource_name {
None => maybe_resource_name = Some(req.get_resource_name().to_owned()),
Some(ref resource_name) => {
if resource_name != req.get_resource_name() {
return Err(grpcio::Error::RpcFailure(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!(
"All resource names in stream must be the same. Got {} but earlier saw {}",
req.get_resource_name(),
resource_name
)),
)));
}
}
}
if req.get_write_offset() != want_next_offset {
return Err(grpcio::Error::RpcFailure(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!(
"Missing chunk. Expected next offset {}, got next offset: {}",
want_next_offset,
req.get_write_offset()
)),
)));
}
want_next_offset += req.get_data().len() as i64;
write_message_sizes.lock().push(req.get_data().len());
bytes.extend(req.get_data());
}
Ok((maybe_resource_name, bytes))
})
.map_err(move |err: grpcio::Error| match err {
grpcio::Error::RpcFailure(status) => status,
e => grpcio::RpcStatus::new(grpcio::RpcStatusCode::Unknown, Some(format!("{:?}", e))),
})
.and_then(
move |(maybe_resource_name, bytes)| match maybe_resource_name {
None => Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some("Stream saw no messages".to_owned()),
)),
Some(resource_name) => {
let parts: Vec<_> = resource_name.splitn(6, '/').collect();
if parts.len() != 6
|| parts.get(0) != Some(&instance_name.as_ref())
|| parts.get(1) != Some(&"uploads")
|| parts.get(3) != Some(&"blobs")
{
return Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!("Bad resource name: {}", resource_name)),
));
}
let fingerprint = match Fingerprint::from_hex_string(parts[4]) {
Ok(f) => f,
Err(err) => {
return Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!(
"Bad fingerprint in resource name: {}: {}",
parts[4], err
)),
));
}
};
let size = match parts[5].parse::<usize>() {
Ok(s) => s,
Err(err) => {
return Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!("Bad size in resource name: {}: {}", parts[5], err)),
));
}
};
if size != bytes.len() {
return Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::InvalidArgument,
Some(format!(
"Size was incorrect: resource name said size={} but got {}",
size,
bytes.len()
)),
));
}
if always_errors {
return Err(grpcio::RpcStatus::new(
grpcio::RpcStatusCode::Internal,
Some("StubCAS is configured to always fail".to_owned()),
));
}
{
let mut blobs = blobs.lock();
blobs.insert(fingerprint, bytes);
}
let mut response = bazel_protos::bytestream::WriteResponse::new();
response.set_committed_size(size as i64);
Ok(response)
}
},
)
.then(move |result| match result {
Ok(resp) => sink.success(resp),
Err(err) => sink.fail(err),
})
.then(move |_| Ok(())),
);
}