From 7c5ddb734f76b486845a7c7b2b6d130481690470 Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Tue, 9 Jan 2024 12:46:48 +0100 Subject: [PATCH] draft: launch workers from CommandHub::server --- bin/src/command_v2/mod.rs | 26 +++++- bin/src/command_v2/server.rs | 141 +++++++++++++++++++++++---------- bin/src/command_v2/sessions.rs | 12 ++- bin/src/main.rs | 16 +++- lib/src/server.rs | 17 +++- 5 files changed, 157 insertions(+), 55 deletions(-) diff --git a/bin/src/command_v2/mod.rs b/bin/src/command_v2/mod.rs index 9841a79fc..db677774c 100644 --- a/bin/src/command_v2/mod.rs +++ b/bin/src/command_v2/mod.rs @@ -23,7 +23,8 @@ mod sessions; pub fn start_server( config: Config, command_socket_path: String, - workers: Vec, + // workers: Vec, + executable_path: String, ) -> anyhow::Result<()> { let path = PathBuf::from(&command_socket_path); @@ -38,12 +39,15 @@ pub fn start_server( Err(e) => { error!("could not create unix socket: {:?}", e); // the workers did not even get the configuration, we can kill them right away + + /* for worker in workers { error!("killing worker n°{} (PID {})", worker.id, worker.pid); let _ = kill(Pid::from_raw(worker.pid), Signal::SIGKILL).map_err(|e| { error!("could not kill worker: {:?}", e); }); } + */ bail!("couldn't start server"); } }; @@ -53,6 +57,8 @@ pub fn start_server( let _ = fs::remove_file(&path).map_err(|e2| { error!("could not remove the unix socket: {:?}", e2); }); + + /* // the workers did not even get the configuration, we can kill them right away for worker in workers { error!("killing worker n°{} (PID {})", worker.id, worker.pid); @@ -60,16 +66,30 @@ pub fn start_server( error!("could not kill worker: {:?}", e); }); } + */ bail!("couldn't start server"); } // Create a copy of the state path to load state later let saved_state_path = config.saved_state.clone(); + let worker_count = config.worker_count; + + let mut command_hub = CommandHub::new(unix_listener, config, executable_path)?; + + for _ in 0..worker_count { + command_hub.launch_new_worker(); + } - let mut command_hub = CommandHub::new(unix_listener, config)?; + /* for mut worker in workers { - command_hub.register_worker(worker.id, worker.pid, worker.worker_channel.take().unwrap()) + command_hub.register_worker( + worker.id, + worker.pid, + worker.worker_channel.take().unwrap(), + worker.scm_socket, + ) } + */ load_static_config(&mut command_hub.server); diff --git a/bin/src/command_v2/server.rs b/bin/src/command_v2/server.rs index 7bacf0fba..204c10441 100644 --- a/bin/src/command_v2/server.rs +++ b/bin/src/command_v2/server.rs @@ -18,17 +18,22 @@ use nix::{ use sozu_command_lib::{ channel::Channel, config::Config, - proto::command::{Request, ResponseStatus, RunState}, + proto::command::{request::RequestType, Request, ResponseStatus, RunState, Status}, ready::Ready, request::WorkerRequest, response::WorkerResponse, + scm_socket::{ScmSocket, Listeners}, state::ConfigState, }; -use crate::command_v2::sessions::{ - wants_to_write, ClientResult, ClientSession, WorkerResult, WorkerSession, +use crate::{ + command_v2::sessions::{ + wants_to_write, ClientResult, ClientSession, WorkerResult, WorkerSession, + }, + worker::fork_main_into_worker, }; +pub type WorkerId = u32; /// Gather messages and notifies when there are no more to read. #[allow(unused)] pub trait Gatherer { @@ -39,7 +44,7 @@ pub trait Gatherer { &mut self, server: &mut Server, client: &mut ClientSession, - worker_id: u32, + worker_id: WorkerId, message: WorkerResponse, ) -> bool { unimplemented!("The parent task expected no client"); @@ -49,7 +54,7 @@ pub trait Gatherer { fn on_message_no_client( &mut self, server: &mut Server, - worker_id: u32, + worker_id: WorkerId, message: WorkerResponse, ) -> bool { unimplemented!("The parent task expected a client"); @@ -70,6 +75,7 @@ pub trait GatheringTask: Debug { // TODO: add an on_failure argument to on_finish functions } +/// Contains a task and its execution timeout #[derive(Debug)] struct TaskContainer { job: Box, @@ -80,7 +86,7 @@ struct TaskContainer { pub struct DefaultGatherer { pub ok: usize, pub errors: usize, - pub responses: Vec<(u32, WorkerResponse)>, + pub responses: Vec<(WorkerId, WorkerResponse)>, pub expected_responses: usize, } @@ -94,7 +100,7 @@ impl Gatherer for DefaultGatherer { &mut self, server: &mut Server, client: &mut ClientSession, - worker_id: u32, + worker_id: WorkerId, message: WorkerResponse, ) -> bool { match message.status { @@ -109,7 +115,7 @@ impl Gatherer for DefaultGatherer { fn on_message_no_client( &mut self, server: &mut Server, - worker_id: u32, + worker_id: WorkerId, message: WorkerResponse, ) -> bool { match message.status { @@ -131,27 +137,6 @@ pub struct CommandHub { tasks: HashMap, } -/// Manages workers -/// Functions as an executer for tasks that have two steps: -/// - scatter to workers -/// - gather worker responses -/// - trigger a finishing function when all responses are gathered -#[derive(Debug)] -pub struct Server { - in_flight: HashMap, - next_client_id: u32, - next_session_token: usize, - next_task_id: usize, - poll: Poll, - queued_tasks: HashMap, - unix_listener: UnixListener, - pub workers: HashMap, - pub state: ConfigState, - frontends_count: usize, - backends_count: usize, - pub config: Config, -} - impl Deref for CommandHub { type Target = Server; @@ -166,9 +151,13 @@ impl DerefMut for CommandHub { } impl CommandHub { - pub fn new(unix_listener: UnixListener, config: Config) -> anyhow::Result { + pub fn new( + unix_listener: UnixListener, + config: Config, + executable_path: String, + ) -> anyhow::Result { Ok(Self { - server: Server::new(unix_listener, config)?, + server: Server::new(unix_listener, config, executable_path)?, clients: HashMap::new(), tasks: HashMap::new(), }) @@ -305,7 +294,7 @@ impl CommandHub { } } - fn handle_response(&mut self, worker_id: u32, response: WorkerResponse) { + fn handle_response(&mut self, worker_id: WorkerId, response: WorkerResponse) { let Some(task_id) = self.in_flight.get(&response.id).copied() else { error!("Go a response for an unknown task: {}", response); return; @@ -347,8 +336,35 @@ impl CommandHub { } } +/// Manages workers +/// Functions as an executer for tasks that have two steps: +/// - scatter to workers +/// - gather worker responses +/// - trigger a finishing function when all responses are gathered +#[derive(Debug)] +pub struct Server { + backends_count: usize, + pub config: Config, + executable_path: String, + frontends_count: usize, + in_flight: HashMap, + next_client_id: u32, // TODO: create a ClientId type + next_session_token: usize, + next_task_id: usize, // TODO: create a TaskId type + next_worker_id: WorkerId, // TODO: create a WorkerId type + poll: Poll, + queued_tasks: HashMap, + pub state: ConfigState, + unix_listener: UnixListener, + pub workers: HashMap, +} + impl Server { - fn new(mut unix_listener: UnixListener, config: Config) -> anyhow::Result { + fn new( + mut unix_listener: UnixListener, + config: Config, + executable_path: String, + ) -> anyhow::Result { let poll = mio::Poll::new().with_context(|| "Poll::new() failed")?; poll.registry() .register( @@ -359,21 +375,54 @@ impl Server { .with_context(|| "should register the channel")?; Ok(Self { + backends_count: 0, + config, + executable_path, + frontends_count: 0, in_flight: HashMap::new(), next_client_id: 0, next_session_token: 1, // 0 is reserved for the UnixListener next_task_id: 0, + next_worker_id: 0, poll, - state: Default::default(), queued_tasks: HashMap::new(), + state: ConfigState::new(), unix_listener, workers: HashMap::new(), - frontends_count: 0, - backends_count: 0, - config, }) } + pub fn launch_new_worker(&mut self) { + let worker_id = self.next_worker_id(); + let (worker_pid, main_to_worker_channel, main_to_worker_scm) = fork_main_into_worker( + &worker_id.to_string(), + &self.config, + self.executable_path.clone(), + &self.state, + Some(Listeners { + http: Vec::new(), + tls: Vec::new(), + tcp: Vec::new(), + }), + ) + .expect("could not fork main into new worker"); + + let worker_token = self.register_worker( + worker_id, + worker_pid, + main_to_worker_channel, + main_to_worker_scm, + ); + + self.workers + .get_mut(&worker_token) + .unwrap() + .send(&WorkerRequest { + id: worker_id.to_string(), + content: RequestType::Status(Status {}).into(), + }) + } + /// count backends and frontends in the cache, update gauge metrics pub fn update_counts(&mut self) { self.backends_count = self.state.count_backends(); @@ -400,6 +449,12 @@ impl Server { id } + fn next_worker_id(&mut self) -> WorkerId { + let id = self.next_worker_id; + self.next_worker_id += 1; + id + } + fn register(&mut self, token: Token, stream: &mut UnixStream) { self.poll .registry() @@ -409,14 +464,18 @@ impl Server { pub fn register_worker( &mut self, - id: u32, + worker_id: WorkerId, pid: pid_t, mut channel: Channel, - ) { + scm_socket: ScmSocket, + ) -> Token { let token = self.next_session_token(); self.register(token, &mut channel.sock); - self.workers - .insert(token, WorkerSession::new(channel, id, pid, token)); + self.workers.insert( + token, + WorkerSession::new(channel, worker_id, pid, token, scm_socket), + ); + token } /// Add a task in a queue to make it accessible until the next tick diff --git a/bin/src/command_v2/sessions.rs b/bin/src/command_v2/sessions.rs index ddc1abefb..065319e4a 100644 --- a/bin/src/command_v2/sessions.rs +++ b/bin/src/command_v2/sessions.rs @@ -9,8 +9,11 @@ use sozu_command_lib::{ ready::Ready, request::WorkerRequest, response::WorkerResponse, + scm_socket::ScmSocket, }; +use super::server::WorkerId; + /// Follows a client request from start to finish #[derive(Debug)] pub struct ClientSession { @@ -96,9 +99,10 @@ impl ClientSession { #[derive(Debug)] pub struct WorkerSession { pub channel: Channel, - pub id: u32, + pub id: WorkerId, pub pid: pid_t, pub run_state: RunState, + pub scm_socket: ScmSocket, pub token: Token, } @@ -113,9 +117,10 @@ pub enum WorkerResult { impl WorkerSession { pub fn new( mut channel: Channel, - id: u32, + id: WorkerId, pid: pid_t, token: Token, + scm_socket: ScmSocket, ) -> Self { channel.interest = Ready::READABLE | Ready::ERROR | Ready::HUP; Self { @@ -123,6 +128,7 @@ impl WorkerSession { id, pid, run_state: RunState::Running, + scm_socket, token, } } @@ -159,7 +165,7 @@ impl WorkerSession { RunState::Running | RunState::NotAnswering => RunState::NotAnswering, }; WorkerInfo { - id: self.id, + id: self.id as u32, pid: self.pid, run_state: run_state as i32, } diff --git a/bin/src/main.rs b/bin/src/main.rs index a7bb95d27..5baa32f99 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -125,17 +125,25 @@ fn start(args: &cli::Args) -> Result<(), anyhow::Error> { let executable_path = unsafe { get_executable_path().with_context(|| "Could not get executable path")? }; - let workers = - start_workers(executable_path, &config).with_context(|| "Failed at spawning workers")?; + + /* + let workers = start_workers(executable_path.clone(), &config) + .with_context(|| "Failed at spawning workers")?; if config.handle_process_affinity { set_workers_affinity(&workers); } + */ let command_socket_path = config.command_socket_path()?; - command_v2::start_server(config, command_socket_path, workers) - .with_context(|| "could not start Sozu")?; + command_v2::start_server( + config, + command_socket_path, + // workers, + executable_path, + ) + .with_context(|| "could not start Sozu")?; Ok(()) } diff --git a/lib/src/server.rs b/lib/src/server.rs index 5431d3c78..4abed266b 100644 --- a/lib/src/server.rs +++ b/lib/src/server.rs @@ -22,8 +22,8 @@ use sozu_command::{ request::RequestType, response_content::ContentType, ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes, ClusterInformations, DeactivateListener, Event, HttpListenerConfig, HttpsListenerConfig, ListenerType, - LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend, ResponseStatus, - TcpListenerConfig as CommandTcpListener, + LoadBalancingAlgorithms, LoadMetric, MetricsConfiguration, RemoveBackend, Request, + ResponseStatus, TcpListenerConfig as CommandTcpListener, }, ready::Ready, request::WorkerRequest, @@ -457,10 +457,19 @@ impl Server { let msg = server.channel.read_message(); debug!("got message: {:?}", msg); - if let Ok(msg) = msg { - if let Err(e) = server.channel.write_message(&WorkerResponse::ok(msg.id)) { + if let Ok(WorkerRequest { + id, + content: + Request { + request_type: Some(RequestType::Status(_)), + }, + }) = msg + { + if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) { error!("Could not send an ok to the main process: {}", e); } + } else { + panic!("plz give me a status request first when I start, you sent me this instead: {:?}", msg); } server.unblock_channel(); }