Skip to content

Commit 1de351a

Browse files
authored
Fix limiter connections (#46)
* fix(proxy): fixed update active connections * chore(examples): incresed max connection to tier 1
1 parent 584b9cc commit 1de351a

File tree

3 files changed

+22
-12
lines changed

3 files changed

+22
-12
lines changed

examples/manifest.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ data:
211211
212212
[[tiers]]
213213
name = "1"
214-
max_connections = 1
214+
max_connections = 2
215215
[[tiers.rates]]
216216
interval = "1s"
217217
limit = 1024

proxy/src/main.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -106,21 +106,30 @@ pub struct Consumer {
106106
active_connections: usize,
107107
}
108108
impl Consumer {
109-
pub async fn inc_connections(&mut self, state: Arc<State>) {
110-
self.active_connections += 1;
109+
pub async fn inc_connections(&self, state: Arc<State>) {
111110
state
112111
.consumers
113112
.write()
114113
.await
115-
.insert(self.key.clone(), self.clone());
114+
.entry(self.key.clone())
115+
.and_modify(|consumer| consumer.active_connections += 1);
116116
}
117117
pub async fn dec_connections(&mut self, state: Arc<State>) {
118-
self.active_connections -= 1;
119118
state
120119
.consumers
121120
.write()
122121
.await
123-
.insert(self.key.clone(), self.clone());
122+
.entry(self.key.clone())
123+
.and_modify(|consumer| consumer.active_connections -= 1);
124+
}
125+
pub async fn get_active_connections(&self, state: Arc<State>) -> usize {
126+
state
127+
.consumers
128+
.read()
129+
.await
130+
.get(&self.key)
131+
.map(|consumer| consumer.active_connections)
132+
.unwrap_or_default()
124133
}
125134
}
126135
impl Display for Consumer {

proxy/src/proxy.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -94,18 +94,19 @@ impl ProxyApp {
9494

9595
match event {
9696
DuplexEvent::ClientRead(0) | DuplexEvent::InstanceRead(0) => {
97-
info!(
98-
consumer = ctx.consumer.to_string(),
99-
active_connections = ctx.consumer.active_connections,
100-
"client disconnected"
101-
);
102-
10397
ctx.consumer.dec_connections(self.state.clone()).await;
10498
state.metrics.dec_total_connections(
10599
&ctx.consumer,
106100
&ctx.namespace,
107101
&ctx.instance,
108102
);
103+
104+
let active_connections =
105+
ctx.consumer.get_active_connections(state.clone()).await;
106+
info!(
107+
consumer = ctx.consumer.to_string(),
108+
active_connections, "client disconnected"
109+
);
109110
return Ok(());
110111
}
111112
DuplexEvent::ClientRead(bytes) => {

0 commit comments

Comments
 (0)