Skip to content

Commit ac4f595

Browse files
dawidwysZakelly
authored andcommitted
[FLINK-37222] Do not reuse views across TableEnvironments in SQL client
1 parent b88a6ca commit ac4f595

File tree

3 files changed

+74
-2
lines changed

3 files changed

+74
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.gateway.service.context;
20+
21+
import org.apache.flink.table.api.TableEnvironment;
22+
import org.apache.flink.table.catalog.CatalogBaseTable;
23+
import org.apache.flink.table.catalog.CatalogView;
24+
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
25+
import org.apache.flink.table.catalog.ObjectPath;
26+
import org.apache.flink.table.catalog.QueryOperationCatalogView;
27+
import org.apache.flink.table.catalog.ResolvedCatalogView;
28+
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
29+
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
30+
31+
import java.util.Optional;
32+
33+
/**
34+
* An in-memory catalog that can be reused across different {@link TableEnvironment}. The SQL client
35+
* works against {@link TableEnvironment} design and reuses some of the components (e.g.
36+
* CatalogManager), but not all (e.g. Planner) which causes e.g. views registered in an in-memory
37+
* catalog to fail. This class is a workaround not to keep Planner bound parts of a view reused
38+
* across different {@link TableEnvironment}.
39+
*/
40+
public class EnvironmentReusableInMemoryCatalog extends GenericInMemoryCatalog {
41+
public EnvironmentReusableInMemoryCatalog(String name, String defaultDatabase) {
42+
super(name, defaultDatabase);
43+
}
44+
45+
@Override
46+
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
47+
throws TableAlreadyExistException, DatabaseNotExistException {
48+
CatalogBaseTable tableToRegister =
49+
extractView(table)
50+
.flatMap(QueryOperationCatalogView::getOriginalView)
51+
.map(v -> (CatalogBaseTable) v)
52+
.orElse(table);
53+
super.createTable(tablePath, tableToRegister, ignoreIfExists);
54+
}
55+
56+
private Optional<QueryOperationCatalogView> extractView(CatalogBaseTable table) {
57+
if (table instanceof ResolvedCatalogView) {
58+
final CatalogView origin = ((ResolvedCatalogView) table).getOrigin();
59+
if (origin instanceof QueryOperationCatalogView) {
60+
return Optional.of((QueryOperationCatalogView) origin);
61+
}
62+
return Optional.empty();
63+
} else if (table instanceof QueryOperationCatalogView) {
64+
return Optional.of((QueryOperationCatalogView) table);
65+
}
66+
return Optional.empty();
67+
}
68+
}

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.table.catalog.CatalogManager;
3131
import org.apache.flink.table.catalog.CatalogStoreHolder;
3232
import org.apache.flink.table.catalog.FunctionCatalog;
33-
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
3433
import org.apache.flink.table.factories.CatalogStoreFactory;
3534
import org.apache.flink.table.factories.FactoryUtil;
3635
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -446,7 +445,7 @@ private static CatalogManager buildCatalogManager(
446445
catalogStore.config(),
447446
catalogStore.classLoader()))
448447
.orElse(
449-
new GenericInMemoryCatalog(
448+
new EnvironmentReusableInMemoryCatalog(
450449
defaultCatalogName, settings.getBuiltInDatabaseName()));
451450
}
452451
defaultCatalog.open();

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java

+5
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,9 @@ public String getExpandedQuery() {
116116
public boolean supportsShowCreateView() {
117117
return originalView != null;
118118
}
119+
120+
@Internal
121+
public Optional<CatalogView> getOriginalView() {
122+
return Optional.ofNullable(originalView);
123+
}
119124
}

0 commit comments

Comments
 (0)