Skip to content

Commit de0e459

Browse files
VladLazarconradludgate
authored andcommitted
backend: add interpreted record replication message type (#32)
This is to be used by the safekeeper -> pageserver protocol.
1 parent 54821c5 commit de0e459

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed

postgres-protocol/src/message/backend.rs

+47
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3939
// replication message tags
4040
pub const XLOG_DATA_TAG: u8 = b'w';
4141
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
42+
pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0';
4243

4344
// logical replication message tags
4445
const BEGIN_TAG: u8 = b'B';
@@ -325,6 +326,7 @@ impl Message {
325326
pub enum ReplicationMessage<D> {
326327
XLogData(XLogDataBody<D>),
327328
PrimaryKeepAlive(PrimaryKeepAliveBody),
329+
RawInterpretedWalRecords(RawInterpretedWalRecordsBody<D>),
328330
}
329331

330332
impl ReplicationMessage<Bytes> {
@@ -370,6 +372,21 @@ impl ReplicationMessage<Bytes> {
370372
reply,
371373
})
372374
}
375+
INTERPRETED_WAL_RECORD_TAG => {
376+
let streaming_lsn = buf.read_u64::<BigEndian>()?;
377+
let commit_lsn = buf.read_u64::<BigEndian>()?;
378+
let next_record_lsn = match buf.read_u64::<BigEndian>()? {
379+
0 => None,
380+
lsn => Some(lsn),
381+
};
382+
383+
ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody {
384+
streaming_lsn,
385+
commit_lsn,
386+
next_record_lsn,
387+
data: buf.read_all(),
388+
})
389+
}
373390
tag => {
374391
return Err(io::Error::new(
375392
io::ErrorKind::InvalidInput,
@@ -950,6 +967,36 @@ impl<D> XLogDataBody<D> {
950967
}
951968
}
952969

970+
#[derive(Debug)]
971+
pub struct RawInterpretedWalRecordsBody<D> {
972+
streaming_lsn: u64,
973+
commit_lsn: u64,
974+
next_record_lsn: Option<u64>,
975+
data: D,
976+
}
977+
978+
impl<D> RawInterpretedWalRecordsBody<D> {
979+
#[inline]
980+
pub fn streaming_lsn(&self) -> u64 {
981+
self.streaming_lsn
982+
}
983+
984+
#[inline]
985+
pub fn commit_lsn(&self) -> u64 {
986+
self.commit_lsn
987+
}
988+
989+
#[inline]
990+
pub fn next_record_lsn(&self) -> Option<u64> {
991+
self.next_record_lsn
992+
}
993+
994+
#[inline]
995+
pub fn data(&self) -> &D {
996+
&self.data
997+
}
998+
}
999+
9531000
#[derive(Debug)]
9541001
pub struct PrimaryKeepAliveBody {
9551002
wal_end: u64,

0 commit comments

Comments
 (0)