Skip to content

Commit fb26da5

Browse files
authored
Merge pull request #23788 from mjibson/mv-lcw
sql: RETAIN HISTORY for materialized views
2 parents 6c050a6 + fae8081 commit fb26da5

File tree

15 files changed

+218
-11
lines changed

15 files changed

+218
-11
lines changed

src/adapter/src/catalog/open.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,6 +1909,7 @@ mod builtin_migration_tests {
19091909
resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)),
19101910
cluster_id: ClusterId::User(1),
19111911
non_null_assertions: vec![],
1912+
custom_logical_compaction_window: None,
19121913
})
19131914
}
19141915
SimplifiedItem::Index { on } => {

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,7 @@ impl CatalogState {
934934
resolved_ids,
935935
cluster_id: materialized_view.cluster_id,
936936
non_null_assertions: materialized_view.non_null_assertions,
937+
custom_logical_compaction_window: materialized_view.compaction_window,
937938
})
938939
}
939940
Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index {

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,7 @@ impl Coordinator {
955955
column_names,
956956
cluster_id,
957957
non_null_assertions,
958+
compaction_window,
958959
},
959960
replace: _,
960961
drop_ids,
@@ -1032,6 +1033,7 @@ impl Coordinator {
10321033
resolved_ids,
10331034
cluster_id,
10341035
non_null_assertions,
1036+
custom_logical_compaction_window: compaction_window,
10351037
}),
10361038
owner_id: *session.current_role_id(),
10371039
});
@@ -1083,7 +1085,10 @@ impl Coordinator {
10831085
.unwrap_or_terminate("cannot fail to append");
10841086

10851087
coord
1086-
.initialize_storage_read_policies(vec![id], CompactionWindow::Default)
1088+
.initialize_storage_read_policies(
1089+
vec![id],
1090+
compaction_window.unwrap_or(CompactionWindow::Default),
1091+
)
10871092
.await;
10881093

10891094
if coord.catalog().state().system_config().enable_mz_notices() {

src/catalog/src/memory/objects.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ pub struct MaterializedView {
702702
pub resolved_ids: ResolvedIds,
703703
pub cluster_id: ClusterId,
704704
pub non_null_assertions: Vec<usize>,
705+
pub custom_logical_compaction_window: Option<CompactionWindow>,
705706
}
706707

707708
#[derive(Debug, Clone, Serialize)]
@@ -1094,8 +1095,8 @@ impl CatalogItem {
10941095
CatalogItem::Table(table) => table.custom_logical_compaction_window,
10951096
CatalogItem::Source(source) => source.custom_logical_compaction_window,
10961097
CatalogItem::Index(index) => index.custom_logical_compaction_window,
1097-
CatalogItem::MaterializedView(_)
1098-
| CatalogItem::Log(_)
1098+
CatalogItem::MaterializedView(mview) => mview.custom_logical_compaction_window,
1099+
CatalogItem::Log(_)
10991100
| CatalogItem::View(_)
11001101
| CatalogItem::Sink(_)
11011102
| CatalogItem::Type(_)

src/environmentd/tests/sql.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,11 @@ use http::StatusCode;
9393
use itertools::Itertools;
9494
use mz_adapter::{TimestampContext, TimestampExplanation};
9595
use mz_environmentd::test_util::{
96-
self, try_get_explain_timestamp, MzTimestamp, PostgresErrorExt, TestServerWithRuntime,
97-
KAFKA_ADDRS,
96+
self, get_explain_timestamp, get_explain_timestamp_determination, try_get_explain_timestamp,
97+
MzTimestamp, PostgresErrorExt, TestServerWithRuntime, KAFKA_ADDRS,
9898
};
9999
use mz_ore::assert_contains;
100+
use mz_ore::collections::CollectionExt;
100101
use mz_ore::now::{NowFn, NOW_ZERO, SYSTEM_TIME};
101102
use mz_ore::result::ResultExt;
102103
use mz_ore::retry::Retry;
@@ -3588,3 +3589,109 @@ async fn test_explain_as_of() {
35883589
.await
35893590
.unwrap();
35903591
}
3592+
3593+
// Test that RETAIN HISTORY results in the since and upper being separated by the specified amount.
3594+
#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
3595+
async fn test_retain_history() {
3596+
let server = test_util::TestHarness::default().start().await;
3597+
let client = server.connect().await.unwrap();
3598+
let sys_client = server
3599+
.connect()
3600+
.internal()
3601+
.user(&SYSTEM_USER.name)
3602+
.await
3603+
.unwrap();
3604+
3605+
// Must fail before flag set.
3606+
assert!(client
3607+
.batch_execute(
3608+
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '2s') AS SELECT * FROM t",
3609+
)
3610+
.await
3611+
.is_err());
3612+
3613+
sys_client
3614+
.batch_execute("ALTER SYSTEM SET enable_logical_compaction_window = true")
3615+
.await
3616+
.unwrap();
3617+
3618+
client
3619+
.batch_execute("CREATE TABLE t (a INT4)")
3620+
.await
3621+
.unwrap();
3622+
client
3623+
.batch_execute("INSERT INTO t VALUES (1)")
3624+
.await
3625+
.unwrap();
3626+
3627+
assert_contains!(
3628+
client
3629+
.batch_execute(
3630+
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '-2s') AS SELECT * FROM t",
3631+
)
3632+
.await
3633+
.unwrap_err()
3634+
.to_string(),
3635+
"invalid RETAIN HISTORY"
3636+
);
3637+
3638+
client
3639+
.batch_execute(
3640+
"CREATE MATERIALIZED VIEW v WITH (RETAIN HISTORY = FOR '5s') AS SELECT * FROM t",
3641+
)
3642+
.await
3643+
.unwrap();
3644+
3645+
// Test compaction and querying without an index present.
3646+
Retry::default()
3647+
.retry_async(|_| async {
3648+
let ts = get_explain_timestamp_determination("v", &client).await?;
3649+
let source = ts.sources.into_element();
3650+
let upper = source.write_frontier.into_element();
3651+
let since = source.read_frontier.into_element();
3652+
if upper.saturating_sub(since) < Timestamp::from(2000u64) {
3653+
anyhow::bail!("{upper} - {since} should be atleast 2s apart")
3654+
}
3655+
client
3656+
.query(
3657+
&format!(
3658+
"SELECT * FROM v AS OF {}-2000",
3659+
ts.determination.timestamp_context.timestamp_or_default()
3660+
),
3661+
&[],
3662+
)
3663+
.await?;
3664+
Ok(())
3665+
})
3666+
.await
3667+
.unwrap();
3668+
3669+
// With an index the AS OF query should fail because we haven't taught the planner about retain
3670+
// history yet.
3671+
client
3672+
.batch_execute("CREATE INDEX i ON v (a)")
3673+
.await
3674+
.unwrap();
3675+
3676+
let ts = get_explain_timestamp("v", &client).await;
3677+
assert_contains!(
3678+
client
3679+
.query(&format!("SELECT * FROM v AS OF {ts}-2000"), &[])
3680+
.await
3681+
.unwrap_err()
3682+
.to_string(),
3683+
"not valid for all inputs"
3684+
);
3685+
3686+
// Make sure we didn't fail just because the index didn't have enough time after creation.
3687+
tokio::time::sleep(Duration::from_secs(3)).await;
3688+
let ts = get_explain_timestamp("v", &client).await;
3689+
assert_contains!(
3690+
client
3691+
.query(&format!("SELECT * FROM v AS OF {ts}-2000"), &[])
3692+
.await
3693+
.unwrap_err()
3694+
.to_string(),
3695+
"not valid for all inputs"
3696+
);
3697+
}

src/sql-lexer/src/keywords.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Groups
164164
Having
165165
Header
166166
Headers
167+
History
167168
Hold
168169
Host
169170
Hour
@@ -309,6 +310,7 @@ Replication
309310
Reset
310311
Respect
311312
Restrict
313+
Retain
312314
Return
313315
Returning
314316
Revoke

src/sql-parser/src/ast/defs/ddl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ use crate::ast::{AstInfo, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOpti
3030
pub enum MaterializedViewOptionName {
3131
/// The `ASSERT NOT NULL [=] <ident>` option.
3232
AssertNotNull,
33+
RetainHistory,
3334
}
3435

3536
impl AstDisplay for MaterializedViewOptionName {
3637
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
3738
match self {
3839
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
40+
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
3941
}
4042
}
4143
}

src/sql-parser/src/ast/defs/statement.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3129,6 +3129,7 @@ pub enum WithOptionValue<T: AstInfo> {
31293129
// Special cases.
31303130
ClusterReplicas(Vec<ReplicaDefinition<T>>),
31313131
ConnectionKafkaBroker(KafkaBroker<T>),
3132+
RetainHistoryFor(Value),
31323133
}
31333134

31343135
impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
@@ -3137,7 +3138,9 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
31373138
// When adding branches to this match statement, think about whether it is OK for us to collect
31383139
// the value as part of our telemetry. Check the data management policy to be sure!
31393140
match self {
3140-
WithOptionValue::Value(_) | WithOptionValue::Sequence(_) => {
3141+
WithOptionValue::Value(_)
3142+
| WithOptionValue::Sequence(_)
3143+
| WithOptionValue::RetainHistoryFor(_) => {
31413144
// These are redact-aware.
31423145
}
31433146
WithOptionValue::DataType(_)
@@ -3179,6 +3182,10 @@ impl<T: AstInfo> AstDisplay for WithOptionValue<T> {
31793182
WithOptionValue::ConnectionKafkaBroker(broker) => {
31803183
f.write_node(broker);
31813184
}
3185+
WithOptionValue::RetainHistoryFor(value) => {
3186+
f.write_str("FOR ");
3187+
f.write_node(value);
3188+
}
31823189
}
31833190
}
31843191
}

src/sql-parser/src/parser.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3231,18 +3231,44 @@ impl<'a> Parser<'a> {
32313231
fn parse_materialized_view_option_name(
32323232
&mut self,
32333233
) -> Result<MaterializedViewOptionName, ParserError> {
3234-
self.expect_keywords(&[ASSERT, NOT, NULL])?;
3235-
Ok(MaterializedViewOptionName::AssertNotNull)
3234+
let option = self.expect_one_of_keywords(&[ASSERT, RETAIN])?;
3235+
let name = match option {
3236+
ASSERT => {
3237+
self.expect_keywords(&[NOT, NULL])?;
3238+
MaterializedViewOptionName::AssertNotNull
3239+
}
3240+
RETAIN => {
3241+
self.expect_keyword(HISTORY)?;
3242+
MaterializedViewOptionName::RetainHistory
3243+
}
3244+
_ => unreachable!(),
3245+
};
3246+
Ok(name)
32363247
}
32373248

32383249
fn parse_materialized_view_option(
32393250
&mut self,
32403251
) -> Result<MaterializedViewOption<Raw>, ParserError> {
32413252
let name = self.parse_materialized_view_option_name()?;
3253+
if name == MaterializedViewOptionName::RetainHistory {
3254+
return self.parse_materialized_view_option_retain_history();
3255+
}
32423256
let value = self.parse_optional_option_value()?;
32433257
Ok(MaterializedViewOption { name, value })
32443258
}
32453259

3260+
fn parse_materialized_view_option_retain_history(
3261+
&mut self,
3262+
) -> Result<MaterializedViewOption<Raw>, ParserError> {
3263+
let _ = self.consume_token(&Token::Eq);
3264+
self.expect_keyword(FOR)?;
3265+
let value = self.parse_value()?;
3266+
Ok(MaterializedViewOption {
3267+
name: MaterializedViewOptionName::RetainHistory,
3268+
value: Some(WithOptionValue::RetainHistoryFor(value)),
3269+
})
3270+
}
3271+
32463272
fn parse_create_index(&mut self) -> Result<Statement<Raw>, ParserError> {
32473273
let default_index = self.parse_keyword(DEFAULT);
32483274
self.expect_keyword(INDEX)?;

src/sql-parser/tests/testdata/ddl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,9 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1
382382
CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [] })
383383

384384
parse-statement roundtrip
385-
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b) AS SELECT 1
385+
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1
386386
----
387-
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b) AS SELECT 1
387+
CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL = a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1
388388

