Skip to content

Commit

Permalink
Add RabbitMQ support
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-gabriel committed Apr 18, 2023
1 parent 80e0cd3 commit f6c583c
Show file tree
Hide file tree
Showing 14 changed files with 906 additions and 38 deletions.
567 changes: 543 additions & 24 deletions server/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions server/rabbit/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management, rabbitmq_delayed_message_exchange].
Binary file not shown.
7 changes: 6 additions & 1 deletion server/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ SVIX_CACHE_TYPE="none" \
SVIX_REDIS_DSN="redis://localhost:6379" \
${TEST_COMMAND}


echo "*********** RUN 6 ***********"
SVIX_QUEUE_TYPE="rabbitmq" \
SVIX_CACHE_TYPE="redis" \
SVIX_REDIS_DSN="redis://localhost:6379" \
SVIX_RABBIT_DSN="amqp://xivs:xivs@localhost:5672/%2f" \
${TEST_COMMAND}
1 change: 1 addition & 0 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ urlencoding = "2.1.2"
strum_macros = "0.24"
strum = { version = "0.24", features = ["derive"] }
form_urlencoded = "1.1.0"
lapin = "2.1.1"

[dev-dependencies]
anyhow = "1.0.56"
5 changes: 3 additions & 2 deletions server/svix-server/development.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Example .env file for development
DATABASE_URL=postgresql://postgres:postgres@localhost:8079/postgres # For sqlx
DATABASE_URL="postgresql://postgres:postgres@localhost:8079/postgres" # For sqlx
SVIX_CACHE_TYPE=memory
SVIX_JWT_SECRET=x
SVIX_LOG_LEVEL=trace
SVIX_QUEUE_TYPE=redis
SVIX_REDIS_DSN=redis://localhost:8078
SVIX_RABBIT_DSN="amqp://xivs:[email protected]:5672/%2f"
SVIX_REDIS_DSN="redis://localhost:8078"
17 changes: 17 additions & 0 deletions server/svix-server/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ pub struct ConfigurationInner {
/// Maximum number of concurrent worker tasks to spawn (0 is unlimited)
pub worker_max_tasks: u16,

/// The address of the rabbitmq exchange
pub rabbit_dsn: Option<Arc<String>>,

#[serde(flatten)]
pub internal: InternalConfig,
}
Expand Down Expand Up @@ -216,6 +219,17 @@ fn validate_config_complete(
});
}
}
QueueType::RabbitMQ => {
if config.rabbit_dsn.is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"The rabbit_dsn field must be set if the queue_type is `rabbitmq`",
)),
params: HashMap::new(),
});
}
}
}

Ok(())
Expand All @@ -239,6 +253,7 @@ impl ConfigurationInner {
QueueType::Memory => QueueBackend::Memory,
QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)),
QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)),
QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)),
}
}

Expand Down Expand Up @@ -280,6 +295,7 @@ pub enum QueueBackend<'a> {
Memory,
Redis(&'a str),
RedisCluster(&'a str),
RabbitMq(&'a str),
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -311,6 +327,7 @@ pub enum QueueType {
Memory,
Redis,
RedisCluster,
RabbitMQ,
}

#[derive(Clone, Debug, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ impl<T> Traceable<T> for std::result::Result<T, TransactionError<Error>> {
}
}

impl<T> Traceable<T> for std::result::Result<T, lapin::Error> {
fn trace(self, location: &'static str) -> Result<T> {
self.map_err(|e| Error::queue(format!("{e:?}"), location))
}
}

#[derive(Debug, Clone)]
pub enum ErrorType {
/// A generic error
Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/src/queue/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn new_pair() -> (TaskQueueProducer, TaskQueueConsumer) {
)
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MemoryQueueProducer {
tx: mpsc::UnboundedSender<TaskQueueDelivery>,
}
Expand Down
35 changes: 35 additions & 0 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration};

use axum::async_trait;
use chrono::{DateTime, Utc};
use lapin::options::{BasicAckOptions, BasicNackOptions};
use serde::{Deserialize, Serialize};
use strum::Display;
use svix_ksuid::*;
Expand All @@ -22,6 +23,7 @@ use self::{
};

pub mod memory;
pub mod rabbitmq;
pub mod redis;

const RETRY_SCHEDULE: &[Duration] = &[
Expand All @@ -48,6 +50,12 @@ pub async fn new_pair(
redis::new_pair(pool, prefix).await
}
QueueBackend::Memory => memory::new_pair().await,
QueueBackend::RabbitMq(dsn) => {
let prefix = prefix.unwrap_or("");
rabbitmq::new_pair(dsn, format!("{prefix}-message-queue"))
.await
.expect("can't connect to rabbit")
}
}
}

Expand Down Expand Up @@ -113,6 +121,7 @@ pub enum QueueTask {
pub enum TaskQueueProducer {
Memory(MemoryQueueProducer),
Redis(RedisQueueProducer),
RabbitMq(rabbitmq::Producer),
}

impl TaskQueueProducer {
Expand All @@ -123,6 +132,7 @@ impl TaskQueueProducer {
match self {
TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await,
}
},
should_retry,
Expand All @@ -135,23 +145,28 @@ impl TaskQueueProducer {
pub enum TaskQueueConsumer {
Redis(RedisQueueConsumer),
Memory(MemoryQueueConsumer),
RabbitMq(rabbitmq::Consumer),
}

impl TaskQueueConsumer {
pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
match self {
TaskQueueConsumer::Redis(q) => ctx!(q.receive_all().await),
TaskQueueConsumer::Memory(q) => ctx!(q.receive_all().await),
TaskQueueConsumer::RabbitMq(q) => ctx!(q.receive_all().await),
}
}
}

/// Used by TaskQueueDeliveries to Ack/Nack itself
#[derive(Debug)]
enum Acker {
Memory(MemoryQueueProducer),
Redis(Arc<RedisQueueInner>),
RabbitMQ(lapin::message::Delivery),
}

#[derive(Debug)]
pub struct TaskQueueDelivery {
pub id: String,
pub task: Arc<QueueTask>,
Expand All @@ -178,6 +193,15 @@ impl TaskQueueDelivery {
Acker::Redis(q) => {
ctx!(q.ack(&self).await)
}
Acker::RabbitMQ(delivery) => {
ctx!(
delivery
.ack(BasicAckOptions {
multiple: false // Only ack this message, not others
})
.await
)
}
}
},
should_retry,
Expand All @@ -198,6 +222,17 @@ impl TaskQueueDelivery {
Acker::Redis(q) => {
ctx!(q.nack(&self).await)
}
Acker::RabbitMQ(delivery) => {
// See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue
ctx!(
delivery
.nack(BasicNackOptions {
requeue: true,
multiple: false // Only nack this message, not others
})
.await
)
}
}
},
should_retry,
Expand Down
Loading

0 comments on commit f6c583c

Please sign in to comment.