fn main()

in src/proxy/momento/src/main.rs [106:232]


fn main() -> Result<(), Box<dyn std::error::Error>> {
    // custom panic hook to terminate whole process after unwinding
    std::panic::set_hook(Box::new(|s| {
        error!("{}", s);
        println!("{:?}", Backtrace::new());
        std::process::exit(101);
    }));

    // parse command line options
    let matches = App::new(env!("CARGO_BIN_NAME"))
        .version(env!("CARGO_PKG_VERSION"))
        .version_short("v")
        .long_about(
            "A proxy that supports a limited subset of the Memcache protocol on
            the client side and communicates with Momento over gRPC to fulfill
            the requests.

            This allows use of the Momento cache offering without code changes
            for existing software which uses Memcached.

            The supported commands are limited to: get/set",
        )
        .arg(
            Arg::with_name("stats")
                .short("s")
                .long("stats")
                .help("List all metrics in stats")
                .takes_value(false),
        )
        .arg(
            Arg::with_name("CONFIG")
                .help("Server configuration file")
                .index(1),
        )
        .get_matches();

    // load config from file
    let config = if let Some(file) = matches.value_of("CONFIG") {
        match MomentoProxyConfig::load(file) {
            Ok(c) => c,
            Err(e) => {
                println!("{}", e);
                std::process::exit(1);
            }
        }
    } else {
        Default::default()
    };

    // initialize logging
    let mut log_drain = configure_logging(&config);

    // validate config parameters
    for cache in config.caches() {
        let name = cache.cache_name();
        let ttl = cache.default_ttl();
        let limit = u64::MAX / 1000;
        if ttl.get() > limit {
            error!("default ttl of {ttl} for cache `{name}` is greater than {limit}");
            let _ = log_drain.flush();
            std::process::exit(1);
        }

        if let Err(e) = cache.socket_addr() {
            error!("listen address for cache `{name}` is not valid: {}", e);
            let _ = log_drain.flush();
            std::process::exit(1);
        }
    }

    // initialize metrics
    common::metrics::init();

    // output stats descriptions and exit if the `stats` option was provided
    if matches.is_present("stats") {
        println!("{:<31} {:<15} DESCRIPTION", "NAME", "TYPE");

        let mut metrics = Vec::new();

        for metric in &rustcommon_metrics::metrics() {
            let any = match metric.as_any() {
                Some(any) => any,
                None => {
                    continue;
                }
            };

            if any.downcast_ref::<Counter>().is_some() {
                metrics.push(format!("{:<31} counter", metric.name()));
            } else if any.downcast_ref::<Gauge>().is_some() {
                metrics.push(format!("{:<31} gauge", metric.name()));
            } else if any.downcast_ref::<Heatmap>().is_some() {
                for (label, _) in PERCENTILES {
                    let name = format!("{}_{}", metric.name(), label);
                    metrics.push(format!("{:<31} percentile", name));
                }
            } else {
                continue;
            }
        }

        metrics.sort();
        for metric in metrics {
            println!("{}", metric);
        }
        std::process::exit(0);
    }

    let mut runtime = Builder::new_multi_thread();

    runtime.thread_name_fn(|| {
        static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
        let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
        format!("pelikan_wrk_{}", id)
    });

    if let Some(threads) = config.threads() {
        runtime.worker_threads(threads);
    }

    let runtime = runtime
        .enable_all()
        .build()
        .expect("failed to launch tokio runtime");

    runtime.block_on(async move { spawn(config, log_drain).await })
}