src/exposition/kafka.rs (38 lines of code) (raw):

// Copyright 2019 Twitter, Inc. // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::*; use kafka::producer::{Producer, Record}; use crate::config::Config; use crate::exposition::MetricsSnapshot; pub struct KafkaProducer { snapshot: MetricsSnapshot, producer: Producer, topic: String, interval: Duration, } impl KafkaProducer { pub fn new(config: Arc<Config>, metrics: Arc<Metrics>) -> Self { Self { snapshot: MetricsSnapshot::new(metrics, config.general().reading_suffix()), producer: Producer::from_hosts(config.exposition().kafka().hosts()) .create() .unwrap(), topic: config.exposition().kafka().topic().unwrap(), interval: Duration::from_millis( config.exposition().kafka().interval().try_into().unwrap(), ), } } pub fn run(&mut self) { let start = Instant::now(); self.snapshot.refresh(); let _ = self .producer .send(&Record::from_value(&self.topic, self.snapshot.json(false))); let stop = Instant::now(); if start + self.interval > stop { std::thread::sleep(self.interval - (stop - start)); } } }