Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2574,6 +2574,49 @@ object KyuubiConf {
.checkValues(EngineType)
.createWithDefault(EngineType.SPARK_SQL.toString)

val ENGINE_PROFILE: OptionalConfigEntry[String] =
Comment thread
iamlapa marked this conversation as resolved.
buildConf("kyuubi.engine.profile")
.doc("The named engine profile to use for this session. Engine profiles are declared by" +
" the administrator under `kyuubi.engine.profile.<name>.*` in `kyuubi-defaults.conf`," +
" each bundling an engine type, environment variables, Kyuubi session variables and" +
" engine-native config. The profile is resolved at session open time with precedence:" +
" this explicit session parameter, then the user/group default profile, then the" +
" per-engine-type default profile. If a resolved profile name is not defined, opening" +
" the session either fails or logs warning depending on the value of the" +
" `kyuubi.engine.profiles.unknown.strategy` configuration option." +
" When no profile resolves, behavior is unchanged.")
.version("1.10.1")
.stringConf
.createOptional

// `kyuubi.engine.<TYPE>.profile.default` is parameterized by engine type, so one optional
// entry is registered per known EngineType, keyed by the upper-case engine type name.
val ENGINE_DEFAULT_PROFILE: Map[String, OptionalConfigEntry[String]] =
EngineType.values.toSeq.map { engineType =>
engineType.toString ->
buildConf(s"kyuubi.engine.$engineType.profile.default")
.doc(s"The default engine profile to use for the $engineType engine type when a" +
s" session does not explicitly request a profile and no user/group default profile" +
s" applies. The referenced profile must be declared under" +
s" `kyuubi.engine.profile.<name>.*`.")
.version("1.10.1")
.stringConf
.createOptional
}.toMap

val ENGINE_PROFILES_UNKNOWN_STRATEGY: ConfigEntry[String] =
buildConf("kyuubi.engine.profiles.unknown.strategy")
.doc("The strategy when a session resolves to an engine profile that is not defined under" +
" `kyuubi.engine.profile.<name>.*`. <ul>" +
"<li>FAIL - fail opening the session with an error.</li>" +
"<li>LOG - log a warning and continue without applying any profile.</li>" +
"</ul>")
.version("1.10.1")
.stringConf
.transformToUpperCase
.checkValues(Set("FAIL", "LOG"))
.createWithDefault("FAIL")

val ENGINE_POOL_IGNORE_SUBDOMAIN: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.pool.ignoreSubdomain")
.doc(s"Whether to ignore ${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}" +
Expand Down Expand Up @@ -3093,7 +3136,8 @@ object KyuubiConf {
.toSet()
.createWithDefault(Set(
"kyuubi.backend.server.event.kafka.",
"kyuubi.metadata.store.jdbc.datasource."))
"kyuubi.metadata.store.jdbc.datasource.",
"kyuubi.engine.profile."))

val ENGINE_SPARK_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.spark.showProgress")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_ENGINE_APP_MGR_INFO_KEY = "kyuubi.engine.appMgrInfo"
// The resolved engine profile name for a session. Uses the `profileName` (no dot before the
// name) spelling so it never collides with the `kyuubi.engine.profile.<name>.*` definition
// namespace. Surfaced as a service discovery attribute for Web UI grouping.
final val KYUUBI_ENGINE_PROFILE_NAME_KEY = "kyuubi.engine.profileName"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class AbstractFrontendService(name: String)
}

