Skip to content

Commit 92f1864

Browse files
authored
Fix primary key constraint (#66)
1 parent 9a9a9eb commit 92f1864

File tree

9 files changed

+18
-94
lines changed

9 files changed

+18
-94
lines changed

.github/workflows/integration-tests.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,18 @@ jobs:
5959
kubectl describe kafkas -n kafka
6060
kubectl describe flinkdeployments
6161
kubectl describe subscriptions
62+
- name: Capture Flink Job Logs
63+
if: always()
64+
run: |
65+
kubectl logs $(kubectl get pods -l component=jobmanager -o name) --since=0s || echo "skipped."
66+
kubectl logs $(kubectl get pods -l component=taskmanager -o name) --since=0s || echo "skipped."
6267
- name: Capture Hoptimator Operator Logs
6368
if: always()
6469
run: kubectl logs $(kubectl get pods -l app=hoptimator-operator -o name)
6570
- name: Capture Flink Operator Logs
6671
if: always()
6772
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)
73+
- name: Capture Flink Job Logs
74+
if: always()
75+
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)
6876

69-

Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ quickstart: build deploy-dev-environment deploy
2525
deploy-dev-environment:
2626
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
2727
kubectl create namespace kafka || echo "skipping"
28-
kubectl create namespace mysql || echo "skipping"
29-
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
28+
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
3029
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
3130
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
3231
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io

deploy/dev/mysql.yaml

Lines changed: 0 additions & 62 deletions
This file was deleted.

deploy/samples/subscriptions.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
apiVersion: hoptimator.linkedin.com/v1alpha1
33
kind: Subscription
44
metadata:
5-
name: products
5+
name: names
66
spec:
7-
sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand"
7+
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
88
database: RAWKAFKA
99

1010

etc/integration-tests.sql

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@ SELECT * FROM DATAGEN.COMPANY;
1515
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
1616
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;
1717

18-
-- MySQL CDC tables
19-
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
20-
21-
-- Test check command
22-
!check not empty SELECT * FROM INVENTORY."products_on_hand";
23-
24-
-- MySQL CDC -> Kafka (via sample subscription "products")
25-
SELECT * FROM RAWKAFKA."products" LIMIT 1;
18+
-- read from sample subscription "names"
19+
SELECT * FROM RAWKAFKA."names" LIMIT 1;
2620

etc/readiness-probe.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,4 @@
33

44
SELECT * FROM DATAGEN.PERSON;
55
SELECT * FROM DATAGEN.COMPANY;
6-
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;
76
SELECT * FROM RAWKAFKA."test-sink" LIMIT 0;

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) {
170170
* Implements a CREATE TABLE...WITH... DDL statement.
171171
*
172172
* N.B. the following magic:
173-
* - field 'KEY' is treated as a PRIMARY KEY
173+
* - field 'PRIMARY_KEY' is treated as a PRIMARY KEY
174174
*/
175175
class ConnectorImplementor implements ScriptImplementor {
176176
private final String database;
@@ -192,9 +192,9 @@ public void implement(SqlWriter w) {
192192
(new CompoundIdentifierImplementor(database, name)).implement(w);
193193
SqlWriter.Frame frame1 = w.startList("(", ")");
194194
(new RowTypeSpecImplementor(rowType)).implement(w);
195-
if (rowType.getField("KEY", true, false) != null) {
195+
if (rowType.getField("PRIMARY_KEY", true, false) != null) {
196196
w.sep(",");
197-
w.literal("PRIMARY KEY (KEY) NOT ENFORCED");
197+
w.literal("PRIMARY KEY (PRIMARY_KEY) NOT ENFORCED");
198198
}
199199
w.endList(frame1);
200200
// TODO support PARTITIONED BY for Tables that support it

hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
3131
.with("KEY", DataType.VARCHAR);
3232
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
3333
.withPrefix("properties.")
34-
.with("connector", "upsert-kafka")
34+
.with("connector", "kafka")
3535
.with("key.format", "csv")
36+
.with("key.fields", "KEY")
3637
.with("value.format", "csv")
3738
.with("value.fields-include", "EXCEPT_KEY")
3839
.with("topic", x -> x);

test-model.yaml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,3 @@ schemas:
1414
bootstrap.servers: one-kafka-bootstrap.kafka.svc:9092
1515
group.id: hoptimator-test
1616
auto.offset.reset: earliest
17-
18-
- name: INVENTORY
19-
type: custom
20-
factory: com.linkedin.hoptimator.catalog.mysql.MySqlCdcSchemaFactory
21-
operand:
22-
username: root
23-
password: debezium
24-
hostname: mysql.mysql.svc.cluster.local
25-
port: 3306
26-
database: inventory
27-
urlSuffix: "?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
28-
connectorConfig:
29-
scan.incremental.snapshot.enabled: false
30-

0 commit comments

Comments
 (0)