in src/replay.rs [39:228]
fn main() {
// initialize logging
let log = LogBuilder::new()
.output(Box::new(Stdout::new()))
.log_queue_depth(1024)
.single_message_size(4096)
.build()
.expect("failed to initialize log");
let log = MultiLogBuilder::new()
.level_filter(LevelFilter::Info)
.default(log)
.build()
.start();
// process command line arguments
// TODO(bmartin): consider moving to a file based config
let matches = App::new("rpc-replay")
.version("0.0.0")
.author("Brian Martin <bmartin@twitter.com>")
.about("Replay cache logs")
.arg(
Arg::with_name("trace")
.long("trace")
.value_name("FILE")
.help("zstd compressed cache trace")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("binary-trace")
.long("binary-trace")
.help("indicates the trace is in the binary format")
.takes_value(false)
.required(false),
)
.arg(
Arg::with_name("endpoint")
.long("endpoint")
.value_name("HOST:PORT")
.help("server endpoint to send traffic to")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("speed")
.long("speed")
.value_name("FLOAT")
.help("replay speed as a multiplier relative to realtime")
.takes_value(true),
)
.arg(
Arg::with_name("rate")
.long("rate")
.value_name("INT")
.help("replay speed in requests/s")
.takes_value(true),
)
.arg(
Arg::with_name("poolsize")
.long("poolsize")
.value_name("INT")
.help("number of connections to open from each worker")
.takes_value(true),
)
.arg(
Arg::with_name("workers")
.long("workers")
.value_name("INT")
.help("number of client worker threads")
.takes_value(true),
)
.arg(
Arg::with_name("tls-chain")
.long("tls-chain")
.value_name("FILE")
.help("TLS certificate chain")
.takes_value(true),
)
.arg(
Arg::with_name("tls-key")
.long("tls-key")
.value_name("FILE")
.help("TLS private key")
.takes_value(true),
)
.arg(
Arg::with_name("tls-cert")
.long("tls-cert")
.value_name("FILE")
.help("TLS certificate")
.takes_value(true),
)
.get_matches();
if matches.is_present("speed") && matches.is_present("rate") {
fatal!("invalid configuration: 'speed' and 'rate' cannot be used together");
}
// config value parsing and defaults
let trace = matches.value_of("trace").unwrap();
let endpoint = matches.value_of("endpoint").unwrap();
let speed: Option<f64> = matches
.value_of("speed")
.map(|v| v.parse().expect("invalid value for 'speed'"));
let rate: Option<usize> = matches
.value_of("rate")
.map(|v| v.parse().expect("invalid value for 'rate'"));
let poolsize: usize = matches
.value_of("poolsize")
.unwrap_or("1")
.parse()
.expect("invalid value for 'poolsize'");
let workers: usize = matches
.value_of("workers")
.unwrap_or("1")
.parse()
.expect("invalid value for 'workers'");
let binary = matches.is_present("binary-trace");
// configure tls connector
let key = matches.value_of("tls-key");
let cert = matches.value_of("tls-cert");
let chain = matches.value_of("tls-chain");
let tls = if key.is_some() && cert.is_some() && chain.is_some() {
let mut builder = SslConnector::builder(SslMethod::tls_client())
.expect("failed to initialize TLS client");
builder.set_verify(SslVerifyMode::NONE);
builder
.set_certificate_file(cert.unwrap(), SslFiletype::PEM)
.expect("failed to set TLS cert");
let pem = std::fs::read(chain.unwrap()).expect("failed to read certificate chain");
let chain = X509::stack_from_pem(&pem).expect("bad certificate chain");
for cert in chain {
builder
.add_extra_chain_cert(cert)
.expect("bad certificate in chain");
}
builder
.set_private_key_file(key.unwrap(), SslFiletype::PEM)
.expect("failed to set TLS key");
let connector = builder.build();
Some(connector)
} else if key.is_none() && cert.is_none() && chain.is_none() {
None
} else {
fatal!("incomplete TLS config");
};
// lookup socket address
let sockaddr = endpoint.to_socket_addrs().unwrap().next().unwrap();
// initialize work queue
let work = Queue::with_capacity(1024 * 1024); // arbitrarily large
let request_heatmap = Some(Arc::new(AtomicHeatmap::<u64, AtomicU64>::new(
1_000_000,
3,
Duration::from_secs(60),
Duration::from_millis(1000),
)));
// spawn admin
let mut admin = Admin::for_replay(None, log);
admin.set_request_heatmap(request_heatmap.clone());
let _admin_thread = std::thread::spawn(move || admin.run());
// spawn workers
for _ in 0..workers {
let mut worker = Worker::new(
sockaddr,
poolsize,
tls.clone(),
work.clone(),
request_heatmap.clone(),
);
std::thread::spawn(move || worker.run());
}
let controller: Box<dyn Controller> = if let Some(rate) = rate {
Box::new(RateController::new(rate as u64, workers as u64))
} else {
let speed = speed.unwrap_or(1.0);
Box::new(SpeedController::new(speed))
};
let mut generator = Generator::new(trace, work, binary, controller);
generator.run()
}