Skip to content

Commit 9eb0dbf

Browse files
committed
add blocking replication API
Add sync (blocking) wrappers for the async replication API.
1 parent 7f6e040 commit 9eb0dbf

File tree

8 files changed

+394
-2
lines changed

8 files changed

+394
-2
lines changed

Diff for: postgres/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ bytes = "1.0"
3535
fallible-iterator = "0.2"
3636
futures = "0.3"
3737
tokio-postgres = { version = "0.7.1", path = "../tokio-postgres" }
38+
postgres-protocol = { version = "0.6.1", path = "../postgres-protocol" }
3839

3940
tokio = { version = "1.0", features = ["rt", "time"] }
4041
log = "0.4"

Diff for: postgres/src/client.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::connection::Connection;
22
use crate::{
3-
CancelToken, Config, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
3+
CancelToken, Config, CopyBoth, CopyInWriter, CopyOutReader, Notifications, RowIter, Statement,
44
ToStatement, Transaction, TransactionBuilder,
55
};
66
use std::task::Poll;
@@ -395,6 +395,15 @@ impl Client {
395395
Ok(CopyOutReader::new(self.connection.as_ref(), stream))
396396
}
397397

398+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
399+
/// data.
400+
pub fn copy_both_simple(&mut self, query: &str) -> Result<CopyBoth<'_>, Error> {
401+
let stream = self
402+
.connection
403+
.block_on(self.client.copy_both_simple(query))?;
404+
Ok(CopyBoth::new(self.connection.as_ref(), stream))
405+
}
406+
398407
/// Executes a sequence of SQL statements using the simple query protocol.
399408
///
400409
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

Diff for: postgres/src/copy_both.rs

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use crate::connection::ConnectionRef;
2+
use crate::lazy_pin::LazyPin;
3+
use bytes::{Buf, Bytes, BytesMut};
4+
use futures::{SinkExt, StreamExt};
5+
use std::io::{self, BufRead, Read};
6+
use tokio_postgres::{CopyBothDuplex, Error};
7+
8+
/// The reader/writer returned by the `copy_both_simple` method.
9+
pub struct CopyBoth<'a> {
10+
pub(crate) connection: ConnectionRef<'a>,
11+
pub(crate) stream_sink: LazyPin<CopyBothDuplex<Bytes>>,
12+
buf: BytesMut,
13+
cur: Bytes,
14+
}
15+
16+
impl<'a> CopyBoth<'a> {
17+
pub(crate) fn new(
18+
connection: ConnectionRef<'a>,
19+
duplex: CopyBothDuplex<Bytes>,
20+
) -> CopyBoth<'a> {
21+
CopyBoth {
22+
connection,
23+
stream_sink: LazyPin::new(duplex),
24+
buf: BytesMut::new(),
25+
cur: Bytes::new(),
26+
}
27+
}
28+
29+
/// Completes the copy, returning the number of rows written.
30+
///
31+
/// If this is not called, the copy will be aborted.
32+
pub fn finish(mut self) -> Result<u64, Error> {
33+
self.flush_inner()?;
34+
self.connection.block_on(self.stream_sink.pinned().finish())
35+
}
36+
37+
fn flush_inner(&mut self) -> Result<(), Error> {
38+
if self.buf.is_empty() {
39+
return Ok(());
40+
}
41+
42+
self.connection
43+
.block_on(self.stream_sink.pinned().send(self.buf.split().freeze()))
44+
}
45+
}
46+
47+
impl Read for CopyBoth<'_> {
48+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
49+
let b = self.fill_buf()?;
50+
let len = usize::min(buf.len(), b.len());
51+
buf[..len].copy_from_slice(&b[..len]);
52+
self.consume(len);
53+
Ok(len)
54+
}
55+
}
56+
57+
impl BufRead for CopyBoth<'_> {
58+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
59+
while !self.cur.has_remaining() {
60+
let mut stream = self.stream_sink.pinned();
61+
match self
62+
.connection
63+
.block_on(async { stream.next().await.transpose() })
64+
{
65+
Ok(Some(cur)) => self.cur = cur,
66+
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
67+
Ok(None) => break,
68+
};
69+
}
70+
71+
Ok(&self.cur)
72+
}
73+
74+
fn consume(&mut self, amt: usize) {
75+
self.cur.advance(amt);
76+
}
77+
}

