Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/paimon/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ class PAIMON_EXPORT Catalog {
/// @note System tables will not be supported.
///
/// @param identifier The identifier (database and table name) of the table to load.
/// @return A result containing table schema if the table exists, or std::nullopt if it
/// doesn't, or an error status on failure.
virtual Result<std::optional<std::shared_ptr<Schema>>> LoadTableSchema(
const Identifier& identifier) const = 0;
/// @return A result containing table schema if the table exists, or an error status on failure.
virtual Result<std::shared_ptr<Schema>> LoadTableSchema(const Identifier& identifier) const = 0;
};

} // namespace paimon
34 changes: 13 additions & 21 deletions src/paimon/core/catalog/file_system_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class Schema;
struct ArrowSchema;

namespace paimon {
class TableSchema;

FileSystemCatalog::FileSystemCatalog(const std::shared_ptr<FileSystem>& fs,
const std::string& warehouse)
: fs_(fs), warehouse_(warehouse), logger_(Logger::GetLogger("FileSystemCatalog")) {}
Expand Down Expand Up @@ -104,7 +102,9 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema*
return Status::Invalid(
fmt::format("database {} is not exist", identifier.GetDatabaseName()));
}
PAIMON_ASSIGN_OR_RAISE(bool table_exist, TableExists(identifier));
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(identifier));
bool table_exist = (latest_schema != std::nullopt);
if (table_exist) {
if (ignore_if_exists) {
return Status::OK();
Expand All @@ -128,17 +128,14 @@ Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema*
return Status::OK();
}

Result<bool> FileSystemCatalog::TableExists(const Identifier& identifier) const {
Result<std::optional<std::shared_ptr<TableSchema>>> FileSystemCatalog::TableSchemaExists(
const Identifier& identifier) const {
if (IsSystemTable(identifier)) {
return Status::NotImplemented("do not support checking TableExists for system table.");
return Status::NotImplemented(
"do not support checking TableSchemaExists for system table.");
}
SchemaManager schema_manager(fs_, NewDataTablePath(warehouse_, identifier));
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
schema_manager.Latest());
if (latest_schema == std::nullopt) {
return false;
}
return true;
return schema_manager.Latest();
}

bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) {
Expand Down Expand Up @@ -212,19 +209,14 @@ Result<bool> FileSystemCatalog::TableExistsInFileSystem(const std::string& table
}
}

Result<std::optional<std::shared_ptr<Schema>>> FileSystemCatalog::LoadTableSchema(
Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
const Identifier& identifier) const {
if (IsSystemTable(identifier)) {
return Status::NotImplemented("do not support loading schema for system table.");
}
SchemaManager schema_manager(fs_, NewDataTablePath(warehouse_, identifier));
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
schema_manager.Latest());
if (latest_schema.has_value()) {
std::shared_ptr<Schema> schema = std::make_shared<SchemaImpl>(*latest_schema);
return std::optional<std::shared_ptr<Schema>>(schema);
TableSchemaExists(identifier));
if (!latest_schema) {
return Status::NotExist(fmt::format("{} not exist", identifier.ToString()));
}
return std::optional<std::shared_ptr<Schema>>();
return std::make_shared<SchemaImpl>(*latest_schema);
}

} // namespace paimon
8 changes: 4 additions & 4 deletions src/paimon/core/catalog/file_system_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
struct ArrowSchema;