override def attributes: Map[String, String] = {
conf.getAll.filter(_._1 == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
conf.getAll.filter { case (key, _) =>
key == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY ||
key == KyuubiReservedKeys.KYUUBI_ENGINE_PROFILE_NAME_KEY
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,30 @@ class KyuubiConfSuite extends KyuubiFunSuite {
assert(kyuubiConf.getUserDefaults("kyuubi").getAll.size == 0)
assert(kyuubiConf.getUserDefaults("user").getAll.size == 0)
}

test("engine profile declarations are server-only and not passed to the engine") {
val kyuubiConf = KyuubiConf(false)
kyuubiConf.set("kyuubi.engine.profile.pg.type", "JDBC")
kyuubiConf.set("kyuubi.engine.profile.pg.env.SECRET_TOKEN", "env-secret")
kyuubiConf.set("kyuubi.engine.profile.pg.session.engine.trino.connection.url", "http://host")
kyuubiConf.set(
"kyuubi.engine.profile.pg.conf.kyuubi.engine.jdbc.connection.password",
"conf-secret")

// a plain server default that is not a profile declaration must still pass through
kyuubiConf.set("spark.some.tuning", "1g")

val userConf = kyuubiConf.getUserDefaults("user")

assert(userConf.getOption("spark.some.tuning").contains("1g"))
assert(!userConf.getAll.keys.exists(_.startsWith("kyuubi.engine.profile.")))
assert(!userConf.getAll.values.exists(_.contains("env-secret")))
assert(!userConf.getAll.values.exists(_.contains("conf-secret")))

// check that the profile-name attribute (published for discovery) still flows through
assert(kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_PROFILE_NAME_KEY).isEmpty)
kyuubiConf.set(KyuubiReservedKeys.KYUUBI_ENGINE_PROFILE_NAME_KEY, "pg")
assert(kyuubiConf.getUserDefaults("kyuubi")
.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_PROFILE_NAME_KEY).contains("pg"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.client.api.v1.dto;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

/** Active engine instances grouped by engine profile name and engine type. */
public class EngineProfileGroup {

private String profile;
private String engineType;
private String version;
private int instanceCount;
private String status;
private List<Engine> engines;

public EngineProfileGroup() {}

public EngineProfileGroup(
String profile,
String engineType,
String version,
int instanceCount,
String status,
List<Engine> engines) {
this.profile = profile;
this.engineType = engineType;
this.version = version;
this.instanceCount = instanceCount;
this.status = status;
this.engines = engines;
}

public String getProfile() {
return profile;
}

public void setProfile(String profile) {
this.profile = profile;
}

public String getEngineType() {
return engineType;
}

public void setEngineType(String engineType) {
this.engineType = engineType;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public int getInstanceCount() {
return instanceCount;
}

public void setInstanceCount(int instanceCount) {
this.instanceCount = instanceCount;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public List<Engine> getEngines() {
if (null == engines) {
return Collections.emptyList();
}
return engines;
}

public void setEngines(List<Engine> engines) {
this.engines = engines;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EngineProfileGroup that = (EngineProfileGroup) o;
return getInstanceCount() == that.getInstanceCount()
&& Objects.equals(getProfile(), that.getProfile())
&& Objects.equals(getEngineType(), that.getEngineType())
&& Objects.equals(getVersion(), that.getVersion())
&& Objects.equals(getStatus(), that.getStatus())
&& Objects.equals(getEngines(), that.getEngines());
}

@Override
public int hashCode() {
return Objects.hash(
getProfile(), getEngineType(), getVersion(), getInstanceCount(), getStatus(), getEngines());
}

@Override
public String toString() {
return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine

/**
* A materialized named engine profile, declared under
* `kyuubi.engine.profile.<name>.*`. The profile bundles, for a single named engine
* configuration, the engine type, environment variables, Kyuubi session variables and
* engine-native configs - already mapped to their effective Kyuubi config keys.
*
* All [[conf]] keys are applied to a session as defaults: any conflicting option the client
* supplies (e.g. via the JDBC url) overrides the profile.
*
* @param name the profile name
* @param conf the materialized engine-conf key-value pairs
*/
case class EngineProfile(name: String, conf: Map[String, String])
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine

import java.util.Locale

import scala.collection.mutable

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._

/**
* An immutable registry of all [[EngineProfile]]s declared under `kyuubi.engine.profile.<name>.*`.
*
* This owns the engine-profile configuration logic: reading the profile declarations from a
* [[KyuubiConf]] and materializing them into [[EngineProfile]]s. Engine profiles are loaded from
* `kyuubi-defaults.conf` at startup and never change at runtime, so [[EngineProfileRegistry.apply]]
* materializes them once (validating each) and the result is reused for every session open and
* REST listing, rather than being re-parsed each time.
*/
class EngineProfileRegistry private (profiles: Map[String, EngineProfile]) {

/** Names of all declared engine profiles. */
def names: Set[String] = profiles.keySet

/** The materialized profile for `name`, or None if no such profile is declared. */
def get(name: String): Option[EngineProfile] = profiles.get(name)
}

object EngineProfileRegistry {

// The profile declaration prefix, e.g. `kyuubi.engine.profile`.
private val PROFILE_PREFIX = ENGINE_PROFILE.key

/**
* Materialize all engine profiles declared in `conf` into an immutable registry. Fails fast with
* [[IllegalArgumentException]] if any profile has an invalid engine type or unknown bucket.
*/
def apply(conf: KyuubiConf): EngineProfileRegistry = {
val profiles = profileNames(conf)
.map(name => name -> buildProfile(conf, name))
.toMap
new EngineProfileRegistry(profiles)
}

/** Names of all engine profiles declared under `kyuubi.engine.profile.<name>.*`. */
private def profileNames(conf: KyuubiConf): Set[String] = {
val prefix = s"$PROFILE_PREFIX."
conf.getAll.keys.collect {
case k if k.startsWith(prefix) => k.stripPrefix(prefix).split("\\.", 2).head
}.toSet
}

/**
* Materialize one named engine profile into effective engine-conf keys. Fails fast with
* [[IllegalArgumentException]] on an invalid engine type or an unknown profile bucket.
*
* Bucket mapping for a `kyuubi.engine.profile.<name>.<bucket>[.<rest>]` key:
* - `type` -> `kyuubi.engine.type` (validated against [[EngineType]])
* - `env.<VAR>` -> `kyuubi.engineEnv.<VAR>`
* - `session.<rest>` -> `kyuubi.session.<rest>` (Kyuubi session variable)
* - `conf.<rest>` -> `<rest>` verbatim (engine-native property)
*
* All materialized keys are applied to a session as defaults; conflicting client conf wins.
*/
private def buildProfile(conf: KyuubiConf, name: String): EngineProfile = {
val raw = conf.getAllWithPrefix(s"$PROFILE_PREFIX.$name", "")
val profileConf = mutable.Map[String, String]()
raw.foreach { case (suffix, value) =>
val parts = suffix.split("\\.", 2)
parts.head match {
case "type" =>
require(
parts.length == 1,
s"Invalid engine profile key '$PROFILE_PREFIX.$name.$suffix':" +
s" 'type' does not accept a sub-key.")
val engineType = value.toUpperCase(Locale.ROOT)
require(
EngineType.values.exists(_.toString == engineType),
s"Invalid engine type '$value' in profile '$name', expected one of" +
s" ${EngineType.values.mkString("[", ", ", "]")}.")
profileConf += (ENGINE_TYPE.key -> engineType)
case "env" =>
require(
parts.length == 2,
s"Invalid engine profile key '$PROFILE_PREFIX.$name.$suffix':" +
s" 'env' requires an environment variable name, e.g. env.SPARK_HOME.")
profileConf += (s"$KYUUBI_ENGINE_ENV_PREFIX.${parts(1)}" -> value)
case "session" =>
require(
parts.length == 2,
s"Invalid engine profile key '$PROFILE_PREFIX.$name.$suffix':" +
s" 'session' requires a Kyuubi session config key.")
profileConf += (s"kyuubi.session.${parts(1)}" -> value)
case "conf" =>
require(
parts.length == 2,
s"Invalid engine profile key '$PROFILE_PREFIX.$name.$suffix':" +
s" 'conf' requires an engine config key.")
profileConf += (parts(1) -> value)
case bucket =>
throw new IllegalArgumentException(
s"Unknown engine profile bucket '$bucket' in key '$PROFILE_PREFIX.$name.$suffix'." +
s" Expected one of: type, env.<VAR>, session.<key>, conf.<key>.")
}
}
EngineProfile(name, profileConf.toMap)
}
}
Loading
Loading