Diff for: postgres/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub use tokio_postgres::{
7272
pub use crate::cancel_token::CancelToken;
7373
pub use crate::client::*;
7474
pub use crate::config::Config;
75+
pub use crate::copy_both::CopyBoth;
7576
pub use crate::copy_in_writer::CopyInWriter;
7677
pub use crate::copy_out_reader::CopyOutReader;
7778
#[doc(no_inline)]
@@ -92,14 +93,18 @@ mod cancel_token;
9293
mod client;
9394
pub mod config;
9495
mod connection;
96+
mod copy_both;
9597
mod copy_in_writer;
9698
mod copy_out_reader;
9799
mod generic_client;
98100
mod lazy_pin;
99101
pub mod notifications;
102+
pub mod replication;
100103
mod row_iter;
101104
mod transaction;
102105
mod transaction_builder;
103106

107+
#[cfg(test)]
108+
mod replication_test;
104109
#[cfg(test)]
105110
mod test;

Diff for: postgres/src/replication.rs

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
//! Utilities for working with the PostgreSQL replication copy both format.
2+
3+
use crate::connection::ConnectionRef;
4+
use crate::lazy_pin::LazyPin;
5+
use crate::{CopyBoth, Error};
6+
use bytes::Bytes;
7+
use fallible_iterator::FallibleIterator;
8+
use futures::Stream;
9+
use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage};
10+
use std::task::Poll;
11+
use std::time::SystemTime;
12+
use tokio_postgres::replication::{LogicalReplicationStream, ReplicationStream};
13+
use tokio_postgres::types::PgLsn;
14+
15+
/// A type which deserializes the postgres replication protocol.
16+
///
17+
/// This type can be used with both physical and logical replication to get
18+
/// access to the byte content of each replication message.
19+
///
20+
/// This is the sync (blocking) version of [`ReplicationStream`]
21+
pub struct ReplicationIter<'a> {
22+
connection: ConnectionRef<'a>,
23+
stream: LazyPin<ReplicationStream>,
24+
}
25+
26+
impl<'a> ReplicationIter<'a> {
27+
/// Creates a new `ReplicationIter`.
28+
pub fn new(copyboth: CopyBoth<'a>) -> Self {
29+
let unpinned_copyboth = copyboth
30+
.stream_sink
31+
.into_unpinned()
32+
.expect("copy-both stream has already been used");
33+
let stream = ReplicationStream::new(unpinned_copyboth);
34+
Self {
35+
connection: copyboth.connection,
36+
stream: LazyPin::new(stream),
37+
}
38+
}
39+
40+
/// Send standby update to server.
41+
pub fn standby_status_update(
42+
&mut self,
43+
write_lsn: PgLsn,
44+
flush_lsn: PgLsn,
45+
apply_lsn: PgLsn,
46+
timestamp: SystemTime,
47+
reply: u8,
48+
) -> Result<(), Error> {
49+
self.connection.block_on(
50+
self.stream
51+
.pinned()
52+
.standby_status_update(write_lsn, flush_lsn, apply_lsn, timestamp, reply),
53+
)
54+
}
55+
56+
/// Send hot standby feedback message to server.
57+
pub fn hot_standby_feedback(
58+
&mut self,
59+
timestamp: SystemTime,
60+
global_xmin: u32,
61+
global_xmin_epoch: u32,
62+
catalog_xmin: u32,
63+
catalog_xmin_epoch: u32,
64+
) -> Result<(), Error> {
65+
self.connection
66+
.block_on(self.stream.pinned().hot_standby_feedback(
67+
timestamp,
68+
global_xmin,
69+
global_xmin_epoch,
70+
catalog_xmin,
71+
catalog_xmin_epoch,
72+
))
73+
}
74+
}
75+
76+
impl<'a> FallibleIterator for ReplicationIter<'a> {
77+
type Item = ReplicationMessage<Bytes>;
78+
type Error = Error;
79+
80+
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
81+
let pinstream = &mut self.stream;
82+
83+
self.connection
84+
.poll_block_on(|cx, _, _| match pinstream.pinned().poll_next(cx) {
85+
Poll::Ready(x) => Poll::Ready(x.transpose()),
86+
Poll::Pending => Poll::Pending,
87+
})
88+
}
89+
}
90+
91+
/// A type which deserializes the postgres logical replication protocol. This
92+
/// type gives access to a high level representation of the changes in
93+
/// transaction commit order.
94+
///
95+
/// This is the sync (blocking) version of [`LogicalReplicationStream`]
96+
pub struct LogicalReplicationIter<'a> {
97+
connection: ConnectionRef<'a>,
98+
stream: LazyPin<LogicalReplicationStream>,
99+
}
100+
101+
impl<'a> LogicalReplicationIter<'a> {
102+
/// Creates a new `ReplicationThing`.
103+
pub fn new(copyboth: CopyBoth<'a>) -> Self {
104+
let unpinned_copyboth = copyboth
105+
.stream_sink
106+
.into_unpinned()
107+
.expect("copy-both stream has already been used");
108+
let stream = LogicalReplicationStream::new(unpinned_copyboth);
109+
Self {
110+
connection: copyboth.connection,
111+
stream: LazyPin::new(stream),
112+
}
113+
}
114+
115+
/// Send standby update to server.
116+
pub fn standby_status_update(
117+
&mut self,
118+
write_lsn: PgLsn,
119+
flush_lsn: PgLsn,
120+
apply_lsn: PgLsn,
121+
timestamp: SystemTime,
122+
reply: u8,
123+
) -> Result<(), Error> {
124+
self.connection.block_on(
125+
self.stream
126+
.pinned()
127+
.standby_status_update(write_lsn, flush_lsn, apply_lsn, timestamp, reply),
128+
)
129+
}
130+
131+
/// Send hot standby feedback message to server.
132+
pub fn hot_standby_feedback(
133+
&mut self,
134+
timestamp: SystemTime,
135+
global_xmin: u32,
136+
global_xmin_epoch: u32,
137+
catalog_xmin: u32,
138+
catalog_xmin_epoch: u32,
139+
) -> Result<(), Error> {
140+
self.connection
141+
.block_on(self.stream.pinned().hot_standby_feedback(
142+
timestamp,
143+
global_xmin,
144+
global_xmin_epoch,
145+
catalog_xmin,
146+
catalog_xmin_epoch,
147+
))
148+
}
149+
}
150+
151+
impl<'a> FallibleIterator for LogicalReplicationIter<'a> {
152+
type Item = ReplicationMessage<LogicalReplicationMessage>;
153+
type Error = Error;
154+
155+
fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
156+
let pinstream = &mut self.stream;
157+
158+
self.connection
159+
.poll_block_on(|cx, _, _| match pinstream.pinned().poll_next(cx) {
160+
Poll::Ready(x) => Poll::Ready(x.transpose()),
161+
Poll::Pending => Poll::Pending,
162+
})
163+
}
164+
}

0 commit comments

Comments
 (0)