in src/rust/engine/fs/fs_util/src/main.rs [255:541]
fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
let store_dir = top_match
.value_of("local-store-path")
.map(PathBuf::from)
.unwrap_or_else(Store::default_path);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let (store, store_has_remote) = {
let (store_result, store_has_remote) = match top_match.values_of("server-address") {
Some(cas_address) => {
let chunk_size =
value_t!(top_match.value_of("chunk-bytes"), usize).expect("Bad chunk-bytes flag");
let root_ca_certs = if let Some(path) = top_match.value_of("root-ca-cert-file") {
Some(
std::fs::read(path)
.map_err(|err| format!("Error reading root CA certs file {}: {}", path, err))?,
)
} else {
None
};
let oauth_bearer_token =
if let Some(path) = top_match.value_of("oauth-bearer-token-file") {
Some(std::fs::read_to_string(path).map_err(|err| {
format!("Error reading oauth bearer token from {:?}: {}", path, err)
})?)
} else {
None
};
// Randomize CAS address order to avoid thundering herds from common config.
let mut cas_addresses = cas_address.map(str::to_owned).collect::<Vec<_>>();
cas_addresses.shuffle(&mut rand::thread_rng());
(
Store::with_remote(
&store_dir,
&cas_addresses,
top_match
.value_of("remote-instance-name")
.map(str::to_owned),
&root_ca_certs,
oauth_bearer_token,
value_t!(top_match.value_of("thread-count"), usize).expect("Invalid thread count"),
chunk_size,
// This deadline is really only in place because otherwise DNS failures
// leave this hanging forever.
//
// Make fs_util have a very long deadline (because it's not configurable,
// like it is inside pants) until we switch to Tower (where we can more
// carefully control specific components of timeouts).
//
// See https://github.com/pantsbuild/pants/pull/6433 for more context.
Duration::from_secs(30 * 60),
// TODO: Take a command line arg.
fs::BackoffConfig::new(
std::time::Duration::from_secs(1),
1.2,
std::time::Duration::from_secs(20),
)?,
value_t!(top_match.value_of("rpc-attempts"), usize).expect("Bad rpc-attempts flag"),
futures_timer::TimerHandle::default(),
),
true,
)
}
None => (Store::local_only(&store_dir), false),
};
let store = store_result.map_err(|e| {
format!(
"Failed to open/create store for directory {:?}: {}",
store_dir, e
)
})?;
(store, store_has_remote)
};
match top_match.subcommand() {
("file", Some(sub_match)) => {
match sub_match.subcommand() {
("cat", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let digest = Digest(fingerprint, size_bytes);
let write_result = store
.load_file_bytes_with(digest, |bytes| io::stdout().write_all(&bytes).unwrap())
.wait()?;
write_result.ok_or_else(|| {
ExitError(
format!("File with digest {:?} not found", digest),
ExitCode::NotFound,
)
})
}
("save", Some(args)) => {
let path = PathBuf::from(args.value_of("path").unwrap());
// Canonicalize path to guarantee that a relative path has a parent.
let posix_fs = make_posix_fs(
path
.canonicalize()
.map_err(|e| format!("Error canonicalizing path {:?}: {:?}", path, e))?
.parent()
.ok_or_else(|| format!("File being saved must have parent but {:?} did not", path))?,
);
let file = runtime
.block_on(posix_fs.stat(PathBuf::from(path.file_name().unwrap())))
.unwrap();
match file {
fs::Stat::File(f) => {
let digest = fs::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
.store_by_digest(f)
.wait()
.unwrap();
let report = ensure_uploaded_to_remote(&store, store_has_remote, digest);
print_upload_summary(args.value_of("output-mode"), &report);
Ok(())
}
o => Err(
format!(
"Tried to save file {:?} but it was not a file, was a {:?}",
path, o
)
.into(),
),
}
}
(_, _) => unimplemented!(),
}
}
("directory", Some(sub_match)) => match sub_match.subcommand() {
("materialize", Some(args)) => {
let destination = PathBuf::from(args.value_of("destination").unwrap());
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let digest = Digest(fingerprint, size_bytes);
store
.materialize_directory(destination, digest)
.wait()
.map_err(|err| {
if err.contains("not found") {
ExitError(err, ExitCode::NotFound)
} else {
err.into()
}
})
}
("save", Some(args)) => {
let posix_fs = Arc::new(make_posix_fs(args.value_of("root").unwrap()));
let store_copy = store.clone();
let digest = posix_fs
.expand(fs::PathGlobs::create(
&args
.values_of("globs")
.unwrap()
.map(str::to_string)
.collect::<Vec<String>>(),
&[],
// By using `Ignore`, we assume all elements of the globs will definitely expand to
// something here, or we don't care. Is that a valid assumption?
fs::StrictGlobMatching::Ignore,
fs::GlobExpansionConjunction::AllMatch,
)?)
.map_err(|e| format!("Error expanding globs: {:?}", e))
.and_then(move |paths| {
Snapshot::from_path_stats(
store_copy.clone(),
&fs::OneOffStoreFileByDigest::new(store_copy, posix_fs),
paths,
)
})
.map(|snapshot| snapshot.digest)
.wait()?;
let report = ensure_uploaded_to_remote(&store, store_has_remote, digest);
print_upload_summary(args.value_of("output-mode"), &report);
Ok(())
}
("cat-proto", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let digest = Digest(fingerprint, size_bytes);
let proto_bytes = match args.value_of("output-format").unwrap() {
"binary" => store
.load_directory(digest)
.wait()
.map(|maybe_d| maybe_d.map(|d| d.write_to_bytes().unwrap())),
"text" => store
.load_directory(digest)
.wait()
.map(|maybe_p| maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec())),
"recursive-file-list" => expand_files(store, digest).map(|maybe_v| {
maybe_v
.map(|v| {
v.into_iter()
.map(|(name, _digest)| format!("{}\n", name))
.collect::<Vec<String>>()
.join("")
})
.map(String::into_bytes)
}),
"recursive-file-list-with-digests" => expand_files(store, digest).map(|maybe_v| {
maybe_v
.map(|v| {
v.into_iter()
.map(|(name, digest)| format!("{} {} {}\n", name, digest.0, digest.1))
.collect::<Vec<String>>()
.join("")
})
.map(String::into_bytes)
}),
format => Err(format!(
"Unexpected value of --output-format arg: {}",
format
)),
}?;
match proto_bytes {
Some(bytes) => {
io::stdout().write_all(&bytes).unwrap();
Ok(())
}
None => Err(ExitError(
format!("Directory with digest {:?} not found", digest),
ExitCode::NotFound,
)),
}
}
(_, _) => unimplemented!(),
},
("cat", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
.unwrap()
.parse::<usize>()
.expect("size_bytes must be a non-negative number");
let digest = Digest(fingerprint, size_bytes);
let v = match store.load_file_bytes_with(digest, |bytes| bytes).wait()? {
None => store
.load_directory(digest)
.map(|maybe_dir| {
maybe_dir.map(|dir| {
Bytes::from(
dir
.write_to_bytes()
.expect("Error serializing Directory proto"),
)
})
})
.wait()?,
some => some,
};
match v {
Some(bytes) => {
io::stdout().write_all(&bytes).unwrap();
Ok(())
}
None => Err(ExitError(
format!("Digest {:?} not found", digest),
ExitCode::NotFound,
)),
}
}
("gc", Some(args)) => {
let target_size_bytes = value_t!(args.value_of("target-size-bytes"), usize)
.expect("--target-size-bytes must be passed as a non-negative integer");
store.garbage_collect(target_size_bytes, fs::ShrinkBehavior::Compact)?;
Ok(())
}
(_, _) => unimplemented!(),
}
}