Skip to content

Commit 67fda04

Browse files
authored
Load from JDBC (#71)
* Support loading Calcite schemas * Support loading arbitrary JDBC connections
1 parent a400db2 commit 67fda04

File tree

5 files changed

+63
-32
lines changed

5 files changed

+63
-32
lines changed

deploy/dev/kafka.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ metadata:
2323
namespace: kafka
2424
spec:
2525
kafka:
26-
version: 3.6.1
26+
version: 3.8.0
2727
replicas: 1
2828
listeners:
2929
- name: plain
@@ -40,7 +40,7 @@ spec:
4040
transaction.state.log.min.isr: 1
4141
default.replication.factor: 1
4242
min.insync.replicas: 1
43-
inter.broker.protocol.version: "3.4"
43+
inter.broker.protocol.version: "3.8"
4444
allow.everyone.if.no.acl.found: true
4545
storage:
4646
type: ephemeral

deploy/hoptimator-operator-deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ spec:
1919
- name: hoptimator-operator
2020
image: docker.io/library/hoptimator
2121
imagePullPolicy: Never
22-
command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "/etc/config/model.yaml"]
22+
command: ["./hoptimator-operator-integration/bin/hoptimator-operator-integration", "jdbc:calcite:model=/etc/config/model.yaml"]
2323
volumeMounts:
2424
- name: config-volume
2525
mountPath: /etc/config

hoptimator-cli/src/main/java/com/linkedin/hoptimator/HoptimatorCliApp.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
137137

138138
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
139139
try {
140-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
140+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
141141
RelNode plan = planner.logical(sql);
142142
String avroSchema = AvroConverter.avro("OutputNamespace", "OutputName", plan.getRowType()).toString(true);
143143
sqlline.output(avroSchema);
@@ -205,7 +205,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
205205
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
206206
try {
207207
InsertInto insertInto = parseInsertInto(sql);
208-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
208+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
209209
PipelineRel plan = planner.pipeline(insertInto.query);
210210
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
211211
HopTable sink = planner.database(insertInto.database)
@@ -280,7 +280,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
280280
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
281281
try {
282282
InsertInto insertInto = parseInsertInto(sql);
283-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
283+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
284284
PipelineRel plan = planner.pipeline(insertInto.query);
285285
sqlline.output("PLAN:");
286286
sqlline.output(plan.explain());
@@ -383,7 +383,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
383383
throw new IllegalArgumentException("Expected one of 'not', 'empty', or 'value'");
384384
}
385385

386-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
386+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
387387
PipelineRel plan = planner.pipeline(query);
388388
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
389389
String pipelineSql = impl.query().sql(MysqlSqlDialect.DEFAULT);
@@ -475,7 +475,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
475475
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
476476
try {
477477
InsertInto insertInto = parseInsertInto(sql);
478-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
478+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
479479
PipelineRel plan = planner.pipeline(insertInto.query);
480480
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
481481
HopTable sink = planner.database(insertInto.database)
@@ -607,7 +607,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
607607
String connectionUrl = sqlline.getConnectionMetadata().getUrl();
608608
try {
609609
InsertInto insertInto = parseInsertInto(sql);
610-
HoptimatorPlanner planner = HoptimatorPlanner.fromModelFile(connectionUrl, properties);
610+
HoptimatorPlanner planner = HoptimatorPlanner.fromJdbc(connectionUrl, properties);
611611
PipelineRel plan = planner.pipeline(insertInto.query);
612612
PipelineRel.Implementor impl = new PipelineRel.Implementor(plan);
613613
HopTable sink = planner.database(insertInto.database)

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333
public class HoptimatorOperatorApp {
3434
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);
3535

36-
final String modelPath;
36+
final String url;
3737
final String namespace;
3838
final ApiClient apiClient;
3939
final Predicate<V1alpha1Subscription> subscriptionFilter;
4040
final Properties properties;
4141
final Resource.Environment environment;
4242

4343
/** This constructor is likely to evolve and break. */
44-
public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiClient,
44+
public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient,
4545
Predicate<V1alpha1Subscription> subscriptionFilter, Properties properties) {
46-
this.modelPath = modelPath;
46+
this.url = url;
4747
this.namespace = namespace;
4848
this.apiClient = apiClient;
4949
this.subscriptionFilter = subscriptionFilter;
@@ -53,7 +53,7 @@ public HoptimatorOperatorApp(String modelPath, String namespace, ApiClient apiCl
5353

5454
public static void main(String[] args) throws Exception {
5555
if (args.length < 1) {
56-
throw new IllegalArgumentException("Missing model file argument.");
56+
throw new IllegalArgumentException("Missing JDBC URL argument.");
5757
}
5858

5959
Options options = new Options();
@@ -76,18 +76,17 @@ public static void main(String[] args) throws Exception {
7676
return;
7777
}
7878

79-
String modelFileInput = cmd.getArgs()[0];
79+
String urlInput = cmd.getArgs()[0];
8080
String namespaceInput = cmd.getOptionValue("namespace", "default");
8181

82-
new HoptimatorOperatorApp(modelFileInput, namespaceInput, Config.defaultClient(), null,
82+
new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null,
8383
new Properties()).run();
8484
}
8585

8686
public void run() throws Exception {
87-
HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromModelFile(modelPath,
88-
properties);
87+
HoptimatorPlanner.Factory plannerFactory = HoptimatorPlanner.Factory.fromJdbc(url, properties);
8988

90-
// ensure model file works, and that static classes are initialized in the main thread
89+
// ensure JDBC connection works, and that static classes are initialized in the main thread
9190
HoptimatorPlanner planner = plannerFactory.makePlanner();
9291

9392
apiClient.setHttpClient(apiClient.getHttpClient().newBuilder()

hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/HoptimatorPlanner.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.linkedin.hoptimator.planner;
22

3+
import org.apache.calcite.adapter.jdbc.JdbcSchema;
4+
import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema;
35
import org.apache.calcite.config.CalciteConnectionConfig;
46
import org.apache.calcite.config.CalciteConnectionConfigImpl;
57
import org.apache.calcite.jdbc.Driver;
68
import org.apache.calcite.jdbc.CalciteConnection;
9+
import org.apache.calcite.model.ModelHandler;
710
import org.apache.calcite.rel.RelCollationTraitDef;
811
import org.apache.calcite.rel.RelNode;
912
import org.apache.calcite.rel.rules.CoreRules;
@@ -16,7 +19,6 @@
1619
import org.apache.calcite.sql.SqlNode;
1720
import org.apache.calcite.schema.Schema;
1821
import org.apache.calcite.schema.SchemaPlus;
19-
import org.apache.calcite.model.ModelHandler;
2022
import org.apache.calcite.tools.RuleSet;
2123
import org.apache.calcite.tools.RuleSets;
2224
import org.apache.calcite.tools.Frameworks;
@@ -26,11 +28,14 @@
2628
import com.linkedin.hoptimator.catalog.Database;
2729
import com.linkedin.hoptimator.catalog.DatabaseSchema;
2830

31+
import java.io.IOException;
2932
import java.util.Arrays;
3033
import java.util.ArrayList;
3134
import java.util.List;
3235
import java.util.Properties;
3336
import java.util.NoSuchElementException;
37+
import java.sql.SQLException;
38+
import javax.sql.DataSource;
3439

3540
/** A one-shot stateful object, which creates Pipelines from SQL. */
3641
public class HoptimatorPlanner {
@@ -67,8 +72,20 @@ public class HoptimatorPlanner {
6772
public interface Factory {
6873
HoptimatorPlanner makePlanner() throws Exception;
6974

70-
static Factory fromModelFile(String filePath, Properties properties) {
71-
return () -> HoptimatorPlanner.fromModelFile(filePath, properties);
75+
static Factory fromSchema(String catalog, Schema schema) {
76+
return () -> HoptimatorPlanner.fromSchema(catalog, schema);
77+
}
78+
79+
static Factory fromDataSource(String catalog, DataSource dataSource) {
80+
return () -> HoptimatorPlanner.fromDataSource(catalog, dataSource);
81+
}
82+
83+
static Factory fromJdbc(String url, String catalog, String username, String password) {
84+
return () -> HoptimatorPlanner.fromJdbc(url, catalog, username, password);
85+
}
86+
87+
static Factory fromJdbc(String url, Properties properties) {
88+
return () -> HoptimatorPlanner.fromJdbc(url, properties);
7289
}
7390
}
7491

@@ -131,23 +148,38 @@ public Database database(String name) {
131148
return ((DatabaseSchema) subSchema).database();
132149
}
133150

134-
public static HoptimatorPlanner fromModelFile(String filePath, Properties properties) throws Exception {
135-
String uri = filePath;
136-
if (uri.startsWith("jdbc:calcite:model=")) {
137-
uri = uri.substring("jdbc:calcite:model=".length());
138-
}
151+
public static HoptimatorPlanner fromSchema(String name, Schema schema) {
152+
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
153+
rootSchema.add(name == null ? "ROOT" : name, schema);
154+
return new HoptimatorPlanner(rootSchema);
155+
}
156+
157+
public static HoptimatorPlanner fromDataSource(String catalog, DataSource dataSource) {
158+
Schema schema = JdbcCatalogSchema.create(null, catalog, dataSource, catalog);
159+
return fromSchema(catalog, schema);
160+
}
161+
162+
public static HoptimatorPlanner fromModelFile(String filePath, Properties properties)
163+
throws SQLException, IOException {
139164
Driver driver = new Driver();
140165
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(properties);
141166
CalciteConnection connection = (CalciteConnection) driver.connect("jdbc:calcite:", properties);
142167
SchemaPlus schema = connection.getRootSchema();
143-
ModelHandler modelHandler = new ModelHandler(connection, uri); // side-effect: modifies connection
168+
ModelHandler modelHandler = new ModelHandler(connection, filePath); // side-effect: modifies connection
144169
return new HoptimatorPlanner(schema);
145170
}
146171

147-
// for testing purposes
148-
static HoptimatorPlanner fromSchema(String name, Schema schema) {
149-
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
150-
rootSchema.add(name, schema);
151-
return new HoptimatorPlanner(rootSchema);
172+
public static HoptimatorPlanner fromJdbc(String url, String catalog, String username, String password) {
173+
DataSource dataSource = JdbcSchema.dataSource(url, null, username, password);
174+
return fromDataSource(catalog, dataSource);
175+
}
176+
177+
public static HoptimatorPlanner fromJdbc(String url, Properties properties) throws SQLException, IOException {
178+
if (url.startsWith("jdbc:calcite:model=")) {
179+
return fromModelFile(url.substring("jdbc:calcite:model=".length()), properties);
180+
} else {
181+
return fromJdbc(url, properties.getProperty("catalog"), properties.getProperty("username"),
182+
properties.getProperty("password"));
183+
}
152184
}
153185
}

0 commit comments

Comments
 (0)