Skip to content

Commit f0812e0

Browse files
authored
Updated tier watcher (#24)
1 parent c9e2d99 commit f0812e0

File tree

3 files changed

+32
-38
lines changed

3 files changed

+32
-38
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ cert
55
docker-compose.yml
66
.env*
77
env-crd.yaml
8-
tiers.toml
8+
tiers*

proxy/src/config.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
use std::{env, path::PathBuf};
1+
use std::{env, path::PathBuf, time::Duration};
22

33
#[derive(Debug, Clone)]
44
pub struct Config {
55
pub proxy_addr: String,
66
pub proxy_namespace: String,
77
pub proxy_tiers_path: PathBuf,
8+
pub proxy_tiers_poll_interval: Duration,
89
pub prometheus_addr: String,
910
pub ssl_crt_path: String,
1011
pub ssl_key_path: String,
@@ -17,8 +18,16 @@ impl Config {
1718
proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"),
1819
proxy_namespace: env::var("PROXY_NAMESPACE").expect("PROXY_NAMESPACE must be set"),
1920
proxy_tiers_path: env::var("PROXY_TIERS_PATH")
20-
.map(|v| v.into())
21-
.expect("PROXY_TIERS_PATH must be set"),
21+
.map(|v| v.into())
22+
.expect("PROXY_TIERS_PATH must be set"),
23+
proxy_tiers_poll_interval: env::var("PROXY_TIERS_POLL_INTERVAL")
24+
.map(|v| {
25+
Duration::from_secs(
26+
v.parse::<u64>()
27+
.expect("PROXY_TIERS_POLL_INTERVAL must be a number in seconds. eg: 2"),
28+
)
29+
})
30+
.unwrap_or(Duration::from_secs(2)),
2231
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
2332
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
2433
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),

proxy/src/tiers.rs

+19-34
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
use std::error::Error;
2-
use std::{fs, sync::Arc};
3-
41
use async_trait::async_trait;
5-
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
2+
use notify::{PollWatcher, RecursiveMode, Watcher};
63
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
74
use serde_json::Value;
8-
use tokio::runtime::{Handle, Runtime};
5+
use std::error::Error;
6+
use std::{fs, sync::Arc};
97
use tracing::{error, info, warn};
108

119
use crate::{config::Config, State, Tier};
@@ -42,16 +40,6 @@ impl TierBackgroundService {
4240
}
4341
}
4442

45-
fn runtime_handle() -> Handle {
46-
match Handle::try_current() {
47-
Ok(h) => h,
48-
Err(_) => {
49-
let rt = Runtime::new().unwrap();
50-
rt.handle().clone()
51-
}
52-
}
53-
}
54-
5543
#[async_trait]
5644
impl BackgroundService for TierBackgroundService {
5745
async fn start(&self, mut _shutdown: ShutdownWatch) {
@@ -60,19 +48,13 @@ impl BackgroundService for TierBackgroundService {
6048
return;
6149
}
6250

63-
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1);
51+
let (tx, rx) = std::sync::mpsc::channel();
6452

65-
let watcher_result = RecommendedWatcher::new(
66-
move |result: Result<Event, notify::Error>| {
67-
let event = result.unwrap();
68-
if event.kind.is_modify() {
69-
runtime_handle().block_on(async {
70-
tx.send(event).await.unwrap();
71-
});
72-
}
73-
},
74-
notify::Config::default(),
75-
);
53+
let watcher_config = notify::Config::default()
54+
.with_compare_contents(true)
55+
.with_poll_interval(self.config.proxy_tiers_poll_interval);
56+
57+
let watcher_result = PollWatcher::new(tx, watcher_config);
7658
if let Err(err) = watcher_result {
7759
error!(error = err.to_string(), "error to watcher tier");
7860
return;
@@ -85,14 +67,17 @@ impl BackgroundService for TierBackgroundService {
8567
return;
8668
}
8769

88-
loop {
89-
if rx.recv().await.is_some() {
90-
if let Err(err) = self.update_tiers().await {
91-
error!(error = err.to_string(), "error to update tiers");
92-
continue;
93-
}
70+
for result in rx {
71+
match result {
72+
Ok(_event) => {
73+
if let Err(err) = self.update_tiers().await {
74+
error!(error = err.to_string(), "error to update tiers");
75+
continue;
76+
}
9477

95-
info!("tiers modified");
78+
info!("tiers modified");
79+
}
80+
Err(err) => error!(error = err.to_string(), "watch error"),
9681
}
9782
}
9883
}

0 commit comments

Comments
 (0)