From d5649db57f5250f488cc85bad374b72a83336f75 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 24 Feb 2025 21:46:49 +0500 Subject: [PATCH] Handle double quote in z prefix resources (#1412) --- .../PgsqlKafkaKeyAvroSchemaTemplate.java | 4 +- .../PgsqlKafkaValueAvroSchemaTemplate.java | 4 +- .../stream/PgsqlKafkaProxyFactory.java | 4 +- .../binding/pgsql/parser/PgsqlParser.java | 4 +- .../listener/SqlAlterZtableTopicListener.java | 2 +- .../listener/SqlCreateZfunctionListener.java | 2 +- .../SqlCreateZtableTopicListener.java | 14 +- .../listener/SqlCreateZviewListener.java | 2 +- .../parser/listener/SqlDropListener.java | 2 +- .../{CreateTable.java => CreateZtable.java} | 4 +- .../{TableColumn.java => ZtableColumn.java} | 2 +- .../binding/pgsql/parser/PgsqlParserTest.java | 119 ++++---- .../client.rpt | 261 ++++++++++++++++++ .../server.rpt | 259 +++++++++++++++++ .../client.rpt | 54 ++++ .../server.rpt | 56 ++++ .../risingwave/streams/EffectiveIT.java | 9 + .../binding/risingwave/streams/PgsqlIT.java | 9 + .../macro/RisingwaveCreateZtableMacro.java | 12 +- .../stream/RisingwaveProxyFactory.java | 4 +- .../risingwave/internal/stream/ProxyIT.java | 10 + 21 files changed, 758 insertions(+), 79 deletions(-) rename incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/{CreateTable.java => CreateZtable.java} (92%) rename incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/{TableColumn.java => ZtableColumn.java} (96%) create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/client.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/server.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/client.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/server.rpt diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java index a20eeebfb0..56ae3bf9de 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.stream.Collectors; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; public class PgsqlKafkaKeyAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate { @@ -31,7 +31,7 @@ public PgsqlKafkaKeyAvroSchemaTemplate( public String generate( String database, - CreateTable command) + CreateZtable command) { final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database); diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java index 4fed9421e9..0aea978840 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java @@ -26,7 +26,7 @@ import jakarta.json.JsonValue; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; public class PgsqlKafkaValueAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate { @@ -39,7 +39,7 @@ public PgsqlKafkaValueAvroSchemaTemplate( } public String generate( - CreateTable command) + CreateZtable command) { final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, command.schema()); diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 3d6bbccc59..3314a7f357 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -55,7 +55,7 @@ import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.WindowFW; import io.aklivity.zilla.runtime.binding.pgsql.parser.PgsqlParser; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.BindingHandler; @@ -1336,7 +1336,7 @@ private void decodeCreateTopicCommand( } else if (server.commandsProcessed == 0) { - final CreateTable createTopic = parser.parseCreateTable(statement); + final CreateZtable createTopic = parser.parseCreateTable(statement); final String schema = createTopic.schema(); final String topic = createTopic.name(); diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java index 6d35805734..0d60d0078c 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParser.java @@ -39,8 +39,8 @@ import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlShowListener; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateFunction; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZfunction; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZview; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop; @@ -88,7 +88,7 @@ public String parseCommand( return commandListener.command(); } - public CreateTable parseCreateTable( + public CreateZtable parseCreateTable( String sql) { parser(sql, createTableListener); diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlAlterZtableTopicListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlAlterZtableTopicListener.java index a51ad3b0b7..565023aec7 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlAlterZtableTopicListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlAlterZtableTopicListener.java @@ -61,7 +61,7 @@ public void enterRoot( public void enterQualified_name( PostgreSqlParser.Qualified_nameContext ctx) { - String text = ctx.getText(); + String text = ctx.getText().replace("\"", ""); String[] split = text.split(SCHEMA_PATTERN); schema = split.length > 1 ? split[0] : PUBLIC_SCHEMA_NAME; name = split.length > 1 ? split[1] : text; diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZfunctionListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZfunctionListener.java index d75c31b56b..fc942acdf9 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZfunctionListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZfunctionListener.java @@ -99,7 +99,7 @@ public void enterRoot( public void enterCreatezfunctionstmt( PostgreSqlParser.CreatezfunctionstmtContext ctx) { - String text = ctx.func_name().getText(); + String text = ctx.func_name().getText().replace("\"", ""); String[] split = text.split(SCHEMA_PATTERN); schema = split.length > 1 ? split[0] : PUBLIC_SCHEMA_NAME; name = split.length > 1 ? split[1] : text; diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZtableTopicListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZtableTopicListener.java index 7b81eef3df..307ab446cc 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZtableTopicListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZtableTopicListener.java @@ -23,15 +23,15 @@ import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParser; import io.aklivity.zilla.runtime.binding.pgsql.parser.PostgreSqlParserBaseListener; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.TableColumn; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.ZtableColumn; public class SqlCreateZtableTopicListener extends PostgreSqlParserBaseListener { private static final String PUBLIC_SCHEMA_NAME = "public"; private static final String SCHEMA_PATTERN = "\\."; - private final List columns; + private final List columns; private final Set primaryKeys; private final TokenStream tokens; @@ -46,9 +46,9 @@ public SqlCreateZtableTopicListener( this.tokens = tokens; } - public CreateTable table() + public CreateZtable table() { - return new CreateTable(schema, name, columns, primaryKeys); + return new CreateZtable(schema, name, columns, primaryKeys); } @Override @@ -65,7 +65,7 @@ public void enterRoot( public void enterQualified_name( PostgreSqlParser.Qualified_nameContext ctx) { - String text = ctx.getText(); + String text = ctx.getText().replace("\"", ""); String[] split = text.split(SCHEMA_PATTERN); schema = split.length > 1 ? split[0] : PUBLIC_SCHEMA_NAME; name = split.length > 1 ? split[1] : text; @@ -112,7 +112,7 @@ private void addColumn( constraints.add(tokens.getText(constraint.colconstraintelem()).toUpperCase()); } } - columns.add(new TableColumn(columnName, dataType, constraints)); + columns.add(new ZtableColumn(columnName, dataType, constraints)); } private void addPrimaryKey( diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZviewListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZviewListener.java index 35fd02f513..81835c2de9 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZviewListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlCreateZviewListener.java @@ -55,7 +55,7 @@ public void enterRoot( public void enterCreatezviewstmt( PostgreSqlParser.CreatezviewstmtContext ctx) { - String text = ctx.create_mv_target().qualified_name().getText(); + String text = ctx.create_mv_target().qualified_name().getText().replace("\"", ""); String[] split = text.split(SCHEMA_PATTERN); schema = split.length > 1 ? split[0] : PUBLIC_SCHEMA_NAME; name = split.length > 1 ? split[1] : text; diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java index 9b736164f3..a9021a6157 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/listener/SqlDropListener.java @@ -50,7 +50,7 @@ public void enterDropstmt( { ctx.any_name_list().any_name().forEach(n -> { - String text = n.getText(); + String text = n.getText().replace("\"", ""); String[] split = text.split(SCHEMA_PATTERN); String schema = split.length > 1 ? split[0] : PUBLIC_SCHEMA_NAME; String name = split.length > 1 ? split[1] : text; diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateTable.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateZtable.java similarity index 92% rename from incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateTable.java rename to incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateZtable.java index 203599010f..1909b20081 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateTable.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/CreateZtable.java @@ -17,10 +17,10 @@ import java.util.List; import java.util.Set; -public record CreateTable( +public record CreateZtable( String schema, String name, - List columns, + List columns, Set primaryKeys) { } diff --git a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/TableColumn.java b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/ZtableColumn.java similarity index 96% rename from incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/TableColumn.java rename to incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/ZtableColumn.java index e4dd1a3ed7..04b96d8587 100644 --- a/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/TableColumn.java +++ b/incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/parser/model/ZtableColumn.java @@ -16,7 +16,7 @@ import java.util.List; -public record TableColumn( +public record ZtableColumn( String name, String type, List constraints) diff --git a/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java b/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java index bfcb952b4d..bef3b11dc6 100644 --- a/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java +++ b/incubator/binding-pgsql/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/parser/PgsqlParserTest.java @@ -26,8 +26,8 @@ import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateFunction; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZfunction; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZview; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Operation; @@ -47,10 +47,10 @@ public void setUp() public void shouldCreateZtableWithPrimaryKey() { String sql = "CREATE ZTABLE test (id INT PRIMARY KEY, name VARCHAR(100));"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertTrue(createTable.primaryKeys().contains("id")); + assertNotNull(createZtable); + assertTrue(createZtable.primaryKeys().contains("id")); } @Test @@ -66,7 +66,7 @@ name VARCHAR(100), PRIMARY KEY (id, name) );"""; - CreateTable table = parser.parseCreateTable(sql); + CreateZtable table = parser.parseCreateTable(sql); assertNotNull(table); assertEquals(2, table.primaryKeys().size()); @@ -75,6 +75,27 @@ PRIMARY KEY (id, name) assertTrue(table.primaryKeys().contains("name")); } + @Test + public void shouldCreateZtableParseWithDoubleQuotedName() + { + String sql = """ + CREATE ZTABLE "example_table" ( + id INT, + name VARCHAR(100), + age INT, + PRIMARY KEY (id, name) + );"""; + + CreateZtable table = parser.parseCreateTable(sql); + + assertNotNull(table); + assertEquals("example_table", table.name()); + assertEquals(2, table.primaryKeys().size()); + assertEquals(3, table.columns().size()); + assertTrue(table.primaryKeys().contains("id")); + assertTrue(table.primaryKeys().contains("name")); + } + @Test public void shouldCreateZtableWithPrimaryKeyAsAggregate() { @@ -85,29 +106,29 @@ name VARCHAR(100), age INT, PRIMARY KEY (id, name) );"""; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(2, createTable.primaryKeys().size()); - assertEquals(3, createTable.columns().size()); - assertTrue(createTable.primaryKeys().contains("id")); - assertTrue(createTable.primaryKeys().contains("name")); + assertNotNull(createZtable); + assertEquals(2, createZtable.primaryKeys().size()); + assertEquals(3, createZtable.columns().size()); + assertTrue(createZtable.primaryKeys().contains("id")); + assertTrue(createZtable.primaryKeys().contains("name")); } @Test public void shouldCreateTableName() { String sql = "CREATE ZTABLE test (id INT);"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertEquals("test", createTable.name()); + assertEquals("test", createZtable.name()); } @Test public void shouldCreateZtableNameWithDoublePrecisionTypeField() { String sql = "CREATE ZTABLE test (id DOUBLE PRECISION);"; - CreateTable table = parser.parseCreateTable(sql); + CreateZtable table = parser.parseCreateTable(sql); assertEquals("test", table.name()); assertEquals("DOUBLE PRECISION", table.columns().get(0).type()); } @@ -116,7 +137,7 @@ public void shouldCreateZtableNameWithDoublePrecisionTypeField() public void shouldCreateTableColumns() { String sql = "CREATE ZTABLE test (id INT, name VARCHAR(100));"; - CreateTable table = parser.parseCreateTable(sql); + CreateZtable table = parser.parseCreateTable(sql); assertEquals(2, table.columns().size()); assertEquals("INT", table.columns().get(0).type()); @@ -128,7 +149,7 @@ public void shouldCreateTableColumns() public void shouldParseCreateZtableCompositePrimaryKey() { String sql = "CREATE ZTABLE test (id INT, name VARCHAR(100), PRIMARY KEY (id, name));"; - CreateTable table = parser.parseCreateTable(sql); + CreateZtable table = parser.parseCreateTable(sql); assertEquals(2, table.primaryKeys().size()); assertTrue(table.primaryKeys().contains("id")); @@ -139,19 +160,19 @@ public void shouldParseCreateZtableCompositePrimaryKey() public void shouldHandleEmptyCreateZtable() { String sql = "CREATE ZTABLE test ();"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertEquals(0, createTable.columns().size()); - assertEquals(0, createTable.primaryKeys().size()); + assertEquals(0, createZtable.columns().size()); + assertEquals(0, createZtable.primaryKeys().size()); } @Test public void shouldHandleEmptySql() { String sql = ""; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); + assertNotNull(createZtable); } @Test @@ -188,9 +209,9 @@ public void shouldHandleInvalidCreateZView() public void shouldHandleInvalidZCreateZtable() { String sql = "CREATE ZTABLE test"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNull(createTable.name()); + assertNull(createZtable.name()); } @Test @@ -286,24 +307,24 @@ public void shouldHandleInvalidCreateFunction() public void shouldCreateZtableWithUniqueConstraint() { String sql = "CREATE ZTABLE test (id INT UNIQUE, name VARCHAR(100));"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(2, createTable.columns().size()); - assertEquals("id", createTable.columns().get(0).name()); - assertEquals("name", createTable.columns().get(1).name()); + assertNotNull(createZtable); + assertEquals(2, createZtable.columns().size()); + assertEquals("id", createZtable.columns().get(0).name()); + assertEquals("name", createZtable.columns().get(1).name()); } @Test public void shouldParseCreateZtableWithCheckConstraint() { String sql = "CREATE ZTABLE test (id INT, name VARCHAR(100), CHECK (id > 0));"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(2, createTable.columns().size()); - assertEquals("id", createTable.columns().get(0).name()); - assertEquals("name", createTable.columns().get(1).name()); + assertNotNull(createZtable); + assertEquals(2, createZtable.columns().size()); + assertEquals("id", createZtable.columns().get(0).name()); + assertEquals("name", createZtable.columns().get(1).name()); } @Test @@ -317,37 +338,37 @@ public void shouldHandleInvalidCreateTableWithMissingColumns() public void shouldCreateZtableWithDefaultValues() { String sql = "CREATE ZTABLE test (id INT DEFAULT 0, name VARCHAR(100));"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(2, createTable.columns().size()); - assertEquals("INT", createTable.columns().get(0).type()); - assertEquals("VARCHAR(100)", createTable.columns().get(1).type()); + assertNotNull(createZtable); + assertEquals(2, createZtable.columns().size()); + assertEquals("INT", createZtable.columns().get(0).type()); + assertEquals("VARCHAR(100)", createZtable.columns().get(1).type()); } @Test public void shouldCreateZtableWithNotNullConstraint() { String sql = "CREATE ZTABLE test (id INT NOT NULL, name VARCHAR(100) NOT NULL);"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(2, createTable.columns().size()); - assertEquals("id", createTable.columns().get(0).name()); - assertEquals("name", createTable.columns().get(1).name()); + assertNotNull(createZtable); + assertEquals(2, createZtable.columns().size()); + assertEquals("id", createZtable.columns().get(0).name()); + assertEquals("name", createZtable.columns().get(1).name()); } @Test public void shouldCreateZtableWithMultipleConstraints() { String sql = "CREATE ZTABLE test (id INT PRIMARY KEY, name VARCHAR(100) UNIQUE, age INT CHECK (age > 0));"; - CreateTable createTable = parser.parseCreateTable(sql); + CreateZtable createZtable = parser.parseCreateTable(sql); - assertNotNull(createTable); - assertEquals(3, createTable.columns().size()); - assertTrue(createTable.primaryKeys().contains("id")); - assertEquals("name", createTable.columns().get(1).name()); - assertEquals("age", createTable.columns().get(2).name()); + assertNotNull(createZtable); + assertEquals(3, createZtable.columns().size()); + assertTrue(createZtable.primaryKeys().contains("id")); + assertEquals("name", createZtable.columns().get(1).name()); + assertEquals("age", createZtable.columns().get(2).name()); } @Test diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/client.rpt new file mode 100644 index 0000000000..6687ebe106 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/client.rpt @@ -0,0 +1,261 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "postgres") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='public.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE MATERIALIZED VIEW IF NOT EXISTS zb_catalog.cities_view AS SELECT * FROM cities_source;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "GRANT ALL PRIVILEGES ON TABLE public.cities TO zillabase;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("GRANT_PRIVILEGES") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK zb_catalog.cities_view_sink INTO cities FROM zb_catalog.cities_view;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK zb_catalog.cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='public.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "INSERT INTO zb_catalog.ztables (name, sql) VALUES " + "('cities', 'CREATE ZTABLE IF NOT EXISTS \"cities\" (id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));');" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("INSERT 0 2") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "zillabase") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +connect "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "postgres") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TOPIC IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/server.rpt new file mode 100644 index 0000000000..2242dedbb9 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.ztable.with.double.quote.name/server.rpt @@ -0,0 +1,259 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "postgres") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read await CREATE_TOPIC_COMPLETED + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='public.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE MATERIALIZED VIEW IF NOT EXISTS zb_catalog.cities_view AS SELECT * FROM cities_source;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "GRANT ALL PRIVILEGES ON TABLE public.cities TO zillabase;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("GRANT_PRIVILEGES") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK zb_catalog.cities_view_sink INTO cities FROM zb_catalog.cities_view;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK zb_catalog.cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='public.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "INSERT INTO zb_catalog.ztables (name, sql) VALUES " + "('cities', 'CREATE ZTABLE IF NOT EXISTS \"cities\" (id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));');" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("INSERT 0 2") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "zillabase") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +accept "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "postgres") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TOPIC IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +write notify CREATE_TOPIC_COMPLETED + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/client.rpt new file mode 100644 index 0000000000..27b7739bac --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/client.rpt @@ -0,0 +1,54 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "zillabase") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE ZTABLE IF NOT EXISTS \"cities\" " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_ZTABLE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/server.rpt new file mode 100644 index 0000000000..4a7e12b676 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.ztable.with.double.quote.name/server.rpt @@ -0,0 +1,56 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "zillabase") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE ZTABLE IF NOT EXISTS \"cities\" " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_ZTABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java index fb8aef9429..dc31c26406 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java @@ -46,6 +46,15 @@ public void shouldCreateZtableWithPrimaryKey() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/create.ztable.with.double.quote.name/client", + "${app}/create.ztable.with.double.quote.name/server" }) + public void shouldCreateZtableWithDoubleQuoteName() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/create.zview/client", diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java index 96b5d93902..d160dc098d 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java @@ -46,6 +46,15 @@ public void shouldCreateZtableWithPrimaryKey() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/create.ztable.with.double.quote.name/client", + "${app}/create.ztable.with.double.quote.name/server" }) + public void shouldCreateZtableWithDoubleQuoteName() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/create.zview/client", diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java index db624cf55c..b2b7236541 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/macro/RisingwaveCreateZtableMacro.java @@ -22,8 +22,8 @@ import org.agrona.collections.Object2ObjectHashMap; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.TableColumn; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.ZtableColumn; import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; @@ -47,7 +47,7 @@ public class RisingwaveCreateZtableMacro extends RisingwaveMacroBase private final StringBuilder includeBuilder; private final String systemSchema; private final String user; - private final CreateTable command; + private final CreateZtable command; public RisingwaveCreateZtableMacro( String bootstrapServer, @@ -56,7 +56,7 @@ public RisingwaveCreateZtableMacro( String systemSchema, String user, String sql, - CreateTable command, + CreateZtable command, RisingwaveMacroHandler handler) { super(sql, handler); @@ -163,7 +163,7 @@ public void onStarted( String table = command.name(); includeBuilder.setLength(0); - List includes = command.columns().stream() + List includes = command.columns().stream() .filter(column -> column.constraints().stream() .anyMatch(ZILLA_MAPPINGS::containsKey)) .collect(Collectors.toCollection(ArrayList::new)); @@ -239,7 +239,7 @@ public void onStarted( String name = command.name(); String select = "*"; - List includes = command.columns().stream() + List includes = command.columns().stream() .filter(column -> column.constraints().stream() .anyMatch(ZILLA_MAPPINGS::containsKey)) .collect(Collectors.toCollection(ArrayList::new)); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index d61492d92c..7e22749c22 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -39,8 +39,8 @@ import io.aklivity.zilla.runtime.binding.pgsql.parser.PgsqlParser; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateFunction; -import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZfunction; +import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZtable; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZview; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop; import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Operation; @@ -1692,7 +1692,7 @@ private void decodeCreateZtableCommand( { if (server.macroState == null) { - final CreateTable command = parser.parseCreateTable(statement); + final CreateZtable command = parser.parseCreateTable(statement); RisingwaveBindingConfig binding = server.binding; RisingwaveCreateZtableMacro machine = new RisingwaveCreateZtableMacro( diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java index 605cd812b1..b8e273440b 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java @@ -59,6 +59,16 @@ public void shouldCreateZtableWithPrimaryKey() throws Exception k3po.finish(); } + @Test + @Configuration("proxy.yaml") + @Specification({ + "${pgsql}/create.ztable.with.double.quote.name/client", + "${effective}/create.ztable.with.double.quote.name/server" }) + public void shouldCreateZtableWithDoubleQuoteName() throws Exception + { + k3po.finish(); + } + @Test @Configuration("proxy.risingwave.yaml") @Specification({