diff --git a/misc/dbt-materialize/tests/adapter/test_constraints.py b/misc/dbt-materialize/tests/adapter/test_constraints.py index 306b6db75f2bd..3cd08e30077c3 100644 --- a/misc/dbt-materialize/tests/adapter/test_constraints.py +++ b/misc/dbt-materialize/tests/adapter/test_constraints.py @@ -129,7 +129,7 @@ def test_ddl_enforcement(self, project): fetch="one", ) assert ( - 'WITH (ASSERT NOT NULL = "a", ASSERT NOT NULL = "b")' + 'ASSERT NOT NULL = "a", ASSERT NOT NULL = "b"' in nullability_assertions_ddl[1] ) diff --git a/misc/python/materialize/checks/all_checks/null_value.py b/misc/python/materialize/checks/all_checks/null_value.py index afb90baaaf0a3..523062d9f29ed 100644 --- a/misc/python/materialize/checks/all_checks/null_value.py +++ b/misc/python/materialize/checks/all_checks/null_value.py @@ -70,7 +70,7 @@ def validate(self) -> Testdrive: > SHOW CREATE MATERIALIZED VIEW null_value_view2; - materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL" + materialize.public.null_value_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"null_value_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"f2\\", NULL FROM \\"materialize\\".\\"public\\".\\"null_value_table\\" WHERE \\"f1\\" IS NULL OR \\"f1\\" IS NOT NULL OR \\"f1\\" = NULL" > SELECT * FROM null_value_view2; diff --git a/misc/python/materialize/checks/all_checks/text_bytea_types.py b/misc/python/materialize/checks/all_checks/text_bytea_types.py index e8207c060adaf..b3d920b692337 100644 --- a/misc/python/materialize/checks/all_checks/text_bytea_types.py +++ b/misc/python/materialize/checks/all_checks/text_bytea_types.py @@ -53,7 +53,7 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW string_bytea_types_view1; - materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\"" + materialize.public.string_bytea_types_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"string_bytea_types_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"text_col\\", \\"bytea_col\\", 'това'::\\"pg_catalog\\".\\"text\\", '\\\\xAAAA'::\\"pg_catalog\\".\\"bytea\\" FROM \\"materialize\\".\\"public\\".\\"text_bytea_types_table\\" WHERE \\"text_col\\" >= ''::\\"pg_catalog\\".\\"text\\" AND \\"bytea_col\\" >= ''::\\"pg_catalog\\".\\"bytea\\"" > SELECT text_col, text, LENGTH(bytea_col), LENGTH(bytea) FROM string_bytea_types_view1; aaaa това 2 2 diff --git a/misc/python/materialize/checks/all_checks/top_k.py b/misc/python/materialize/checks/all_checks/top_k.py index a54944cccfe4a..9d3284e645ea6 100644 --- a/misc/python/materialize/checks/all_checks/top_k.py +++ b/misc/python/materialize/checks/all_checks/top_k.py @@ -60,14 +60,14 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW basic_topk_view1; - materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" + materialize.public.basic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" > SELECT * FROM basic_topk_view1; 2 32 3 48 > SHOW CREATE MATERIALIZED VIEW basic_topk_view2; - materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" + materialize.public.basic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"basic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"basic_topk_table\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" > SELECT * FROM basic_topk_view2; 1 16 @@ -123,14 +123,14 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW monotonic_topk_view1; - materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" + materialize.public.monotonic_topk_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 2" > SELECT * FROM monotonic_topk_view1; E 5 D 4 > SHOW CREATE MATERIALIZED VIEW monotonic_topk_view2; - materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" + materialize.public.monotonic_topk_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_topk_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_topk_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 2" > SELECT * FROM monotonic_topk_view2; A 1 @@ -186,13 +186,13 @@ def validate(self) -> Testdrive: dedent( """ > SHOW CREATE MATERIALIZED VIEW monotonic_top1_view1; - materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1" + materialize.public.monotonic_top1_view1 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view1\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" DESC NULLS LAST LIMIT 1" > SELECT * FROM monotonic_top1_view1; D 5 > SHOW CREATE MATERIALIZED VIEW monotonic_top1_view2; - materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1" + materialize.public.monotonic_top1_view2 "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"monotonic_top1_view2\\" IN CLUSTER \\"default\\" WITH (REFRESH = ON COMMIT) AS SELECT \\"f1\\", \\"pg_catalog\\".\\"count\\"(\\"f1\\") FROM \\"materialize\\".\\"public\\".\\"monotonic_top1_source\\" GROUP BY \\"f1\\" ORDER BY \\"f1\\" ASC NULLS FIRST LIMIT 1" > SELECT * FROM monotonic_top1_view2; A 1 diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index b1853976a5de2..a06e443537fe0 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -735,7 +735,12 @@ impl CatalogState { diff: Diff, ) -> Vec { let create_stmt = mz_sql::parse::parse(&view.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", view.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + view.create_sql, e + ) + }) .into_element() .ast; let query = match &create_stmt { @@ -777,7 +782,12 @@ impl CatalogState { diff: Diff, ) -> Vec { let create_stmt = mz_sql::parse::parse(&mview.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", mview.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + mview.create_sql, e + ) + }) .into_element() .ast; let query = match &create_stmt { @@ -875,7 +885,12 @@ impl CatalogState { let mut updates = vec![]; let create_stmt = mz_sql::parse::parse(&index.create_sql) - .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", index.create_sql)) + .unwrap_or_else(|e| { + panic!( + "create_sql cannot be invalid: `{}` --- error: `{}`", + index.create_sql, e + ) + }) .into_element() .ast; diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index b1e1cecd6b5ee..fd20658959cc5 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -15,6 +15,10 @@ use mz_ore::collections::CollectionExt; use mz_ore::now::{EpochMillis, NowFn}; use mz_sql::ast::display::AstDisplay; use mz_sql::ast::Raw; +use mz_sql_parser::ast::{ + CreateMaterializedViewStatement, MaterializedViewOption, MaterializedViewOptionName, + RefreshOptionValue, Statement, +}; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::sources::GenericSourceConnection; @@ -70,8 +74,8 @@ pub(crate) async fn migrate( // Perform per-item AST migrations. let conn_cat = state.for_system_session(); - rewrite_items(tx, &conn_cat, |_tx, _conn_cat, _item| { - let _catalog_version = catalog_version.clone(); + rewrite_items(tx, &conn_cat, |_tx, _conn_cat, stmt| { + let catalog_version = catalog_version.clone(); Box::pin(async move { // Add per-item AST migrations below. // @@ -87,6 +91,10 @@ pub(crate) async fn migrate( // Migration functions may also take `tx` as input to stage // arbitrary changes to the catalog. + if catalog_version <= Version::new(0, 79, u64::MAX) { + ast_rewrite_create_materialized_view_refresh_options_0_80_0(stmt)?; + } + Ok(()) }) }) @@ -244,6 +252,37 @@ async fn ast_rewrite_postgres_source_timeline_id_0_80_0( Ok(()) } +fn ast_rewrite_create_materialized_view_refresh_options_0_80_0( + stmt: &mut Statement, +) -> Result<(), anyhow::Error> { + use mz_sql::ast::visit_mut::VisitMut; + use mz_sql::ast::WithOptionValue; + + struct Rewriter; + + impl<'ast> VisitMut<'ast, Raw> for Rewriter { + fn visit_create_materialized_view_statement_mut( + &mut self, + node: &'ast mut CreateMaterializedViewStatement, + ) { + if !node + .with_options + .iter() + .any(|option| matches!(option.name, MaterializedViewOptionName::Refresh)) + { + node.with_options.push(MaterializedViewOption { + name: MaterializedViewOptionName::Refresh, + value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)), + }) + } + } + } + + Rewriter.visit_statement_mut(stmt); + + Ok(()) +} + fn _add_to_audit_log( tx: &mut Transaction, event_type: mz_audit_log::EventType, diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 2971bc107bd9b..be953c68b74e9 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -1904,6 +1904,7 @@ mod builtin_migration_tests { resolved_ids: ResolvedIds(BTreeSet::from_iter(resolved_ids)), cluster_id: ClusterId::User(1), non_null_assertions: vec![], + refresh_schedule: None, }) } SimplifiedItem::Index { on } => { diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 9c408965f59a5..21a7aab11e327 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -931,6 +931,7 @@ impl CatalogState { resolved_ids, cluster_id: materialized_view.cluster_id, non_null_assertions: materialized_view.non_null_assertions, + refresh_schedule: materialized_view.refresh_schedule, }) } Plan::CreateIndex(CreateIndexPlan { index, .. }) => CatalogItem::Index(Index { diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 7106053ecc7c9..70f266237098c 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -27,16 +27,21 @@ use mz_sql::ast::{ CopyRelation, CopyStatement, InsertSource, Query, Raw, SetExpr, Statement, SubscribeStatement, }; use mz_sql::catalog::RoleAttributes; -use mz_sql::names::{PartialItemName, ResolvedIds}; +use mz_sql::names::{Aug, PartialItemName, ResolvedIds}; use mz_sql::plan::{ AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan, TransactionType, }; +use mz_sql::pure::{ + materialized_view_option_contains_temporal, purify_create_materialized_view_options, +}; use mz_sql::rbac; use mz_sql::rbac::CREATE_ITEM_USAGE; use mz_sql::session::user::User; use mz_sql::session::vars::{ EndTransactionAction, OwnedVarInput, Var, STATEMENT_LOGGING_SAMPLE_RATE, }; +use mz_sql_parser::ast::CreateMaterializedViewStatement; +use mz_storage_types::sources::Timeline; use opentelemetry::trace::TraceContextExt; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{debug_span, Instrument}; @@ -53,7 +58,7 @@ use crate::notice::AdapterNotice; use crate::session::{Session, TransactionOps, TransactionStatus}; use crate::util::{ClientTransmitter, ResultExt}; use crate::webhook::{AppendWebhookResponse, AppendWebhookValidator}; -use crate::{catalog, metrics, ExecuteContext}; +use crate::{catalog, metrics, ExecuteContext, TimelineContext}; use super::ExecuteContextExtra; @@ -614,7 +619,7 @@ impl Coordinator { let catalog = self.catalog(); let catalog = catalog.for_session(ctx.session()); let original_stmt = stmt.clone(); - let (stmt, resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) { + let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, stmt) { Ok(resolved) => resolved, Err(e) => return ctx.retire(Err(e.into())), }; @@ -683,6 +688,87 @@ impl Coordinator { "CREATE SUBSOURCE statements", ))), + Statement::CreateMaterializedView(mut cmvs) => { + // (This won't be the same timestamp as the system table inserts, unfortunately.) + let mz_now = if cmvs + .with_options + .iter() + .any(materialized_view_option_contains_temporal) + { + let timeline_context = + match self.validate_timeline_context(resolved_ids.0.clone()) { + Ok(tc) => tc, + Err(e) => return ctx.retire(Err(e)), + }; + let timeline = match timeline_context { + TimelineContext::TimelineDependent(timeline) => timeline, + TimelineContext::TimestampDependent + | TimelineContext::TimestampIndependent => { + // We default to EpochMilliseconds, similarly to `determine_timestamp_for`. + // Note that we didn't accurately decide whether we are TimestampDependent + // or TimestampIndependent, because for this we'd need to also check whether + // `query.contains_temporal()`, similarly to how `peek_stage_validate` does. + // However, this doesn't matter here, as we are just going to default to + // EpochMilliseconds in both cases. + Timeline::EpochMilliseconds + } + }; + Some(self.get_timestamp_oracle(&timeline).read_ts().await) + // TODO: It might be good to take into account `least_valid_read` in addition to + // the oracle's `read_ts`, but there are two problems: + // 1. At this point, we don't know which indexes would be used. We could do an + // overestimation here by grabbing the ids of all indexes that are on ids + // involved in the query. (We'd have to recursively follow view definitions, + // similarly to `validate_timeline_context`.) + // 2. For a peak, when the `least_valid_read` is later than the oracle's + // `read_ts`, then the peak doesn't return before it completes at the chosen + // timestamp. However, for a CRATE MATERIALIZED VIEW statement, it's not clear + // whether we want to make it block until the chosen time. If it doesn't block, + // then the initial refresh wouldn't be linearized with the CREATE MATERIALIZED + // VIEW statement. + // + // Note: The Adapter is usually keeping a read hold of all objects at the oracle + // read timestamp, so `least_valid_read` usually won't actually be later than + // the oracle's `read_ts`. (see `Coordinator::advance_timelines`) + // + // Note 2: If we choose a timestamp here that is earlier than + // `least_valid_read`, that is somewhat bad, but not catastrophic: The only + // bad thing that happens is that we won't perform that refresh that was + // specified to be at `mz_now()` (which is usually the initial refresh) + // (similarly to how we don't perform refreshes that were specified to be in the + // past). + } else { + None + }; + + let owned_catalog = self.owned_catalog(); + let catalog = owned_catalog.for_session(ctx.session()); + + purify_create_materialized_view_options( + catalog, + mz_now, + &mut cmvs, + &mut resolved_ids, + ); + + let purified_stmt = + Statement::CreateMaterializedView(CreateMaterializedViewStatement:: { + if_exists: cmvs.if_exists, + name: cmvs.name, + columns: cmvs.columns, + in_cluster: cmvs.in_cluster, + query: cmvs.query, + with_options: cmvs.with_options, + }); + + // (Purifying CreateMaterializedView doesn't happen async, so no need to send + // `Message::PurifiedStatementReady` here.) + match self.plan_statement(ctx.session(), purified_stmt, ¶ms, &resolved_ids) { + Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, + Err(e) => ctx.retire(Err(e)), + } + } + // All other statements are handled immediately. _ => match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) { Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await, diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 86517181356e4..9bc744812266f 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -35,6 +35,7 @@ use mz_repr::explain::json::json_string; use mz_repr::explain::{ ExplainFormat, ExprHumanizer, ExprHumanizerExt, TransientItem, UsedIndexes, }; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{ColumnName, Datum, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp}; use mz_sql::ast::{ExplainStage, IndexOptionName}; @@ -964,6 +965,7 @@ impl Coordinator { column_names, cluster_id, non_null_assertions, + refresh_schedule, }, replace: _, drop_ids, @@ -971,6 +973,12 @@ impl Coordinator { ambiguous_columns, } = plan; + if refresh_schedule.is_some() { + return Err(AdapterError::Unsupported( + "REFRESH options other than ON COMMIT", + )); + } + self.ensure_cluster_can_host_compute_item(&name, cluster_id)?; // Validate any references in the materialized view's expression. We do @@ -1041,6 +1049,7 @@ impl Coordinator { resolved_ids, cluster_id, non_null_assertions, + refresh_schedule: refresh_schedule.clone(), }), owner_id: *session.current_role_id(), }); @@ -3309,6 +3318,7 @@ impl Coordinator { cluster_id, broken, non_null_assertions, + refresh_schedule, } => { // Please see the docs on `explain_query_optimizer_pipeline` above. self.explain_create_materialized_view_optimizer_pipeline( @@ -3318,6 +3328,7 @@ impl Coordinator { cluster_id, broken, non_null_assertions, + refresh_schedule, &config, root_dispatch, ) @@ -3649,6 +3660,7 @@ impl Coordinator { target_cluster_id: ClusterId, broken: bool, non_null_assertions: Vec, + _refresh_schedule: Option, explain_config: &mz_repr::explain::ExplainConfig, _root_dispatch: tracing::Dispatch, ) -> Result< diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index deee438e89168..be19ab85eb93a 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -29,6 +29,7 @@ use mz_controller_types::{ClusterId, ReplicaId}; use mz_expr::{CollectionPlan, MirScalarExpr, OptimizedMirRelationExpr}; use mz_ore::collections::CollectionExt; use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap}; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{GlobalId, RelationDesc}; use mz_sql::ast::display::AstDisplay; @@ -678,6 +679,7 @@ pub struct MaterializedView { pub resolved_ids: ResolvedIds, pub cluster_id: ClusterId, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } #[derive(Debug, Clone, Serialize)] diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index d5df57cf98139..3eb7036bf1650 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -104,6 +104,7 @@ pub mod explain; pub mod fixed_length; pub mod global_id; pub mod namespaces; +pub mod refresh_schedule; pub mod role_id; pub mod stats; pub mod strconv; diff --git a/src/repr/src/refresh_schedule.rs b/src/repr/src/refresh_schedule.rs new file mode 100644 index 0000000000000..7ffd5edd308b9 --- /dev/null +++ b/src/repr/src/refresh_schedule.rs @@ -0,0 +1,35 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::adt::interval::Interval; +use crate::Timestamp; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshSchedule { + // `REFRESH EVERY`s + pub everies: Vec, + // `REFRESH AT`s + pub ats: Vec, +} + +impl RefreshSchedule { + pub fn empty() -> RefreshSchedule { + RefreshSchedule { + everies: Vec::new(), + ats: Vec::new(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshEvery { + pub interval: Interval, + pub aligned_to: Timestamp, +} diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 16c7e31891873..03debeb632cb2 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -27,6 +27,7 @@ Access Add Addresses Aggregate +Aligned All Alter And @@ -92,6 +93,7 @@ Create Createcluster Createdb Createrole +Creation Cross Csv Current @@ -132,6 +134,7 @@ Enforced Envelope Error Escape +Every Except Execute Exists diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index a7f74436ea6fa..2f35160cc3be8 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -30,12 +30,15 @@ use crate::ast::{AstInfo, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOpti pub enum MaterializedViewOptionName { /// The `ASSERT NOT NULL [=] ` option. AssertNotNull, + /// The `REFRESH [=] ...` option. + Refresh, } impl AstDisplay for MaterializedViewOptionName { fn fmt(&self, f: &mut AstFormatter) { match self { MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"), + MaterializedViewOptionName::Refresh => f.write_str("REFRESH"), } } } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 3eb4b93b783b6..23393badee12c 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -3129,6 +3129,7 @@ pub enum WithOptionValue { // Special cases. ClusterReplicas(Vec>), ConnectionKafkaBroker(KafkaBroker), + Refresh(RefreshOptionValue), } impl AstDisplay for WithOptionValue { @@ -3137,7 +3138,9 @@ impl AstDisplay for WithOptionValue { // When adding branches to this match statement, think about whether it is OK for us to collect // the value as part of our telemetry. Check the data management policy to be sure! match self { - WithOptionValue::Value(_) | WithOptionValue::Sequence(_) => { + WithOptionValue::Value(_) + | WithOptionValue::Sequence(_) + | WithOptionValue::Refresh(_) => { // These are redact-aware. } WithOptionValue::DataType(_) @@ -3179,11 +3182,57 @@ impl AstDisplay for WithOptionValue { WithOptionValue::ConnectionKafkaBroker(broker) => { f.write_node(broker); } + WithOptionValue::Refresh(RefreshOptionValue::OnCommit) => { + f.write_str("ON COMMIT"); + } + WithOptionValue::Refresh(RefreshOptionValue::AtCreation) => { + f.write_str("AT CREATION"); + } + WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time })) => { + f.write_str("AT "); + f.write_node(time); + } + WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to, + })) => { + f.write_str("EVERY '"); + f.write_str(interval); + f.write_str("'"); + if let Some(aligned_to) = aligned_to { + f.write_str(" ALIGNED TO "); + f.write_node(aligned_to) + } + } } } } impl_display_t!(WithOptionValue); +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum RefreshOptionValue { + OnCommit, + AtCreation, + At(RefreshAtOptionValue), + Every(RefreshEveryOptionValue), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RefreshAtOptionValue { + // We need an Expr because we want to support `mz_now()`. + pub time: Expr, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RefreshEveryOptionValue { + // The following is a String and not an IntervalValue, because that starts with the keyword + // INTERVAL, but that is not needed here, since the only thing that can come here is an + // interval, so no need to indicate this with an extra keyword. + pub interval: String, + // We need an Expr because we want to support `mz_now()`. + pub aligned_to: Option>, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TransactionMode { AccessMode(TransactionAccessMode), diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 96bac7ef4d94a..b6f41a6e4df6b 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3218,18 +3218,102 @@ impl<'a> Parser<'a> { fn parse_materialized_view_option_name( &mut self, ) -> Result { - self.expect_keywords(&[ASSERT, NOT, NULL])?; - Ok(MaterializedViewOptionName::AssertNotNull) + let name = match self.expect_one_of_keywords(&[ASSERT, REFRESH])? { + ASSERT => { + self.expect_keywords(&[NOT, NULL])?; + MaterializedViewOptionName::AssertNotNull + } + REFRESH => MaterializedViewOptionName::Refresh, + _ => unreachable!(), + }; + Ok(name) } fn parse_materialized_view_option( &mut self, ) -> Result, ParserError> { let name = self.parse_materialized_view_option_name()?; - let value = self.parse_optional_option_value()?; + let value = match name { + MaterializedViewOptionName::Refresh => { + Some(self.parse_materialized_view_refresh_option_value()?) + } + _ => self.parse_optional_option_value()?, + }; Ok(MaterializedViewOption { name, value }) } + fn parse_materialized_view_refresh_option_value( + &mut self, + ) -> Result, ParserError> { + let _ = self.consume_token(&Token::Eq); + + if self.parse_keyword(ON) { + self.expect_keyword(COMMIT)?; + Ok(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)) + } else if self.parse_keyword(AT) { + if self.parse_keyword(CREATION) { + Ok(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) + } else { + Ok(WithOptionValue::Refresh(RefreshOptionValue::At( + RefreshAtOptionValue { + time: self.parse_expr()?, + }, + ))) + } + } else if self.parse_keyword(EVERY) { + match self + .parse_value() + .map_err(|ParserError { message, pos }| ParserError { + message: "Error parsing interval of REFRESH EVERY: ".to_string() + &message, + pos, + })? { + Value::String(interval) => { + let aligned_to = if self.parse_keywords(&[ALIGNED, TO]) { + Some(self.parse_expr()?) + } else { + None + }; + Ok(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { + interval, + aligned_to, + }, + ))) + } + v @ Value::Interval(_) => { + parser_err!( + self, + self.peek_prev_pos(), + format!( + "Invalid value for REFRESH EVERY: `{v}`. The value should be a \ + string parseable as an interval, e.g., '1 day'. The INTERVAL keyword \ + should NOT be present!" + ) + ) + } + v => { + parser_err!( + self, + self.peek_prev_pos(), + format!( + "Invalid value for REFRESH EVERY: `{v}`. The value should be a \ + string parseable as an interval, e.g., '1 day'." + ) + ) + } + } + } else { + parser_err!( + self, + self.peek_prev_pos(), + format!( + "Invalid REFRESH option value. Expected ON COMMIT, AT, or EVERY. Instead got {}.", + self.peek_token().map(|token| token.to_string()).unwrap_or("".to_string()), + ) + ) + } + } + fn parse_create_index(&mut self) -> Result, ParserError> { let default_index = self.parse_keyword(DEFAULT); self.expect_keyword(INDEX)?; diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 79458599c6b65..8f764936f9730 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -381,6 +381,20 @@ CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1 => CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [] }) +parse-statement +CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SELECT * FROM t; +---- +CREATE MATERIALIZED VIEW v WITH (REFRESH = EVERY '1 day', ASSERT NOT NULL = x) AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: "1 day", aligned_to: None }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(Ident(Ident("x"))) }] }) + +parse-statement +CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL x, REFRESH AT mz_now(), REFRESH ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t; +---- +CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH = EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL = x, REFRESH = AT mz_now(), REFRESH = ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, options: [] }), order_by: [], limit: None, offset: None }, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: "1 day", aligned_to: Some(Value(String("2023-12-11 11:00"))) }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(Ident(Ident("x"))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(At(RefreshAtOptionValue { time: Function(Function { name: Name(UnresolvedItemName([Ident("mz_now")])), args: Args { args: [], order_by: [] }, filter: None, over: None, distinct: false }) }))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(OnCommit)) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(AtCreation)) }] }) + parse-statement roundtrip CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b) AS SELECT 1 ---- diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index 8c5ab5f42bf80..dc8444e488f50 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -196,7 +196,9 @@ pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionR cluster_replica_name: &'b QualifiedReplica, ) -> Result<&dyn CatalogClusterReplica<'a>, CatalogError>; - /// Resolves a partially-specified item name. + /// Resolves a partially-specified item name, that is NOT a function or + /// type. (For resolving functions or types, please use resolve_function + /// or resolve_type.) /// /// If the partial name has a database component, it searches only the /// specified database; otherwise, it searches the active database. If the diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index b3f41dc0fda27..1eb4dfcc5b110 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -1783,6 +1783,7 @@ impl<'a> Fold for NameResolver<'a> { .collect(), ), ConnectionKafkaBroker(broker) => ConnectionKafkaBroker(self.fold_kafka_broker(broker)), + Refresh(refresh) => Refresh(self.fold_refresh_option_value(refresh)), } } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 357fb48054dae..df1b0607554bb 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -39,6 +39,7 @@ use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; use mz_repr::explain::{ExplainConfig, ExplainFormat}; +use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; use mz_repr::{ColumnName, Diff, GlobalId, RelationDesc, Row, ScalarType}; use mz_sql_parser::ast::{ @@ -861,6 +862,7 @@ pub enum ExplaineeStatement { /// Broken flag (see [`ExplaineeStatement::broken()`]). broken: bool, non_null_assertions: Vec, + refresh_schedule: Option, }, /// The object to be explained is a CREATE INDEX. CreateIndex { @@ -1431,6 +1433,7 @@ pub struct MaterializedView { pub column_names: Vec, pub cluster_id: ClusterId, pub non_null_assertions: Vec, + pub refresh_schedule: Option, } #[derive(Clone, Debug)] diff --git a/src/sql/src/plan/expr.rs b/src/sql/src/plan/expr.rs index d1d25b937a44d..5b2f2c0b9a7de 100644 --- a/src/sql/src/plan/expr.rs +++ b/src/sql/src/plan/expr.rs @@ -3032,6 +3032,28 @@ impl HirScalarExpr { } }) } + + /// Attempts to simplify this expression to a literal MzTimestamp. + /// + /// Returns `None` if this expression cannot be simplified, e.g. because it + /// contains non-literal values. + /// + /// TODO: Make this (and the other similar fns above) return Result, so that we can show the + /// error when it fails. (E.g., there can be non-trivial cast errors.) + /// + /// # Panics + /// + /// Panics if this expression does not have type [`ScalarType::MzTimestamp`]. + pub fn into_literal_mz_timestamp(self) -> Option { + self.simplify_to_literal().and_then(|row| { + let datum = row.unpack_first(); + if datum.is_null() { + None + } else { + Some(datum.unwrap_mz_timestamp()) + } + }) + } } impl VisitChildren for HirScalarExpr { diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 06a0e063350ac..28f55874a1389 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -240,7 +240,8 @@ pub fn describe( /// Planning is a pure, synchronous function and so requires that the provided /// `stmt` does does not depend on any external state. Statements that rely on /// external state must remove that state prior to calling this function via -/// [`crate::pure::purify_statement`]. +/// [`crate::pure::purify_statement`] or +/// [`crate::pure::purify_create_materialized_view_options`]. /// /// TODO: sinks do not currently obey this rule, which is a bug /// diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index a06e33a262c81..9c6862fa1120b 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Write; use std::iter; +use std::str::FromStr; use std::time::Duration; use itertools::{Either, Itertools}; @@ -25,8 +26,10 @@ use mz_ore::cast::{CastFrom, TryCastFrom}; use mz_ore::collections::HashSet; use mz_ore::str::StrExt; use mz_proto::RustType; +use mz_repr::adt::interval::Interval; use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap}; use mz_repr::adt::system::Oid; +use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule}; use mz_repr::role_id::RoleId; use mz_repr::{strconv, ColumnName, ColumnType, GlobalId, RelationDesc, RelationType, ScalarType}; use mz_sql_parser::ast::display::comma_separated; @@ -39,8 +42,8 @@ use mz_sql_parser::ast::{ CreateConnectionOption, CreateConnectionOptionName, CreateConnectionType, CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, DeferredItemName, DocOnIdentifier, DocOnSchema, DropOwnedStatement, MaterializedViewOption, - MaterializedViewOptionName, SetRoleVar, UnresolvedItemName, UnresolvedObjectName, - UnresolvedSchemaName, Value, + MaterializedViewOptionName, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, + SetRoleVar, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, }; use mz_sql_parser::ident; use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection}; @@ -94,7 +97,7 @@ use crate::names::{ use crate::normalize::{self, ident}; use crate::plan::error::PlanError; use crate::plan::expr::ColumnRef; -use crate::plan::query::{scalar_type_from_catalog, ExprContext, QueryLifetime}; +use crate::plan::query::{plan_expr, scalar_type_from_catalog, ExprContext, QueryLifetime}; use crate::plan::scope::Scope; use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS}; use crate::plan::statement::{scl, StatementContext, StatementDesc}; @@ -118,6 +121,7 @@ use crate::plan::{ WebhookHeaders, WebhookValidation, }; use crate::session::vars; +use crate::session::vars::ENABLE_REFRESH_EVERY_MVS; mod connection; @@ -2086,9 +2090,125 @@ pub fn plan_create_materialized_view( let MaterializedViewOptionExtracted { assert_not_null, + refresh, seen: _, }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?; + let refresh_schedule = { + let mut refresh_schedule = RefreshSchedule::empty(); + let mut on_commits_seen = 0; + for refresh_option_value in refresh { + if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) { + scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?; + } + match refresh_option_value { + RefreshOptionValue::OnCommit => { + on_commits_seen += 1; + } + RefreshOptionValue::AtCreation => { + unreachable!("REFRESH AT CREATION should have been purified away") + } + RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => { + transform_ast::transform(scx, &mut time)?; // Desugar the expression + let ecx = &ExprContext { + qcx: &QueryContext::root(scx, QueryLifetime::OneShot), + name: "REFRESH AT", + scope: &Scope::empty(), + relation_type: &RelationType::empty(), + allow_aggregates: false, + allow_subqueries: false, + allow_parameters: false, + allow_windows: false, + }; + let hir = plan_expr(ecx, &time)?.cast_to( + ecx, + CastContext::Implicit, + &ScalarType::MzTimestamp, + )?; + // (mz_now was purified away to a literal earlier) + let timestamp = hir + .into_literal_mz_timestamp(). + ok_or_else(|| sql_err!( + "REFRESH AT argument must be an expression that can be simplified and/or cast \ + to a constant whose type is mz_timestamp (calling mz_now() is allowed)" + ))?; + refresh_schedule.ats.push(timestamp); + } + RefreshOptionValue::Every(RefreshEveryOptionValue { + interval, + aligned_to, + }) => { + let interval = Interval::from_str(interval.as_str())?; + if interval.as_microseconds() <= 0 { + sql_bail!("REFRESH interval must be positive; got: {}", interval); + } + if interval.as_microseconds() > Interval::new(0, 27, 0).as_microseconds() { + // This limitation is because we want Intervals to be cleanly convertable + // to a unix epoch timestamp difference. When it's at least 1 month, then + // this is not true anymore, because months have variable lengths. + // See `Timestamp::round_up`. + sql_bail!( + "REFRESH interval too big: {}. Currently, only intervals not larger than 27 days are supported.", + interval + ); + } + + let mut aligned_to = + aligned_to.expect("ALIGNED TO should have been filled in by purification"); + + // Desugar the `aligned_to` expression + transform_ast::transform(scx, &mut aligned_to)?; + + let ecx = &ExprContext { + qcx: &QueryContext::root(scx, QueryLifetime::OneShot), + name: "REFRESH EVERY ... ALIGNED TO", + scope: &Scope::empty(), + relation_type: &RelationType::empty(), + allow_aggregates: false, + allow_subqueries: false, + allow_parameters: false, + allow_windows: false, + }; + let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to( + ecx, + CastContext::Implicit, + &ScalarType::MzTimestamp, + )?; + // (mz_now was purified away to a literal earlier) + let aligned_to_const = aligned_to_hir + .into_literal_mz_timestamp() + .ok_or_else(|| + sql_err!( + "REFRESH EVERY ... ALIGNED TO argument must be an expression that can be simplified and/or cast \ + to a constant whose type is mz_timestamp (calling mz_now() is allowed)" + ))?; + + refresh_schedule.everies.push(RefreshEvery { + interval, + aligned_to: aligned_to_const, + }); + } + } + } + + if on_commits_seen > 1 { + sql_bail!("REFRESH ON COMMIT can be given only once"); + } + if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::empty() { + sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options"); + } + // Note: Seeing no REFRESH options at all (not even REFRESH ON COMMIT) should be acceptable: + // even though purification inserts REFRESH ON COMMIT if no other REFRESH option was given, + // we can't rely on this behavior in planning, because this won't happen for old + // materialized views that were created before this feature was introduced. + + if refresh_schedule == RefreshSchedule::empty() { + None + } else { + Some(refresh_schedule) + } + }; + if !assert_not_null.is_empty() { scx.require_feature_flag(&crate::session::vars::ENABLE_ASSERT_NOT_NULL)?; } @@ -2180,6 +2300,7 @@ pub fn plan_create_materialized_view( column_names, cluster_id, non_null_assertions, + refresh_schedule, }, replace, drop_ids, @@ -2190,7 +2311,8 @@ pub fn plan_create_materialized_view( generate_extracted_config!( MaterializedViewOption, - (AssertNotNull, Ident, AllowMultiple) + (AssertNotNull, Ident, AllowMultiple), + (Refresh, RefreshOptionValue, AllowMultiple) ); pub fn describe_create_sink( diff --git a/src/sql/src/plan/statement/dml.rs b/src/sql/src/plan/statement/dml.rs index fc6dc777c145e..985f8dc21235e 100644 --- a/src/sql/src/plan/statement/dml.rs +++ b/src/sql/src/plan/statement/dml.rs @@ -371,6 +371,7 @@ pub fn plan_explain_plan( column_names, cluster_id, non_null_assertions, + refresh_schedule, .. }, .. @@ -386,6 +387,7 @@ pub fn plan_explain_plan( cluster_id, broken, non_null_assertions, + refresh_schedule, }) } Explainee::CreateIndex(mut stmt, broken) => { diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index d222319d10251..a04c6873ed57c 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -10,7 +10,7 @@ //! Provides tooling to handle `WITH` options. use mz_repr::{strconv, GlobalId}; -use mz_sql_parser::ast::{Ident, KafkaBroker, ReplicaDefinition}; +use mz_sql_parser::ast::{Ident, KafkaBroker, RefreshOptionValue, ReplicaDefinition}; use mz_storage_types::connections::StringOrSecret; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -432,9 +432,12 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue sql_bail!( + | WithOptionValue::ConnectionKafkaBroker(_) + | WithOptionValue::Refresh(_) => sql_bail!( "incompatible value types: cannot convert {} to {}", match v { + WithOptionValue::Value(_) => unreachable!(), + WithOptionValue::Ident(_) => unreachable!(), WithOptionValue::Sequence(_) => "sequences", WithOptionValue::Item(_) => "object references", WithOptionValue::UnresolvedItemName(_) => "object names", @@ -442,7 +445,7 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue "data types", WithOptionValue::ClusterReplicas(_) => "cluster replicas", WithOptionValue::ConnectionKafkaBroker(_) => "connection kafka brokers", - _ => unreachable!(), + WithOptionValue::Refresh(_) => "refresh option values", }, V::name() ), @@ -509,3 +512,23 @@ impl ImpliedValue for Vec> { sql_bail!("must provide a kafka broker") } } + +impl TryFromValue> for RefreshOptionValue { + fn try_from_value(v: WithOptionValue) -> Result { + if let WithOptionValue::Refresh(r) = v { + Ok(r) + } else { + sql_bail!("cannot use value `{}` for a refresh option", v) + } + } + + fn name() -> String { + "refresh option value".to_string() + } +} + +impl ImpliedValue for RefreshOptionValue { + fn implied_value() -> Result { + sql_bail!("must provide a refresh option value") + } +} diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 0299b6e2b8afc..3a810c93149fd 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -25,16 +25,20 @@ use mz_ore::str::StrExt; use mz_postgres_util::replication::WalLevel; use mz_postgres_util::PostgresError; use mz_proto::RustType; -use mz_repr::{strconv, GlobalId}; +use mz_repr::{strconv, GlobalId, Timestamp}; use mz_sql_parser::ast::display::AstDisplay; +use mz_sql_parser::ast::visit::{visit_function, Visit}; +use mz_sql_parser::ast::visit_mut::{visit_expr_mut, VisitMut}; use mz_sql_parser::ast::{ AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn, - CreateSinkConnection, CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName, - CsrConfigOption, CsrConfigOptionName, CsrConnection, CsrSeedAvro, CsrSeedProtobuf, - CsrSeedProtobufSchema, DbzMode, DeferredItemName, DocOnIdentifier, DocOnSchema, Envelope, - Ident, KafkaConfigOption, KafkaConfigOptionName, KafkaConnection, KafkaSourceConnection, - PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy, Statement, - UnresolvedItemName, + CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkStatement, + CreateSubsourceOption, CreateSubsourceOptionName, CsrConfigOption, CsrConfigOptionName, + CsrConnection, CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DbzMode, DeferredItemName, + DocOnIdentifier, DocOnSchema, Envelope, Expr, Function, FunctionArgs, Ident, KafkaConfigOption, + KafkaConfigOptionName, KafkaConnection, KafkaSourceConnection, MaterializedViewOption, + MaterializedViewOptionName, PgConfigOption, PgConfigOptionName, RawItemName, + ReaderSchemaSelectionStrategy, RefreshAtOptionValue, RefreshEveryOptionValue, + RefreshOptionValue, Statement, UnresolvedItemName, }; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::inline::IntoInlineConnection; @@ -57,7 +61,10 @@ use crate::ast::{ }; use crate::catalog::{CatalogItemType, ErsatzCatalog, SessionCatalog}; use crate::kafka_util::KafkaConfigOptionExtracted; -use crate::names::{Aug, ResolvedColumnName, ResolvedItemName}; +use crate::names::{ + Aug, FullItemName, PartialItemName, ResolvedColumnName, ResolvedDataType, ResolvedIds, + ResolvedItemName, +}; use crate::plan::error::PlanError; use crate::plan::statement::ddl::load_generator_ast_to_generator; use crate::plan::StatementContext; @@ -1475,3 +1482,239 @@ async fn compile_proto( message_name, }) } + +const MZ_NOW_NAME: &str = "mz_now"; +const MZ_NOW_SCHEMA: &str = "mz_catalog"; + +/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if +/// references to ids appear or disappear during the purification. +pub fn purify_create_materialized_view_options( + catalog: impl SessionCatalog, + mz_now: Option, + cmvs: &mut CreateMaterializedViewStatement, + resolved_ids: &mut ResolvedIds, +) { + // 0. Preparations: + // Prepare an expression that calls `mz_now()`, which we can insert in various later steps. + let (mz_now_id, mz_now_expr) = { + let item = catalog + .resolve_function(&PartialItemName { + database: None, + schema: Some(MZ_NOW_SCHEMA.to_string()), + item: MZ_NOW_NAME.to_string(), + }) + .expect("we should be able to resolve mz_now"); + ( + item.id(), + Expr::Function(Function { + name: ResolvedItemName::Item { + id: item.id(), + qualifiers: item.name().qualifiers.clone(), + full_name: catalog.resolve_full_name(item.name()), + print_id: false, + }, + args: FunctionArgs::Args { + args: Vec::new(), + order_by: Vec::new(), + }, + filter: None, + over: None, + distinct: false, + }), + ) + }; + // Prepare the `mz_timestamp` type. + let (mz_timestamp_id, mz_timestamp_type) = { + let item = catalog.get_system_type("mz_timestamp"); + let full_name = catalog.resolve_full_name(item.name()); + ( + item.id(), + ResolvedDataType::Named { + id: item.id(), + qualifiers: item.name().qualifiers.clone(), + full_name, + modifiers: vec![], + print_id: true, + }, + ) + }; + + let mut introduced_mz_timestamp = false; + + for option in cmvs.with_options.iter_mut() { + // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`. + if matches!( + option.value, + Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) + ) { + option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At( + RefreshAtOptionValue { + time: mz_now_expr.clone(), + }, + ))); + } + + // 2. If `REFRESH EVERY` doesn't have a `STARTING AT`, then add `STARTING AT mz_now()`. + if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { aligned_to, .. }, + ))) = &mut option.value + { + if aligned_to.is_none() { + *aligned_to = Some(mz_now_expr.clone()); + } + } + + // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW + // statement. (This has to happen after the above steps, which might introduce `mz_now()`.) + match &mut option.value { + Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { + time, + }))) => { + let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone()); + visitor.visit_expr_mut(time); + introduced_mz_timestamp |= visitor.introduced_mz_timestamp; + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every( + RefreshEveryOptionValue { + interval: _, + aligned_to: Some(aligned_to), + }, + ))) => { + let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone()); + visitor.visit_expr_mut(aligned_to); + introduced_mz_timestamp |= visitor.introduced_mz_timestamp; + } + _ => {} + } + } + + // 4. If the user didn't give any REFRESH option, then default to ON COMMIT. + if !cmvs.with_options.iter().any(|o| { + matches!( + o, + MaterializedViewOption { + value: Some(WithOptionValue::Refresh(..)), + .. + } + ) + }) { + cmvs.with_options.push(MaterializedViewOption { + name: MaterializedViewOptionName::Refresh, + value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)), + }) + } + + // 5. Attend to `resolved_ids`: The purification might have + // - added references to `mz_timestamp`; + // - removed references to `mz_now`. + if introduced_mz_timestamp { + resolved_ids.0.insert(mz_timestamp_id); + } + // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()` + // remaining in the main query expression of the MV, so let's visit the entire statement to look + // for `mz_now()` everywhere. + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_create_materialized_view_statement(cmvs); + if !visitor.contains_temporal { + resolved_ids.0.remove(&mz_now_id); + } +} + +/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve +/// after purification. +pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption) -> bool { + match &mvo.value { + Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => { + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_expr(time); + visitor.contains_temporal + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval: _, + aligned_to: Some(aligned_to), + }))) => { + let mut visitor = ExprContainsTemporalVisitor::new(); + visitor.visit_expr(aligned_to); + visitor.contains_temporal + } + Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue { + interval: _, + aligned_to: None, + }))) => { + // For a `REFRESH EVERY` without a `STARTING AT`, purification will default the + // `STARTING AT` to `mz_now()`. + true + } + Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => { + // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`. + true + } + _ => false, + } +} + +/// Determines whether the AST involves `mz_now()`. +struct ExprContainsTemporalVisitor { + pub contains_temporal: bool, +} + +impl ExprContainsTemporalVisitor { + pub fn new() -> ExprContainsTemporalVisitor { + ExprContainsTemporalVisitor { + contains_temporal: false, + } + } +} + +impl Visit<'_, Aug> for ExprContainsTemporalVisitor { + fn visit_function(&mut self, func: &Function) { + self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME; + visit_function(self, func); + } +} + +struct MzNowPurifierVisitor { + pub mz_now: Option, + pub mz_timestamp_type: ResolvedDataType, + pub introduced_mz_timestamp: bool, +} + +impl MzNowPurifierVisitor { + pub fn new( + mz_now: Option, + mz_timestamp_type: ResolvedDataType, + ) -> MzNowPurifierVisitor { + MzNowPurifierVisitor { + mz_now, + mz_timestamp_type, + introduced_mz_timestamp: false, + } + } +} + +impl VisitMut<'_, Aug> for MzNowPurifierVisitor { + fn visit_expr_mut(&mut self, expr: &'_ mut Expr) { + match expr { + Expr::Function(Function { + name: + ResolvedItemName::Item { + full_name: FullItemName { item, .. }, + .. + }, + .. + }) if item == &MZ_NOW_NAME.to_string() => { + let mz_now = self.mz_now.expect( + "we should have chosen a timestamp if the expression contains mz_now()", + ); + // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to + // not alter the type of the expression. + *expr = Expr::Cast { + expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))), + data_type: self.mz_timestamp_type.clone(), + }; + self.introduced_mz_timestamp = true; + } + _ => visit_expr_mut(self, expr), + } + } +} diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index bd7a9c8001729..0fd67381a35a7 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -2080,6 +2080,13 @@ feature_flags!( internal: true, enable_for_item_parsing: true, }, + { + name: enable_refresh_every_mvs, + desc: "REFRESH EVERY materialized views", + default: false, + internal: true, + enable_for_item_parsing: true, + }, ); /// Represents the input to a variable. diff --git a/test/sqllogictest/materialized_views.slt b/test/sqllogictest/materialized_views.slt index 31c8649acaf2b..e1fd1aa3e1801 100644 --- a/test/sqllogictest/materialized_views.slt +++ b/test/sqllogictest/materialized_views.slt @@ -370,14 +370,14 @@ query TT colnames SHOW CREATE MATERIALIZED VIEW mv ---- name create_sql -materialize.public.mv CREATE␠MATERIALIZED␠VIEW␠"materialize"."public"."mv"␠IN␠CLUSTER␠"default"␠AS␠SELECT␠1 +materialize.public.mv CREATE␠MATERIALIZED␠VIEW␠"materialize"."public"."mv"␠IN␠CLUSTER␠"default"␠WITH␠(REFRESH␠=␠ON␠COMMIT)␠AS␠SELECT␠1 # Test: SHOW CREATE MATERIALIZED VIEW as mz_support simple conn=mz_introspection,user=mz_support SHOW CREATE MATERIALIZED VIEW mv ---- -materialize.public.mv,CREATE MATERIALIZED VIEW "materialize"."public"."mv" IN CLUSTER "default" AS SELECT 1 +materialize.public.mv,CREATE MATERIALIZED VIEW "materialize"."public"."mv" IN CLUSTER "default" WITH (REFRESH = ON COMMIT) AS SELECT 1 COMPLETE 1 # Test: SHOW MATERIALIZED VIEWS @@ -645,7 +645,123 @@ SELECT * FROM mv_assertion_at_begin ORDER BY x; 4 NULL 6 7 8 NULL -# More Cleanup +# ------------------------------------------------------------------ +# REFRESH options (see also in materialized-view-refresh-options.td) +# ------------------------------------------------------------------ + +# Planning/parsing errors + +# Should be disabled by default. +query error db error: ERROR: REFRESH EVERY materialized views is not supported +CREATE MATERIALIZED VIEW mv_bad WITH (ASSERT NOT NULL x, REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_refresh_every_mvs = true +---- +COMPLETE 0 + +query error Invalid REFRESH option value\. Expected ON COMMIT, AT, or EVERY\. Instead got number "5"\. +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH 5) AS SELECT 1; + +query error db error: ERROR: REFRESH ON COMMIT can be given only once +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH ON COMMIT, REFRESH ON COMMIT) AS SELECT 1; + +query error db error: ERROR: REFRESH ON COMMIT is not compatible with any of the other REFRESH options +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH ON COMMIT, REFRESH EVERY '1 day') AS SELECT 1; + +query error db error: ERROR: REFRESH AT does not support implicitly casting from record\(f1: integer,f2: integer\) to mz_timestamp +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT row(1,2)) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT 'aaaa') AS SELECT 1; + +query error db error: ERROR: column "ccc" does not exist +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT ccc) AS SELECT 1 as ccc; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT now()) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT now()::mz_timestamp) AS SELECT 1; + +query error db error: ERROR: greatest types mz_timestamp and timestamp with time zone cannot be matched +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT greatest(mz_now(), now())) AS SELECT 1; + +query error db error: ERROR: REFRESH AT argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT greatest(mz_now(), now()::mz_timestamp)) AS SELECT 1; + +query error db error: ERROR: aggregate functions are not allowed in REFRESH AT \(function pg_catalog\.sum\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT sum(5)) AS SELECT 1; + +query error db error: ERROR: REFRESH AT does not allow subqueries +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT (SELECT 1)) AS SELECT 1; + +query error db error: ERROR: window functions are not allowed in REFRESH AT \(function pg_catalog\.lag\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH AT lag(7) OVER ()) AS SELECT 1; + +query error Invalid value for REFRESH EVERY: `42`\. The value should be a string parseable as an interval, e\.g\., '1 day'\. +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY 42) AS SELECT 1; + +query error db error: ERROR: invalid input syntax for type interval: unknown units dayy: "1 dayy" +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 dayy') AS SELECT 1; + +query error Invalid value for REFRESH EVERY: `INTERVAL '1 day'`\. The value should be a string parseable as an interval, e\.g\., '1 day'\. The INTERVAL keyword should NOT be present! +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY INTERVAL '1 day') AS SELECT 1; + +query error db error: ERROR: REFRESH interval must be positive; got: \-00:01:00 +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '-1 minutes') AS SELECT 1; + +query error db error: ERROR: REFRESH interval too big: 28 days\. Currently, only intervals not larger than 27 days are supported\. +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '28 days') AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO now()) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO now()::mz_timestamp) AS SELECT 1; + +query error db error: ERROR: greatest types mz_timestamp and timestamp with time zone cannot be matched +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO greatest(mz_now(), now())) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO argument must be an expression that can be simplified and/or cast to a constant whose type is mz_timestamp \(calling mz_now\(\) is allowed\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO greatest(mz_now(), now()::mz_timestamp)) AS SELECT 1; + +query error db error: ERROR: aggregate functions are not allowed in REFRESH EVERY \.\.\. ALIGNED TO \(function pg_catalog\.sum\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO sum(5)) AS SELECT 1; + +query error db error: ERROR: REFRESH EVERY \.\.\. ALIGNED TO does not allow subqueries +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO (SELECT 1)) AS SELECT 1; + +query error db error: ERROR: window functions are not allowed in REFRESH EVERY \.\.\. ALIGNED TO \(function pg_catalog\.lag\) +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY '1 day' ALIGNED TO lag(7) OVER ()) AS SELECT 1; + +query error Error parsing interval of REFRESH EVERY: Unsupported value: RParen +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY) AS SELECT * FROM t2; + +query error Error parsing interval of REFRESH EVERY: Unsupported value: Comma +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY, ASSERT NOT NULL x) AS SELECT * FROM t2; + +query error Expected right parenthesis, found REFRESH +CREATE MATERIALIZED VIEW mv_bad WITH (ASSERT NOT NULL x REFRESH EVERY '8 seconds') AS SELECT * FROM t2; + +query error Error parsing interval of REFRESH EVERY: No value parser for keyword ASSERT +CREATE MATERIALIZED VIEW mv_bad WITH (REFRESH EVERY ASSERT NOT NULL x) AS SELECT * FROM t2; + +# Test that we call `transform_ast::transform`. (This has an `Expr::Nested`, which needs to be desugared, or we panic.) +query error db error: ERROR: REFRESH options other than ON COMMIT are not supported +CREATE MATERIALIZED VIEW mv_desugar1 WITH (REFRESH AT (mz_now())) AS SELECT * FROM t2; + +# Same with ALIGNED TO +query error db error: ERROR: REFRESH options other than ON COMMIT are not supported +CREATE MATERIALIZED VIEW mv_desugar2 WITH (REFRESH EVERY '1 day' ALIGNED TO (mz_now())) AS SELECT * FROM t2; statement ok -DROP TABLE t2 CASCADE +CREATE MATERIALIZED VIEW mv_on_commit WITH (REFRESH ON COMMIT) AS SELECT * FROM t2; + +query III +SELECT 1000*x, 1000*y, 1000*z +FROM mv_on_commit; +---- +7000 8000 NULL +4000 NULL 6000 +1000 2000 3000 diff --git a/test/sqllogictest/rename.slt b/test/sqllogictest/rename.slt index 57f2dd40156d5..dc0c0113da491 100644 --- a/test/sqllogictest/rename.slt +++ b/test/sqllogictest/rename.slt @@ -159,7 +159,7 @@ query TT SHOW CREATE MATERIALIZED VIEW grand_friend.mv1; ---- materialize.grand_friend.mv1 -CREATE MATERIALIZED VIEW "materialize"."grand_friend"."mv1" IN CLUSTER "default" AS SELECT "x" FROM "materialize"."enemy"."v1" +CREATE MATERIALIZED VIEW "materialize"."grand_friend"."mv1" IN CLUSTER "default" WITH (REFRESH = ON COMMIT) AS SELECT "x" FROM "materialize"."enemy"."v1" statement ok CREATE TABLE a1.t (y text); diff --git a/test/testdrive/materializations.td b/test/testdrive/materializations.td index 4c4ce7bf852a4..e52128c0f3fc1 100644 --- a/test/testdrive/materializations.td +++ b/test/testdrive/materializations.td @@ -79,7 +79,7 @@ data_view > SHOW CREATE MATERIALIZED VIEW test1 name create_sql ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -materialize.public.test1 "CREATE MATERIALIZED VIEW \"materialize\".\"public\".\"test1\" IN CLUSTER \"default\" AS SELECT \"b\", \"pg_catalog\".\"sum\"(\"a\") FROM \"materialize\".\"public\".\"data\" GROUP BY \"b\"" +materialize.public.test1 "CREATE MATERIALIZED VIEW \"materialize\".\"public\".\"test1\" IN CLUSTER \"default\" WITH (REFRESH = ON COMMIT) AS SELECT \"b\", \"pg_catalog\".\"sum\"(\"a\") FROM \"materialize\".\"public\".\"data\" GROUP BY \"b\"" # Materialized view can be built on a not-materialized view. > CREATE MATERIALIZED VIEW test2 AS