Skip to content

Commit

Permalink
[FLINK-34918][table] Support ALTER CATALOG COMMENT syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 authored and reswqa committed Jun 14, 2024
1 parent adfbbbd commit 2385cc0
Show file tree
Hide file tree
Showing 18 changed files with 418 additions and 13 deletions.
11 changes: 11 additions & 0 deletions docs/content.zh/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只
ALTER CATALOG catalog_name
SET (key1=val1, ...)
| RESET (key1, ...)
| COMMENT 'comment'
```

### SET
Expand All @@ -571,4 +572,14 @@ ALTER CATALOG cat2 SET ('default-database'='db');
ALTER CATALOG cat2 RESET ('default-database');
```

### COMMENT

为指定的 catalog 设置注释。若注释已经存在,则使用新值覆盖旧值。

`COMMENT` 语句示例如下。

```sql
ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2''';
```

{{< top >}}
11 changes: 11 additions & 0 deletions docs/content/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ Language tag to instruct flink runtime how to execute the function. Currently on
ALTER CATALOG catalog_name
SET (key1=val1, ...)
| RESET (key1, ...)
| COMMENT 'comment'
```

### SET
Expand All @@ -573,4 +574,14 @@ The following examples illustrate the usage of the `RESET` statements.
ALTER CATALOG cat2 RESET ('default-database');
```

### COMMENT

Set comment in the specified catalog. If the comment is already set in the catalog, override the old value with the new one.

The following examples illustrate the usage of the `COMMENT` statements.

```sql
ALTER CATALOG cat2 COMMENT 'comment for catalog ''cat2''';
```

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,36 @@ alter catalog cat2 reset ();
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key
!error

alter catalog cat2 comment 'comment for catalog ''cat2''';
[INFO] Execute statement succeeded.
!info

desc catalog extended cat2;
+-----------+----------------------------+
| info name | info value |
+-----------+----------------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | comment for catalog 'cat2' |
+-----------+----------------------------+
3 rows in set
!ok

alter catalog cat2 comment '';
[INFO] Execute statement succeeded.
!info

desc catalog extended cat2;
+-----------+-------------------+
| info name | info value |
+-----------+-------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | |
+-----------+-------------------+
3 rows in set
!ok

# ==========================================================================
# test database
# ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,50 @@ alter catalog cat2 reset ();
org.apache.flink.table.api.ValidationException: ALTER CATALOG RESET does not support empty key
!error

alter catalog cat2 comment 'comment for catalog ''cat2''';
!output
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
!ok

desc catalog extended cat2;
!output
+-----------+----------------------------+
| info name | info value |
+-----------+----------------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | comment for catalog 'cat2' |
+-----------+----------------------------+
3 rows in set
!ok

alter catalog cat2 comment '';
!output
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
!ok

desc catalog extended cat2;
!output
+-----------+-------------------+
| info name | info value |
+-----------+-------------------+
| name | cat2 |
| type | generic_in_memory |
| comment | |
+-----------+-------------------+
3 rows in set
!ok

# ==========================================================================
# test database
# ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"org.apache.flink.sql.parser.ddl.SqlAlterCatalog"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ SqlAlterCatalog SqlAlterCatalog() :
SqlParserPos startPos;
SqlIdentifier catalogName;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNode comment = null;
}
{
<ALTER> <CATALOG> { startPos = getPos(); }
Expand All @@ -186,6 +187,14 @@ SqlAlterCatalog SqlAlterCatalog() :
catalogName,
propertyList);
}
|
<COMMENT>
comment = StringLiteral()
{
return new SqlAlterCatalogComment(startPos.plus(getPos()),
catalogName,
comment);
}
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

import static java.util.Objects.requireNonNull;

