Skip to content

[Bug] the close() of the OSS stream may not guarantee the file is visible/readable on OSS immediately #7467

@MaxLinyun

Description

@MaxLinyun

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

release 1.2

Compute Engine

Flink 1.16

Minimal reproduce step

Execute the INSERT SELECT statement on Flink to write a large volume of data into a Paimon table with a high number of buckets. And the traffic load on the OSS bucket storing this Paimon table is relatively high.

What doesn't meet your expectations?

java.io.IOException: Could not perform checkpoint 1264 for operator Writer(write-only) : cthot_rank_paimon (49/80)#47.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:978)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:957)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:731)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:553)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.FileNotFoundException: oss://my_bucket/my_warehouse/my_db.db/my_table/bucket-46/data-13d72dd9-4dad-43c3-8ec8-713d7a00d033-1.parquet: No such file or directory!
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:221)
at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:128)
at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:205)
at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:104)
at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:84)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:334)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
... 22 more
Caused by: java.io.FileNotFoundException: oss://my_bucket/my_warehouse/my_db.db/my_table/bucket-46/data-13d72dd9-4dad-43c3-8ec8-713d7a00d033-1.parquet: No such file or directory!
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.getFileStatus(AliyunOSSFileSystem.java:283)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.open(AliyunOSSFileSystem.java:590)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:131)
at org.apache.flink.fs.osshadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:134)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:87)
at org.apache.paimon.flink.FlinkFileIO.newInputStream(FlinkFileIO.java:67)
at org.apache.paimon.format.parquet.ParquetInputFile.newStream(ParquetInputFile.java:57)
at org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:257)
at org.apache.paimon.format.parquet.ParquetUtil.getParquetReader(ParquetUtil.java:86)
at org.apache.paimon.format.parquet.ParquetUtil.extractColumnStats(ParquetUtil.java:52)
at org.apache.paimon.format.parquet.ParquetSimpleStatsExtractor.extractWithFileInfo(ParquetSimpleStatsExtractor.java:78)
at org.apache.paimon.format.parquet.ParquetSimpleStatsExtractor.extract(ParquetSimpleStatsExtractor.java:71)
at org.apache.paimon.io.SimpleStatsProducer$2.extract(SimpleStatsProducer.java:93)
at org.apache.paimon.io.StatsCollectingSingleFileWriter.fieldStats(StatsCollectingSingleFileWriter.java:86)
at org.apache.paimon.io.KeyValueDataFileWriter.result(KeyValueDataFileWriter.java:159)
at org.apache.paimon.io.KeyValueDataFileWriter.result(KeyValueDataFileWriter.java:55)
at org.apache.paimon.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:135)
at org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:167)
at org.apache.paimon.mergetree.MergeTreeWriter.flushWriteBuffer(MergeTreeWriter.java:234)
at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:253)
at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:210)
at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:152)
at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:248)
at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:216)
... 32 more

Anything else?

The failure happens during the stats collection phase when closing the current writer in the rolling file writer, specifically when trying to compute field statistics on the Parquet file that should have been written but appears to be missing from OSS

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions