fn main()

in src/replay.rs [39:228]


fn main() {
    // initialize logging
    let log = LogBuilder::new()
        .output(Box::new(Stdout::new()))
        .log_queue_depth(1024)
        .single_message_size(4096)
        .build()
        .expect("failed to initialize log");

    let log = MultiLogBuilder::new()
        .level_filter(LevelFilter::Info)
        .default(log)
        .build()
        .start();

    // process command line arguments
    // TODO(bmartin): consider moving to a file based config
    let matches = App::new("rpc-replay")
        .version("0.0.0")
        .author("Brian Martin <bmartin@twitter.com>")
        .about("Replay cache logs")
        .arg(
            Arg::with_name("trace")
                .long("trace")
                .value_name("FILE")
                .help("zstd compressed cache trace")
                .takes_value(true)
                .required(true),
        )
        .arg(
            Arg::with_name("binary-trace")
                .long("binary-trace")
                .help("indicates the trace is in the binary format")
                .takes_value(false)
                .required(false),
        )
        .arg(
            Arg::with_name("endpoint")
                .long("endpoint")
                .value_name("HOST:PORT")
                .help("server endpoint to send traffic to")
                .takes_value(true)
                .required(true),
        )
        .arg(
            Arg::with_name("speed")
                .long("speed")
                .value_name("FLOAT")
                .help("replay speed as a multiplier relative to realtime")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("rate")
                .long("rate")
                .value_name("INT")
                .help("replay speed in requests/s")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("poolsize")
                .long("poolsize")
                .value_name("INT")
                .help("number of connections to open from each worker")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("workers")
                .long("workers")
                .value_name("INT")
                .help("number of client worker threads")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("tls-chain")
                .long("tls-chain")
                .value_name("FILE")
                .help("TLS certificate chain")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("tls-key")
                .long("tls-key")
                .value_name("FILE")
                .help("TLS private key")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("tls-cert")
                .long("tls-cert")
                .value_name("FILE")
                .help("TLS certificate")
                .takes_value(true),
        )
        .get_matches();

    if matches.is_present("speed") && matches.is_present("rate") {
        fatal!("invalid configuration: 'speed' and 'rate' cannot be used together");
    }

    // config value parsing and defaults
    let trace = matches.value_of("trace").unwrap();
    let endpoint = matches.value_of("endpoint").unwrap();
    let speed: Option<f64> = matches
        .value_of("speed")
        .map(|v| v.parse().expect("invalid value for 'speed'"));
    let rate: Option<usize> = matches
        .value_of("rate")
        .map(|v| v.parse().expect("invalid value for 'rate'"));
    let poolsize: usize = matches
        .value_of("poolsize")
        .unwrap_or("1")
        .parse()
        .expect("invalid value for 'poolsize'");
    let workers: usize = matches
        .value_of("workers")
        .unwrap_or("1")
        .parse()
        .expect("invalid value for 'workers'");
    let binary = matches.is_present("binary-trace");

    // configure tls connector
    let key = matches.value_of("tls-key");
    let cert = matches.value_of("tls-cert");
    let chain = matches.value_of("tls-chain");

    let tls = if key.is_some() && cert.is_some() && chain.is_some() {
        let mut builder = SslConnector::builder(SslMethod::tls_client())
            .expect("failed to initialize TLS client");
        builder.set_verify(SslVerifyMode::NONE);
        builder
            .set_certificate_file(cert.unwrap(), SslFiletype::PEM)
            .expect("failed to set TLS cert");
        let pem = std::fs::read(chain.unwrap()).expect("failed to read certificate chain");
        let chain = X509::stack_from_pem(&pem).expect("bad certificate chain");
        for cert in chain {
            builder
                .add_extra_chain_cert(cert)
                .expect("bad certificate in chain");
        }
        builder
            .set_private_key_file(key.unwrap(), SslFiletype::PEM)
            .expect("failed to set TLS key");
        let connector = builder.build();
        Some(connector)
    } else if key.is_none() && cert.is_none() && chain.is_none() {
        None
    } else {
        fatal!("incomplete TLS config");
    };

    // lookup socket address
    let sockaddr = endpoint.to_socket_addrs().unwrap().next().unwrap();

    // initialize work queue
    let work = Queue::with_capacity(1024 * 1024); // arbitrarily large

    let request_heatmap = Some(Arc::new(AtomicHeatmap::<u64, AtomicU64>::new(
        1_000_000,
        3,
        Duration::from_secs(60),
        Duration::from_millis(1000),
    )));

    // spawn admin
    let mut admin = Admin::for_replay(None, log);
    admin.set_request_heatmap(request_heatmap.clone());
    let _admin_thread = std::thread::spawn(move || admin.run());

    // spawn workers
    for _ in 0..workers {
        let mut worker = Worker::new(
            sockaddr,
            poolsize,
            tls.clone(),
            work.clone(),
            request_heatmap.clone(),
        );
        std::thread::spawn(move || worker.run());
    }

    let controller: Box<dyn Controller> = if let Some(rate) = rate {
        Box::new(RateController::new(rate as u64, workers as u64))
    } else {
        let speed = speed.unwrap_or(1.0);
        Box::new(SpeedController::new(speed))
    };

    let mut generator = Generator::new(trace, work, binary, controller);
    generator.run()
}