Improve Handling of Missing Database Key Fields in Pulsar IO JDBC Sink #23953
Labels
type/enhancement
The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Search before asking
Motivation
When using the Pulsar IO JDBC Sink with PostgreSQL and Avro schemas - if the database has a column that does not have a corresponding field in the schema/message, the sink does not work.
I noticed this issue when trying to integrate with a table that has a primary key populated by the db, i.e. using a regular db sequence. The seemingly only workaround is have my app call the db sequence and set the ID in the message before passing it to the sink. This seems a basic use case and would be very useful feature.
Solution
Improve the JDBC Sink to handle cases where messages lack db fields, by introducing additional configuration options:
Option 1:
missingKeyHandlingMode
FAIL
(default): Keep existing behavior, failing when a db field is missing from message.IGNORE
: Use db default value.Option 2:
ignoreKeys
CSV of message keys to not attempt to map to a db column
Alternatives
No response
Anything else?
Current errors
Setup
id integer
- with no constraints.org.apache.avro.AvroRuntimeException: Not a valid schema field: id at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) ~[java-instance.jar:?] at org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord.getField(GenericAvroRecord.java:48) ~[pulsar-client-original-4.0.2.jar:4.0.2] at org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.lambda$createMutation$1(BaseJdbcAutoSchemaSink.java:146) ~[pulsar-io-jdbc-core-4.0.2.jar:?]
I did test
excludeNonDeclaredFields
set to true, which gives error also:2025-02-08T10:02:47,417+0000 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception No value specified for parameter 1. after 0 ms, failing 1 messages org.postgresql.util.PSQLException: No value specified for parameter 1. at org.postgresql.core.v3.SimpleParameterList.checkAllParametersSet(SimpleParameterList.java:339) ~[postgresql-42.5.5.jar:42.5.5] at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:340) ~[postgresql-42.5.5.jar:42.5.5] at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) ~[postgresql-42.5.5.jar:42.5.5]
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: