Skip to content

Latest commit

 

History

History
538 lines (468 loc) · 29.1 KB

ksqldb-jdbc-sink.adoc

File metadata and controls

538 lines (468 loc) · 29.1 KB

ksqlDB & Kafka Connect JDBC Sink in action

Create source connector (data generator)

CREATE SOURCE CONNECTOR SOURCE_DATA WITH (
    'connector.class'                = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'value.converter'                = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'key.converter'                  = 'org.apache.kafka.connect.storage.StringConverter',

    'genkp.customers.with'                 = '#{Internet.uuid}',
    'genv.customers.name.with'             = '#{HitchhikersGuideToTheGalaxy.character}',
    'genv.customers.email.with'            = '#{Internet.emailAddress}',
    'genv.customers.location->city.with'   = '#{HitchhikersGuideToTheGalaxy.location}',
    'genv.customers.location->planet.with' = '#{HitchhikersGuideToTheGalaxy.planet}',
    'topic.customers.records.exactly'      = 10,

    'genkp.transactions.with'                = '#{Internet.uuid}',
    'genv.transactions.customer_id.matching' = 'customers.key',
    'genv.transactions.cost.with'            = '#{Commerce.price}',
    'genv.transactions.card_type.with'       = '#{Business.creditCardType}',
    'genv.transactions.item.with'            = '#{Beer.name}',
    'topic.transactions.throttle.ms'         = 500
);

Declare Stream & Table over the source topics

CREATE TABLE CUSTOMER (ROWKEY VARCHAR KEY,
                       NAME VARCHAR,
                       EMAIL VARCHAR,
                       LOCATION STRUCT<CITY VARCHAR,
                                       PLANET VARCHAR>)
                 WITH (KAFKA_TOPIC='customers',
                       VALUE_FORMAT='JSON');

CREATE STREAM TXNS (ROWKEY VARCHAR KEY,
                    CUSTOMER_ID VARCHAR,
                    COST DOUBLE,
                    CARD_TYPE VARCHAR,
                    ITEM VARCHAR)
              WITH (KAFKA_TOPIC='transactions',
                    VALUE_FORMAT='JSON');
ksql> SELECT * FROM CUSTOMER EMIT CHANGES LIMIT 2;
+---------------------+---------------------+---------------------+---------------------+---------------------+
|ROWTIME              |ROWKEY               |NAME                 |EMAIL                |LOCATION             |
+---------------------+---------------------+---------------------+---------------------+---------------------+
|1585353217743        |84e2c8d1-3ac6-461b-8a|Agrajag              |benny.skiles@hotmail.|{CITY=Lamuella, PLANE|
|                     |6c-2a765be6bd82      |                     |com                  |T=Nano}              |
|1585353217917        |ee4cb838-cbf3-4cb7-8d|Pasta Fasta          |peter.nikolaus@yahoo.|{CITY=Betelgeuse, PLA|
|                     |d2-b7b3a9e9da11      |                     |com                  |NET=Krikkit}         |
Limit Reached
Query terminated

ksql> SELECT * FROM TXNS EMIT CHANGES LIMIT 5;
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|ROWTIME          |ROWKEY           |CUSTOMER_ID      |COST             |CARD_TYPE        |ITEM             |
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|1585353217910    |1a14a135-d9c3-42b|84e2c8d1-3ac6-461|44.05            |visa             |Alpha King Pale A|
|                 |e-b692-9bfd58beec|b-8a6c-2a765be6bd|                 |                 |le               |
|                 |63               |82               |                 |                 |                 |
|1585353218247    |a01765a8-6d1e-40d|ee4cb838-cbf3-4cb|25.62            |visa             |Samuel Smith’s Im|
|                 |8-9bc2-8d8166e020|7-8dd2-b7b3a9e9da|                 |                 |perial IPA       |
|                 |14               |11               |                 |                 |                 |
|1585353218748    |d6047271-d28e-4c2|d83c7c98-c2f1-449|81.8             |jcb              |Storm King Stout |
|                 |7-bfc0-8dbf31855c|e-b80a-19c38d06b4|                 |                 |                 |
|                 |03               |76               |                 |                 |                 |
|1585353219247    |aaf68dbd-48d0-48e|9058ef36-3ab8-491|37.54            |visa             |Nugget Nectar    |
|                 |1-a1c2-c28d9365ae|a-8142-c059b74f06|                 |                 |                 |
|                 |e9               |f9               |                 |                 |                 |
|1585353219747    |a4ea8cb5-b89f-48e|9058ef36-3ab8-491|27.8             |jcb              |Racer 5 India Pal|
|                 |5-a023-7ab33a55b5|a-8142-c059b74f06|                 |                 |e Ale, Bear Repub|
|                 |9f               |f9               |                 |                 |lic Bre          |
Limit Reached
Query terminated

Enrich transactions with customer info

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM TXNS_ENRICHED WITH (VALUE_FORMAT='AVRO') AS
    SELECT T.ROWTIME AS TXN_TS,
           T.ITEM,
           T.COST,
           T.CARD_TYPE,
           C.ROWKEY AS CUST_ID,
           C.NAME,
           C.EMAIL,
           C.LOCATION
      FROM TXNS T
             LEFT JOIN
           CUSTOMER C
             ON T.CUSTOMER_ID=C.ROWKEY;
ksql> SELECT * FROM TXNS_ENRICHED EMIT CHANGES LIMIT 2;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|ROWTIME     |ROWKEY      |TXN_TS      |ITEM        |COST        |CARD_TYPE   |CUST_ID     |NAME        |EMAIL       |LOCATION    |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|158535321791|84e2c8d1-3ac|158535321791|Alpha King P|44.05       |visa        |84e2c8d1-3ac|Agrajag     |benny.skiles|{CITY=Lamuel|
|0           |6-461b-8a6c-|0           |ale Ale     |            |            |6-461b-8a6c-|            |@hotmail.com|la, PLANET=N|
|            |2a765be6bd82|            |            |            |            |2a765be6bd82|            |            |ano}        |
|158535321824|ee4cb838-cbf|158535321824|Samuel Smith|25.62       |visa        |ee4cb838-cbf|Pasta Fasta |peter.nikola|{CITY=Betelg|
|7           |3-4cb7-8dd2-|7           |’s Imperial |            |            |3-4cb7-8dd2-|            |us@yahoo.com|euse, PLANET|
|            |b7b3a9e9da11|            |IPA         |            |            |b7b3a9e9da11|            |            |=Krikkit}   |
Limit Reached
Query terminated

Stream enriched transactions to database

CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'TXNS_ENRICHED',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true'
  );

This doesn’t work:

ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;

Name                 : SINK_TXNS_ENRICHED_PG
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
---------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_LOCATION (STRUCT) type doesn't have a mapping to the SQL database column type
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1753)
        at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:221)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1669)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1658)
        at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
        at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1660)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1583)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:120)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more

---------------------------------------------------------------------------------------------------------------------------------------------
ksql>

Use SMT to flatten LOCATION

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'TXNS_ENRICHED',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'table.name.format'   = '${topic}',
    'transforms'          = 'flatten',
    'transforms.flatten.type'= 'org.apache.kafka.connect.transforms.Flatten$Value'
  );

✅Table is created and populated in Postgres:

postgres=# \d+ "TXNS_ENRICHED"
                                        Table "public.TXNS_ENRICHED"
     Column      |       Type       | Collation | Nullable | Default | Storage  | Stats target | Description
-----------------+------------------+-----------+----------+---------+----------+--------------+-------------
 TXN_TS          | bigint           |           |          |         | plain    |              |
 ITEM            | text             |           |          |         | extended |              |
 COST            | double precision |           |          |         | plain    |              |
 CARD_TYPE       | text             |           |          |         | extended |              |
 CUST_ID         | text             |           |          |         | extended |              |
 NAME            | text             |           |          |         | extended |              |
 EMAIL           | text             |           |          |         | extended |              |
 LOCATION.CITY   | text             |           |          |         | extended |              |
 LOCATION.PLANET | text             |           |          |         | extended |              |
Access method: heap

postgres=# SELECT * FROM "TXNS_ENRICHED" LIMIT 2;
    TXN_TS     |            ITEM             | COST  | CARD_TYPE |               CUST_ID                |    NAME     |          EMAIL           | LOCATION.CITY | LOCATION.PLANET
---------------+-----------------------------+-------+-----------+--------------------------------------+-------------+--------------------------+---------------+-----------------
 1585353217910 | Alpha King Pale Ale         | 44.05 | visa      | 84e2c8d1-3ac6-461b-8a6c-2a765be6bd82 | Agrajag     | benny.skiles@hotmail.com | Lamuella      | Nano
 1585353218247 | Samuel Smith’s Imperial IPA | 25.62 | visa      | ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11 | Pasta Fasta | peter.nikolaus@yahoo.com | Betelgeuse    | Krikkit
(2 rows)

Note that the TXN_TS is a bigint (epoch), not an actual timestamp type.

Use SMT to handle timestamp

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'TXNS_ENRICHED',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'table.name.format'   = '${topic}',
    'transforms'          = 'flatten,setTimestampType',
    'transforms.flatten.type'= 'org.apache.kafka.connect.transforms.Flatten$Value',
    'transforms.setTimestampType.type'= 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'= 'TXN_TS',
    'transforms.setTimestampType.target.type' ='Timestamp'
);

Connector doesn’t work though (Status is WARNING):

ksql> SHOW CONNECTORS;

 Connector Name        | Type   | Class                                       | Status
------------------------------------------------------------------------------------------------------------
 SOURCE_DATA           | SOURCE | io.mdrogalis.voluble.VolubleSourceConnector | RUNNING (1/1 tasks RUNNING)
 SINK_TXNS_ENRICHED_PG | SINK   | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------

Check the error:

ksql> DESCRIBE CONNECTOR SINK_TXNS_ENRICHED_PG;

Name                 : SINK_TXNS_ENRICHED_PG
Class                : io.confluent.connect.jdbc.JdbcSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','[email protected]','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "TXNS_ENRICHED"("TXN_TS","ITEM","COST","CARD_TYPE","CUST_ID","NAME","EMAIL","LOCATION.CITY","LOCATION.PLANET") VALUES('2020-03-27 23:58:30.431+00','Shakespeare Oatmeal',69.72,'mastercard','ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11','Pasta Fasta','[email protected]','Betelgeuse','Krikkit') was aborted: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"
org.postgresql.util.PSQLException: ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

        ... 12 more

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ksql>

Now that the TXN_TS is coming through as a timestamp, the Postgres INSERT is failing because we’re trying to write it to a bigint field:

ERROR: invalid input syntax for type bigint: "2020-03-27 23:58:30.431+00"

So here we’ll ditch the previous table, and instead populate a new one (taking advantage of table.name.format to modify the target table name) using all of the existing data in the source Kafka topic:

DROP CONNECTOR SINK_TXNS_ENRICHED_PG;
CREATE SINK CONNECTOR SINK_TXNS_ENRICHED_PG_01 WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'TXNS_ENRICHED',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'table.name.format'   = '${topic}-01',
    'transforms'          = 'flatten,setTimestampType',
    'transforms.flatten.type'= 'org.apache.kafka.connect.transforms.Flatten$Value',
    'transforms.setTimestampType.type'= 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'= 'TXN_TS',
    'transforms.setTimestampType.target.type' ='Timestamp'
);

Now Postgres table is built and populated with Timestamp column:

postgres=# \d+ "TXNS_ENRICHED-01"
                                            Table "public.TXNS_ENRICHED-01"
     Column      |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 TXN_TS          | timestamp without time zone |           |          |         | plain    |              |
 ITEM            | text                        |           |          |         | extended |              |
 COST            | double precision            |           |          |         | plain    |              |
 CARD_TYPE       | text                        |           |          |         | extended |              |
 CUST_ID         | text                        |           |          |         | extended |              |
 NAME            | text                        |           |          |         | extended |              |
 EMAIL           | text                        |           |          |         | extended |              |
 LOCATION.CITY   | text                        |           |          |         | extended |              |
 LOCATION.PLANET | text                        |           |          |         | extended |              |
Access method: heap

postgres=# SELECT * FROM "TXNS_ENRICHED-01" LIMIT 2;
         TXN_TS          |            ITEM             | COST  | CARD_TYPE |               CUST_ID                |    NAME     |          EMAIL           | LOCATION.CITY | LOCATION.PLANET
-------------------------+-----------------------------+-------+-----------+--------------------------------------+-------------+--------------------------+---------------+-----------------
 2020-03-27 23:53:37.91  | Alpha King Pale Ale         | 44.05 | visa      | 84e2c8d1-3ac6-461b-8a6c-2a765be6bd82 | Agrajag     | benny.skiles@hotmail.com | Lamuella      | Nano
 2020-03-27 23:53:38.247 | Samuel Smith’s Imperial IPA | 25.62 | visa      | ee4cb838-cbf3-4cb7-8dd2-b7b3a9e9da11 | Pasta Fasta | peter.nikolaus@yahoo.com | Betelgeuse    | Krikkit
(2 rows)

Build aggregate (beers sold per hour)

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE BEERS_HOUR_AGG WITH (VALUE_FORMAT='AVRO') AS
    SELECT WINDOWSTART AS WINDOW_START_TS,
           ITEM,
           SUM(COST) AS TOTAL_SOLD,
           COUNT(*) AS NUMBER_SOLD
      FROM TXNS WINDOW TUMBLING (SIZE 1 HOUR)
      GROUP BY ITEM;
  • Push query (stream of aggregate changes)

    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
           ITEM,
           TOTAL_SOLD,
           NUMBER_SOLD
      FROM BEERS_HOUR_AGG
      EMIT CHANGES;
  • Pull query (aggregate current state)

    SELECT TIMESTAMPTOSTRING(WINDOW_START_TS,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
           TOTAL_SOLD,
           NUMBER_SOLD
      FROM BEERS_HOUR_AGG
     WHERE ROWKEY='Duvel';

Build aggregate (customer summary) to push to database

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE CUSTOMER_SUMMARY WITH (VALUE_FORMAT='AVRO') AS
    SELECT CUST_ID,
           NAME,
           MAX(TXN_TS) AS MOST_RECENT_ORDER_TS,
           COUNT(*) AS NUM_ORDERS,
           COUNT_DISTINCT(ITEM) AS UNIQUE_ITEMS,
           SUM(COST) AS TOTAL_COST
      FROM TXNS_ENRICHED
      GROUP BY CUST_ID, NAME;

Stream ksqlDB to database

CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_01 WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'CUSTOMER_SUMMARY',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'table.name.format'   = '${topic}-01',
    'transforms'          = 'setTimestampType',
    'transforms.setTimestampType.type'= 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'= 'MOST_RECENT_ORDER_TS',
    'transforms.setTimestampType.target.type' ='Timestamp'
);

This is fine but we get new rows in the database each time the aggregate changes, even for the same key.

postgres=# SELECT * FROM "CUSTOMER_SUMMARY-01" WHERE "NAME" = 'Vroomfondel' ;
               CUST_ID                |    NAME     |  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST
--------------------------------------+-------------+-------------------------+------------+--------------+--------------------
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-27 23:58:57.398 |         64 |           38 |            3520.09
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-28 00:07:23.898 |        179 |           53 |            9851.54
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-28 00:07:46.378 |        184 |           53 | 10213.160000000002
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-28 00:07:51.878 |        185 |           53 | 10261.670000000002

Use upsert to push table state from ksqlDB to database

CREATE SINK CONNECTOR SINK_CUSTOMER_SUMMARY_PG_02 WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'CUSTOMER_SUMMARY',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'auto.evolve'         = 'true',
    'insert.mode'         = 'upsert',
    'pk.mode'             = 'record_value',
    'pk.fields'           = 'CUST_ID,NAME',
    'table.name.format'   = '${topic}-02',
    'transforms'          = 'setTimestampType',
    'transforms.setTimestampType.type'= 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
    'transforms.setTimestampType.field'= 'MOST_RECENT_ORDER_TS',
    'transforms.setTimestampType.target.type' ='Timestamp'
);

Important settings:

  • insert.mode is upsert (not the default insert)

  • pk.mode is record_value which says we’re going to define the target table’s primary key based on field(s) from the record’s value

  • pk.fields specifies which field(s) from the record’s value we’d like to use as the PK in the target table (for a ksqlDB aggregate table this is going to be whichever columns you declared for the GROUP BY)

Note target database table is created by the sink with a primary key:

postgres=# \d+ "CUSTOMER_SUMMARY-02"
                                             Table "public.CUSTOMER_SUMMARY-02"
        Column        |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description
----------------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 CUST_ID              | text                        |           | not null |         | extended |              |
 NAME                 | text                        |           | not null |         | extended |              |
 MOST_RECENT_ORDER_TS | timestamp without time zone |           |          |         | plain    |              |
 NUM_ORDERS           | bigint                      |           |          |         | plain    |              |
 UNIQUE_ITEMS         | bigint                      |           |          |         | plain    |              |
 TOTAL_COST           | double precision            |           |          |         | plain    |              |
Indexes:
    "CUSTOMER_SUMMARY-02_pkey" PRIMARY KEY, btree ("CUST_ID", "NAME")
Access method: heap

One row per unique key with aggregate values updated in-place:

postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
               CUST_ID                |    NAME     |  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST
--------------------------------------+-------------+-------------------------+------------+--------------+--------------------
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-28 00:13:03.072 |        254 |           53 | 14009.210000000005
(1 row)

postgres=# SELECT * FROM "CUSTOMER_SUMMARY-02" WHERE "NAME" = 'Vroomfondel' ;
               CUST_ID                |    NAME     |  MOST_RECENT_ORDER_TS   | NUM_ORDERS | UNIQUE_ITEMS |     TOTAL_COST
--------------------------------------+-------------+-------------------------+------------+--------------+--------------------
 d83c7c98-c2f1-449e-b80a-19c38d06b476 | Vroomfondel | 2020-03-28 00:13:18.038 |        257 |           53 | 14122.780000000002
(1 row)