src/samplers/interrupt/mod.rs (292 lines of code) (raw):
// Copyright 2019 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
use std::collections::HashMap;
#[cfg(feature = "bpf")]
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use crate::common::bpf::*;
use crate::config::SamplerConfig;
use crate::samplers::Common;
use crate::*;
mod config;
mod stat;
pub use config::*;
pub use stat::*;
#[allow(dead_code)]
pub struct Interrupt {
bpf: Option<Arc<Mutex<BPF>>>,
bpf_last: Arc<Mutex<Instant>>,
common: Common,
proc_interrupts: Option<File>,
statistics: Vec<InterruptStatistic>,
}
#[async_trait]
impl Sampler for Interrupt {
type Statistic = InterruptStatistic;
fn new(common: Common) -> Result<Self, anyhow::Error> {
let fault_tolerant = common.config.general().fault_tolerant();
let statistics = common.config().samplers().interrupt().statistics();
#[allow(unused_mut)]
let mut sampler = Self {
bpf: None,
bpf_last: Arc::new(Mutex::new(Instant::now())),
common,
proc_interrupts: None,
statistics,
};
if let Err(e) = sampler.initialize_bpf() {
error!("failed to initialize bpf: {}", e);
if !fault_tolerant {
return Err(e);
}
}
if sampler.sampler_config().enabled() {
sampler.register();
}
Ok(sampler)
}
fn spawn(common: Common) {
if common.config().samplers().interrupt().enabled() {
if let Ok(mut interrupt) = Interrupt::new(common.clone()) {
common.runtime().spawn(async move {
loop {
let _ = interrupt.sample().await;
}
});
} else if !common.config.fault_tolerant() {
fatal!("failed to initialize interrupt sampler");
} else {
error!("failed to initialize interrupt sampler");
}
}
}
fn common(&self) -> &Common {
&self.common
}
fn common_mut(&mut self) -> &mut Common {
&mut self.common
}
fn sampler_config(&self) -> &dyn SamplerConfig<Statistic = Self::Statistic> {
self.common.config().samplers().interrupt()
}
async fn sample(&mut self) -> Result<(), std::io::Error> {
if let Some(ref mut delay) = self.delay() {
delay.tick().await;
}
if !self.sampler_config().enabled() {
return Ok(());
}
debug!("sampling");
self.sample_interrupt().await?;
#[cfg(feature = "bpf")]
self.map_result(self.sample_bpf())?;
Ok(())
}
}
impl Interrupt {
#[cfg(feature = "bpf")]
fn bpf_enabled(&self) -> bool {
if self.sampler_config().bpf() {
for statistic in &self.statistics {
if statistic.bpf_table().is_some() {
return true;
}
}
}
false
}
fn initialize_bpf(&mut self) -> Result<(), anyhow::Error> {
#[cfg(feature = "bpf")]
{
if self.enabled() && self.bpf_enabled() {
debug!("initializing bpf");
let code = include_str!("bpf.c");
let code = code.replace(
"VALUE_TO_INDEX2_FUNC",
include_str!("../../common/value_to_index2.c"),
);
let mut bpf = bcc::BPF::new(&code)?;
// collect the set of probes required from the statistics enabled.
let mut probes = HashSet::new();
for statistic in &self.statistics {
for probe in statistic.bpf_probes_required() {
probes.insert(probe);
}
}
// load + attach the kernel probes that are required to the bpf instance.
for probe in probes {
if self.common.config.fault_tolerant() {
if let Err(e) = probe.try_attach_to_bpf(&mut bpf) {
warn!("skipping {} with error: {}", probe.name, e);
}
} else {
probe.try_attach_to_bpf(&mut bpf)?;
}
}
self.bpf = Some(Arc::new(Mutex::new(BPF { inner: bpf })))
}
}
Ok(())
}
async fn sample_interrupt(&mut self) -> Result<(), std::io::Error> {
if self.proc_interrupts.is_none() {
let file = File::open("/proc/interrupts").await?;
self.proc_interrupts = Some(file);
}
let mut result = HashMap::<InterruptStatistic, u64>::new();
let mut cores: Option<usize> = None;
if let Some(file) = &mut self.proc_interrupts {
file.seek(SeekFrom::Start(0)).await?;
let mut reader = BufReader::new(file);
let mut line = String::new();
loop {
line.clear();
if reader.read_line(&mut line).await? == 0 {
break;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if cores.is_none() {
cores = Some(parts.len());
continue;
}
let mut sum = 0;
let mut node0 = 0;
let mut node1 = 0;
let cores = cores.unwrap();
for i in 0..cores {
let count = parts.get(i + 1).unwrap_or(&"0").parse().unwrap_or(0);
sum += count;
let node = self.common.hardware_info().get_numa(i as u64).unwrap_or(0);
match node {
0 => node0 += count,
1 => node1 += count,
_ => {}
}
}
let stat = match parts.get(0) {
Some(&"NMI:") => InterruptStatistic::NonMaskable,
Some(&"LOC:") => InterruptStatistic::LocalTimer,
Some(&"SPU:") => InterruptStatistic::Spurious,
Some(&"PMI:") => InterruptStatistic::PerformanceMonitoring,
Some(&"RES:") => InterruptStatistic::Rescheduling,
Some(&"TLB:") => InterruptStatistic::TlbShootdowns,
Some(&"TRM:") => InterruptStatistic::ThermalEvent,
Some(&"MCE:") => InterruptStatistic::MachineCheckException,
_ => match parts.last() {
Some(&"timer") => InterruptStatistic::Timer,
Some(&"rtc0") => InterruptStatistic::RealTimeClock,
Some(&"vmd") => {
if let Some(previous) = result.get_mut(&InterruptStatistic::Node0Nvme) {
*previous += node0;
} else {
result.insert(InterruptStatistic::Node0Nvme, node0);
}
if let Some(previous) = result.get_mut(&InterruptStatistic::Node1Nvme) {
*previous += node1;
} else {
result.insert(InterruptStatistic::Node1Nvme, node1);
}
InterruptStatistic::Nvme
}
Some(label) => {
if label.starts_with("mlx")
|| label.starts_with("eth")
|| label.starts_with("enp")
{
if let Some(previous) =
result.get_mut(&InterruptStatistic::Node0Network)
{
*previous += node0;
} else {
result.insert(InterruptStatistic::Node0Network, node0);
}
if let Some(previous) =
result.get_mut(&InterruptStatistic::Node1Network)
{
*previous += node1;
} else {
result.insert(InterruptStatistic::Node1Network, node1);
}
InterruptStatistic::Network
} else if label.starts_with("nvme") {
if let Some(previous) =
result.get_mut(&InterruptStatistic::Node0Nvme)
{
*previous += node0;
} else {
result.insert(InterruptStatistic::Node0Nvme, node0);
}
if let Some(previous) =
result.get_mut(&InterruptStatistic::Node1Nvme)
{
*previous += node1;
} else {
result.insert(InterruptStatistic::Node1Nvme, node1);
}
InterruptStatistic::Nvme
} else {
continue;
}
}
None => {
continue;
}
},
};
if let Some(previous) = result.get_mut(&stat) {
*previous += sum;
} else {
result.insert(stat, sum);
}
if let Some(previous) = result.get_mut(&InterruptStatistic::Total) {
*previous += sum;
} else {
result.insert(InterruptStatistic::Total, sum);
}
if let Some(previous) = result.get_mut(&InterruptStatistic::Node0Total) {
*previous += node0;
} else {
result.insert(InterruptStatistic::Node0Total, node0);
}
if let Some(previous) = result.get_mut(&InterruptStatistic::Node1Total) {
*previous += node1;
} else {
result.insert(InterruptStatistic::Node1Total, node1);
}
}
}
let time = Instant::now();
for stat in &self.statistics {
if let Some(value) = result.get(stat) {
let _ = self.metrics().record_counter(stat, time, *value);
}
}
Ok(())
}
#[cfg(feature = "bpf")]
fn sample_bpf(&self) -> Result<(), std::io::Error> {
if self.bpf_last.lock().unwrap().elapsed()
>= Duration::from_secs(self.general_config().window() as u64)
{
if let Some(ref bpf) = self.bpf {
let bpf = bpf.lock().unwrap();
let time = Instant::now();
for statistic in self.statistics.iter().filter(|s| s.bpf_table().is_some()) {
if let Ok(mut table) = (*bpf).inner.table(statistic.bpf_table().unwrap()) {
for (&value, &count) in &map_from_table(&mut table) {
if count > 0 {
let _ = self.metrics().record_bucket(
statistic,
time,
value * crate::MICROSECOND,
count,
);
}
}
}
}
}
*self.bpf_last.lock().unwrap() = Instant::now();
}
Ok(())
}
}