Skip to content

Commit ba4fa73

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Thread prev_ts through DocumentUpdate (#33295)
This adds prev_ts to the old_document of `tx.apply_validated_write`, requiring the caller to thread the ts from whatever document they queried. This is then threaded all the way through DocumentUpdate and into the persistence layer. In particular, the caller passes a `WriteTimestamp`, which may be `Pending`; `Writes::update` is responsible for collapsing the prev_ts chain in that case. Some users of DocumentUpdate - primarily around index updates - do not care about the prev_ts. I have split those into a separate type, `IndexDocumentUpdate`. There is a push safety consideration because we serialize the write set and send it around. As such I have marked the prev_ts as optional almost everywhere to account for the case where it gets lost during an RPC. This can get cleaned up after a push cycle. GitOrigin-RevId: f6cca7c59dce42d159780486d30142ebdae7d3c3
1 parent 1c4c8c5 commit ba4fa73

File tree

9 files changed

+261
-82
lines changed

9 files changed

+261
-82
lines changed

crates/common/src/document.rs

+131-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use packed_value::{
2828
};
2929
use pb::common::{
3030
DocumentUpdate as DocumentUpdateProto,
31+
DocumentUpdateWithPrevTs as DocumentUpdateWithPrevTsProto,
3132
ResolvedDocument as ResolvedDocumentProto,
3233
};
3334
#[cfg(any(test, feature = "testing"))]
@@ -588,18 +589,91 @@ impl ResolvedDocument {
588589

589590
#[derive(Clone, Debug, Eq, PartialEq)]
590591
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
591-
pub struct DocumentUpdate {
592+
pub struct DocumentUpdateWithPrevTs {
592593
pub id: ResolvedDocumentId,
593-
pub old_document: Option<ResolvedDocument>,
594+
/// The old document and its timestamp in the document log.
595+
/// The timestamp will become the update's `prev_ts`.
596+
// TODO: make the timestamp non-optional after everything has pushed
597+
pub old_document: Option<(ResolvedDocument, Option<Timestamp>)>,
594598
pub new_document: Option<ResolvedDocument>,
595599
}
596600

597-
impl HeapSize for DocumentUpdate {
601+
impl DocumentUpdateWithPrevTs {
602+
/// Checks if two DocumentUpdates are almost equal, ignoring the case where
603+
/// one has a missing old timestamp
604+
// TODO: remove this once the old timestamp is non-optional
605+
pub fn eq_ignoring_none_old_ts(&self, other: &DocumentUpdateWithPrevTs) -> bool {
606+
self.id == other.id
607+
&& self.old_document.as_ref().map(|(d, _)| d)
608+
== other.old_document.as_ref().map(|(d, _)| d)
609+
&& self
610+
.old_document
611+
.as_ref()
612+
.map(|(_, ts)| ts)
613+
.zip(other.old_document.as_ref().map(|(_, ts)| ts))
614+
.is_none_or(|(a, b)| a == b)
615+
&& self.new_document == other.new_document
616+
}
617+
}
618+
619+
impl HeapSize for DocumentUpdateWithPrevTs {
598620
fn heap_size(&self) -> usize {
599621
self.old_document.heap_size() + self.new_document.heap_size()
600622
}
601623
}
602624

625+
impl TryFrom<DocumentUpdateWithPrevTs> for DocumentUpdateWithPrevTsProto {
626+
type Error = anyhow::Error;
627+
628+
fn try_from(
629+
DocumentUpdateWithPrevTs {
630+
id,
631+
old_document,
632+
new_document,
633+
}: DocumentUpdateWithPrevTs,
634+
) -> anyhow::Result<Self> {
635+
let (old_document, old_ts) = old_document.unzip();
636+
Ok(Self {
637+
id: Some(id.into()),
638+
old_document: old_document.map(|d| d.try_into()).transpose()?,
639+
old_ts: old_ts.flatten().map(|ts| ts.into()),
640+
new_document: new_document.map(|d| d.try_into()).transpose()?,
641+
})
642+
}
643+
}
644+
645+
impl TryFrom<DocumentUpdateWithPrevTsProto> for DocumentUpdateWithPrevTs {
646+
type Error = anyhow::Error;
647+
648+
fn try_from(
649+
DocumentUpdateWithPrevTsProto {
650+
id,
651+
old_document,
652+
old_ts,
653+
new_document,
654+
}: DocumentUpdateWithPrevTsProto,
655+
) -> anyhow::Result<Self> {
656+
let id = id
657+
.context("Document updates missing document id")?
658+
.try_into()?;
659+
Ok(Self {
660+
id,
661+
old_document: old_document
662+
.map(|d| anyhow::Ok((d.try_into()?, old_ts.map(Timestamp::try_from).transpose()?)))
663+
.transpose()?,
664+
new_document: new_document.map(|d| d.try_into()).transpose()?,
665+
})
666+
}
667+
}
668+
669+
#[derive(Clone, Debug, Eq, PartialEq)]
670+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
671+
pub struct DocumentUpdate {
672+
pub id: ResolvedDocumentId,
673+
pub old_document: Option<ResolvedDocument>,
674+
pub new_document: Option<ResolvedDocument>,
675+
}
676+
603677
impl TryFrom<DocumentUpdate> for DocumentUpdateProto {
604678
type Error = anyhow::Error;
605679

@@ -639,6 +713,51 @@ impl TryFrom<DocumentUpdateProto> for DocumentUpdate {
639713
}
640714
}
641715

716+
impl From<DocumentUpdateWithPrevTs> for DocumentUpdate {
717+
fn from(update: DocumentUpdateWithPrevTs) -> Self {
718+
Self {
719+
id: update.id,
720+
old_document: update.old_document.map(|(d, _)| d),
721+
new_document: update.new_document,
722+
}
723+
}
724+
}
725+
726+
/// Either a [`DocumentUpdate`] or a [`DocumentUpdateWithPrevTs`]
727+
pub trait DocumentUpdateRef {
728+
fn id(&self) -> ResolvedDocumentId;
729+
fn old_document(&self) -> Option<&ResolvedDocument>;
730+
fn new_document(&self) -> Option<&ResolvedDocument>;
731+
}
732+
733+
impl DocumentUpdateRef for DocumentUpdateWithPrevTs {
734+
fn id(&self) -> ResolvedDocumentId {
735+
self.id
736+
}
737+
738+
fn old_document(&self) -> Option<&ResolvedDocument> {
739+
self.old_document.as_ref().map(|(d, _)| d)
740+
}
741+
742+
fn new_document(&self) -> Option<&ResolvedDocument> {
743+
self.new_document.as_ref()
744+
}
745+
}
746+
747+
impl DocumentUpdateRef for DocumentUpdate {
748+
fn id(&self) -> ResolvedDocumentId {
749+
self.id
750+
}
751+
752+
fn old_document(&self) -> Option<&ResolvedDocument> {
753+
self.old_document.as_ref()
754+
}
755+
756+
fn new_document(&self) -> Option<&ResolvedDocument> {
757+
self.new_document.as_ref()
758+
}
759+
}
760+
642761
impl DeveloperDocument {
643762
pub fn new(
644763
id: DeveloperDocumentId,
@@ -899,13 +1018,15 @@ mod tests {
8991018

9001019
use super::{
9011020
DocumentUpdateProto,
1021+
DocumentUpdateWithPrevTsProto,
9021022
ResolvedDocumentProto,
9031023
};
9041024
use crate::{
9051025
assert_obj,
9061026
document::{
9071027
CreationTime,
9081028
DocumentUpdate,
1029+
DocumentUpdateWithPrevTs,
9091030
ResolvedDocument,
9101031
},
9111032
paths::FieldPath,
@@ -951,7 +1072,13 @@ mod tests {
9511072

9521073

9531074
#[test]
954-
fn test_document_update_proto_roundtrips(left in any::<DocumentUpdate>()) {
1075+
fn test_document_update_proto_roundtrips(left in any::<DocumentUpdateWithPrevTs>()) {
1076+
assert_roundtrips::<DocumentUpdateWithPrevTs, DocumentUpdateWithPrevTsProto>(left);
1077+
}
1078+
1079+
1080+
#[test]
1081+
fn test_index_document_update_proto_roundtrips(left in any::<DocumentUpdate>()) {
9551082
assert_roundtrips::<DocumentUpdate, DocumentUpdateProto>(left);
9561083
}
9571084
}

crates/database/src/bootstrap_model/import_facing.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl<'a, RT: Runtime> ImportFacingModel<'a, RT> {
187187
.table_mapping()
188188
.tablet_namespace(table_id.tablet_id)?;
189189

190-
let existing_doc = self.tx.get(id).await?;
190+
let existing_doc = self.tx.get_with_ts(id).await?;
191191

192192
let creation_time_field = FieldName::from(CREATION_TIME_FIELD.clone());
193193
let creation_time = if let Some(ConvexValue::Float64(f)) = value.get(&creation_time_field) {
@@ -236,7 +236,7 @@ impl<'a, RT: Runtime> ImportFacingModel<'a, RT> {
236236
}
237237

238238
let id = ResolvedDocumentId::new(table_id.tablet_id, developer_id);
239-
let existing_doc = self.tx.get(id).await?;
239+
let existing_doc = self.tx.get_with_ts(id).await?;
240240

241241
self.tx.apply_validated_write(id, existing_doc, None)?;
242242

crates/database/src/committer.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use common::{
2020
ComponentPath,
2121
},
2222
document::{
23-
DocumentUpdate,
23+
DocumentUpdateWithPrevTs,
2424
ParsedDocument,
2525
ResolvedDocument,
2626
},
@@ -657,7 +657,7 @@ impl<RT: Runtime> Committer<RT> {
657657
commit_ts,
658658
ordered_updates
659659
.into_iter()
660-
.map(|(id, update)| (id, PackedDocumentUpdate::pack(update)))
660+
.map(|(id, update)| (id, PackedDocumentUpdate::pack(update.into())))
661661
.collect(),
662662
write_source,
663663
);
@@ -673,7 +673,7 @@ impl<RT: Runtime> Committer<RT> {
673673
fn compute_writes(
674674
&self,
675675
commit_ts: Timestamp,
676-
ordered_updates: &Vec<(ResolvedDocumentId, DocumentUpdate)>,
676+
ordered_updates: &Vec<(ResolvedDocumentId, DocumentUpdateWithPrevTs)>,
677677
) -> anyhow::Result<(
678678
Vec<ValidatedDocumentWrite>,
679679
BTreeSet<(Timestamp, DatabaseIndexUpdate)>,
@@ -696,6 +696,10 @@ impl<RT: Runtime> Committer<RT> {
696696
document: document_update.new_document.clone(),
697697
},
698698
doc_in_vector_index,
699+
prev_ts: document_update
700+
.old_document
701+
.as_ref()
702+
.and_then(|&(_, ts)| ts),
699703
});
700704
}
701705
let index_writes = index_writes
@@ -736,14 +740,11 @@ impl<RT: Runtime> Committer<RT> {
736740
let timer = metrics::commit_persistence_write_timer();
737741
let document_writes = document_writes
738742
.into_iter()
739-
.map(|write| {
740-
DocumentLogEntry {
741-
ts: write.commit_ts,
742-
id: write.id,
743-
value: write.write.document,
744-
// TODO: fill in prev_ts
745-
prev_ts: None,
746-
}
743+
.map(|write| DocumentLogEntry {
744+
ts: write.commit_ts,
745+
id: write.id,
746+
value: write.write.document,
747+
prev_ts: write.prev_ts,
747748
})
748749
.collect();
749750
persistence
@@ -999,6 +1000,7 @@ struct ValidatedDocumentWrite {
9991000
id: InternalDocumentId,
10001001
write: DocumentWrite,
10011002
doc_in_vector_index: DocInVectorIndex,
1003+
prev_ts: Option<Timestamp>,
10021004
}
10031005

10041006
#[derive(Clone)]

crates/database/src/snapshot_manager.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use common::{
1212
ComponentPath,
1313
},
1414
document::{
15-
DocumentUpdate,
15+
DocumentUpdateRef,
1616
ResolvedDocument,
1717
},
1818
knobs::MAX_TRANSACTION_WINDOW,
@@ -211,13 +211,13 @@ pub struct Snapshot {
211211
impl Snapshot {
212212
pub(crate) fn update(
213213
&mut self,
214-
document_update: &DocumentUpdate,
214+
document_update: &impl DocumentUpdateRef,
215215
commit_ts: Timestamp,
216216
) -> anyhow::Result<(Vec<DatabaseIndexUpdate>, DocInVectorIndex)> {
217217
block_in_place(|| {
218-
let removal = document_update.old_document.as_ref();
219-
let insertion = document_update.new_document.as_ref();
220-
let document_id = document_update.id;
218+
let removal = document_update.old_document();
219+
let insertion = document_update.new_document();
220+
let document_id = document_update.id();
221221
let table_update = self
222222
.table_registry
223223
.update(

0 commit comments

Comments
 (0)