Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions include/paimon/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,34 @@

#pragma once

#include <chrono>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <variant>
#include <vector>

#include "paimon/catalog/identifier.h"
#include "paimon/result.h"
#include "paimon/schema/schema.h"
#include "paimon/status.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

struct ArrowSchema;

namespace paimon {
// Tag name or snapshot id
using Instant = std::variant<std::string, int64_t>;

class Database;
class Table;
class View;
class Schema;
class Snapshot;
class PartitionStatistics;
class Tag;
class Identifier;

/// This interface is responsible for reading and writing metadata such as database/table from a
/// paimon catalog.
class PAIMON_EXPORT Catalog {
Expand Down Expand Up @@ -99,6 +110,38 @@ class PAIMON_EXPORT Catalog {
/// status.
virtual Result<std::vector<std::string>> ListTables(const std::string& db_name) const = 0;

/// Drops a database.
///
/// @param name Name of the database to be dropped.
/// @param ignore_if_not_exists If true, no action is taken if the database does not exist.
/// @param cascade If true, drops all tables and functions in the database before dropping the
/// database.
/// @return A status indicating success or failure.
virtual Status DropDatabase(const std::string& name, bool ignore_if_not_exists,
bool cascade) = 0;

/// Drops a table.
///
/// @param identifier Identifier of the table to drop.
/// @param ignore_if_not_exists If true, no action is taken if the table does not exist.
/// @return A status indicating success or failure.
virtual Status DropTable(const Identifier& identifier, bool ignore_if_not_exists) = 0;

/// Renames a table.
///
/// @param from_table Current identifier of the table.
/// @param to_table New identifier for the table.
/// @param ignore_if_not_exists If true, no action is taken if the table does not exist.
/// @return A status indicating success or failure.
virtual Status RenameTable(const Identifier& from_table, const Identifier& to_table,
bool ignore_if_not_exists) = 0;

/// Gets a table.
///
/// @param identifier Identifier of the table to get.
/// @return A result containing the table, or an error status.
virtual Result<std::shared_ptr<Table>> GetTable(const Identifier& identifier) const = 0;

/// Checks whether a database with the specified name exists in the catalog.
///
/// @param db_name The name of the database to check for existence.
Expand Down
53 changes: 53 additions & 0 deletions include/paimon/catalog/database.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <map>
#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/type_fwd.h"
#include "paimon/visibility.h"

struct ArrowSchema;

namespace paimon {

/// Interface of a database in a catalog.
class PAIMON_EXPORT Database {
public:
virtual ~Database() = default;

/// ================== Database Metadata =====================

/// A name to identify this database.
virtual std::string Name() const = 0;

/// Get the database-level options associated with this database.
/// @return Options
virtual const std::map<std::string, std::string>& Options() const = 0;

/// Get an optional comment describing the database.
/// @return The database comment if set, or std::nullopt otherwise.
virtual std::optional<std::string> Comment() const = 0;
};

} // namespace paimon
213 changes: 212 additions & 1 deletion src/paimon/core/catalog/file_system_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cstring>
#include <optional>
#include <set>
#include <utility>

#include "arrow/c/bridge.h"
Expand All @@ -27,7 +28,8 @@
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/string_utils.h"
#include "paimon/core/schema/schema_manager.h"
#include "paimon/core/utils/branch_manager.h"
#include "paimon/defs.h"
#include "paimon/fs/file_system.h"
#include "paimon/logging.h"
#include "paimon/result.h"
Expand Down Expand Up @@ -240,4 +242,213 @@ Result<std::shared_ptr<Schema>> FileSystemCatalog::LoadTableSchema(
return std::static_pointer_cast<Schema>(*latest_schema);
}

Result<std::shared_ptr<Table>> FileSystemCatalog::GetTable(const Identifier& identifier) const {
std::string table_path = GetTableLocation(identifier);
PAIMON_ASSIGN_OR_RAISE(bool exist, fs_->Exists(table_path));
if (!exist) {
return Status::NotExist(fmt::format("{} not exist", identifier.ToString()));
}
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(identifier));
if (!latest_schema) {
return Status::NotExist(
fmt::format("load table schema for {} failed", identifier.ToString()));
}
auto schema = std::static_pointer_cast<Schema>(*latest_schema);
return std::make_shared<Table>(schema, identifier.GetDatabaseName(), identifier.GetTableName());
}

Status FileSystemCatalog::DropDatabase(const std::string& name, bool ignore_if_not_exists,
bool cascade) {
if (IsSystemDatabase(name)) {
return Status::Invalid(fmt::format("Cannot drop system database {}.", name));
}

PAIMON_ASSIGN_OR_RAISE(bool exist, DatabaseExists(name));
if (!exist) {
if (ignore_if_not_exists) {
return Status::OK();
} else {
return Status::NotExist(fmt::format("database {} does not exist", name));
}
}

std::string db_path = NewDatabasePath(warehouse_, name);

if (cascade) {
// List all tables in the database and drop them
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> tables, ListTables(name));
for (const std::string& table_name : tables) {
Identifier table_id(name, table_name);
PAIMON_RETURN_NOT_OK(DropTable(table_id, false));
}
} else {
// Check if database is empty
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> tables, ListTables(name));
if (!tables.empty()) {
return Status::Invalid(
fmt::format("Cannot drop non-empty database {}. Use cascade=true to force.", name));
}
}

// Delete the database directory
PAIMON_RETURN_NOT_OK(fs_->Delete(db_path));
return Status::OK();
}

