Skip to content

Commit

Permalink
Add a flag around the new snapshot-elision behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
bkirwi committed Feb 13, 2025
1 parent fbed4c8 commit 8f3f81d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use mz_storage_types::sources::{
GenericSourceConnection, IngestionDescription, SourceConnection, SourceData, SourceDesc,
SourceExport, SourceExportDataConfig,
};
use mz_storage_types::AlterCompatible;
use mz_storage_types::{dyncfgs, AlterCompatible};
use mz_txn_wal::metrics::Metrics as TxnMetrics;
use mz_txn_wal::txn_read::TxnsRead;
use mz_txn_wal::txns::TxnsHandle;
Expand Down Expand Up @@ -3156,10 +3156,14 @@ where
// Choose an as-of frontier for this execution of the sink. If the write frontier of the sink
// is strictly larger than its read hold, it must have at least written out its snapshot, and we can skip
// reading it; otherwise assume we may have to replay from the beginning.
let enable_snapshot_frontier =
dyncfgs::STORAGE_SINK_SNAPSHOT_FRONTIER.get(self.config().config_set());
let export_state = self.storage_collections.collection_frontiers(id)?;
let mut as_of = description.sink.as_of.clone();
as_of.join_assign(&export_state.implied_capability);
let with_snapshot = if PartialOrder::less_than(&as_of, &export_state.write_frontier) {
let with_snapshot = if enable_snapshot_frontier
&& PartialOrder::less_than(&as_of, &export_state.write_frontier)
{
false
} else {
description.sink.with_snapshot
Expand Down
8 changes: 8 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ pub const STORAGE_SUSPEND_AND_RESTART_DELAY: Config<Duration> = Config::new(
"Delay interval when reconnecting to a source / sink after halt.",
);

/// If true, skip fetching the snapshot in the sink once the frontier has advanced.
pub const STORAGE_SINK_SNAPSHOT_FRONTIER: Config<bool> = Config::new(
"storage_sink_snapshot_frontier",
true,
"If true, skip fetching the snapshot in the sink once the frontier has advanced.",
);

/// Whether to mint reclock bindings based on the latest probed frontier or the currently ingested
/// frontier.
pub const STORAGE_RECLOCK_TO_LATEST: Config<bool> = Config::new(
Expand Down Expand Up @@ -267,6 +274,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
.add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
.add(&STORAGE_SINK_SNAPSHOT_FRONTIER)
.add(&STORAGE_RECLOCK_TO_LATEST)
.add(&STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT)
}

0 comments on commit 8f3f81d

Please sign in to comment.