From 03f26fabf1ede16f867a69ce86b13a06e21e6252 Mon Sep 17 00:00:00 2001 From: Caleb Metz <135133572+cmetz100@users.noreply.github.com> Date: Tue, 7 Jan 2025 10:08:41 -0500 Subject: [PATCH] Reformat procfs scraping to handle errors (#1178) * Extracted per process operations into a function to better handle unknown errors Signed-off-by: Caleb Metz * Make clippy happy Signed-off-by: Caleb Metz * Added explicit error handling to pid exe, comm, and cmdline reads Signed-off-by: Caleb Metz * Removed comments Signed-off-by: Caleb Metz --------- Signed-off-by: Caleb Metz --- lading/src/observer/linux/procfs.rs | 309 ++++++++++++++++------------ 1 file changed, 177 insertions(+), 132 deletions(-) diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index b89f1de15..81ba24564 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -91,7 +91,6 @@ impl Sampler { let mut processes: VecDeque = VecDeque::with_capacity(16); // an arbitrary smallish number processes.push_back(Process::new(self.parent.pid())?); - // BEGIN pid loop while let Some(process) = processes.pop_back() { // Search for child processes. This is done by querying for every // thread of `process` and inspecting each child of the thread. Note @@ -118,158 +117,204 @@ impl Sampler { } } } + // Update the process_info map to only hold processes seen by the current poll call. + self.process_info.retain(|pid, _| pids.contains(pid)); let pid = process.pid(); - - // `/proc/{pid}/status` - let status = match process.status() { - Ok(status) => status, - Err(e) => { - warn!("Couldn't read status: {:?}", e); - // The pid may have exited since we scanned it or we may not - // have sufficient permission. - continue; - } - }; - if status.tgid != pid { - // This is a thread, not a process and we do not wish to scan it. - continue; + if let Err(e) = self.handle_process(process, &mut aggr).await { + warn!("Encountered uncaught error when handling `/proc/{pid}/`: {e}"); } + } - // If we haven't seen this process before, initialize its ProcessInfo. - match self.process_info.entry(pid) { - Entry::Occupied(_) => { /* Already initialized */ } - Entry::Vacant(entry) => { - let exe = proc_exe(pid).await?; - let comm = proc_comm(pid).await?; - let cmdline = proc_cmdline(pid).await?; - let pid_s = format!("{pid}"); - let stat_sampler = stat::Sampler::new(); + gauge!("total_rss_bytes").set(aggr.rss as f64); + gauge!("total_pss_bytes").set(aggr.pss as f64); - entry.insert(ProcessInfo { - cmdline, - exe, - comm, - pid_s, - stat_sampler, - }); - } + Ok(()) + } + + #[allow( + clippy::similar_names, + clippy::too_many_lines, + clippy::cast_sign_loss, + clippy::cast_possible_truncation, + clippy::cast_possible_wrap + )] + async fn handle_process( + &mut self, + process: Process, + aggr: &mut memory::smaps_rollup::Aggregator, + ) -> Result<(), Error> { + let pid = process.pid(); + + // `/proc/{pid}/status` + let status = match process.status() { + Ok(status) => status, + Err(e) => { + warn!("Couldn't read status: {:?}", e); + // The pid may have exited since we scanned it or we may not + // have sufficient permission. + return Ok(()); } + }; + if status.tgid != pid { + // This is a thread, not a process and we do not wish to scan it. + return Ok(()); + } - // SAFETY: We've just inserted this pid into the map. - let pinfo = self - .process_info - .get_mut(&pid) - .expect("catastrophic programming error"); + // If we haven't seen this process before, initialize its ProcessInfo. + match self.process_info.entry(pid) { + Entry::Occupied(_) => { /* Already initialized */ } + Entry::Vacant(entry) => { + let exe = match proc_exe(pid).await { + Ok(exe) => exe, + Err(e) => { + warn!("Couldn't read exe for pid {}: {:?}", pid, e); + // The pid may have exited since we scanned it or we may not + // have sufficient permission. + return Ok(()); + } + }; + let comm = match proc_comm(pid).await { + Ok(comm) => comm, + Err(e) => { + warn!("Couldn't read comm for pid {}: {:?}", pid, e); + // The pid may have exited since we scanned it or we may not + // have sufficient permission. + return Ok(()); + } + }; + let cmdline = match proc_cmdline(pid).await { + Ok(cmdline) => cmdline, + Err(e) => { + warn!("Couldn't read cmdline for pid {}: {:?}", pid, e); + // The pid may have exited since we scanned it or we may not + // have sufficient permission. + return Ok(()); + } + }; + let pid_s = format!("{pid}"); + let stat_sampler = stat::Sampler::new(); + entry.insert(ProcessInfo { + cmdline, + exe, + comm, + pid_s, + stat_sampler, + }); + } + } - let labels: [(&'static str, String); 4] = [ - ("pid", pinfo.pid_s.clone()), - ("exe", pinfo.exe.clone()), - ("cmdline", pinfo.cmdline.clone()), - ("comm", pinfo.comm.clone()), - ]; + // SAFETY: We've just inserted this pid into the map. + let pinfo = self + .process_info + .get_mut(&pid) + .expect("catastrophic programming error"); - // `/proc/{pid}/status` - report_status_field!(status, labels, vmrss); - report_status_field!(status, labels, rssanon); - report_status_field!(status, labels, rssfile); - report_status_field!(status, labels, rssshmem); - report_status_field!(status, labels, vmdata); - report_status_field!(status, labels, vmstk); - report_status_field!(status, labels, vmexe); - report_status_field!(status, labels, vmlib); + let labels: [(&'static str, String); 4] = [ + ("pid", pinfo.pid_s.clone()), + ("exe", pinfo.exe.clone()), + ("cmdline", pinfo.cmdline.clone()), + ("comm", pinfo.comm.clone()), + ]; - let uptime = uptime::poll().await?; + // `/proc/{pid}/status` + report_status_field!(status, labels, vmrss); + report_status_field!(status, labels, rssanon); + report_status_field!(status, labels, rssfile); + report_status_field!(status, labels, rssshmem); + report_status_field!(status, labels, vmdata); + report_status_field!(status, labels, vmstk); + report_status_field!(status, labels, vmexe); + report_status_field!(status, labels, vmlib); - // `/proc/{pid}/stat`, most especially per-process CPU data. - if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await { - // We don't want to bail out entirely if we can't read stats - // which will happen if we don't have permissions or, more - // likely, the process has exited. - warn!("Couldn't process `/proc/{pid}/stat`: {e}"); - continue; - } + let uptime = uptime::poll().await?; - // `/proc/{pid}/smaps` - match memory::smaps::Regions::from_pid(pid) { - Ok(memory_regions) => { - for (pathname, measures) in memory_regions.aggregate_by_pathname() { - let labels: [(&'static str, String); 5] = [ - ("pid", pinfo.pid_s.clone()), - ("exe", pinfo.exe.clone()), - ("cmdline", pinfo.cmdline.clone()), - ("comm", pinfo.comm.clone()), - ("pathname", pathname), - ]; - gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64); - gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64); - gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64); - gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64); + // `/proc/{pid}/stat`, most especially per-process CPU data. + if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await { + // We don't want to bail out entirely if we can't read stats + // which will happen if we don't have permissions or, more + // likely, the process has exited. + warn!("Couldn't process `/proc/{pid}/stat`: {e}"); + return Ok(()); + } - if let Some(m) = measures.private_clean { - gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.private_dirty { - gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_clean { - gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_dirty { - gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.referenced { - gauge!("smaps.referenced.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.anonymous { - gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.lazy_free { - gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.anon_huge_pages { - gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shmem_pmd_mapped { - gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_hugetlb { - gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.private_hugetlb { - gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.file_pmd_mapped { - gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.locked { - gauge!("smaps.locked.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.swap_pss { - gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64); - } + // `/proc/{pid}/smaps` + match memory::smaps::Regions::from_pid(pid) { + Ok(memory_regions) => { + for (pathname, measures) in memory_regions.aggregate_by_pathname() { + let labels: [(&'static str, String); 5] = [ + ("pid", pinfo.pid_s.clone()), + ("exe", pinfo.exe.clone()), + ("cmdline", pinfo.cmdline.clone()), + ("comm", pinfo.comm.clone()), + ("pathname", pathname), + ]; + gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64); + gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64); + gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64); + gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64); + + if let Some(m) = measures.private_clean { + gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.private_dirty { + gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_clean { + gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_dirty { + gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.referenced { + gauge!("smaps.referenced.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.anonymous { + gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.lazy_free { + gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.anon_huge_pages { + gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shmem_pmd_mapped { + gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_hugetlb { + gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.private_hugetlb { + gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.file_pmd_mapped { + gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.locked { + gauge!("smaps.locked.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.swap_pss { + gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64); } - } - Err(err) => { - // We don't want to bail out entirely if we can't read stats - // which will happen if we don't have permissions or, more - // likely, the process has exited. - warn!("Couldn't process `/proc/{pid}/smaps`: {err}"); } } - - // `/proc/{pid}/smaps_rollup` - if let Err(err) = memory::smaps_rollup::poll(pid, &labels, &mut aggr).await { - // We don't want to bail out entirely if we can't read smap rollup + Err(err) => { + // We don't want to bail out entirely if we can't read stats // which will happen if we don't have permissions or, more // likely, the process has exited. - warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}"); + warn!("Couldn't process `/proc/{pid}/smaps`: {err}"); + return Ok(()); } } - // END pid loop - gauge!("total_rss_bytes").set(aggr.rss as f64); - gauge!("total_pss_bytes").set(aggr.pss as f64); + // `/proc/{pid}/smaps_rollup` + if let Err(err) = memory::smaps_rollup::poll(pid, &labels, aggr).await { + // We don't want to bail out entirely if we can't read smap rollup + // which will happen if we don't have permissions or, more + // likely, the process has exited. + warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}"); + return Ok(()); + } Ok(()) }