namespace paimon {

class TableSchema;
class FileSystem;
class Identifier;
class Logger;
Expand All @@ -49,8 +49,7 @@ class FileSystemCatalog : public Catalog {

Result<std::vector<std::string>> ListDatabases() const override;
Result<std::vector<std::string>> ListTables(const std::string& database_names) const override;
Result<std::optional<std::shared_ptr<Schema>>> LoadTableSchema(
const Identifier& identifier) const override;
Result<std::shared_ptr<Schema>> LoadTableSchema(const Identifier& identifier) const override;

private:
static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name);
Expand All @@ -59,7 +58,8 @@ class FileSystemCatalog : public Catalog {
static bool IsSpecifiedSystemTable(const Identifier& identifier);
static bool IsSystemTable(const Identifier& identifier);
Result<bool> DataBaseExists(const std::string& db_name) const;
Result<bool> TableExists(const Identifier& identifier) const;
Result<std::optional<std::shared_ptr<TableSchema>>> TableSchemaExists(
const Identifier& identifier) const;

Status CreateDatabaseImpl(const std::string& db_name,
const std::map<std::string, std::string>& options);
Expand Down
37 changes: 18 additions & 19 deletions src/paimon/core/catalog/file_system_catalog_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,9 @@ TEST(FileSystemCatalogTest, TestCreateTableWithBlob) {
ASSERT_OK_AND_ASSIGN(std::vector<std::string> table_names, catalog.ListTables("db1"));
ASSERT_EQ(1, table_names.size());
ASSERT_EQ(table_names[0], "tbl1");
ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<Schema>> table_schema,
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> table_schema,
catalog.LoadTableSchema(Identifier("db1", "tbl1")));
ASSERT_TRUE(table_schema.has_value());
ASSERT_OK_AND_ASSIGN(auto arrow_schema, (*table_schema)->GetArrowSchema());
ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema());
auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie();
ASSERT_TRUE(typed_schema.Equals(loaded_schema));
ArrowSchemaRelease(&schema);
Expand Down Expand Up @@ -336,32 +335,32 @@ TEST(FileSystemCatalogTest, TestValidateTableSchema) {
ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options,
/*ignore_if_exists=*/false));

ASSERT_OK_AND_ASSIGN(std::optional<std::shared_ptr<Schema>> table_schema,
catalog.LoadTableSchema(Identifier("db0", "tbl0")));
ASSERT_FALSE(table_schema.has_value());
ASSERT_OK_AND_ASSIGN(table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1")));
ASSERT_TRUE(table_schema.has_value());
ASSERT_EQ(0, (*table_schema)->Id());
ASSERT_EQ(3, (*table_schema)->HighestFieldId());
ASSERT_EQ(1, (*table_schema)->PartitionKeys().size());
ASSERT_EQ(0, (*table_schema)->PrimaryKeys().size());
ASSERT_EQ(-1, (*table_schema)->NumBuckets());
ASSERT_FALSE((*table_schema)->Comment().has_value());
std::vector<std::string> field_names = (*table_schema)->FieldNames();
ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db0", "tbl0")),
"Identifier{database=\'db0\', table=\'tbl0\'} not exist");
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Schema> table_schema,
catalog.LoadTableSchema(Identifier("db1", "tbl1")));
ASSERT_EQ(0, table_schema->Id());
ASSERT_EQ(3, table_schema->HighestFieldId());
ASSERT_EQ(1, table_schema->PartitionKeys().size());
ASSERT_EQ(0, table_schema->PrimaryKeys().size());
ASSERT_EQ(-1, table_schema->NumBuckets());
ASSERT_FALSE(table_schema->Comment().has_value());
std::vector<std::string> field_names = table_schema->FieldNames();
std::vector<std::string> expected_field_names = {"f0", "f1", "f2", "f3"};
ASSERT_EQ(field_names, expected_field_names);

ASSERT_OK_AND_ASSIGN(auto arrow_schema, (*table_schema)->GetArrowSchema());
ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema());
auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie();
ASSERT_TRUE(typed_schema.Equals(loaded_schema));

ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {}));
ASSERT_OK(fs->Delete(PathUtil::JoinPath(dir->Str(), "db1.db/tbl1/schema/schema-0")));
ASSERT_OK_AND_ASSIGN(table_schema, catalog.LoadTableSchema(Identifier("db1", "tbl1")));
ASSERT_FALSE(table_schema.has_value());

ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl1")),
"Identifier{database=\'db1\', table=\'tbl1\'} not exist");

ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")),
"do not support loading schema for system table.");
"do not support checking TableSchemaExists for system table.");
ArrowSchemaRelease(&schema);
}

Expand Down
Loading