Skip to content

Commit

Permalink
draft: launch workers from CommandHub::server
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Jan 9, 2024
1 parent 6ea31b2 commit 7c5ddb7
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 55 deletions.
26 changes: 23 additions & 3 deletions bin/src/command_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ mod sessions;
pub fn start_server(
config: Config,
command_socket_path: String,
workers: Vec<Worker>,
// workers: Vec<Worker>,
executable_path: String,
) -> anyhow::Result<()> {
let path = PathBuf::from(&command_socket_path);

Expand All @@ -38,12 +39,15 @@ pub fn start_server(
Err(e) => {
error!("could not create unix socket: {:?}", e);
// the workers did not even get the configuration, we can kill them right away

/*
for worker in workers {
error!("killing worker n°{} (PID {})", worker.id, worker.pid);
let _ = kill(Pid::from_raw(worker.pid), Signal::SIGKILL).map_err(|e| {
error!("could not kill worker: {:?}", e);
});
}
*/
bail!("couldn't start server");
}
};
Expand All @@ -53,23 +57,39 @@ pub fn start_server(
let _ = fs::remove_file(&path).map_err(|e2| {
error!("could not remove the unix socket: {:?}", e2);
});

/*
// the workers did not even get the configuration, we can kill them right away
for worker in workers {
error!("killing worker n°{} (PID {})", worker.id, worker.pid);
let _ = kill(Pid::from_raw(worker.pid), Signal::SIGKILL).map_err(|e| {
error!("could not kill worker: {:?}", e);
});
}
*/
bail!("couldn't start server");
}

// Create a copy of the state path to load state later
let saved_state_path = config.saved_state.clone();
let worker_count = config.worker_count;

let mut command_hub = CommandHub::new(unix_listener, config, executable_path)?;

for _ in 0..worker_count {
command_hub.launch_new_worker();
}

let mut command_hub = CommandHub::new(unix_listener, config)?;
/*
for mut worker in workers {
command_hub.register_worker(worker.id, worker.pid, worker.worker_channel.take().unwrap())
command_hub.register_worker(
worker.id,
worker.pid,
worker.worker_channel.take().unwrap(),
worker.scm_socket,
)
}
*/

load_static_config(&mut command_hub.server);

Expand Down
141 changes: 100 additions & 41 deletions bin/src/command_v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ use nix::{
use sozu_command_lib::{
channel::Channel,
config::Config,
proto::command::{Request, ResponseStatus, RunState},
proto::command::{request::RequestType, Request, ResponseStatus, RunState, Status},
ready::Ready,
request::WorkerRequest,
response::WorkerResponse,
scm_socket::{ScmSocket, Listeners},
state::ConfigState,
};

use crate::command_v2::sessions::{
wants_to_write, ClientResult, ClientSession, WorkerResult, WorkerSession,
use crate::{
command_v2::sessions::{
wants_to_write, ClientResult, ClientSession, WorkerResult, WorkerSession,
},
worker::fork_main_into_worker,
};

pub type WorkerId = u32;
/// Gather messages and notifies when there are no more to read.
#[allow(unused)]
pub trait Gatherer {
Expand All @@ -39,7 +44,7 @@ pub trait Gatherer {
&mut self,
server: &mut Server,
client: &mut ClientSession,
worker_id: u32,
worker_id: WorkerId,
message: WorkerResponse,
) -> bool {
unimplemented!("The parent task expected no client");
Expand All @@ -49,7 +54,7 @@ pub trait Gatherer {
fn on_message_no_client(
&mut self,
server: &mut Server,
worker_id: u32,
worker_id: WorkerId,
message: WorkerResponse,
) -> bool {
unimplemented!("The parent task expected a client");
Expand All @@ -70,6 +75,7 @@ pub trait GatheringTask: Debug {
// TODO: add an on_failure argument to on_finish functions
}

/// Contains a task and its execution timeout
#[derive(Debug)]
struct TaskContainer {
job: Box<dyn GatheringTask>,
Expand All @@ -80,7 +86,7 @@ struct TaskContainer {
pub struct DefaultGatherer {
pub ok: usize,
pub errors: usize,
pub responses: Vec<(u32, WorkerResponse)>,
pub responses: Vec<(WorkerId, WorkerResponse)>,
pub expected_responses: usize,
}

Expand All @@ -94,7 +100,7 @@ impl Gatherer for DefaultGatherer {
&mut self,
server: &mut Server,
client: &mut ClientSession,
worker_id: u32,
worker_id: WorkerId,
message: WorkerResponse,
) -> bool {
match message.status {
Expand All @@ -109,7 +115,7 @@ impl Gatherer for DefaultGatherer {
fn on_message_no_client(
&mut self,
server: &mut Server,
worker_id: u32,
worker_id: WorkerId,
message: WorkerResponse,
) -> bool {
match message.status {
Expand All @@ -131,27 +137,6 @@ pub struct CommandHub {
tasks: HashMap<usize, TaskContainer>,
}

/// Manages workers
/// Functions as an executer for tasks that have two steps:
/// - scatter to workers
/// - gather worker responses
/// - trigger a finishing function when all responses are gathered
#[derive(Debug)]
pub struct Server {
in_flight: HashMap<String, usize>,
next_client_id: u32,
next_session_token: usize,
next_task_id: usize,
poll: Poll,
queued_tasks: HashMap<usize, TaskContainer>,
unix_listener: UnixListener,
pub workers: HashMap<Token, WorkerSession>,
pub state: ConfigState,
frontends_count: usize,
backends_count: usize,
pub config: Config,
}

impl Deref for CommandHub {
type Target = Server;

Expand All @@ -166,9 +151,13 @@ impl DerefMut for CommandHub {
}

impl CommandHub {
pub fn new(unix_listener: UnixListener, config: Config) -> anyhow::Result<Self> {
pub fn new(
unix_listener: UnixListener,
config: Config,
executable_path: String,
) -> anyhow::Result<Self> {
Ok(Self {
server: Server::new(unix_listener, config)?,
server: Server::new(unix_listener, config, executable_path)?,
clients: HashMap::new(),
tasks: HashMap::new(),
})
Expand Down Expand Up @@ -305,7 +294,7 @@ impl CommandHub {
}
}

fn handle_response(&mut self, worker_id: u32, response: WorkerResponse) {
fn handle_response(&mut self, worker_id: WorkerId, response: WorkerResponse) {
let Some(task_id) = self.in_flight.get(&response.id).copied() else {
error!("Go a response for an unknown task: {}", response);
return;
Expand Down Expand Up @@ -347,8 +336,35 @@ impl CommandHub {
}
}

/// Manages workers
/// Functions as an executer for tasks that have two steps:
/// - scatter to workers
/// - gather worker responses
/// - trigger a finishing function when all responses are gathered
#[derive(Debug)]
pub struct Server {
backends_count: usize,
pub config: Config,
executable_path: String,
frontends_count: usize,
in_flight: HashMap<String, usize>,
next_client_id: u32, // TODO: create a ClientId type
next_session_token: usize,
next_task_id: usize, // TODO: create a TaskId type
next_worker_id: WorkerId, // TODO: create a WorkerId type
poll: Poll,
queued_tasks: HashMap<usize, TaskContainer>,
pub state: ConfigState,
unix_listener: UnixListener,
pub workers: HashMap<Token, WorkerSession>,
}

impl Server {
fn new(mut unix_listener: UnixListener, config: Config) -> anyhow::Result<Self> {
fn new(
mut unix_listener: UnixListener,
config: Config,
executable_path: String,
) -> anyhow::Result<Self> {
let poll = mio::Poll::new().with_context(|| "Poll::new() failed")?;
poll.registry()
.register(
Expand All @@ -359,21 +375,54 @@ impl Server {
.with_context(|| "should register the channel")?;

Ok(Self {
backends_count: 0,
config,
executable_path,
frontends_count: 0,
in_flight: HashMap::new(),
next_client_id: 0,
next_session_token: 1, // 0 is reserved for the UnixListener
next_task_id: 0,
next_worker_id: 0,
poll,
state: Default::default(),
queued_tasks: HashMap::new(),
state: ConfigState::new(),
unix_listener,
workers: HashMap::new(),
frontends_count: 0,
backends_count: 0,
config,
})
}

pub fn launch_new_worker(&mut self) {
let worker_id = self.next_worker_id();
let (worker_pid, main_to_worker_channel, main_to_worker_scm) = fork_main_into_worker(
&worker_id.to_string(),
&self.config,
self.executable_path.clone(),
&self.state,
Some(Listeners {
http: Vec::new(),
tls: Vec::new(),
tcp: Vec::new(),
}),
)
.expect("could not fork main into new worker");

let worker_token = self.register_worker(
worker_id,
worker_pid,
main_to_worker_channel,
main_to_worker_scm,
);

self.workers
.get_mut(&worker_token)
.unwrap()
.send(&WorkerRequest {
id: worker_id.to_string(),
content: RequestType::Status(Status {}).into(),
})
}

/// count backends and frontends in the cache, update gauge metrics
pub fn update_counts(&mut self) {
self.backends_count = self.state.count_backends();
Expand All @@ -400,6 +449,12 @@ impl Server {
id
}

fn next_worker_id(&mut self) -> WorkerId {
let id = self.next_worker_id;
self.next_worker_id += 1;
id
}

fn register(&mut self, token: Token, stream: &mut UnixStream) {
self.poll
.registry()
Expand All @@ -409,14 +464,18 @@ impl Server {

pub fn register_worker(
&mut self,
id: u32,
worker_id: WorkerId,
pid: pid_t,
mut channel: Channel<WorkerRequest, WorkerResponse>,
) {
scm_socket: ScmSocket,
) -> Token {
let token = self.next_session_token();
self.register(token, &mut channel.sock);
self.workers
.insert(token, WorkerSession::new(channel, id, pid, token));
self.workers.insert(
token,
WorkerSession::new(channel, worker_id, pid, token, scm_socket),
);
token
}

/// Add a task in a queue to make it accessible until the next tick
Expand Down
12 changes: 9 additions & 3 deletions bin/src/command_v2/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use sozu_command_lib::{
ready::Ready,
request::WorkerRequest,
response::WorkerResponse,
scm_socket::ScmSocket,
};

use super::server::WorkerId;

/// Follows a client request from start to finish
#[derive(Debug)]
pub struct ClientSession {
Expand Down Expand Up @@ -96,9 +99,10 @@ impl ClientSession {
#[derive(Debug)]
pub struct WorkerSession {
pub channel: Channel<WorkerRequest, WorkerResponse>,
pub id: u32,
pub id: WorkerId,
pub pid: pid_t,
pub run_state: RunState,
pub scm_socket: ScmSocket,
pub token: Token,
}

Expand All @@ -113,16 +117,18 @@ pub enum WorkerResult {
impl WorkerSession {
pub fn new(
mut channel: Channel<WorkerRequest, WorkerResponse>,
id: u32,
id: WorkerId,
pid: pid_t,
token: Token,
scm_socket: ScmSocket,
) -> Self {
channel.interest = Ready::READABLE | Ready::ERROR | Ready::HUP;
Self {
channel,
id,
pid,
run_state: RunState::Running,
scm_socket,
token,
}
}
Expand Down Expand Up @@ -159,7 +165,7 @@ impl WorkerSession {
RunState::Running | RunState::NotAnswering => RunState::NotAnswering,
};
WorkerInfo {
id: self.id,
id: self.id as u32,
pid: self.pid,
run_state: run_state as i32,
}
Expand Down
Loading

0 comments on commit 7c5ddb7

Please sign in to comment.