Skip to content

Commit 969a961

Browse files
authored
Init rate limiter and meter on ingester init (#4195)
1 parent 5afd086 commit 969a961

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

quickwit/quickwit-ingest/src/ingest_v2/ingester.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,13 @@ impl Ingester {
207207
.unwrap_or(Position::Eof);
208208
let solo_shard =
209209
IngesterShard::new_solo(ShardState::Closed, Position::Eof, truncation_position);
210-
state_guard.shards.insert(queue_id, solo_shard);
210+
state_guard.shards.insert(queue_id.clone(), solo_shard);
211+
212+
let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings);
213+
let rate_meter = RateMeter::default();
214+
state_guard
215+
.rate_trackers
216+
.insert(queue_id, (rate_limiter, rate_meter));
211217
}
212218
Ok(())
213219
}
@@ -1009,6 +1015,8 @@ mod tests {
10091015
solo_shard_01.assert_replication_position(Position::Eof);
10101016
solo_shard_01.assert_truncation_position(Position::Eof);
10111017

1018+
assert!(state_guard.rate_trackers.contains_key(&queue_id_01));
1019+
10121020
state_guard
10131021
.mrecordlog
10141022
.assert_records_eq(&queue_id_01, .., &[(1, "\0\x02")]);
@@ -1019,6 +1027,8 @@ mod tests {
10191027
solo_shard_02.assert_replication_position(Position::Eof);
10201028
solo_shard_02.assert_truncation_position(0u64);
10211029

1030+
assert!(state_guard.rate_trackers.contains_key(&queue_id_02));
1031+
10221032
state_guard.mrecordlog.assert_records_eq(
10231033
&queue_id_02,
10241034
..,
@@ -1031,6 +1041,8 @@ mod tests {
10311041
solo_shard_03.assert_replication_position(Position::Eof);
10321042
solo_shard_03.assert_truncation_position(Position::Eof);
10331043

1044+
assert!(state_guard.rate_trackers.contains_key(&queue_id_02));
1045+
10341046
state_guard
10351047
.mrecordlog
10361048
.assert_records_eq(&queue_id_03, .., &[(0, "\0\x02")]);

0 commit comments

Comments
 (0)