Skip to content

Commit

Permalink
ref(system): Only return metrics for alive services (#4526)
Browse files Browse the repository at this point in the history
Only return metrics for services which have not terminated.

Also cleans up the `start_in` method a bit, the variable naming was
pretty bad and the instance id allocation/tracking can be moved to the
group, making the `start_in` much easier to understand.
  • Loading branch information
Dav1dde authored Feb 26, 2025
1 parent ca911ea commit 7eea540
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions relay-system/src/service/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,38 +104,24 @@ impl Inner {
let task_id = TaskId::from(&service);
let group = self.services.entry(task_id).or_default();

// Cleanup group, evicting all terminated services, while we're at it.
group.instances.retain(|s| !s.handle.is_finished());

let id = ServiceId {
task: task_id,
instance_id: group.next_instance_id,
};
group.next_instance_id += 1;

// Services are allowed to process as much work as possible before yielding to other,
// lower priority tasks. We want to prioritize service backlogs over creating more work
// for these services.
let future = tokio::task::unconstrained(service.future);
let future = ServiceMonitor::wrap(future);
let metrics = Arc::clone(future.metrics());

let jh = crate::runtime::spawn_in(handle, task_id, future);
let (sjh, sjhe) = crate::service::status::split(jh);
let task_handle = crate::runtime::spawn_in(handle, task_id, future);
let (status_handle, handle) = crate::service::status::split(task_handle);

let service = ServiceInstance {
instance_id: id.instance_id,
metrics,
handle: sjh,
};
group.instances.push(service);
group.add(metrics, status_handle);

sjhe
handle
}

fn metrics(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
self.services.iter().flat_map(|(task_id, group)| {
group.instances.iter().map(|service| {
group.iter().map(|service| {
let id = ServiceId {
task: *task_id,
instance_id: service.instance_id,
Expand Down Expand Up @@ -176,6 +162,30 @@ struct ServiceGroup {
instances: Vec<ServiceInstance>,
}

impl ServiceGroup {
/// Adds a started service to the service group.
pub fn add(&mut self, metrics: Arc<RawMetrics>, handle: ServiceStatusJoinHandle) {
// Cleanup the group, evicting all finished services, while we're at it.
self.instances.retain(|s| !s.handle.is_finished());

let instance_id = self.next_instance_id;
self.next_instance_id += 1;

let service = ServiceInstance {
instance_id,
metrics,
handle,
};

self.instances.push(service);
}

/// Returns an iterator over all currently alive services.
pub fn iter(&self) -> impl Iterator<Item = &ServiceInstance> {
self.instances.iter().filter(|s| !s.handle.is_finished())
}
}

/// Collection of metadata the registry tracks per service instance.
#[derive(Debug)]
struct ServiceInstance {
Expand Down

0 comments on commit 7eea540

Please sign in to comment.