in src/proxy/momento/src/main.rs [106:232]
fn main() -> Result<(), Box<dyn std::error::Error>> {
// custom panic hook to terminate whole process after unwinding
std::panic::set_hook(Box::new(|s| {
error!("{}", s);
println!("{:?}", Backtrace::new());
std::process::exit(101);
}));
// parse command line options
let matches = App::new(env!("CARGO_BIN_NAME"))
.version(env!("CARGO_PKG_VERSION"))
.version_short("v")
.long_about(
"A proxy that supports a limited subset of the Memcache protocol on
the client side and communicates with Momento over gRPC to fulfill
the requests.
This allows use of the Momento cache offering without code changes
for existing software which uses Memcached.
The supported commands are limited to: get/set",
)
.arg(
Arg::with_name("stats")
.short("s")
.long("stats")
.help("List all metrics in stats")
.takes_value(false),
)
.arg(
Arg::with_name("CONFIG")
.help("Server configuration file")
.index(1),
)
.get_matches();
// load config from file
let config = if let Some(file) = matches.value_of("CONFIG") {
match MomentoProxyConfig::load(file) {
Ok(c) => c,
Err(e) => {
println!("{}", e);
std::process::exit(1);
}
}
} else {
Default::default()
};
// initialize logging
let mut log_drain = configure_logging(&config);
// validate config parameters
for cache in config.caches() {
let name = cache.cache_name();
let ttl = cache.default_ttl();
let limit = u64::MAX / 1000;
if ttl.get() > limit {
error!("default ttl of {ttl} for cache `{name}` is greater than {limit}");
let _ = log_drain.flush();
std::process::exit(1);
}
if let Err(e) = cache.socket_addr() {
error!("listen address for cache `{name}` is not valid: {}", e);
let _ = log_drain.flush();
std::process::exit(1);
}
}
// initialize metrics
common::metrics::init();
// output stats descriptions and exit if the `stats` option was provided
if matches.is_present("stats") {
println!("{:<31} {:<15} DESCRIPTION", "NAME", "TYPE");
let mut metrics = Vec::new();
for metric in &rustcommon_metrics::metrics() {
let any = match metric.as_any() {
Some(any) => any,
None => {
continue;
}
};
if any.downcast_ref::<Counter>().is_some() {
metrics.push(format!("{:<31} counter", metric.name()));
} else if any.downcast_ref::<Gauge>().is_some() {
metrics.push(format!("{:<31} gauge", metric.name()));
} else if any.downcast_ref::<Heatmap>().is_some() {
for (label, _) in PERCENTILES {
let name = format!("{}_{}", metric.name(), label);
metrics.push(format!("{:<31} percentile", name));
}
} else {
continue;
}
}
metrics.sort();
for metric in metrics {
println!("{}", metric);
}
std::process::exit(0);
}
let mut runtime = Builder::new_multi_thread();
runtime.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("pelikan_wrk_{}", id)
});
if let Some(threads) = config.threads() {
runtime.worker_threads(threads);
}
let runtime = runtime
.enable_all()
.build()
.expect("failed to launch tokio runtime");
runtime.block_on(async move { spawn(config, log_drain).await })
}