Skip to content

Commit

Permalink
[FLINK-31223][sqlgateway] Pass all flink parameters to the SqlGateway…
Browse files Browse the repository at this point in the history
…RestEndpoint.
  • Loading branch information
reswqa committed Apr 25, 2024
1 parent 3553bce commit 0e59da2
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;

import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/** Test that {@link SqlClient} works normally when SSL is enabled. */
class SqlClientSSLTest extends SqlClientTestBase {
@RegisterExtension
@Order(1)
public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
new SqlGatewayServiceExtension(Configuration::new);

@RegisterExtension
@Order(2)
private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
new SqlGatewayRestEndpointExtension(
SQL_GATEWAY_SERVICE_EXTENSION::getService, SqlClientSSLTest::withSSL);

private static final String truststorePath = getTestResource("ssl/local127.truststore");

private static final String keystorePath = getTestResource("ssl/local127.keystore");

@Test
void testEmbeddedMode() throws Exception {
String[] args = new String[] {"embedded"};
String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), false);
assertThat(actual).contains(SecurityOptions.SSL_REST_ENABLED.key(), "true");
}

@Test
void testGatewayMode() throws Exception {
String[] args =
new String[] {
"gateway",
"-e",
InetSocketAddress.createUnresolved(
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort())
.toString()
};
String actual = runSqlClient(args, String.join("\n", "SET;", "QUIT;"), false);
assertThat(actual).contains(SecurityOptions.SSL_REST_ENABLED.key(), "true");
}

private static void withSSL(Configuration configuration) {
configuration.set(SecurityOptions.SSL_REST_ENABLED, true);
configuration.set(SecurityOptions.SSL_REST_TRUSTSTORE, truststorePath);
configuration.set(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, "password");
configuration.set(SecurityOptions.SSL_REST_KEYSTORE, keystorePath);
configuration.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, "password");
configuration.set(SecurityOptions.SSL_REST_KEY_PASSWORD, "password");
}

@Override
protected void writeConfigOptionsToConfYaml(Path confYamlPath) throws IOException {
Configuration configuration = new Configuration();
withSSL(configuration);
Files.write(
confYamlPath,
configuration.toMap().entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.toList()));
}

private static String getTestResource(final String fileName) {
final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
final URL resource = classLoader.getResource(fileName);
if (resource == null) {
throw new IllegalArgumentException(
String.format("Test resource %s does not exist", fileName));
}
return resource.getFile();
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ interface Context {
/**
* Get a map contains all flink configurations.
*
* @return The copy of flink configurations in the form of map, modify this map will not
* @return The copy of flink configurations in the form of map, modifying this map will not
* influence the original configuration object.
*/
Map<String, String> getFlinkConfigurationOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -48,34 +47,35 @@ public SqlGatewayEndpoint createSqlGatewayEndpoint(Context context) {
SqlGatewayEndpointFactoryUtils.createEndpointFactoryHelper(this, context);
// Check that ADDRESS must be set
endpointFactoryHelper.validate();
Configuration config = rebuildRestEndpointOptions(context.getEndpointOptions());
Configuration config =
rebuildRestEndpointOptions(
context.getEndpointOptions(), context.getFlinkConfigurationOptions());
try {
return new SqlGatewayRestEndpoint(config, context.getSqlGatewayService());
} catch (Exception e) {
throw new SqlGatewayException("Cannot start the rest endpoint.", e);
}
}

public static Configuration rebuildRestEndpointOptions(Map<String, String> configMap) {
Map<String, String> effectiveConfigMap = new HashMap<>(configMap);
public static Configuration rebuildRestEndpointOptions(
Map<String, String> endpointConfigMap, Map<String, String> flinkConfigMap) {
flinkConfigMap.put(RestOptions.ADDRESS.key(), endpointConfigMap.get(ADDRESS.key()));

effectiveConfigMap.put(RestOptions.ADDRESS.key(), configMap.get(ADDRESS.key()));

if (configMap.containsKey(BIND_ADDRESS.key())) {
effectiveConfigMap.put(
RestOptions.BIND_ADDRESS.key(), configMap.get(BIND_ADDRESS.key()));
if (endpointConfigMap.containsKey(BIND_ADDRESS.key())) {
flinkConfigMap.put(
RestOptions.BIND_ADDRESS.key(), endpointConfigMap.get(BIND_ADDRESS.key()));
}

// we need to override RestOptions.PORT anyway, to use a different default value
effectiveConfigMap.put(
flinkConfigMap.put(
RestOptions.PORT.key(),
configMap.getOrDefault(PORT.key(), PORT.defaultValue().toString()));
endpointConfigMap.getOrDefault(PORT.key(), PORT.defaultValue().toString()));

if (configMap.containsKey(BIND_PORT.key())) {
effectiveConfigMap.put(RestOptions.BIND_PORT.key(), configMap.get(BIND_PORT.key()));
if (endpointConfigMap.containsKey(BIND_PORT.key())) {
flinkConfigMap.put(RestOptions.BIND_PORT.key(), endpointConfigMap.get(BIND_PORT.key()));
}

return Configuration.fromMap(effectiveConfigMap);
return Configuration.fromMap(flinkConfigMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static Configuration getBaseConfig(Configuration flinkConf) {
new SqlGatewayEndpointFactoryUtils.DefaultEndpointFactoryContext(
null, flinkConf, getEndpointConfig(flinkConf, IDENTIFIER));

return rebuildRestEndpointOptions(context.getEndpointOptions());
return rebuildRestEndpointOptions(
context.getEndpointOptions(), context.getFlinkConfigurationOptions());
}

/** Create the configuration generated from config.yaml. */
Expand Down

0 comments on commit 0e59da2

Please sign in to comment.