Result<std::vector<std::string>> FileSystemCatalog::GetSchemaExternalPaths(
const std::vector<std::shared_ptr<TableSchema>>& schemas) const {
std::set<std::string> external_paths_set;
for (const auto& schema : schemas) {
const auto& options = schema->Options();
auto iter = options.find(Options::DATA_FILE_EXTERNAL_PATHS);
if (iter != options.end() && !iter->second.empty()) {
auto paths = StringUtils::Split(iter->second, ",", /*ignore_empty=*/true);
for (const auto& path : paths) {
std::string trimmed_path = path;
StringUtils::Trim(&trimmed_path);
if (!trimmed_path.empty()) {
external_paths_set.insert(trimmed_path);
}
}
}
}
return std::vector<std::string>(external_paths_set.begin(), external_paths_set.end());
}

Result<std::vector<std::string>> FileSystemCatalog::GetTableBranches(
const std::string& table_path) const {
std::vector<std::string> branches;
std::string branch_dir = PathUtil::JoinPath(table_path, "branch");
PAIMON_ASSIGN_OR_RAISE(bool branch_dir_exists, fs_->Exists(branch_dir));
if (!branch_dir_exists) {
return branches;
}

std::vector<std::unique_ptr<BasicFileStatus>> file_status_list;
PAIMON_RETURN_NOT_OK(fs_->ListDir(branch_dir, &file_status_list));

for (const auto& file_status : file_status_list) {
if (file_status->IsDir()) {
std::string dir_name = PathUtil::GetName(file_status->GetPath());
// Branch directory name format: branch-{branch_name}
const std::string branch_prefix = BranchManager::BRANCH_PREFIX;
if (StringUtils::StartsWith(dir_name, branch_prefix, /*start_pos=*/0)) {
std::string branch_name = dir_name.substr(branch_prefix.length());
branches.push_back(branch_name);
}
}
}
return branches;
}

