Replies: 2 comments 1 reply
-
in the postgres side, the table was like this
and yaml config like this
|
Beta Was this translation helpful? Give feedback.
-
I think the issue is that you're sending messages like Can you replicate this issue with a test message like this? bin/pulsar-client produce pulsar-postgres-jdbc-sink-topic -m '{"name": "test"}' -s "\n" -n 100 |
Beta Was this translation helpful? Give feedback.
-
Hey all, i have create some pipeline on apache pulsar using Python Function. but i got stuck when i want to insert topic to my Pulsar IO sink to postgre 13.
my topic is at
default/public/pulsar-postgres-jdbc-sink-topic
and use this schema
{ "type": "AVRO", "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}", "properties": {} }
exactly like this in this tutorial
https://pulsar.apache.org/docs/3.1.x/io-quickstart/#connect-pulsar-to-postgresql
but when inserting any data it always return null value for example {"id"=123, "name":"John"}
it return like this
2023-10-04T06:10:51,870+0000 [pool-3-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception ERROR: null value in column "id" of relation "jdbc_sink" violates not-null constraint Detail: Failing row contains (null, null). after 2 ms, failing 1 messages org.postgresql.util.PSQLException: ERROR: null value in column "id" of relation "jdbc_sink" violates not-null constraint Detail: Failing row contains (null, null). at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190) ~[postgresql-42.5.1.jar:42.5.1] at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177) ~[postgresql-42.5.1.jar:42.5.1] at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:289) ~[pulsar-io-jdbc-core-3.1.0.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:833) ~[?:?] 2023-10-04T06:11:39,223+0000 [pulsar-timer-9-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testSinkEpoch] [public/default/test] [715e6] Prefetched messages: 0 --- Consume throughput received: 0.17 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
how i can solve this?
will appreciate any help/assist. thanks~
Beta Was this translation helpful? Give feedback.
All reactions