Skip to content

Commit 503ba7c

Browse files
authored
Subscription "hints" (#37)
* Add hints to Subscription CRD * Add Subscription hints and default values * License select files under Apache 2 * Rename magic template variable "sql" to "pipeline.sql" * Limit hints to sink resources
1 parent da4bdb0 commit 503ba7c

File tree

32 files changed

+233
-102
lines changed

32 files changed

+233
-102
lines changed

bin/hoptimator

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#!/bin/sh
22

3-
kubectl exec -it hoptimator -c hoptimator -- ./hoptimator --isolation=TRANSACTION_NONE -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" -nn hoptimator "$@"
3+
kubectl exec -it hoptimator -c hoptimator -- ./hoptimator -n "" -p "" -u "jdbc:calcite:model=/etc/config/model.yaml" "$@"

deploy/samples/subscriptions.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ metadata:
66
spec:
77
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
88
database: RAWKAFKA
9-
9+
hints:
10+
numPartitions: "2"

deploy/subscriptions.crd.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ spec:
3838
database:
3939
description: The database in which to create the output/sink table.
4040
type: string
41+
hints:
42+
description: Hints to adapters, which may disregard them.
43+
type: object
44+
additionalProperties:
45+
type: string
4146
required:
4247
- sql
4348
- database

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ public Set<String> keys() {
9797

9898
/** Render this Resource using the given TemplateFactory */
9999
public String render(TemplateFactory templateFactory) {
100-
return templateFactory.get(this).render(this);
100+
try {
101+
return templateFactory.get(this).render(this);
102+
} catch (Exception e) {
103+
throw new RuntimeException("Error rendering " + template, e);
104+
}
101105
}
102106

103107
public String render(Template template) {
@@ -148,65 +152,66 @@ public interface Environment {
148152
Environment EMPTY = new SimpleEnvironment();
149153
Environment PROCESS = new ProcessEnvironment();
150154

151-
String get(String key);
155+
String getOrDefault(String key, String defaultValue);
152156
}
153157

154158
/** Basic Environment implementation */
155159
public static class SimpleEnvironment implements Environment {
156-
private final Map<String, String> vars;
157-
158-
public SimpleEnvironment(Map<String, String> vars) {
159-
this.vars = vars;
160-
}
160+
private final Map<String, String> vars = new HashMap<>();
161161

162162
public SimpleEnvironment() {
163-
this(new HashMap<>());
164163
}
165164

166-
public void export(String property, String value) {
165+
protected void export(String property, String value) {
167166
vars.put(property, value);
168167
}
169168

170-
public SimpleEnvironment(Properties properties) {
171-
this.vars = new HashMap<>();
172-
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
173-
this.vars.put(entry.getKey().toString(), entry.getValue().toString());
174-
}
169+
protected void exportAll(Map<String, String> properties) {
170+
vars.putAll(properties);
175171
}
176172

177173
public SimpleEnvironment with(String key, String value) {
178174
Map<String, String> newVars = new HashMap<>();
179175
newVars.putAll(vars);
180176
newVars.put(key, value);
181-
return new SimpleEnvironment(newVars);
177+
return new SimpleEnvironment(){{
178+
exportAll(newVars);
179+
}};
182180
}
183181

184182
@Override
185-
public String get(String key) {
186-
if (!vars.containsKey(key)) {
183+
public String getOrDefault(String key, String defaultValue) {
184+
if (defaultValue == null && !vars.containsKey(key)) {
187185
throw new IllegalArgumentException("No variable '" + key + "' found in the environment");
188186
}
189-
return vars.get(key);
187+
return vars.getOrDefault(key, defaultValue);
190188
}
191189
}
192190

193-
/** Returns "{{key}}" for any key */
191+
/** Returns "{{key}}" for any key without a default */
194192
public static class DummyEnvironment implements Environment {
195193
@Override
196-
public String get(String key) {
197-
return "{{" + key + "}}";
194+
public String getOrDefault(String key, String defaultValue) {
195+
if (defaultValue != null) {
196+
return defaultValue;
197+
} else {
198+
return "{{" + key + "}}";
199+
}
198200
}
199201
}
200202

201203
/** Provides access to the process's environment variables */
202204
public static class ProcessEnvironment implements Environment {
203205

204206
@Override
205-
public String get(String key) {
207+
public String getOrDefault(String key, String defaultValue) {
206208
String value = System.getenv(key);
207209
if (value == null) {
208210
value = System.getProperty(key);
209211
}
212+
if (value == null) {
213+
value = defaultValue;
214+
}
210215
if (value == null) {
211216
throw new IllegalArgumentException("Missing system property `" + key + "`");
212217
}
@@ -222,11 +227,12 @@ public interface Template {
222227
/**
223228
* Replaces `{{var}}` in a template file with the corresponding variable.
224229
*
225-
* Resource-scoped variables take precedence over Environment-scoped variables.
230+
* Resource-scoped variables take precedence over Environment-scoped
231+
* variables. Default values can supplied with `{{var:default}}`.
226232
*
227-
* If `var` contains multiple lines, the behavior depends on context; specifically,
228-
* whether the pattern appears within a list or comment (prefixed with `-` or `#`).
229-
* For example, if the template includes:
233+
* If `var` contains multiple lines, the behavior depends on context;
234+
* specifically, whether the pattern appears within a list or comment
235+
* (prefixed with `-` or `#`). For example, if the template includes:
230236
*
231237
* - {{var}}
232238
*
@@ -235,8 +241,8 @@ public interface Template {
235241
* - value line 1
236242
* - value line 2
237243
*
238-
* To avoid this behavior (and just get a multiline string), use one of YAML's multiline
239-
* markers, e.g.
244+
* To avoid this behavior (and just get a multiline string), use one of
245+
* YAML's multiline markers, e.g.
240246
*
241247
* - |
242248
* {{var}}
@@ -255,17 +261,18 @@ public SimpleTemplate(Environment env, String template) {
255261
@Override
256262
public String render(Resource resource) {
257263
StringBuffer sb = new StringBuffer();
258-
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*\\}\\}");
264+
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}");
259265
Matcher m = p.matcher(template);
260266
while (m.find()) {
261267
String prefix = m.group(1);
262268
if (prefix == null) {
263269
prefix = "";
264270
}
265271
String key = m.group(2);
266-
String value = resource.getOrDefault(key, () -> env.get(key));
272+
String defaultValue = m.group(4);
273+
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, defaultValue));
267274
if (value == null) {
268-
throw new IllegalArgumentException("No value for key " + key);
275+
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
269276
}
270277
String quotedPrefix = Matcher.quoteReplacement(prefix);
271278
String quotedValue = Matcher.quoteReplacement(value);

hoptimator-cli/run.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ java \
55
--add-opens java.base/java.util=ALL-UNNAMED \
66
--add-opens java.base/java.time=ALL-UNNAMED \
77
-classpath "/opt/plugins/*/lib/*:./hoptimator-cli-all.jar" \
8+
-Dorg.slf4j.simpleLogger.defaultLogLevel=error \
89
$JAVA_OPTS \
9-
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator "$@"
10+
com.linkedin.hoptimator.HoptimatorCliApp --verbose=true -nn hoptimator --isolation=TRANSACTION_NONE "$@"

hoptimator-flink-adapter/src/main/resources/SqlJob.yaml.template

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
apiVersion: flink.apache.org/v1beta1
22
kind: FlinkDeployment
33
metadata:
4-
namespace: {{namespace}}
5-
name: {{name}}-flink-job
4+
namespace: {{pipeline.namespace}}
5+
name: {{pipeline.name}}-flink-job
66
spec:
77
image: docker.io/library/hoptimator-flink-runner
88
imagePullPolicy: Never
@@ -21,7 +21,7 @@ spec:
2121
job:
2222
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
2323
args:
24-
- {{sql}}
24+
- {{pipeline.sql}}
2525
jarURI: local:///opt/hoptimator-flink-runner.jar
2626
parallelism: 1
2727
upgradeMode: stateless

hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/KafkaTopic.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@
77
import java.util.Map;
88

99
class KafkaTopic extends Resource {
10-
public KafkaTopic(String name, Integer numPartitions,
11-
Map<String, String> clientOverrides) {
10+
public KafkaTopic(String topicName, Map<String, String> clientOverrides) {
1211
super("KafkaTopic");
13-
export("name", name);
14-
export("numPartitions", Optional.ofNullable(numPartitions)
15-
.map(x -> Integer.toString(x)).orElse("null"));
12+
export("topicName", topicName);
1613
export("clientOverrides", clientOverrides);
1714
}
1815
}

hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
4444
};
4545
ConfigProvider topicConfigProvider = ConfigProvider.from(clientConfig);
4646
TableResolver resolver = x -> rowType.rel();
47-
Integer numPartitions = (Integer) operand.get("numPartitions");
47+
4848
ResourceProvider resources = ResourceProvider.empty()
49-
.with(x -> new KafkaTopic(x, numPartitions, topicConfigProvider.config(x)))
49+
.with(x -> new KafkaTopic(x, topicConfigProvider.config(x)))
5050
.readWith(x -> new KafkaTopicAcl(x, principal, "Read"))
5151
.writeWith(x -> new KafkaTopicAcl(x, principal, "Write"));
52+
5253
Database database = new Database(name, tableLister, resolver, connectorConfigProvider,
5354
resources);
5455
return new DatabaseSchema(database);
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
apiVersion: hoptimator.linkedin.com/v1alpha1
22
kind: KafkaTopic
33
metadata:
4-
name: {{name}}
5-
namespace: {{namespace}}
4+
name: {{topicName}}
5+
namespace: {{pipeline.namespace}}
66
spec:
7-
topicName: {{name}}
8-
numPartitions: {{numPartitions}}
7+
topicName: {{topicName}}
8+
numPartitions: {{numPartitions:null}}
99
clientOverrides:
1010
{{clientOverrides}}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
apiVersion: hoptimator.linkedin.com/v1alpha1
22
kind: Acl
33
metadata:
4-
name: {{name}}-acl-{{id}}
5-
namespace: {{namespace}}
4+
name: {{topicName}}-acl-{{id}}
5+
namespace: {{pipeline.namespace}}
66
spec:
77
resource:
88
kind: KafkaTopic
9-
name: {{name}}
9+
name: {{topicName}}
1010
method: {{method}}
1111
principal: {{principal}}

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Access control rule (colloquially, an Acl)
3232
*/
3333
@ApiModel(description = "Access control rule (colloquially, an Acl)")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3535
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* AclList is a list of Acl
3333
*/
3434
@ApiModel(description = "AclList is a list of Acl")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3636
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* A set of related ACL rules.
3030
*/
3131
@ApiModel(description = "A set of related ACL rules.")
32-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
32+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3333
public class V1alpha1AclSpec {
3434
/**
3535
* The resource access method.

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* The resource being controlled.
2929
*/
3030
@ApiModel(description = "The resource being controlled.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3232
public class V1alpha1AclSpecResource {
3333
public static final String SERIALIZED_NAME_KIND = "kind";
3434
@SerializedName(SERIALIZED_NAME_KIND)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Status, as set by the operator.
2929
*/
3030
@ApiModel(description = "Status, as set by the operator.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3232
public class V1alpha1AclStatus {
3333
public static final String SERIALIZED_NAME_MESSAGE = "message";
3434
@SerializedName(SERIALIZED_NAME_MESSAGE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Kafka Topic
3232
*/
3333
@ApiModel(description = "Kafka Topic")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3535
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* KafkaTopicList is a list of KafkaTopic
3333
*/
3434
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3636
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* Desired Kafka topic configuration.
3434
*/
3535
@ApiModel(description = "Desired Kafka topic configuration.")
36-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
36+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3737
public class V1alpha1KafkaTopicSpec {
3838
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
3939
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* V1alpha1KafkaTopicSpecClientConfigs
3030
*/
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicSpecClientConfigs {
3333
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
3434
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Reference to a ConfigMap to use for AdminClient configuration.
2929
*/
3030
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicSpecConfigMapRef {
3333
public static final String SERIALIZED_NAME_NAME = "name";
3434
@SerializedName(SERIALIZED_NAME_NAME)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Current state of the topic.
2929
*/
3030
@ApiModel(description = "Current state of the topic.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicStatus {
3333
public static final String SERIALIZED_NAME_MESSAGE = "message";
3434
@SerializedName(SERIALIZED_NAME_MESSAGE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Hoptimator Subscription
3232
*/
3333
@ApiModel(description = "Hoptimator Subscription")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3535
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* SubscriptionList is a list of Subscription
3333
*/
3434
@ApiModel(description = "SubscriptionList is a list of Subscription")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
3636
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

0 commit comments

Comments
 (0)