Status FileSystemCatalog::DropTableImpl(const Identifier& identifier,
const std::vector<std::string>& external_paths) {
std::string table_path = GetTableLocation(identifier);

// Delete external paths first
for (const auto& external_path : external_paths) {
PAIMON_ASSIGN_OR_RAISE(bool exists, fs_->Exists(external_path));
if (exists) {
PAIMON_RETURN_NOT_OK(fs_->Delete(external_path));
}
}

// Delete the table directory
PAIMON_RETURN_NOT_OK(fs_->Delete(table_path));
return Status::OK();
}

Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if_not_exists) {
if (IsSystemTable(identifier)) {
return Status::Invalid(fmt::format("Cannot drop system table {}.", identifier.ToString()));
}

std::string table_path = GetTableLocation(identifier);
PAIMON_ASSIGN_OR_RAISE(bool exist, fs_->Exists(table_path));
if (!exist) {
if (ignore_if_not_exists) {
return Status::OK();
} else {
return Status::NotExist(fmt::format("table {} does not exist", identifier.ToString()));
}
}

// Check if table has valid schema (table exists)
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
TableSchemaExists(identifier));
if (!latest_schema) {
if (ignore_if_not_exists) {
return Status::OK();
} else {
return Status::NotExist(fmt::format("table {} does not exist", identifier.ToString()));
}
}

// Collect external paths from all schemas
std::set<std::string> external_paths_set;

// Get external paths from main branch
SchemaManager schema_manager(fs_, table_path);
PAIMON_ASSIGN_OR_RAISE(std::vector<int64_t> schema_ids, schema_manager.ListAllIds());
std::vector<std::shared_ptr<TableSchema>> schemas;
for (int64_t id : schema_ids) {
PAIMON_ASSIGN_OR_RAISE(auto schema, schema_manager.ReadSchema(id));
schemas.push_back(schema);
}
PAIMON_ASSIGN_OR_RAISE(auto main_external_paths, GetSchemaExternalPaths(schemas));
external_paths_set.insert(main_external_paths.begin(), main_external_paths.end());

// Get external paths from all branches
PAIMON_ASSIGN_OR_RAISE(auto branches, GetTableBranches(table_path));
for (const auto& branch : branches) {
SchemaManager branch_schema_manager(fs_, table_path, branch);
PAIMON_ASSIGN_OR_RAISE(std::vector<int64_t> branch_schema_ids,
branch_schema_manager.ListAllIds());
std::vector<std::shared_ptr<TableSchema>> branch_schemas;
for (int64_t id : branch_schema_ids) {
PAIMON_ASSIGN_OR_RAISE(auto schema, branch_schema_manager.ReadSchema(id));
branch_schemas.push_back(schema);
}
PAIMON_ASSIGN_OR_RAISE(auto branch_external_paths, GetSchemaExternalPaths(branch_schemas));
external_paths_set.insert(branch_external_paths.begin(), branch_external_paths.end());
}

std::vector<std::string> external_paths(external_paths_set.begin(), external_paths_set.end());
return DropTableImpl(identifier, external_paths);
}

Status FileSystemCatalog::RenameTable(const Identifier& from_table, const Identifier& to_table,
bool ignore_if_not_exists) {
if (IsSystemTable(from_table) || IsSystemTable(to_table)) {
return Status::Invalid(fmt::format("Cannot rename system table {} or {}.",
from_table.ToString(), to_table.ToString()));
}

if (from_table.GetDatabaseName() != to_table.GetDatabaseName()) {
return Status::Invalid(
"Cannot rename table across databases. Cross-database rename is not supported.");
}

PAIMON_ASSIGN_OR_RAISE(bool from_exist, TableExists(from_table));
if (!from_exist) {
if (ignore_if_not_exists) {
return Status::OK();
} else {
return Status::NotExist(
fmt::format("source table {} does not exist", from_table.ToString()));
}
}

PAIMON_ASSIGN_OR_RAISE(bool to_exist, TableExists(to_table));
if (to_exist) {
return Status::Invalid(fmt::format("target table {} already exists", to_table.ToString()));
}

std::string from_path = GetTableLocation(from_table);
std::string to_path = GetTableLocation(to_table);
PAIMON_RETURN_NOT_OK(fs_->Rename(from_path, to_path));
return Status::OK();
}

} // namespace paimon
Loading
Loading