Skip to content

Commit 277aebb

Browse files
DanielePalaiaDanielePalaia
andauthored
expose store_offset and query_offset in consumer (#203)
* expose store_offset * replacing assert with better return value * adding an OffsetNotFound unit test * adding a non-fake offset store/query offset * implemented code review suggestion --------- Co-authored-by: DanielePalaia <[email protected]>
1 parent 1a44633 commit 277aebb

File tree

4 files changed

+178
-1
lines changed

4 files changed

+178
-1
lines changed

src/consumer.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ use std::{
1414
use rabbitmq_stream_protocol::{
1515
commands::subscribe::OffsetSpecification, message::Message, ResponseKind,
1616
};
17+
1718
use tokio::sync::mpsc::{channel, Receiver, Sender};
1819
use tracing::trace;
1920

21+
use crate::error::ConsumerStoreOffsetError;
22+
2023
use crate::{
2124
client::{MessageHandler, MessageResult},
2225
error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError},
@@ -28,6 +31,8 @@ use rand::{seq::SliceRandom, SeedableRng};
2831

2932
/// API for consuming RabbitMQ stream messages
3033
pub struct Consumer {
34+
// Mandatory in case of manual offset tracking
35+
name: Option<String>,
3136
receiver: Receiver<Result<Delivery, ConsumerDeliveryError>>,
3237
internal: Arc<ConsumerInternal>,
3338
}
@@ -50,6 +55,7 @@ impl ConsumerInternal {
5055

5156
/// Builder for [`Consumer`]
5257
pub struct ConsumerBuilder {
58+
pub(crate) consumer_name: Option<String>,
5359
pub(crate) environment: Environment,
5460
pub(crate) offset_specification: OffsetSpecification,
5561
}
@@ -110,6 +116,7 @@ impl ConsumerBuilder {
110116

111117
if response.is_ok() {
112118
Ok(Consumer {
119+
name: self.consumer_name,
113120
receiver: rx,
114121
internal: consumer,
115122
})
@@ -125,6 +132,11 @@ impl ConsumerBuilder {
125132
self.offset_specification = offset_specification;
126133
self
127134
}
135+
136+
pub fn name(mut self, consumer_name: &str) -> Self {
137+
self.consumer_name = Some(String::from(consumer_name));
138+
self
139+
}
128140
}
129141

130142
impl Consumer {
@@ -137,6 +149,30 @@ impl Consumer {
137149
pub fn is_closed(&self) -> bool {
138150
self.internal.is_closed()
139151
}
152+
153+
pub async fn store_offset(&self, offset: u64) -> Result<(), ConsumerStoreOffsetError> {
154+
if let Some(name) = &self.name {
155+
self.internal
156+
.client
157+
.store_offset(name.as_str(), self.internal.stream.as_str(), offset)
158+
.await
159+
.map(Ok)?
160+
} else {
161+
Err(ConsumerStoreOffsetError::NameMissing)
162+
}
163+
}
164+
165+
pub async fn query_offset(&self) -> Result<u64, ConsumerStoreOffsetError> {
166+
if let Some(name) = &self.name {
167+
self.internal
168+
.client
169+
.query_offset(name.clone(), self.internal.stream.as_str())
170+
.await
171+
.map(Ok)?
172+
} else {
173+
Err(ConsumerStoreOffsetError::NameMissing)
174+
}
175+
}
140176
}
141177

142178
impl Stream for Consumer {

src/environment.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ impl Environment {
5151
/// Returns a builder for creating a consumer
5252
pub fn consumer(&self) -> ConsumerBuilder {
5353
ConsumerBuilder {
54+
consumer_name: None,
5455
environment: self.clone(),
5556
offset_specification: OffsetSpecification::Next,
5657
}

src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ pub enum ClientError {
2323
RequestError(ResponseCode),
2424
}
2525

26+
#[derive(Error, Debug)]
27+
pub enum ConsumerStoreOffsetError {
28+
#[error("Failed to store offset, missing consumer name")]
29+
NameMissing,
30+
#[error(transparent)]
31+
Client(#[from] ClientError),
32+
}
33+
2634
#[derive(Error, Debug)]
2735
pub enum ProtocolError {
2836
#[error("Encode Error {0:?}")]

tests/integration/consumer_test.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@ use crate::common::TestEnvironment;
44
use fake::{Fake, Faker};
55
use futures::StreamExt;
66
use rabbitmq_stream_client::{
7-
error::{ConsumerCloseError, ConsumerDeliveryError, ProducerCloseError},
7+
error::{
8+
ClientError, ConsumerCloseError, ConsumerDeliveryError, ConsumerStoreOffsetError,
9+
ProducerCloseError,
10+
},
811
types::{Delivery, Message, OffsetSpecification},
912
Consumer, NoDedup, Producer,
1013
};
1114

15+
use rabbitmq_stream_protocol::ResponseCode;
16+
1217
#[tokio::test(flavor = "multi_thread")]
1318
async fn consumer_test() {
1419
let env = TestEnvironment::create().await;
@@ -164,3 +169,130 @@ async fn consumer_create_stream_not_existing_error() {
164169
_ => panic!("Should be StreamNotFound error"),
165170
}
166171
}
172+
173+
#[tokio::test(flavor = "multi_thread")]
174+
async fn consumer_store_and_query_offset_test() {
175+
let env = TestEnvironment::create().await;
176+
let consumer = env
177+
.env
178+
.consumer()
179+
.name("test-name")
180+
.offset(OffsetSpecification::Next)
181+
.build(&env.stream)
182+
.await
183+
.unwrap();
184+
185+
let offset: u64 = Faker.fake();
186+
187+
consumer.store_offset(offset).await.unwrap();
188+
189+
let response = consumer.query_offset().await.unwrap();
190+
191+
assert_eq!(offset, response);
192+
}
193+
194+
#[tokio::test(flavor = "multi_thread")]
195+
async fn consumer_query_missing_offset_test() {
196+
let env = TestEnvironment::create().await;
197+
let consumer = env
198+
.env
199+
.consumer()
200+
.name("test-name-new")
201+
.offset(OffsetSpecification::Next)
202+
.build(&env.stream)
203+
.await
204+
.unwrap();
205+
206+
assert!(matches!(
207+
consumer.query_offset().await,
208+
Err(ConsumerStoreOffsetError::Client(ClientError::RequestError(
209+
ResponseCode::OffsetNotFound
210+
))),
211+
))
212+
}
213+
214+
#[tokio::test(flavor = "multi_thread")]
215+
async fn consumer_store_and_query_offset_missing_name_test() {
216+
let env = TestEnvironment::create().await;
217+
let consumer = env
218+
.env
219+
.consumer()
220+
.offset(OffsetSpecification::Next)
221+
.build(&env.stream)
222+
.await
223+
.unwrap();
224+
225+
let offset: u64 = Faker.fake();
226+
227+
assert!(matches!(
228+
consumer.store_offset(offset).await,
229+
Err(ConsumerStoreOffsetError::NameMissing),
230+
));
231+
232+
assert!(matches!(
233+
consumer.query_offset().await,
234+
Err(ConsumerStoreOffsetError::NameMissing),
235+
));
236+
}
237+
238+
#[tokio::test(flavor = "multi_thread")]
239+
async fn consumer_test_with_store_offset() {
240+
let env = TestEnvironment::create().await;
241+
let reference: String = Faker.fake();
242+
let offset_to_store = 4;
243+
244+
let message_count = 10;
245+
let mut producer = env
246+
.env
247+
.producer()
248+
.name(&reference)
249+
.build(&env.stream)
250+
.await
251+
.unwrap();
252+
253+
let mut consumer_store = env
254+
.env
255+
.consumer()
256+
.offset(OffsetSpecification::Next)
257+
.name("consumer-1")
258+
.build(&env.stream)
259+
.await
260+
.unwrap();
261+
262+
for n in 0..message_count {
263+
let _ = producer
264+
.send_with_confirm(Message::builder().body(format!("message{}", n)).build())
265+
.await
266+
.unwrap();
267+
}
268+
269+
for i in 0..message_count {
270+
let delivery = consumer_store.next().await.unwrap();
271+
272+
// Store an offset
273+
if i == offset_to_store {
274+
//Store the 5th element produced
275+
let result = consumer_store
276+
.store_offset(delivery.unwrap().offset())
277+
.await;
278+
}
279+
}
280+
281+
consumer_store.handle().close().await.unwrap();
282+
283+
let mut consumer_query = env
284+
.env
285+
.consumer()
286+
.offset(OffsetSpecification::First)
287+
.name("consumer-1")
288+
.build(&env.stream)
289+
.await
290+
.unwrap();
291+
292+
let offset = consumer_query.query_offset().await.unwrap();
293+
294+
assert!(offset == offset_to_store);
295+
296+
consumer_query.handle().close().await.unwrap();
297+
producer.close().await.unwrap();
298+
}

0 commit comments

Comments
 (0)