3737#include " paimon/common/data/binary_row.h"
3838#include " paimon/common/data/binary_row_writer.h"
3939#include " paimon/common/factories/io_hook.h"
40- #include " paimon/common/fs/resolving_file_system.h"
4140#include " paimon/common/utils/path_util.h"
4241#include " paimon/common/utils/scope_guard.h"
4342#include " paimon/core/catalog/commit_table_request.h"
6261#include " paimon/fs/local/local_file_system_factory.h"
6362#include " paimon/memory/memory_pool.h"
6463#include " paimon/metrics.h"
65- #include " paimon/string_builder.h"
6664#include " paimon/testing/utils/binary_row_generator.h"
6765#include " paimon/testing/utils/io_exception_helper.h"
6866#include " paimon/testing/utils/testharness.h"
@@ -88,26 +86,27 @@ class GmockFileSystemFactory : public LocalFileSystemFactory {
8886
8987 Result<std::unique_ptr<FileSystem>> Create (
9088 const std::string& path, const std::map<std::string, std::string>& options) const override {
91- auto fs = std::make_unique<GmockFileSystem>();
89+ auto fs = std::make_unique<testing::NiceMock<GmockFileSystem>>();
90+ auto fs_ptr = fs.get ();
9291 using ::testing::A;
9392 using ::testing::Invoke;
9493
9594 ON_CALL (*fs, ListDir (A<const std::string&>(),
9695 A<std::vector<std::unique_ptr<BasicFileStatus>>*>()))
9796 .WillByDefault (
98- Invoke ([& ](const std::string& directory,
99- std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) {
100- return fs ->LocalFileSystem ::ListDir (directory, file_status_list);
97+ Invoke ([fs_ptr ](const std::string& directory,
98+ std::vector<std::unique_ptr<BasicFileStatus>>* file_status_list) {
99+ return fs_ptr ->LocalFileSystem ::ListDir (directory, file_status_list);
101100 }));
102101
103102 ON_CALL (*fs, ReadFile (A<const std::string&>(), A<std::string*>()))
104- .WillByDefault (Invoke ([& ](const std::string& path, std::string* content) {
105- return fs ->FileSystem ::ReadFile (path, content);
103+ .WillByDefault (Invoke ([fs_ptr ](const std::string& path, std::string* content) {
104+ return fs_ptr ->FileSystem ::ReadFile (path, content);
106105 }));
107106
108107 ON_CALL (*fs, AtomicStore (A<const std::string&>(), A<const std::string&>()))
109- .WillByDefault (Invoke ([& ](const std::string& path, const std::string& content) {
110- return fs ->FileSystem ::AtomicStore (path, content);
108+ .WillByDefault (Invoke ([fs_ptr ](const std::string& path, const std::string& content) {
109+ return fs_ptr ->FileSystem ::AtomicStore (path, content);
111110 }));
112111
113112 return fs;
@@ -364,33 +363,31 @@ TEST_F(FileStoreCommitImplTest, TestRESTCatalogCommit) {
364363 ASSERT_FALSE (exist);
365364}
366365
367- // TODO(jinli.zjw): fix disabled case
368- TEST_F (FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryTenTimes) {
366+ TEST_F (FileStoreCommitImplTest, TestCommitWithConflictSnapshotAndRetryTenTimes) {
369367 std::string test_data_path = paimon::test::GetDataDir () + " /orc/append_09.db/append_09/" ;
370368 auto dir = UniqueTestDirectory::Create ();
371369 std::string table_path = dir->Str ();
372370 ASSERT_TRUE (TestUtil::CopyDirectory (test_data_path, table_path));
371+ ASSERT_OK_AND_ASSIGN (std::shared_ptr<FileSystem> fs,
372+ FileSystemFactory::Get (" gmock_fs" , table_path, {}));
373373 CommitContextBuilder context_builder (table_path, " commit_user_1" );
374374 ASSERT_OK_AND_ASSIGN (std::unique_ptr<CommitContext> commit_context,
375375 context_builder.AddOption (Options::MANIFEST_FORMAT, " orc" )
376376 .AddOption (Options::MANIFEST_TARGET_FILE_SIZE, " 8mb" )
377377 .AddOption (Options::COMMIT_MAX_RETRIES, " 10" )
378- .AddOption (Options::FILE_SYSTEM, " gmock_fs " )
378+ .WithFileSystem (fs )
379379 .Finish ());
380380
381381 ASSERT_OK_AND_ASSIGN (auto commit, FileStoreCommit::Create (std::move (commit_context)));
382- auto commit_impl = dynamic_cast <FileStoreCommitImpl*>(commit.get ());
383382 std::string latest_hint = PathUtil::JoinPath (table_path, " snapshot/LATEST" );
384383
385- auto * resolving_fs =
386- dynamic_cast <ResolvingFileSystem*>(commit_impl->snapshot_manager_ ->fs_ .get ());
387- ASSERT_OK_AND_ASSIGN (auto real_fs, resolving_fs->GetRealFileSystem (table_path));
388- auto * mock_fs = dynamic_cast <GmockFileSystem*>(real_fs.get ());
384+ auto * mock_fs = dynamic_cast <GmockFileSystem*>(fs.get ());
389385 EXPECT_CALL (*mock_fs, ReadFile (testing::StrEq (latest_hint), testing::_))
390386 .WillRepeatedly (testing::Invoke ([](const std::string& path, std::string* content) {
391387 *content = " -1" ;
392388 return Status::OK ();
393389 }));
390+ EXPECT_CALL (*mock_fs, ListDir (testing::_, testing::_)).Times (testing::AnyNumber ());
394391 EXPECT_CALL (*mock_fs,
395392 ListDir (testing::StrEq (PathUtil::JoinPath (table_path, " snapshot" )), testing::_))
396393 .WillRepeatedly (
@@ -407,25 +404,23 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryT
407404 ASSERT_NOK (commit->Commit (msgs));
408405}
409406
410- TEST_F (FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryOnce ) {
407+ TEST_F (FileStoreCommitImplTest, TestCommitWithConflictSnapshotAndRetryOnce ) {
411408 std::string test_data_path = paimon::test::GetDataDir () + " /orc/append_09.db/append_09/" ;
412409 auto dir = UniqueTestDirectory::Create ();
413410 std::string table_path = dir->Str ();
414411 ASSERT_TRUE (TestUtil::CopyDirectory (test_data_path, table_path));
412+ ASSERT_OK_AND_ASSIGN (std::shared_ptr<FileSystem> fs,
413+ FileSystemFactory::Get (" gmock_fs" , table_path, {}));
415414 CommitContextBuilder context_builder (table_path, " commit_user_1" );
416415 ASSERT_OK_AND_ASSIGN (std::unique_ptr<CommitContext> commit_context,
417416 context_builder.AddOption (Options::MANIFEST_FORMAT, " orc" )
418417 .AddOption (Options::MANIFEST_TARGET_FILE_SIZE, " 8mb" )
419- .AddOption (Options::FILE_SYSTEM, " gmock_fs " )
418+ .WithFileSystem (fs )
420419 .Finish ());
421420
422421 ASSERT_OK_AND_ASSIGN (auto commit, FileStoreCommit::Create (std::move (commit_context)));
423- auto commit_impl = dynamic_cast <FileStoreCommitImpl*>(commit.get ());
424422 std::string latest_hint = PathUtil::JoinPath (table_path, " snapshot/LATEST" );
425- auto * resolving_fs =
426- dynamic_cast <ResolvingFileSystem*>(commit_impl->snapshot_manager_ ->fs_ .get ());
427- ASSERT_OK_AND_ASSIGN (auto real_fs, resolving_fs->GetRealFileSystem (table_path));
428- auto * mock_fs = dynamic_cast <GmockFileSystem*>(real_fs.get ());
423+ auto * mock_fs = dynamic_cast <GmockFileSystem*>(fs.get ());
429424 EXPECT_CALL (*mock_fs, ReadFile (testing::StrEq (latest_hint), testing::_))
430425 .WillRepeatedly (testing::Invoke ([](const std::string& path, std::string* content) {
431426 *content = " -1" ;
@@ -439,6 +434,7 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryO
439434 return mock_fs->FileSystem ::ReadFile (path, content);
440435 }));
441436
437+ EXPECT_CALL (*mock_fs, ListDir (testing::_, testing::_)).Times (testing::AnyNumber ());
442438 EXPECT_CALL (*mock_fs,
443439 ListDir (testing::StrEq (PathUtil::JoinPath (table_path, " snapshot" )), testing::_))
444440 .WillOnce (
@@ -468,25 +464,23 @@ TEST_F(FileStoreCommitImplTest, DISABLED_TestCommitWithConflictSnapshotAndRetryO
468464 ASSERT_TRUE (exist);
469465}
470466
471- TEST_F (FileStoreCommitImplTest,
472- DISABLED_TestCommitWithAtomicWriteSnapshotTimeoutAndActuallySucceed) {
467+ TEST_F (FileStoreCommitImplTest, TestCommitWithAtomicWriteSnapshotTimeoutAndActuallySucceed) {
473468 std::string test_data_path = paimon::test::GetDataDir () + " /orc/append_09.db/append_09/" ;
474469 auto dir = UniqueTestDirectory::Create ();
475470 std::string table_path = dir->Str ();
476471 ASSERT_TRUE (TestUtil::CopyDirectory (test_data_path, table_path));
472+ ASSERT_OK_AND_ASSIGN (std::shared_ptr<FileSystem> fs,
473+ FileSystemFactory::Get (" gmock_fs" , table_path, {}));
477474 CommitContextBuilder context_builder (table_path, " commit_user_1" );
478475 ASSERT_OK_AND_ASSIGN (std::unique_ptr<CommitContext> commit_context,
479476 context_builder.AddOption (Options::MANIFEST_FORMAT, " orc" )
480477 .AddOption (Options::MANIFEST_TARGET_FILE_SIZE, " 8mb" )
481- .AddOption (Options::FILE_SYSTEM, " gmock_fs " )
478+ .WithFileSystem (fs )
482479 .Finish ());
483480
484481 ASSERT_OK_AND_ASSIGN (auto commit, FileStoreCommit::Create (std::move (commit_context)));
485482 std::string new_snapshot_6 = PathUtil::JoinPath (table_path, " snapshot/snapshot-6" );
486- auto commit_impl = dynamic_cast <FileStoreCommitImpl*>(commit.get ());
487- auto * resolving_fs = dynamic_cast <ResolvingFileSystem*>(commit_impl->fs_ .get ());
488- ASSERT_OK_AND_ASSIGN (auto real_fs, resolving_fs->GetRealFileSystem (table_path));
489- auto * mock_fs = dynamic_cast <GmockFileSystem*>(real_fs.get ());
483+ auto * mock_fs = dynamic_cast <GmockFileSystem*>(fs.get ());
490484 EXPECT_CALL (*mock_fs, AtomicStore (testing::StrEq (new_snapshot_6), testing::_))
491485 .WillOnce (testing::Invoke ([&](const std::string& path, const std::string& content) {
492486 // to mock atomic store timeout actually succeed
@@ -507,22 +501,18 @@ TEST_F(FileStoreCommitImplTest,
507501
508502 CommitContextBuilder context_builder_2 (table_path, " commit_user_1" );
509503 ASSERT_OK_AND_ASSIGN (std::unique_ptr<CommitContext> commit_context_2,
510- context_builder .AddOption (Options::MANIFEST_FORMAT, " orc" )
504+ context_builder_2 .AddOption (Options::MANIFEST_FORMAT, " orc" )
511505 .AddOption (Options::MANIFEST_TARGET_FILE_SIZE, " 8mb" )
512- .AddOption (Options::FILE_SYSTEM, " gmock_fs " )
506+ .WithFileSystem (fs )
513507 .Finish ());
514508
515509 ASSERT_OK_AND_ASSIGN (auto commit_2, FileStoreCommit::Create (std::move (commit_context_2)));
516510 ASSERT_OK_AND_ASSIGN (int32_t num_committed, commit_2->FilterAndCommit ({{1 , msgs}}));
517511 ASSERT_EQ (0 , num_committed);
518- auto commit_impl_2 = dynamic_cast <FileStoreCommitImpl*>(commit_2.get ());
519- auto * resolving_fs_2 = dynamic_cast <ResolvingFileSystem*>(commit_impl_2->fs_ .get ());
520- ASSERT_OK_AND_ASSIGN (auto real_fs_2, resolving_fs_2->GetRealFileSystem (table_path));
521- auto * mock_fs_2 = dynamic_cast <GmockFileSystem*>(real_fs_2.get ());
522512 std::string new_snapshot_7 = PathUtil::JoinPath (table_path, " snapshot/snapshot-7" );
523- EXPECT_CALL (*mock_fs_2 , AtomicStore (testing::StrEq (new_snapshot_7), testing::_))
513+ EXPECT_CALL (*mock_fs , AtomicStore (testing::StrEq (new_snapshot_7), testing::_))
524514 .WillOnce (testing::Invoke ([&](const std::string& path, const std::string& content) {
525- return mock_fs_2 ->FileSystem ::AtomicStore (path, content);
515+ return mock_fs ->FileSystem ::AtomicStore (path, content);
526516 }));
527517 std::vector<std::shared_ptr<CommitMessage>> msgs_2 =
528518 GetCommitMessages (paimon::test::GetDataDir () +
0 commit comments