389389
parse-statement
390390
CREATE CONNECTION awsconn TO AWS (ACCESS KEY ID 'id', ENDPOINT 'endpoint', REGION 'region', SECRET ACCESS KEY 'key', TOKEN 'token')

src/sql/src/names.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,6 +1783,7 @@ impl<'a> Fold<Raw, Aug> for NameResolver<'a> {
17831783
.collect(),
17841784
),
17851785
ConnectionKafkaBroker(broker) => ConnectionKafkaBroker(self.fold_kafka_broker(broker)),
1786+
RetainHistoryFor(value) => RetainHistoryFor(self.fold_value(value)),
17861787
}
17871788
}
17881789

src/sql/src/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,7 @@ pub struct MaterializedView {
14351435
pub column_names: Vec<ColumnName>,
14361436
pub cluster_id: ClusterId,
14371437
pub non_null_assertions: Vec<usize>,
1438+
pub compaction_window: Option<CompactionWindow>,
14381439
}
14391440

14401441
#[derive(Clone, Debug)]

src/sql/src/plan/statement/ddl.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2056,12 +2056,19 @@ pub fn plan_create_materialized_view(
20562056

20572057
let MaterializedViewOptionExtracted {
20582058
assert_not_null,
2059+
retain_history,
20592060
seen: _,
20602061
}: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
20612062

20622063
if !assert_not_null.is_empty() {
20632064
scx.require_feature_flag(&crate::session::vars::ENABLE_ASSERT_NOT_NULL)?;
20642065
}
2066+
let compaction_window = retain_history
2067+
.map(|cw| {
2068+
scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
2069+
Ok::<_, PlanError>(cw.try_into()?)
2070+
})
2071+
.transpose()?;
20652072
let mut non_null_assertions = assert_not_null
20662073
.into_iter()
20672074
.map(normalize::column_name)
@@ -2150,6 +2157,7 @@ pub fn plan_create_materialized_view(
21502157
column_names,
21512158
cluster_id,
21522159
non_null_assertions,
2160+
compaction_window,
21532161
},
21542162
replace,
21552163
drop_ids,
@@ -2160,7 +2168,8 @@ pub fn plan_create_materialized_view(
21602168

21612169
generate_extracted_config!(
21622170
MaterializedViewOption,
2163-
(AssertNotNull, Ident, AllowMultiple)
2171+
(AssertNotNull, Ident, AllowMultiple),
2172+
(RetainHistory, Duration)
21642173
);
21652174

21662175
pub fn describe_create_sink(

src/sql/src/plan/with_options.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ impl<V: TryFromValue<Value>, T: AstInfo + std::fmt::Debug> TryFromValue<WithOpti
446446
},
447447
V::name()
448448
),
449+
WithOptionValue::RetainHistoryFor(v) => V::try_from_value(v),
449450
}
450451
}
451452
fn name() -> String {

0 commit comments

Comments
 (0)