fn execute()

in src/rust/engine/fs/fs_util/src/main.rs [255:541]


fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
  let store_dir = top_match
    .value_of("local-store-path")
    .map(PathBuf::from)
    .unwrap_or_else(Store::default_path);
  let mut runtime = tokio::runtime::Runtime::new().unwrap();
  let (store, store_has_remote) = {
    let (store_result, store_has_remote) = match top_match.values_of("server-address") {
      Some(cas_address) => {
        let chunk_size =
          value_t!(top_match.value_of("chunk-bytes"), usize).expect("Bad chunk-bytes flag");

        let root_ca_certs = if let Some(path) = top_match.value_of("root-ca-cert-file") {
          Some(
            std::fs::read(path)
              .map_err(|err| format!("Error reading root CA certs file {}: {}", path, err))?,
          )
        } else {
          None
        };

        let oauth_bearer_token =
          if let Some(path) = top_match.value_of("oauth-bearer-token-file") {
            Some(std::fs::read_to_string(path).map_err(|err| {
              format!("Error reading oauth bearer token from {:?}: {}", path, err)
            })?)
          } else {
            None
          };

        // Randomize CAS address order to avoid thundering herds from common config.
        let mut cas_addresses = cas_address.map(str::to_owned).collect::<Vec<_>>();
        cas_addresses.shuffle(&mut rand::thread_rng());

        (
          Store::with_remote(
            &store_dir,
            &cas_addresses,
            top_match
              .value_of("remote-instance-name")
              .map(str::to_owned),
            &root_ca_certs,
            oauth_bearer_token,
            value_t!(top_match.value_of("thread-count"), usize).expect("Invalid thread count"),
            chunk_size,
            // This deadline is really only in place because otherwise DNS failures
            // leave this hanging forever.
            //
            // Make fs_util have a very long deadline (because it's not configurable,
            // like it is inside pants) until we switch to Tower (where we can more
            // carefully control specific components of timeouts).
            //
            // See https://github.com/pantsbuild/pants/pull/6433 for more context.
            Duration::from_secs(30 * 60),
            // TODO: Take a command line arg.
            fs::BackoffConfig::new(
              std::time::Duration::from_secs(1),
              1.2,
              std::time::Duration::from_secs(20),
            )?,
            value_t!(top_match.value_of("rpc-attempts"), usize).expect("Bad rpc-attempts flag"),
            futures_timer::TimerHandle::default(),
          ),
          true,
        )
      }
      None => (Store::local_only(&store_dir), false),
    };
    let store = store_result.map_err(|e| {
      format!(
        "Failed to open/create store for directory {:?}: {}",
        store_dir, e
      )
    })?;
    (store, store_has_remote)
  };

  match top_match.subcommand() {
    ("file", Some(sub_match)) => {
      match sub_match.subcommand() {
        ("cat", Some(args)) => {
          let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
          let size_bytes = args
            .value_of("size_bytes")
            .unwrap()
            .parse::<usize>()
            .expect("size_bytes must be a non-negative number");
          let digest = Digest(fingerprint, size_bytes);
          let write_result = store
            .load_file_bytes_with(digest, |bytes| io::stdout().write_all(&bytes).unwrap())
            .wait()?;
          write_result.ok_or_else(|| {
            ExitError(
              format!("File with digest {:?} not found", digest),
              ExitCode::NotFound,
            )
          })
        }
        ("save", Some(args)) => {
          let path = PathBuf::from(args.value_of("path").unwrap());
          // Canonicalize path to guarantee that a relative path has a parent.
          let posix_fs = make_posix_fs(
            path
              .canonicalize()
              .map_err(|e| format!("Error canonicalizing path {:?}: {:?}", path, e))?
              .parent()
              .ok_or_else(|| format!("File being saved must have parent but {:?} did not", path))?,
          );
          let file = runtime
            .block_on(posix_fs.stat(PathBuf::from(path.file_name().unwrap())))
            .unwrap();
          match file {
            fs::Stat::File(f) => {
              let digest = fs::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
                .store_by_digest(f)
                .wait()
                .unwrap();

              let report = ensure_uploaded_to_remote(&store, store_has_remote, digest);
              print_upload_summary(args.value_of("output-mode"), &report);

              Ok(())
            }
            o => Err(
              format!(
                "Tried to save file {:?} but it was not a file, was a {:?}",
                path, o
              )
              .into(),
            ),
          }
        }
        (_, _) => unimplemented!(),
      }
    }
    ("directory", Some(sub_match)) => match sub_match.subcommand() {
      ("materialize", Some(args)) => {
        let destination = PathBuf::from(args.value_of("destination").unwrap());
        let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
        let size_bytes = args
          .value_of("size_bytes")
          .unwrap()
          .parse::<usize>()
          .expect("size_bytes must be a non-negative number");
        let digest = Digest(fingerprint, size_bytes);
        store
          .materialize_directory(destination, digest)
          .wait()
          .map_err(|err| {
            if err.contains("not found") {
              ExitError(err, ExitCode::NotFound)
            } else {
              err.into()
            }
          })
      }
      ("save", Some(args)) => {
        let posix_fs = Arc::new(make_posix_fs(args.value_of("root").unwrap()));
        let store_copy = store.clone();
        let digest = posix_fs
          .expand(fs::PathGlobs::create(
            &args
              .values_of("globs")
              .unwrap()
              .map(str::to_string)
              .collect::<Vec<String>>(),
            &[],
            // By using `Ignore`, we assume all elements of the globs will definitely expand to
            // something here, or we don't care. Is that a valid assumption?
            fs::StrictGlobMatching::Ignore,
            fs::GlobExpansionConjunction::AllMatch,
          )?)
          .map_err(|e| format!("Error expanding globs: {:?}", e))
          .and_then(move |paths| {
            Snapshot::from_path_stats(
              store_copy.clone(),
              &fs::OneOffStoreFileByDigest::new(store_copy, posix_fs),
              paths,
            )
          })
          .map(|snapshot| snapshot.digest)
          .wait()?;

        let report = ensure_uploaded_to_remote(&store, store_has_remote, digest);
        print_upload_summary(args.value_of("output-mode"), &report);

        Ok(())
      }
      ("cat-proto", Some(args)) => {
        let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
        let size_bytes = args
          .value_of("size_bytes")
          .unwrap()
          .parse::<usize>()
          .expect("size_bytes must be a non-negative number");
        let digest = Digest(fingerprint, size_bytes);
        let proto_bytes = match args.value_of("output-format").unwrap() {
          "binary" => store
            .load_directory(digest)
            .wait()
            .map(|maybe_d| maybe_d.map(|d| d.write_to_bytes().unwrap())),
          "text" => store
            .load_directory(digest)
            .wait()
            .map(|maybe_p| maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec())),
          "recursive-file-list" => expand_files(store, digest).map(|maybe_v| {
            maybe_v
              .map(|v| {
                v.into_iter()
                  .map(|(name, _digest)| format!("{}\n", name))
                  .collect::<Vec<String>>()
                  .join("")
              })
              .map(String::into_bytes)
          }),
          "recursive-file-list-with-digests" => expand_files(store, digest).map(|maybe_v| {
            maybe_v
              .map(|v| {
                v.into_iter()
                  .map(|(name, digest)| format!("{} {} {}\n", name, digest.0, digest.1))
                  .collect::<Vec<String>>()
                  .join("")
              })
              .map(String::into_bytes)
          }),
          format => Err(format!(
            "Unexpected value of --output-format arg: {}",
            format
          )),
        }?;
        match proto_bytes {
          Some(bytes) => {
            io::stdout().write_all(&bytes).unwrap();
            Ok(())
          }
          None => Err(ExitError(
            format!("Directory with digest {:?} not found", digest),
            ExitCode::NotFound,
          )),
        }
      }
      (_, _) => unimplemented!(),
    },
    ("cat", Some(args)) => {
      let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
      let size_bytes = args
        .value_of("size_bytes")
        .unwrap()
        .parse::<usize>()
        .expect("size_bytes must be a non-negative number");
      let digest = Digest(fingerprint, size_bytes);
      let v = match store.load_file_bytes_with(digest, |bytes| bytes).wait()? {
        None => store
          .load_directory(digest)
          .map(|maybe_dir| {
            maybe_dir.map(|dir| {
              Bytes::from(
                dir
                  .write_to_bytes()
                  .expect("Error serializing Directory proto"),
              )
            })
          })
          .wait()?,
        some => some,
      };
      match v {
        Some(bytes) => {
          io::stdout().write_all(&bytes).unwrap();
          Ok(())
        }
        None => Err(ExitError(
          format!("Digest {:?} not found", digest),
          ExitCode::NotFound,
        )),
      }
    }
    ("gc", Some(args)) => {
      let target_size_bytes = value_t!(args.value_of("target-size-bytes"), usize)
        .expect("--target-size-bytes must be passed as a non-negative integer");
      store.garbage_collect(target_size_bytes, fs::ShrinkBehavior::Compact)?;
      Ok(())
    }

    (_, _) => unimplemented!(),
  }
}