/** ALTER CATALOG catalog_name COMMENT 'comment'. */
public class SqlAlterCatalogComment extends SqlAlterCatalog {

private final SqlNode comment;

public SqlAlterCatalogComment(
SqlParserPos position, SqlIdentifier catalogName, SqlNode comment) {
super(position, catalogName);
this.comment = requireNonNull(comment, "comment cannot be null");
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(catalogName, comment);
}

public SqlNode getComment() {
return comment;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("COMMENT");
comment.unparse(writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void testAlterCatalog() {
sql("alter catalog a set ('k1'='v1', 'k2'='v2')")
.ok("ALTER CATALOG `A` SET (\n" + " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")");
sql("alter catalog a reset ('k1')").ok("ALTER CATALOG `A` RESET (\n" + " 'k1'\n" + ")");
sql("alter catalog a comment 'comment1'").ok("ALTER CATALOG `A` COMMENT 'comment1'");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.EnvironmentSettings;
Expand Down Expand Up @@ -334,33 +333,31 @@ public void createCatalog(String catalogName, CatalogDescriptor catalogDescripto
* Alters a catalog under the given name. The catalog name must be unique.
*
* @param catalogName the given catalog name under which to alter the given catalog
* @param catalogUpdater catalog configuration updater to alter catalog
* @param catalogChange catalog change to update the underlying catalog descriptor
* @throws CatalogException If the catalog neither exists in the catalog store nor in the
* initialized catalogs, or if an error occurs while creating the catalog or storing the
* {@link CatalogDescriptor}
*/
public void alterCatalog(String catalogName, Consumer<Configuration> catalogUpdater)
public void alterCatalog(String catalogName, CatalogChange catalogChange)
throws CatalogException {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"Catalog name cannot be null or empty.");
checkNotNull(catalogUpdater, "Catalog configuration updater cannot be null.");
checkNotNull(catalogChange, "Catalog change cannot be null.");

CatalogStore catalogStore = catalogStoreHolder.catalogStore();
Optional<CatalogDescriptor> oldCatalogDescriptor = getCatalogDescriptor(catalogName);
Optional<CatalogDescriptor> oldDescriptorOpt = getCatalogDescriptor(catalogName);

if (catalogStore.contains(catalogName) && oldCatalogDescriptor.isPresent()) {
Configuration conf = oldCatalogDescriptor.get().getConfiguration();
catalogUpdater.accept(conf);
CatalogDescriptor newCatalogDescriptor = CatalogDescriptor.of(catalogName, conf);
Catalog newCatalog = initCatalog(catalogName, newCatalogDescriptor);
if (catalogStore.contains(catalogName) && oldDescriptorOpt.isPresent()) {
CatalogDescriptor newDescriptor = catalogChange.applyChange(oldDescriptorOpt.get());
Catalog newCatalog = initCatalog(catalogName, newDescriptor);
catalogStore.removeCatalog(catalogName, false);
if (catalogs.containsKey(catalogName)) {
catalogs.get(catalogName).close();
}
newCatalog.open();
catalogs.put(catalogName, newCatalog);
catalogStoreHolder.catalogStore().storeCatalog(catalogName, newCatalogDescriptor);
catalogStoreHolder.catalogStore().storeCatalog(catalogName, newDescriptor);
} else {
throw new CatalogException(
String.format("Catalog %s does not exist in the catalog store.", catalogName));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.ddl;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.utils.EncodingUtils;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Operation to describe a ALTER CATALOG COMMENT statement. */
@Internal
public class AlterCatalogCommentOperation implements AlterOperation {

private final String catalogName;
private final String comment;

public AlterCatalogCommentOperation(String catalogName, String comment) {
this.catalogName = checkNotNull(catalogName);
this.comment = comment;
}

public String getCatalogName() {
return catalogName;
}

public String getComment() {
return comment;
}

@Override
public String asSummaryString() {
return String.format(
"ALTER CATALOG %s COMMENT '%s'",
catalogName, EncodingUtils.escapeSingleQuotes(comment));
}

@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(catalogName, new CatalogChange.CatalogCommentChange(comment));

return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;

import java.util.Collections;
Expand All @@ -34,6 +35,7 @@
/** Operation to describe a ALTER CATALOG SET statement. */
@Internal
public class AlterCatalogOptionsOperation implements AlterOperation {

private final String catalogName;
private final Map<String, String> properties;

Expand Down Expand Up @@ -69,7 +71,9 @@ public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager()
.alterCatalog(
catalogName, conf -> conf.addAll(Configuration.fromMap(properties)));
catalogName,
new CatalogChange.CatalogConfigurationChange(
conf -> conf.addAll(Configuration.fromMap(properties))));

return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
Expand Down
Loading

0 comments on commit 2385cc0

Please sign in to comment.