forked from vectordotdev/vector
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathamqp.rs
102 lines (94 loc) · 3.29 KB
/
amqp.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//! Functionality supporting both the `[crate::sources::amqp]` source and `[crate::sinks::amqp]` sink.
use lapin::tcp::{OwnedIdentity, OwnedTLSConfig};
use vector_lib::configurable::configurable_component;
/// AMQP connection options.
#[configurable_component]
#[derive(Clone, Debug)]
pub(crate) struct AmqpConfig {
/// URI for the AMQP server.
///
/// The URI has the format of
/// `amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds>`.
///
/// The default vhost can be specified by using a value of `%2f`.
///
/// To connect over TLS, a scheme of `amqps` can be specified instead. For example,
/// `amqps://...`. Additional TLS settings, such as client certificate verification, can be
/// configured under the `tls` section.
#[configurable(metadata(
docs::examples = "amqp://user:[email protected]:5672/%2f?timeout=10",
))]
pub(crate) connection_string: String,
#[configurable(derived)]
pub(crate) tls: Option<crate::tls::TlsConfig>,
}
impl Default for AmqpConfig {
fn default() -> Self {
Self {
connection_string: "amqp://127.0.0.1/%2f".to_string(),
tls: None,
}
}
}
/// Polls the connection until a connection can be made.
/// Gives up after 5 attempts.
#[cfg(feature = "amqp-integration-tests")]
#[cfg(test)]
pub(crate) async fn await_connection(connection: &AmqpConfig) {
let mut pause = tokio::time::Duration::from_millis(1);
let mut attempts = 0;
loop {
let connection = connection.clone();
if connection.connect().await.is_ok() {
return;
}
attempts += 1;
if attempts == 5 {
return;
}
tokio::time::sleep(pause).await;
pause *= 2;
}
}
impl AmqpConfig {
pub(crate) async fn connect(
&self,
) -> Result<(lapin::Connection, lapin::Channel), Box<dyn std::error::Error + Send + Sync>> {
let addr = self.connection_string.clone();
let conn = match &self.tls {
Some(tls) => {
let cert_chain = if let Some(ca) = &tls.ca_file {
Some(tokio::fs::read_to_string(ca.to_owned()).await?)
} else {
None
};
let identity = if let Some(identity) = &tls.key_file {
let der = tokio::fs::read(identity.to_owned()).await?;
Some(OwnedIdentity {
der,
password: tls
.key_pass
.as_ref()
.map(|s| s.to_string())
.unwrap_or_else(String::default),
})
} else {
None
};
let tls_config = OwnedTLSConfig {
identity,
cert_chain,
};
lapin::Connection::connect_with_config(
&addr,
lapin::ConnectionProperties::default(),
tls_config,
)
.await
}
None => lapin::Connection::connect(&addr, lapin::ConnectionProperties::default()).await,
}?;
let channel = conn.create_channel().await?;
Ok((conn, channel))
}
}