src/samplers/cpu/mod.rs (402 lines of code) (raw):

// Copyright 2019 Twitter, Inc. // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 use std::collections::{HashMap, HashSet}; use std::io::SeekFrom; use std::str::FromStr; use std::sync::{Arc, Mutex}; use async_trait::async_trait; #[cfg(feature = "bpf")] use bcc::perf_event::{Event, SoftwareEvent}; #[cfg(feature = "bpf")] use bcc::{PerfEvent, PerfEventArray}; use regex::Regex; use tokio::fs::File; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader}; use crate::common::bpf::BPF; use crate::common::*; use crate::config::SamplerConfig; use crate::samplers::Common; use crate::*; mod config; mod stat; pub use config::*; pub use stat::*; #[allow(dead_code)] pub struct Cpu { common: Common, cpus: HashSet<String>, cstates: HashMap<String, String>, cstate_files: HashMap<String, HashMap<String, File>>, perf: Option<Arc<Mutex<BPF>>>, tick_duration: u64, proc_cpuinfo: Option<File>, proc_stat: Option<File>, statistics: Vec<CpuStatistic>, } pub fn nanos_per_tick() -> u64 { let ticks_per_second = sysconf::raw::sysconf(sysconf::raw::SysconfVariable::ScClkTck) .expect("Failed to get Clock Ticks per Second") as u64; SECOND / ticks_per_second } #[async_trait] impl Sampler for Cpu { type Statistic = CpuStatistic; fn new(common: Common) -> Result<Self, anyhow::Error> { let statistics = common.config().samplers().cpu().statistics(); #[allow(unused_mut)] let mut sampler = Self { common, cpus: HashSet::new(), cstates: HashMap::new(), cstate_files: HashMap::new(), perf: None, tick_duration: nanos_per_tick(), proc_cpuinfo: None, proc_stat: None, statistics, }; if sampler.sampler_config().enabled() { sampler.register(); } // we initialize perf last so we can delay if sampler.sampler_config().enabled() && sampler.sampler_config().perf_events() { #[cfg(feature = "bpf")] { if let Err(e) = sampler.initialize_bpf_perf() { if !sampler.common().config().general().fault_tolerant() { return Err(format_err!("bpf perf init failure: {}", e)); } } } } // delay by half the sample interval so that we land between perf // counter updates std::thread::sleep(std::time::Duration::from_micros( (1000 * sampler.interval()) as u64 / 2, )); Ok(sampler) } fn spawn(common: Common) { if common.config().samplers().cpu().enabled() { if let Ok(mut cpu) = Cpu::new(common.clone()) { common.runtime().spawn(async move { loop { let _ = cpu.sample().await; } }); } else if !common.config.fault_tolerant() { fatal!("failed to initialize cpu sampler"); } else { error!("failed to initialize cpu sampler"); } } } fn common(&self) -> &Common { &self.common } fn common_mut(&mut self) -> &mut Common { &mut self.common } fn sampler_config(&self) -> &dyn SamplerConfig<Statistic = Self::Statistic> { self.common.config().samplers().cpu() } async fn sample(&mut self) -> Result<(), std::io::Error> { if let Some(ref mut delay) = self.delay() { delay.tick().await; } if !self.sampler_config().enabled() { return Ok(()); } debug!("sampling"); // we do perf sampling first, since it is time critical to keep it // between underlying counter updates #[cfg(feature = "bpf")] { let r = self.sample_bpf_perf_counters(); self.map_result(r)?; } let r = self.sample_cpuinfo().await; self.map_result(r)?; let r = self.sample_cpu_usage().await; self.map_result(r)?; let r = self.sample_cstates().await; self.map_result(r)?; Ok(()) } } impl Cpu { #[cfg(feature = "bpf")] fn initialize_bpf_perf(&mut self) -> Result<(), std::io::Error> { let cpus = crate::common::hardware_threads().unwrap(); let interval = self.interval() as u64; let frequency = if interval > 1000 { 1 } else if interval == 0 { 1 } else { 1000 / interval }; let code = format!( "{}\n{}", format!("#define NUM_CPU {}", cpus), include_str!("perf.c").to_string() ); let mut perf_array_attached = false; if let Ok(mut bpf) = bcc::BPF::new(&code) { for statistic in &self.statistics { if let Some(table) = statistic.table() { if let Some(event) = statistic.event() { perf_array_attached = true; if PerfEventArray::new() .table(&format!("{}_array", table)) .event(event) .attach(&mut bpf) .is_err() { if !self.common().config().general().fault_tolerant() { fatal!("failed to initialize perf bpf for event: {:?}", event); } else { error!("failed to initialize perf bpf for event: {:?}", event); } } } } } debug!("attaching software event to drive perf counter sampling"); // if none of the perf array was attached, we do not need to attach the perf event. if perf_array_attached { if PerfEvent::new() .handler("do_count") .event(Event::Software(SoftwareEvent::CpuClock)) .sample_frequency(Some(frequency)) .attach(&mut bpf) .is_err() { if !self.common().config().general().fault_tolerant() { fatal!("failed to initialize perf bpf for cpu"); } else { error!("failed to initialize perf bpf for cpu"); } } } self.perf = Some(Arc::new(Mutex::new(BPF { inner: bpf }))); } else if !self.common().config().general().fault_tolerant() { fatal!("failed to initialize perf bpf"); } else { error!("failed to initialize perf bpf. skipping cpu perf telemetry"); } Ok(()) } async fn sample_cpu_usage(&mut self) -> Result<(), std::io::Error> { if self.proc_stat.is_none() { let file = File::open("/proc/stat").await?; self.proc_stat = Some(file); } if let Some(file) = &mut self.proc_stat { file.seek(SeekFrom::Start(0)).await?; let mut reader = BufReader::new(file); let mut result = HashMap::new(); let mut buf = String::new(); while reader.read_line(&mut buf).await? > 0 { result.extend(parse_proc_stat(&buf)); buf.clear(); } let time = Instant::now(); for stat in self.sampler_config().statistics() { if let Some(value) = result.get(&stat) { let _ = self .metrics() .record_counter(&stat, time, value * self.tick_duration); } } } Ok(()) } async fn sample_cpuinfo(&mut self) -> Result<(), std::io::Error> { if self.proc_cpuinfo.is_none() { let file = File::open("/proc/cpuinfo").await?; self.proc_cpuinfo = Some(file); } if let Some(file) = &mut self.proc_cpuinfo { file.seek(SeekFrom::Start(0)).await?; let mut reader = BufReader::new(file); let mut buf = String::new(); let mut result = Vec::new(); while reader.read_line(&mut buf).await? > 0 { if let Some(freq) = parse_frequency(&buf) { result.push(freq.ceil() as u64); } buf.clear(); } let time = Instant::now(); for frequency in result { let _ = self .metrics() .record_gauge(&CpuStatistic::Frequency, time, frequency); } } Ok(()) } #[cfg(feature = "bpf")] fn sample_bpf_perf_counters(&self) -> Result<(), std::io::Error> { if let Some(ref bpf) = self.perf { let bpf = bpf.lock().unwrap(); let time = Instant::now(); for stat in self.statistics.iter().filter(|s| s.table().is_some()) { if let Ok(table) = &(*bpf).inner.table(stat.table().unwrap()) { let map = crate::common::bpf::perf_table_to_map(table); let mut total = 0; for (_cpu, count) in map.iter() { total += count; } let _ = self.metrics().record_counter(stat, time, total); } } } Ok(()) } async fn sample_cstates(&mut self) -> Result<(), std::io::Error> { let mut result = HashMap::<CpuStatistic, u64>::new(); // populate the cpu cache if empty if self.cpus.is_empty() { let cpu_regex = Regex::new(r"^cpu\d+$").unwrap(); let mut cpu_dir = tokio::fs::read_dir("/sys/devices/system/cpu").await?; while let Some(cpu_entry) = cpu_dir.next_entry().await? { if let Ok(cpu_name) = cpu_entry.file_name().into_string() { if cpu_regex.is_match(&cpu_name) { self.cpus.insert(cpu_name.to_string()); } } } } // populate the cstate cache if empty if self.cstates.is_empty() { let state_regex = Regex::new(r"^state\d+$").unwrap(); for cpu in &self.cpus { // iterate through all cpuidle states let cpuidle_dir = format!("/sys/devices/system/cpu/{}/cpuidle", cpu); let mut cpuidle_dir = tokio::fs::read_dir(cpuidle_dir).await?; while let Some(cpuidle_entry) = cpuidle_dir.next_entry().await? { if let Ok(cpuidle_name) = cpuidle_entry.file_name().into_string() { if state_regex.is_match(&cpuidle_name) { // get the name of the state let name_file = format!( "/sys/devices/system/cpu/{}/cpuidle/{}/name", cpu, cpuidle_name ); let mut name_file = File::open(name_file).await?; let mut name_content = Vec::new(); name_file.read_to_end(&mut name_content).await?; if let Ok(name_string) = std::str::from_utf8(&name_content) { if let Some(Ok(state)) = name_string.split_whitespace().next().map(|v| v.parse()) { self.cstates.insert(cpuidle_name, state); } } } } } } } for cpu in &self.cpus { if !self.cstate_files.contains_key(cpu) { self.cstate_files.insert(cpu.to_string(), HashMap::new()); } if let Some(cpuidle_files) = self.cstate_files.get_mut(cpu) { for (cpuidle_name, state) in &self.cstates { if !cpuidle_files.contains_key(cpuidle_name) { let time_file = format!( "/sys/devices/system/cpu/{}/cpuidle/{}/time", cpu, cpuidle_name ); let file = File::open(time_file).await?; cpuidle_files.insert(cpuidle_name.to_string(), file); } if let Some(file) = cpuidle_files.get_mut(cpuidle_name) { file.seek(SeekFrom::Start(0)).await?; let mut reader = BufReader::new(file); if let Ok(time) = reader.read_u64().await { if let Some(state) = state.split('-').next() { let metric = match CState::from_str(state) { Ok(CState::C0) => CpuStatistic::CstateC0Time, Ok(CState::C1) => CpuStatistic::CstateC1Time, Ok(CState::C1E) => CpuStatistic::CstateC1ETime, Ok(CState::C2) => CpuStatistic::CstateC2Time, Ok(CState::C3) => CpuStatistic::CstateC3Time, Ok(CState::C6) => CpuStatistic::CstateC6Time, Ok(CState::C7) => CpuStatistic::CstateC7Time, Ok(CState::C8) => CpuStatistic::CstateC8Time, _ => continue, }; let counter = result.entry(metric).or_insert(0); *counter += time * MICROSECOND; } } } } } } let time = Instant::now(); for stat in &self.statistics { if let Some(value) = result.get(stat) { let _ = self.metrics().record_counter(stat, time, *value); } } Ok(()) } } fn parse_proc_stat(line: &str) -> HashMap<CpuStatistic, u64> { let mut result = HashMap::new(); for (id, part) in line.split_whitespace().enumerate() { match id { 0 => { if part != "cpu" { return result; } } 1 => { result.insert(CpuStatistic::UsageUser, part.parse().unwrap_or(0)); } 2 => { result.insert(CpuStatistic::UsageNice, part.parse().unwrap_or(0)); } 3 => { result.insert(CpuStatistic::UsageSystem, part.parse().unwrap_or(0)); } 4 => { result.insert(CpuStatistic::UsageIdle, part.parse().unwrap_or(0)); } 6 => { result.insert(CpuStatistic::UsageIrq, part.parse().unwrap_or(0)); } 7 => { result.insert(CpuStatistic::UsageSoftirq, part.parse().unwrap_or(0)); } 8 => { result.insert(CpuStatistic::UsageSteal, part.parse().unwrap_or(0)); } 9 => { result.insert(CpuStatistic::UsageGuest, part.parse().unwrap_or(0)); } 10 => { result.insert(CpuStatistic::UsageGuestNice, part.parse().unwrap_or(0)); } _ => {} } } result } fn parse_frequency(line: &str) -> Option<f64> { let mut split = line.split_whitespace(); if split.next() == Some("cpu") && split.next() == Some("MHz") { split.last().map(|v| v.parse().unwrap_or(0.0) * 1_000_000.0) } else { None } } #[cfg(test)] mod test { use super::*; #[test] fn test_parse_proc_stat() { let result = parse_proc_stat("cpu 131586 0 53564 8246483 35015 350665 4288 5632 0 0"); assert_eq!(result.len(), 9); assert_eq!(result.get(&CpuStatistic::UsageUser), Some(&131586)); assert_eq!(result.get(&CpuStatistic::UsageNice), Some(&0)); assert_eq!(result.get(&CpuStatistic::UsageSystem), Some(&53564)); } #[test] fn test_parse_frequency() { let result = parse_frequency("cpu MHz : 1979.685"); assert_eq!(result, Some(1_979_685_000.0)); } }