diff --git a/src/app.rs b/src/app.rs index ef06170..97df388 100644 --- a/src/app.rs +++ b/src/app.rs @@ -706,7 +706,7 @@ impl App { // Check if we have a pending confirmation for this exact session if let Some((idx, ts)) = self.kill_confirm.take() { if idx == self.selected && ts.elapsed().as_secs() < 2 { - // Confirmed — verify PID still runs expected binary before killing + // Confirmed — verify PID still runs a killable agent before killing let pid = session.pid; let verified = std::process::Command::new("ps") .args(["-p", &pid.to_string(), "-o", "command="]) @@ -714,7 +714,7 @@ impl App { .ok() .map(|output| { let cmd = String::from_utf8_lossy(&output.stdout).trim().to_string(); - is_supported_agent_command(&cmd) + is_killable_agent_command(&cmd) }) .unwrap_or(false); if !verified { @@ -1083,6 +1083,11 @@ fn is_supported_agent_command(cmd: &str) -> bool { || crate::collector::process::cmd_has_binary(cmd, "opencode") } +fn is_killable_agent_command(cmd: &str) -> bool { + is_supported_agent_command(cmd) + && !(crate::collector::process::cmd_has_binary(cmd, "codex") && cmd.contains(" app-server")) +} + #[cfg(test)] mod tests { use super::*; @@ -1175,4 +1180,13 @@ mod tests { assert!(is_supported_agent_command("/opt/homebrew/bin/opencode")); assert!(!is_supported_agent_command("node server.js")); } + + #[test] + fn killable_agent_command_rejects_codex_app_server() { + assert!(is_killable_agent_command("codex --resume abc")); + assert!(is_killable_agent_command("/usr/local/bin/claude")); + assert!(!is_killable_agent_command( + "/Applications/Codex.app/Contents/Resources/codex app-server --analytics-default-enabled" + )); + } } diff --git a/src/collector/codex.rs b/src/collector/codex.rs index 222c8f6..3dcc8e1 100644 --- a/src/collector/codex.rs +++ b/src/collector/codex.rs @@ -4,7 +4,7 @@ use crate::model::{ MAX_CHAT_MESSAGES, }; use serde_json::Value; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::io::{BufRead, BufReader, Read}; use std::path::{Path, PathBuf}; @@ -29,6 +29,13 @@ pub struct CodexCollector { pub last_rate_limit: Option, } +#[derive(Clone, Copy)] +struct CodexProcessContext { + pid: Option, + is_exec: bool, + owns_process_tree: bool, +} + impl CodexCollector { pub fn new() -> Self { let home = dirs::home_dir().unwrap_or_default(); @@ -63,8 +70,11 @@ impl CodexCollector { for (pid, jsonl_path) in &pid_to_jsonl { let is_exec = pid_is_exec.get(pid).copied().unwrap_or(false); if let Some((session, rl)) = self.load_session_with_rate_limit( - Some(*pid), - is_exec, + CodexProcessContext { + pid: Some(*pid), + is_exec, + owns_process_tree: true, + }, jsonl_path, &shared.process_info, &shared.children_map, @@ -85,6 +95,46 @@ impl CodexCollector { } } + let desktop_pids = Self::find_codex_desktop_pids_from_shared( + &shared.process_info, + &shared.mcp_server_pids, + ); + let desktop_pid_to_rollouts = super::mcp::map_pid_to_rollouts(&desktop_pids); + + // Codex Desktop app-server keeps old rollout fds open. Only fresh + // Desktop-originated rollouts represent currently active threads. + for (pid, path) in Self::active_desktop_rollouts( + desktop_pid_to_rollouts, + &seen_jsonl, + &shared.mcp_owned_rollouts, + super::mcp::ACTIVE_MTIME_SECS, + ) { + if let Some((session, rl)) = self.load_session_with_rate_limit( + CodexProcessContext { + pid: Some(pid), + is_exec: false, + owns_process_tree: false, + }, + &path, + &shared.process_info, + &shared.children_map, + &shared.ports, + ) { + seen_jsonl.insert(path); + if let Some(new_rl) = rl { + let newer = self + .last_rate_limit + .as_ref() + .is_none_or(|old| new_rl.updated_at > old.updated_at); + if newer { + super::rate_limit::write_codex_cache(&new_rl); + self.last_rate_limit = Some(new_rl); + } + } + sessions.push(session); + } + } + // Recently finished sessions: scan today's JSONL files not owned by any running process. // This ensures Codex sessions transition to Done instead of vanishing. if let Some(recent_dir) = Self::today_session_dir(&self.sessions_dir) { @@ -121,8 +171,11 @@ impl CodexCollector { } } if let Some((session, rl)) = self.load_session_with_rate_limit( - None, - false, + CodexProcessContext { + pid: None, + is_exec: false, + owns_process_tree: false, + }, &path, &shared.process_info, &shared.children_map, @@ -162,10 +215,56 @@ impl CodexCollector { } } + fn is_active_desktop_rollout(path: &Path, active_mtime_secs: u64) -> bool { + let Ok(meta) = fs::metadata(path) else { + return false; + }; + let Ok(modified) = meta.modified() else { + return false; + }; + let age = std::time::SystemTime::now() + .duration_since(modified) + .unwrap_or_default(); + if age.as_secs() >= active_mtime_secs { + return false; + } + + parse_codex_jsonl(path).is_some_and(|result| result.is_codex_desktop()) + } + + fn active_desktop_rollouts( + pid_to_rollouts: HashMap>, + seen_jsonl: &HashSet, + mcp_owned_rollouts: &HashSet, + active_mtime_secs: u64, + ) -> Vec<(u32, PathBuf)> { + let mut candidates: Vec<(u32, PathBuf)> = pid_to_rollouts + .into_iter() + .flat_map(|(pid, paths)| paths.into_iter().map(move |path| (pid, path))) + .collect(); + candidates.sort_by_key(|(_, path)| { + std::cmp::Reverse( + fs::metadata(path) + .and_then(|meta| meta.modified()) + .unwrap_or(std::time::UNIX_EPOCH), + ) + }); + + let mut emitted = HashSet::new(); + candidates + .into_iter() + .filter(|(_, path)| { + !seen_jsonl.contains(path) + && !mcp_owned_rollouts.contains(path) + && emitted.insert(path.clone()) + && Self::is_active_desktop_rollout(path, active_mtime_secs) + }) + .collect() + } + fn load_session_with_rate_limit( &self, - pid: Option, - is_exec: bool, + process_ctx: CodexProcessContext, jsonl_path: &Path, process_info: &HashMap, children_map: &HashMap>, @@ -173,9 +272,13 @@ impl CodexCollector { ) -> Option<(AgentSession, Option)> { let result = parse_codex_jsonl(jsonl_path)?; - let proc = pid.and_then(|p| process_info.get(&p)); - let mem_mb = proc.map(|p| p.rss_kb / 1024).unwrap_or(0); - let display_pid = pid.unwrap_or(0); + let proc = process_ctx.pid.and_then(|p| process_info.get(&p)); + let mem_mb = if process_ctx.owns_process_tree { + proc.map(|p| p.rss_kb / 1024).unwrap_or(0) + } else { + 0 + }; + let display_pid = process_ctx.pid.unwrap_or(0); let project_name = process::last_path_segment(&result.cwd) .unwrap_or("?") @@ -189,12 +292,13 @@ impl CodexCollector { // Mirrors Claude: trust the trailing-event-is-user signal alone. // Codex tool outputs flow through response_item, not user_message, // so model_generating only flips on real prompts. - let status = if !pid_alive || (is_exec && result.task_complete) { + let status = if !pid_alive || (process_ctx.is_exec && result.task_complete) { SessionStatus::Done } else { - let has_active_child = pid.is_some_and(|p| { - process::has_active_descendant(p, children_map, process_info, 5.0) - }); + let has_active_child = process_ctx.owns_process_tree + && process_ctx.pid.is_some_and(|p| { + process::has_active_descendant(p, children_map, process_info, 5.0) + }); if has_active_child || result.pending_since_ms > 0 { SessionStatus::Executing } else if result.model_generating { @@ -209,7 +313,7 @@ impl CodexCollector { // For interactive sessions, task_complete fires after every turn — ignore it. let current_tasks = if !result.current_task.is_empty() { vec![result.current_task] - } else if !pid_alive || (is_exec && result.task_complete) { + } else if !pid_alive || (process_ctx.is_exec && result.task_complete) { vec!["finished".to_string()] } else if matches!(status, SessionStatus::Waiting) { vec!["waiting for input".to_string()] @@ -227,7 +331,7 @@ impl CodexCollector { // Children: collect all descendants recursively (not just direct children) // so we catch grandchild processes that listen on ports. let mut children = Vec::new(); - if let Some(p) = pid { + if let (true, Some(p)) = (process_ctx.owns_process_tree, process_ctx.pid) { let mut stack: Vec = children_map.get(&p).cloned().unwrap_or_default(); let mut visited = std::collections::HashSet::new(); while let Some(cpid) = stack.pop() { @@ -305,7 +409,7 @@ impl CodexCollector { /// are reported via the MCP servers panel instead. fn find_codex_pids_from_shared( process_info: &HashMap, - mcp_server_pids: &std::collections::HashSet, + mcp_server_pids: &HashSet, ) -> Vec<(u32, bool)> { let mut pids = Vec::new(); for (pid, info) in process_info { @@ -315,7 +419,7 @@ impl CodexCollector { let cmd = &info.command; let is_exec = cmd.contains(" exec"); let is_codex = process::cmd_has_binary(cmd, "codex"); - if is_codex && !cmd.contains("app-server") && !cmd.contains("grep") { + if is_codex && !cmd.contains(" app-server") && !cmd.contains("grep") { pids.push((*pid, is_exec)); } } @@ -341,6 +445,29 @@ impl CodexCollector { pids } + /// Find Codex Desktop app-server host PIDs. Desktop is kept separate from + /// CLI discovery because a single app-server PID can hold many rollout fds. + fn find_codex_desktop_pids_from_shared( + process_info: &HashMap, + mcp_server_pids: &HashSet, + ) -> Vec { + let mut pids = Vec::new(); + for (pid, info) in process_info { + if mcp_server_pids.contains(pid) { + continue; + } + let cmd = &info.command; + if process::cmd_has_binary(cmd, "codex") + && cmd.contains(" app-server") + && !cmd.contains("grep") + { + pids.push(*pid); + } + } + pids.sort_unstable(); + pids + } + /// Map codex PIDs to their open rollout-*.jsonl files. /// /// On Linux, scans /proc/{pid}/fd symlinks directly (no process spawn). @@ -460,6 +587,7 @@ impl super::AgentCollector for CodexCollector { struct CodexJSONLResult { session_id: String, cwd: String, + originator: String, started_at: u64, model: String, /// Reasoning effort setting from turn_context: "minimal" | "low" | "medium" | "high". @@ -496,6 +624,12 @@ struct CodexJSONLResult { thinking_since_ms: u64, } +impl CodexJSONLResult { + fn is_codex_desktop(&self) -> bool { + self.originator == "Codex Desktop" + } +} + fn event_timestamp_ms(val: &Value) -> Option { val["timestamp"] .as_str() @@ -647,6 +781,7 @@ fn parse_codex_jsonl(path: &Path) -> Option { let mut result = CodexJSONLResult { session_id: String::new(), cwd: String::new(), + originator: String::new(), started_at: 0, model: String::from("-"), effort: String::new(), @@ -726,6 +861,9 @@ fn parse_codex_jsonl(path: &Path) -> Option { if let Some(cwd) = payload["cwd"].as_str() { result.cwd = cwd.to_string(); } + if let Some(originator) = payload["originator"].as_str() { + result.originator = originator.to_string(); + } if let Some(ver) = payload["cli_version"].as_str() { result.version = ver.to_string(); } @@ -801,7 +939,7 @@ fn parse_codex_jsonl(path: &Path) -> Option { // Plus plans: primary=5h(300min), secondary=7d(10080min). // Free plans: primary=7d(10080min), secondary=null. let rl = &payload["rate_limits"]; - if rl.is_object() { + if rl.is_object() && is_account_level_codex_rate_limit(rl) { let event_secs = val["timestamp"] .as_str() .and_then(|ts| chrono::DateTime::parse_from_rfc3339(ts).ok()) @@ -1003,12 +1141,19 @@ fn parse_codex_jsonl(path: &Path) -> Option { Some(result) } +fn is_account_level_codex_rate_limit(rate_limits: &Value) -> bool { + matches!(rate_limits["limit_id"].as_str(), Some("codex") | None) +} + #[cfg(test)] mod tests { use super::*; + use std::fs::File; use std::io::Write; + use std::time::{Duration, SystemTime}; const SESSION_META: &str = r#"{"type":"session_meta","timestamp":"2026-03-28T15:00:00Z","payload":{"id":"sess-123","cwd":"/home/user/project","cli_version":"0.1.5","timestamp":"2026-03-28T15:00:00Z","git":{"branch":"feature/x"}}}"#; + const DESKTOP_SESSION_META: &str = r#"{"type":"session_meta","timestamp":"2026-03-28T15:00:00Z","payload":{"id":"desktop-123","cwd":"/home/user/project","originator":"Codex Desktop","cli_version":"0.131.0-alpha.9","timestamp":"2026-03-28T15:00:00Z","git":{"branch":"feature/x"}}}"#; fn write_lines(file: &mut tempfile::NamedTempFile, lines: &[&str]) { for line in lines { @@ -1017,7 +1162,6 @@ mod tests { file.flush().unwrap(); } - #[cfg(windows)] fn proc_info(pid: u32, ppid: u32, command: &str) -> ProcInfo { ProcInfo { pid, @@ -1028,6 +1172,34 @@ mod tests { } } + fn owned_process(pid: u32) -> CodexProcessContext { + CodexProcessContext { + pid: Some(pid), + is_exec: false, + owns_process_tree: true, + } + } + + fn host_process(pid: u32) -> CodexProcessContext { + CodexProcessContext { + pid: Some(pid), + is_exec: false, + owns_process_tree: false, + } + } + + fn write_jsonl(path: &Path, lines: &[&str]) { + let mut file = File::create(path).unwrap(); + for line in lines { + writeln!(file, "{}", line).unwrap(); + } + file.flush().unwrap(); + } + + fn set_modified(path: &Path, when: SystemTime) { + File::open(path).unwrap().set_modified(when).unwrap(); + } + #[cfg(windows)] #[test] fn find_codex_pids_windows_keeps_real_child_over_wrappers() { @@ -1065,6 +1237,173 @@ mod tests { assert_eq!(pids, vec![(30, false)]); } + #[test] + fn find_codex_pids_excludes_app_server() { + let mut process_info = HashMap::new(); + process_info.insert(10, proc_info(10, 1, "codex --resume abc")); + process_info.insert( + 20, + proc_info( + 20, + 1, + "/Applications/Codex.app/Contents/Resources/codex app-server --analytics-default-enabled", + ), + ); + + let pids = CodexCollector::find_codex_pids_from_shared(&process_info, &HashSet::new()); + + assert_eq!(pids, vec![(10, false)]); + } + + #[test] + fn find_codex_pids_keeps_cli_with_app_server_in_path() { + let mut process_info = HashMap::new(); + process_info.insert( + 10, + proc_info(10, 1, "codex --cd /home/user/app-server --resume abc"), + ); + + let pids = CodexCollector::find_codex_pids_from_shared(&process_info, &HashSet::new()); + + assert_eq!(pids, vec![(10, false)]); + } + + #[test] + fn find_codex_desktop_pids_detects_app_servers() { + let mut process_info = HashMap::new(); + process_info.insert( + 10, + proc_info( + 10, + 1, + "/Applications/Codex.app/Contents/Resources/codex app-server --analytics-default-enabled", + ), + ); + process_info.insert(20, proc_info(20, 1, "codex app-server --listen stdio://")); + + let pids = + CodexCollector::find_codex_desktop_pids_from_shared(&process_info, &HashSet::new()); + + assert_eq!(pids, vec![10, 20]); + } + + #[test] + fn find_codex_desktop_pids_ignores_mcp_and_non_codex() { + let mut process_info = HashMap::new(); + process_info.insert(10, proc_info(10, 1, "codex mcp-server")); + process_info.insert(20, proc_info(20, 1, "node app-server")); + process_info.insert(30, proc_info(30, 1, "grep codex app-server")); + process_info.insert(40, proc_info(40, 1, "codex app-server --listen stdio://")); + let mut mcp = HashSet::new(); + mcp.insert(10); + + let pids = CodexCollector::find_codex_desktop_pids_from_shared(&process_info, &mcp); + + assert_eq!(pids, vec![40]); + } + + #[test] + fn desktop_rollout_filter_requires_originator() { + let mut desktop = tempfile::NamedTempFile::new().unwrap(); + write_lines(&mut desktop, &[DESKTOP_SESSION_META]); + let mut cli = tempfile::NamedTempFile::new().unwrap(); + write_lines(&mut cli, &[SESSION_META]); + + assert!(CodexCollector::is_active_desktop_rollout( + desktop.path(), + super::super::mcp::ACTIVE_MTIME_SECS + )); + assert!(!CodexCollector::is_active_desktop_rollout( + cli.path(), + super::super::mcp::ACTIVE_MTIME_SECS + )); + } + + #[test] + fn active_desktop_rollouts_filters_stale_seen_and_cli_files() { + let temp = tempfile::tempdir().unwrap(); + let active = temp.path().join("rollout-active.jsonl"); + let stale = temp.path().join("rollout-stale.jsonl"); + let cli = temp.path().join("rollout-cli.jsonl"); + let seen = temp.path().join("rollout-seen.jsonl"); + write_jsonl(&active, &[DESKTOP_SESSION_META]); + write_jsonl(&stale, &[DESKTOP_SESSION_META]); + write_jsonl(&cli, &[SESSION_META]); + write_jsonl(&seen, &[DESKTOP_SESSION_META]); + set_modified(&stale, SystemTime::now() - Duration::from_secs(31 * 60)); + + let mut pid_to_rollouts = HashMap::new(); + pid_to_rollouts.insert( + 99, + vec![active.clone(), stale, cli, seen.clone(), active.clone()], + ); + let seen_jsonl = HashSet::from([seen]); + + let rollouts = CodexCollector::active_desktop_rollouts( + pid_to_rollouts, + &seen_jsonl, + &HashSet::new(), + super::super::mcp::ACTIVE_MTIME_SECS, + ); + + assert_eq!(rollouts, vec![(99, active)]); + } + + #[test] + fn desktop_rollout_selection_loads_active_session_with_host_pid() { + let temp = tempfile::tempdir().unwrap(); + let active = temp.path().join("rollout-active.jsonl"); + let stale = temp.path().join("rollout-stale.jsonl"); + write_jsonl(&active, &[DESKTOP_SESSION_META]); + write_jsonl(&stale, &[DESKTOP_SESSION_META]); + set_modified(&stale, SystemTime::now() - Duration::from_secs(31 * 60)); + + let mut pid_to_rollouts = HashMap::new(); + pid_to_rollouts.insert(99, vec![active.clone(), stale]); + let rollouts = CodexCollector::active_desktop_rollouts( + pid_to_rollouts, + &HashSet::new(), + &HashSet::new(), + super::super::mcp::ACTIVE_MTIME_SECS, + ); + + let collector = CodexCollector::new(); + let mut process_info = HashMap::new(); + process_info.insert( + 99, + proc_info( + 99, + 1, + "/Applications/Codex.app/Contents/Resources/codex app-server --analytics-default-enabled", + ), + ); + process_info.insert(100, proc_info(100, 99, "cargo test")); + let children_map = HashMap::from([(99, vec![100])]); + let ports = HashMap::from([(100, vec![3000])]); + let sessions: Vec = rollouts + .iter() + .filter_map(|(pid, path)| { + collector + .load_session_with_rate_limit( + host_process(*pid), + path, + &process_info, + &children_map, + &ports, + ) + .map(|(session, _)| session) + }) + .collect(); + + assert_eq!(sessions.len(), 1); + assert_eq!(sessions[0].pid, 99); + assert_eq!(sessions[0].session_id, "desktop-123"); + assert_eq!(sessions[0].agent_cli, "codex"); + assert_eq!(sessions[0].status, SessionStatus::Waiting); + assert_eq!(sessions[0].mem_mb, 0); + assert!(sessions[0].children.is_empty()); + } + #[test] fn test_parse_codex_session_meta() { let mut file = tempfile::NamedTempFile::new().unwrap(); @@ -1128,6 +1467,24 @@ mod tests { assert_eq!(rl.seven_day_pct, Some(14.0)); } + #[test] + fn test_parse_codex_rate_limits_ignores_model_specific_limits() { + let mut file = tempfile::NamedTempFile::new().unwrap(); + write_lines( + &mut file, + &[ + SESSION_META, + r#"{"type":"event_msg","timestamp":"2026-03-28T15:01:00Z","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":1,"output_tokens":1},"last_token_usage":{"input_tokens":1,"output_tokens":1}},"rate_limits":{"limit_id":"codex","primary":{"used_percent":25.0,"window_minutes":300,"resets_at":1774686045},"secondary":{"used_percent":4.0,"window_minutes":10080,"resets_at":1775186466}}}}"#, + r#"{"type":"event_msg","timestamp":"2026-03-28T15:01:01Z","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":1,"output_tokens":1},"last_token_usage":{"input_tokens":1,"output_tokens":1}},"rate_limits":{"limit_id":"codex_bengalfox","limit_name":"GPT-5.3-Codex-Spark","primary":{"used_percent":0.0,"window_minutes":300,"resets_at":1774686045},"secondary":{"used_percent":0.0,"window_minutes":10080,"resets_at":1775186466}}}}"#, + ], + ); + + let result = parse_codex_jsonl(file.path()).unwrap(); + let rl = result.rate_limit.expect("account rate_limit should remain"); + assert_eq!(rl.five_hour_pct, Some(25.0)); + assert_eq!(rl.seven_day_pct, Some(4.0)); + } + #[test] fn test_parse_codex_cache_read_fallback_field_name() { let mut file = tempfile::NamedTempFile::new().unwrap(); @@ -1284,8 +1641,7 @@ mod tests { let (session, _) = collector .load_session_with_rate_limit( - Some(42), - false, + owned_process(42), file.path(), &process_info, &HashMap::new(), @@ -1333,8 +1689,7 @@ mod tests { let (session, _) = collector .load_session_with_rate_limit( - Some(42), - false, + owned_process(42), file.path(), &process_info, &HashMap::new(), @@ -1376,8 +1731,7 @@ mod tests { let (session, _) = collector .load_session_with_rate_limit( - Some(42), - false, + owned_process(42), file.path(), &process_info, &HashMap::new(), @@ -1421,8 +1775,7 @@ mod tests { let (session, _) = collector .load_session_with_rate_limit( - Some(42), - false, + owned_process(42), file.path(), &process_info, &HashMap::new(), diff --git a/src/collector/mcp.rs b/src/collector/mcp.rs index e4a240d..b76e09e 100644 --- a/src/collector/mcp.rs +++ b/src/collector/mcp.rs @@ -5,12 +5,12 @@ use std::path::{Path, PathBuf}; use std::process::Command; use std::time::SystemTime; -/// Active-thread mtime threshold: a rollout written within the last +/// Active-thread mtime threshold: a rollout written within the last 30 minutes /// ACTIVE_MTIME_SECS counts as "active". File-descriptor presence alone /// would overcount — `codex mcp-server` keeps fds open for hours after /// a thread last wrote (so it can resume on demand), so we need a /// freshness signal in addition to fd presence. -pub const ACTIVE_MTIME_SECS: u64 = 30; +pub const ACTIVE_MTIME_SECS: u64 = 30 * 60; /// One open `rollout-*.jsonl` fd held by an mcp-server process. #[derive(Clone, Debug)] @@ -359,7 +359,7 @@ mod tests { let now = SystemTime::now(); let stale = McpRollout { path: PathBuf::from("/x"), - mtime: Some(now - std::time::Duration::from_secs(120)), + mtime: Some(now - std::time::Duration::from_secs(31 * 60)), size_bytes: 0, }; let fresh = McpRollout {