Skip to content

[Bug] [connector-cassandra] when field in source(cassancra) is timestamp type and data is null, will cause error. #9849

@aaawuanjun

Description

@aaawuanjun

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

source table in cassandra, when the field is timestamp type, and the data is null ,will cause nullpointer exception.

SeaTunnel Version

2.3.3 and dev

SeaTunnel Config

test1.config's content as below:


env {
    execution.parallelism = 1
    job.mode = "BATCH"
    checkpoint.interval = 10000
}
source {
    Cassandra {
        host = "192.168.0.100:9042"
        keyspace = "store"
        username = "admin"
        password = "123456"
        datacenter = "datacenter1"
        cql = "select userid,item_count,last_update_timestamp from shopping_cart"
    }
}

sink {
    doris {
        fenodes = "192.168.0.101:8030"
        username = "root"
        password = "123456"
        database = "cassandradb"
        table = "shopping_cart"
        table.identifier = "cassandradb.shopping_cart"
        sink.label-prefix = "seatunnel_offline"
        sink.enable-2pc = "false"
        doris.config = {
            format = "json",
            read_json_by_line = "true"
        }
        sink.properties.columns = "userid,item_count,last_update_timestamp"
    }
}

Running Command

bin/seatunnel.sh --config test1.confg -e local

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.NullPointerException
        at java.util.Objects.requireNonNull(Objects.java:203)
        at org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil.buildSeaTunnelRow(TypeConvertUtil.java:185)
        at org.apache.seatunnel.connectors.seatunnel.cassandra.source.CassandraSourceReader.lambda$pollNext$0(CassandraSourceReader.java:73)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at org.apache.seatunnel.connectors.seatunnel.cassandra.source.CassandraSourceReader.pollNext(CassandraSourceReader.java:73)
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

        at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
        ... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions