Skip to content

Commit 772ae1a

Browse files
authored
feat(transaction): Remove current_table, updates, and requirements from Transaction (#1451)
## Which issue does this PR close? Related Issue - #1382 Closes: - #1437 ## What changes are included in this PR? This PR wraps up the effort to make Transaction API + TransactionAction retryable! With `Transaction` holds retryable `actions`, it no longer needs to hold staging variables like `current_table`, `updates`, or `requirements`. These can be generated within `do_commit`. - Remove current_table, updates, and requirements from Transaction ## Are these changes tested? Existing unit tests
1 parent e37b9bb commit 772ae1a

File tree

4 files changed

+71
-90
lines changed

4 files changed

+71
-90
lines changed

crates/iceberg/src/table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,10 @@ pub struct Table {
162162
}
163163

164164
impl Table {
165-
pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) {
165+
/// Sets the [`Table`] metadata and returns an updated instance with the new metadata applied.
166+
pub(crate) fn with_metadata(mut self, metadata: TableMetadataRef) -> Self {
166167
self.metadata = metadata;
168+
self
167169
}
168170

169171
/// Returns a TableBuilder to build a table

crates/iceberg/src/transaction/append.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,16 @@ use crate::transaction::{ActionCommit, TransactionAction};
3333
pub struct FastAppendAction {
3434
check_duplicate: bool,
3535
// below are properties used to create SnapshotProducer when commit
36-
snapshot_id: i64,
3736
commit_uuid: Option<Uuid>,
3837
key_metadata: Option<Vec<u8>>,
3938
snapshot_properties: HashMap<String, String>,
4039
added_data_files: Vec<DataFile>,
4140
}
4241

4342
impl FastAppendAction {
44-
pub(crate) fn new(snapshot_id: i64) -> Self {
43+
pub(crate) fn new() -> Self {
4544
Self {
4645
check_duplicate: true,
47-
snapshot_id,
4846
commit_uuid: None,
4947
key_metadata: None,
5048
snapshot_properties: HashMap::default(),
@@ -95,7 +93,7 @@ impl TransactionAction for FastAppendAction {
9593
}
9694

9795
let snapshot_producer = SnapshotProducer::new(
98-
self.snapshot_id,
96+
table,
9997
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
10098
self.key_metadata.clone(),
10199
self.snapshot_properties.clone(),

crates/iceberg/src/transaction/mod.rs

Lines changed: 42 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,8 @@ mod update_location;
2828
mod update_properties;
2929
mod upgrade_format_version;
3030

31-
use std::mem::discriminant;
3231
use std::sync::Arc;
3332

34-
use uuid::Uuid;
35-
3633
use crate::error::Result;
3734
use crate::table::Table;
3835
use crate::transaction::action::BoxedTransactionAction;
@@ -45,67 +42,49 @@ use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
4542

4643
/// Table transaction.
4744
pub struct Transaction {
48-
base_table: Table,
49-
current_table: Table,
45+
table: Table,
5046
actions: Vec<BoxedTransactionAction>,
51-
updates: Vec<TableUpdate>,
52-
requirements: Vec<TableRequirement>,
5347
}
5448

5549
impl Transaction {
5650
/// Creates a new transaction.
5751
pub fn new(table: &Table) -> Self {
5852
Self {
59-
base_table: table.clone(),
60-
current_table: table.clone(),
53+
table: table.clone(),
6154
actions: vec![],
62-
updates: vec![],
63-
requirements: vec![],
6455
}
6556
}
6657

67-
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
68-
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
58+
fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
59+
let mut metadata_builder = table.metadata().clone().into_builder(None);
6960
for update in updates {
7061
metadata_builder = update.clone().apply(metadata_builder)?;
7162
}
7263

73-
self.current_table
74-
.with_metadata(Arc::new(metadata_builder.build()?.metadata));
75-
76-
Ok(())
64+
Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
7765
}
7866

67+
/// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
68+
/// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
7969
fn apply(
80-
&mut self,
81-
updates: Vec<TableUpdate>,
82-
requirements: Vec<TableRequirement>,
83-
) -> Result<()> {
70+
table: Table,
71+
mut action_commit: ActionCommit,
72+
existing_updates: &mut Vec<TableUpdate>,
73+
existing_requirements: &mut Vec<TableRequirement>,
74+
) -> Result<Table> {
75+
let updates = action_commit.take_updates();
76+
let requirements = action_commit.take_requirements();
77+
8478
for requirement in &requirements {
85-
requirement.check(Some(self.current_table.metadata()))?;
79+
requirement.check(Some(table.metadata()))?;
8680
}
8781

88-
self.update_table_metadata(&updates)?;
89-
90-
self.updates.extend(updates);
91-
92-
// For the requirements, it does not make sense to add a requirement more than once
93-
// For example, you cannot assert that the current schema has two different IDs
94-
for new_requirement in requirements {
95-
if self
96-
.requirements
97-
.iter()
98-
.map(discriminant)
99-
.all(|d| d != discriminant(&new_requirement))
100-
{
101-
self.requirements.push(new_requirement);
102-
}
103-
}
82+
let updated_table = Self::update_table_metadata(table, &updates)?;
10483

105-
// # TODO
106-
// Support auto commit later.
84+
existing_updates.extend(updates);
85+
existing_requirements.extend(requirements);
10786

108-
Ok(())
87+
Ok(updated_table)
10988
}
11089

11190
/// Sets table to a new version.
@@ -118,31 +97,9 @@ impl Transaction {
11897
UpdatePropertiesAction::new()
11998
}
12099

121-
fn generate_unique_snapshot_id(&self) -> i64 {
122-
let generate_random_id = || -> i64 {
123-
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
124-
let snapshot_id = (lhs ^ rhs) as i64;
125-
if snapshot_id < 0 {
126-
-snapshot_id
127-
} else {
128-
snapshot_id
129-
}
130-
};
131-
let mut snapshot_id = generate_random_id();
132-
while self
133-
.current_table
134-
.metadata()
135-
.snapshots()
136-
.any(|s| s.snapshot_id() == snapshot_id)
137-
{
138-
snapshot_id = generate_random_id();
139-
}
140-
snapshot_id
141-
}
142-
143100
/// Creates a fast append action.
144101
pub fn fast_append(&self) -> FastAppendAction {
145-
FastAppendAction::new(self.generate_unique_snapshot_id())
102+
FastAppendAction::new()
146103
}
147104

148105
/// Creates replace sort order action.
@@ -157,41 +114,43 @@ impl Transaction {
157114

158115
/// Commit transaction.
159116
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
160-
if self.actions.is_empty() && self.updates.is_empty() {
117+
if self.actions.is_empty() {
161118
// nothing to commit
162-
return Ok(self.base_table.clone());
119+
return Ok(self.table.clone());
163120
}
164121

165122
self.do_commit(catalog).await
166123
}
167124

168125
async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
169-
let base_table_identifier = self.base_table.identifier().to_owned();
126+
let refreshed = catalog.load_table(self.table.identifier()).await?;
170127

171-
let refreshed = catalog.load_table(&base_table_identifier.clone()).await?;
172-
173-
if self.base_table.metadata() != refreshed.metadata()
174-
|| self.base_table.metadata_location() != refreshed.metadata_location()
128+
if self.table.metadata() != refreshed.metadata()
129+
|| self.table.metadata_location() != refreshed.metadata_location()
175130
{
176131
// current base is stale, use refreshed as base and re-apply transaction actions
177-
self.base_table = refreshed.clone();
132+
self.table = refreshed.clone();
178133
}
179134

180-
let current_table = self.base_table.clone();
181-
182-
for action in self.actions.clone() {
183-
let mut action_commit = action.commit(&current_table).await?;
184-
// apply changes to current_table
185-
self.apply(
186-
action_commit.take_updates(),
187-
action_commit.take_requirements(),
135+
let mut current_table = self.table.clone();
136+
let mut existing_updates: Vec<TableUpdate> = vec![];
137+
let mut existing_requirements: Vec<TableRequirement> = vec![];
138+
139+
for action in &self.actions {
140+
let action_commit = Arc::clone(action).commit(&current_table).await?;
141+
// apply action commit to current_table
142+
current_table = Self::apply(
143+
current_table,
144+
action_commit,
145+
&mut existing_updates,
146+
&mut existing_requirements,
188147
)?;
189148
}
190149

191150
let table_commit = TableCommit::builder()
192-
.ident(base_table_identifier)
193-
.updates(self.updates.clone())
194-
.requirements(self.requirements.clone())
151+
.ident(self.table.identifier().to_owned())
152+
.updates(existing_updates)
153+
.requirements(existing_requirements)
195154
.build();
196155

197156
catalog.update_table(table_commit).await

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,14 @@ pub(crate) struct SnapshotProducer {
7474

7575
impl SnapshotProducer {
7676
pub(crate) fn new(
77-
snapshot_id: i64,
77+
table: &Table,
7878
commit_uuid: Uuid,
7979
key_metadata: Option<Vec<u8>>,
8080
snapshot_properties: HashMap<String, String>,
8181
added_data_files: Vec<DataFile>,
8282
) -> Self {
8383
Self {
84-
snapshot_id,
84+
snapshot_id: Self::generate_unique_snapshot_id(table),
8585
commit_uuid,
8686
key_metadata,
8787
snapshot_properties,
@@ -155,6 +155,28 @@ impl SnapshotProducer {
155155
Ok(())
156156
}
157157

158+
fn generate_unique_snapshot_id(table: &Table) -> i64 {
159+
let generate_random_id = || -> i64 {
160+
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
161+
let snapshot_id = (lhs ^ rhs) as i64;
162+
if snapshot_id < 0 {
163+
-snapshot_id
164+
} else {
165+
snapshot_id
166+
}
167+
};
168+
let mut snapshot_id = generate_random_id();
169+
170+
while table
171+
.metadata()
172+
.snapshots()
173+
.any(|s| s.snapshot_id() == snapshot_id)
174+
{
175+
snapshot_id = generate_random_id();
176+
}
177+
snapshot_id
178+
}
179+
158180
fn new_manifest_output(&mut self, table: &Table) -> Result<OutputFile> {
159181
let new_manifest_path = format!(
160182
"{}/{}/{}-m{}.{}",

0 commit comments

Comments
 (0)