Skip to content

Commit

Permalink
Connection string config for replication.
Browse files Browse the repository at this point in the history
Co-authored-by: Petros Angelatos <[email protected]>
  • Loading branch information
jeff-davis and petrosagg committed May 25, 2021
1 parent 73b62cd commit aefa11c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
35 changes: 35 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ pub enum ChannelBinding {
Require,
}

/// Replication mode configuration.
#[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum ReplicationMode {
/// Physical replication.
Physical,
/// Logical replication.
Logical,
}

/// A host specification.
#[derive(Debug, Clone, PartialEq)]
pub enum Host {
Expand Down Expand Up @@ -159,6 +169,7 @@ pub struct Config {
pub(crate) keepalives_idle: Duration,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) replication_mode: Option<ReplicationMode>,
}

impl Default for Config {
Expand All @@ -184,6 +195,7 @@ impl Config {
keepalives_idle: Duration::from_secs(2 * 60 * 60),
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
replication_mode: None,
}
}

Expand Down Expand Up @@ -387,6 +399,17 @@ impl Config {
self.channel_binding
}

/// Set replication mode.
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
self.replication_mode = Some(replication_mode);
self
}

/// Get replication mode.
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
self.replication_mode
}

fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
Expand Down Expand Up @@ -476,6 +499,17 @@ impl Config {
};
self.channel_binding(channel_binding);
}
"replication" => {
let mode = match value {
"off" => None,
"true" => Some(ReplicationMode::Physical),
"database" => Some(ReplicationMode::Logical),
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
};
if let Some(mode) = mode {
self.replication_mode(mode);
}
}
key => {
return Err(Error::config_parse(Box::new(UnknownOption(
key.to_string(),
Expand Down Expand Up @@ -548,6 +582,7 @@ impl fmt::Debug for Config {
.field("keepalives_idle", &self.keepalives_idle)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.field("replication", &self.replication_mode)
.finish()
}
}
Expand Down
8 changes: 7 additions & 1 deletion tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, Config};
use crate::config::{self, Config, ReplicationMode};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
Expand Down Expand Up @@ -124,6 +124,12 @@ where
if let Some(application_name) = &config.application_name {
params.push(("application_name", &**application_name));
}
if let Some(replication_mode) = &config.replication_mode {
match replication_mode {
ReplicationMode::Physical => params.push(("replication", "true")),
ReplicationMode::Logical => params.push(("replication", "database")),
}
}

let mut buf = BytesMut::new();
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;
Expand Down

0 comments on commit aefa11c

Please sign in to comment.