Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ as message queue solution #895

Merged
merged 1 commit into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
567 changes: 543 additions & 24 deletions server/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions server/rabbit/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management, rabbitmq_delayed_message_exchange].
Binary file not shown.
7 changes: 6 additions & 1 deletion server/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ SVIX_CACHE_TYPE="none" \
SVIX_REDIS_DSN="redis://localhost:6379" \
${TEST_COMMAND}


echo "*********** RUN 6 ***********"
SVIX_QUEUE_TYPE="rabbitmq" \
SVIX_CACHE_TYPE="redis" \
SVIX_REDIS_DSN="redis://localhost:6379" \
SVIX_RABBIT_DSN="amqp://xivs:xivs@localhost:5672/%2f" \
${TEST_COMMAND}
1 change: 1 addition & 0 deletions server/svix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ urlencoding = "2.1.2"
strum_macros = "0.24"
strum = { version = "0.24", features = ["derive"] }
form_urlencoded = "1.1.0"
lapin = "2.1.1"

[dev-dependencies]
anyhow = "1.0.56"
3 changes: 2 additions & 1 deletion server/svix-server/development.env
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Example .env file for development
DATABASE_URL=postgresql://postgres:postgres@localhost:8079/postgres # For sqlx
DATABASE_URL="postgresql://postgres:postgres@localhost:8079/postgres" # For sqlx
SVIX_CACHE_TYPE=memory
SVIX_JWT_SECRET=x
SVIX_LOG_LEVEL=trace
SVIX_QUEUE_TYPE=redis
SVIX_RABBIT_DSN="amqp://xivs:[email protected]:5672/%2f"
SVIX_REDIS_DSN=redis://localhost:8078
18 changes: 18 additions & 0 deletions server/svix-server/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ pub struct ConfigurationInner {
/// Maximum number of concurrent worker tasks to spawn (0 is unlimited)
pub worker_max_tasks: u16,

/// The address of the rabbitmq exchange
pub rabbit_dsn: Option<Arc<String>>,
pub rabbit_consumer_prefetch_size: Option<u16>,

#[serde(flatten)]
pub internal: InternalConfig,
}
Expand Down Expand Up @@ -216,6 +220,17 @@ fn validate_config_complete(
});
}
}
QueueType::RabbitMQ => {
if config.rabbit_dsn.is_none() {
return Err(ValidationError {
code: Cow::from("missing field"),
message: Some(Cow::from(
"The rabbit_dsn field must be set if the queue_type is `rabbitmq`",
)),
params: HashMap::new(),
});
}
}
}

Ok(())
Expand All @@ -239,6 +254,7 @@ impl ConfigurationInner {
QueueType::Memory => QueueBackend::Memory,
QueueType::Redis => QueueBackend::Redis(self.queue_dsn().expect(err)),
QueueType::RedisCluster => QueueBackend::RedisCluster(self.queue_dsn().expect(err)),
QueueType::RabbitMQ => QueueBackend::RabbitMq(self.rabbit_dsn.as_ref().expect(err)),
}
}

Expand Down Expand Up @@ -280,6 +296,7 @@ pub enum QueueBackend<'a> {
Memory,
Redis(&'a str),
RedisCluster(&'a str),
RabbitMq(&'a str),
}

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -311,6 +328,7 @@ pub enum QueueType {
Memory,
Redis,
RedisCluster,
RabbitMQ,
}

#[derive(Clone, Debug, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions server/svix-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ impl<T> Traceable<T> for std::result::Result<T, TransactionError<Error>> {
}
}

impl<T> Traceable<T> for std::result::Result<T, lapin::Error> {
fn trace(self, location: &'static str) -> Result<T> {
self.map_err(|e| Error::queue(format!("{e:?}"), location))
}
}

#[derive(Debug, Clone)]
pub enum ErrorType {
/// A generic error
Expand Down
2 changes: 1 addition & 1 deletion server/svix-server/src/queue/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn new_pair() -> (TaskQueueProducer, TaskQueueConsumer) {
)
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MemoryQueueProducer {
tx: mpsc::UnboundedSender<TaskQueueDelivery>,
}
Expand Down
38 changes: 38 additions & 0 deletions server/svix-server/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration};

use axum::async_trait;
use chrono::{DateTime, Utc};
use lapin::options::{BasicAckOptions, BasicNackOptions};
use serde::{Deserialize, Serialize};
use strum::Display;
use svix_ksuid::*;
Expand All @@ -22,6 +23,7 @@ use self::{
};

pub mod memory;
pub mod rabbitmq;
pub mod redis;

const RETRY_SCHEDULE: &[Duration] = &[
Expand All @@ -48,6 +50,15 @@ pub async fn new_pair(
redis::new_pair(pool, prefix).await
}
QueueBackend::Memory => memory::new_pair().await,
QueueBackend::RabbitMq(dsn) => {
let prefix = prefix.unwrap_or("");
let queue = format!("{prefix}-message-queue");
// Default to a prefetch_size of 1, as it's the safest (least likely to starve consumers)
let prefetch_size = cfg.rabbit_consumer_prefetch_size.unwrap_or(1);
rabbitmq::new_pair(dsn, queue, prefetch_size)
.await
.expect("can't connect to rabbit")
}
}
}

Expand Down Expand Up @@ -113,6 +124,7 @@ pub enum QueueTask {
pub enum TaskQueueProducer {
Memory(MemoryQueueProducer),
Redis(RedisQueueProducer),
RabbitMq(rabbitmq::Producer),
}

impl TaskQueueProducer {
Expand All @@ -123,6 +135,7 @@ impl TaskQueueProducer {
match self {
TaskQueueProducer::Memory(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::Redis(q) => q.send(task.clone(), delay).await,
TaskQueueProducer::RabbitMq(q) => q.send(task.clone(), delay).await,
}
},
should_retry,
Expand All @@ -135,23 +148,28 @@ impl TaskQueueProducer {
pub enum TaskQueueConsumer {
Redis(RedisQueueConsumer),
Memory(MemoryQueueConsumer),
RabbitMq(rabbitmq::Consumer),
}

impl TaskQueueConsumer {
pub async fn receive_all(&mut self) -> Result<Vec<TaskQueueDelivery>> {
match self {
TaskQueueConsumer::Redis(q) => ctx!(q.receive_all().await),
TaskQueueConsumer::Memory(q) => ctx!(q.receive_all().await),
TaskQueueConsumer::RabbitMq(q) => ctx!(q.receive_all().await),
}
}
}

/// Used by TaskQueueDeliveries to Ack/Nack itself
#[derive(Debug)]
enum Acker {
Memory(MemoryQueueProducer),
Redis(Arc<RedisQueueInner>),
RabbitMQ(lapin::message::Delivery),
}

#[derive(Debug)]
pub struct TaskQueueDelivery {
pub id: String,
pub task: Arc<QueueTask>,
Expand All @@ -178,6 +196,15 @@ impl TaskQueueDelivery {
Acker::Redis(q) => {
ctx!(q.ack(&self).await)
}
Acker::RabbitMQ(delivery) => {
ctx!(
delivery
.ack(BasicAckOptions {
multiple: false // Only ack this message, not others
})
.await
)
}
}
},
should_retry,
Expand All @@ -198,6 +225,17 @@ impl TaskQueueDelivery {
Acker::Redis(q) => {
ctx!(q.nack(&self).await)
}
Acker::RabbitMQ(delivery) => {
// See https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue
ctx!(
delivery
.nack(BasicNackOptions {
requeue: true,
multiple: false // Only nack this message, not others
})
.await
)
}
}
},
should_retry,
Expand Down
Loading