Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ set(BUSTUB_THIRD_PARTY_INCLUDE_DIR
${PROJECT_SOURCE_DIR}/third_party/libpg_query/include
${PROJECT_SOURCE_DIR}/third_party/argparse/include
${PROJECT_SOURCE_DIR}/third_party/cpp_random_distributions
${PROJECT_SOURCE_DIR}/third_party/csv2/include
)

include_directories(${BUSTUB_SRC_INCLUDE_DIR} ${BUSTUB_TEST_INCLUDE_DIR} ${BUSTUB_THIRD_PARTY_INCLUDE_DIR})
Expand Down
1 change: 1 addition & 0 deletions src/binder/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_library(
bind_create.cpp
bind_insert.cpp
bind_select.cpp
bind_copy.cpp
bind_variable.cpp
bound_statement.cpp
fmt_impl.cpp
Expand Down
53 changes: 53 additions & 0 deletions src/binder/bind_copy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include <iterator>
#include <memory>
#include <optional>
#include <string>

#include "binder/binder.h"
#include "binder/bound_expression.h"
#include "binder/bound_order_by.h"
#include "binder/bound_table_ref.h"
#include "binder/expressions/bound_column_ref.h"
#include "binder/expressions/bound_constant.h"
#include "binder/statement/copy_statement.h"
#include "binder/statement/delete_statement.h"
#include "binder/statement/insert_statement.h"
#include "binder/statement/select_statement.h"
#include "binder/statement/update_statement.h"
#include "binder/tokens.h"
#include "common/exception.h"
#include "common/macros.h"
#include "common/util/string_util.h"
#include "nodes/parsenodes.hpp"
#include "type/value_factory.h"

namespace bustub {

auto Binder::BindCopy(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement> {
if (pg_stmt->is_from) {
return BindCopyFrom(pg_stmt);
}
return BindCopyTo(pg_stmt);
}

auto Binder::BindCopyFrom(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement> {
auto table = BindBaseTableRef(pg_stmt->relation->relname, std::nullopt);

std::vector<std::unique_ptr<BoundColumnRef>> exprs;
if (pg_stmt->attlist != nullptr) {
for (auto col_node = pg_stmt->attlist->head; col_node != nullptr; col_node = col_node->next) {
auto target = reinterpret_cast<duckdb_libpgquery::PGResTarget *>(col_node->data.ptr_value);
if (target->name != nullptr) {
auto column = ResolveColumnRefFromBaseTableRef(*table, std::vector{std::string{target->name}});
exprs.emplace_back(std::move(column));
}
}
}
return std::make_unique<CopyStatement>(std::move(table), std::move(exprs), pg_stmt->filename, true);
}

auto Binder::BindCopyTo(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement> {
// SelectNode: use pg_stmt->query
throw NotImplementedException("copyTo() not implemented");
}
}; // namespace bustub
1 change: 1 addition & 0 deletions src/binder/statement/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_library(
bustub_statement
OBJECT
create_statement.cpp
copy_statement.cpp
delete_statement.cpp
explain_statement.cpp
index_statement.cpp
Expand Down
28 changes: 28 additions & 0 deletions src/binder/statement/copy_statement.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "binder/statement/copy_statement.h"
#include "binder/bound_statement.h"
#include "catalog/column.h"
#include "common/enums/statement_type.h"
#include "common/util/string_util.h"
#include "fmt/ranges.h"

#include <utility>

namespace bustub {

CopyStatement::CopyStatement(std::unique_ptr<BoundBaseTableRef> table,
std::vector<std::unique_ptr<BoundColumnRef>> columns, std::string file_path, bool is_from)
: BoundStatement(StatementType::COPY_STATEMENT),
table_(std::move(table)),
columns_(std::move(columns)),
file_path_(std::move(file_path)),
is_from_(is_from) {
if (StringUtil::EndsWith(file_path, ".csv")) {
SetCSVFormat();
}
}

auto CopyStatement::ToString() const -> std::string {
return fmt::format("BoundCopy {{ table={}, filename={} }}", table_, file_path_);
}

}; // namespace bustub
2 changes: 1 addition & 1 deletion src/binder/statement/delete_statement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ DeleteStatement::DeleteStatement(std::unique_ptr<BoundBaseTableRef> table, std::
: BoundStatement(StatementType::DELETE_STATEMENT), table_(std::move(table)), expr_(std::move(expr)) {}

auto DeleteStatement::ToString() const -> std::string {
return fmt::format("Delete {{ table={}, expr={} }}", *table_, *expr_);
return fmt::format("BoundDelete {{ table={}, expr={} }}", *table_, *expr_);
}

} // namespace bustub
2 changes: 2 additions & 0 deletions src/binder/transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ auto Binder::BindStatement(duckdb_libpgquery::PGNode *stmt) -> std::unique_ptr<B
return BindCreate(reinterpret_cast<duckdb_libpgquery::PGCreateStmt *>(stmt));
case duckdb_libpgquery::T_PGInsertStmt:
return BindInsert(reinterpret_cast<duckdb_libpgquery::PGInsertStmt *>(stmt));
case duckdb_libpgquery::T_PGCopyStmt:
return BindCopy(reinterpret_cast<duckdb_libpgquery::PGCopyStmt *>(stmt));
case duckdb_libpgquery::T_PGSelectStmt:
return BindSelect(reinterpret_cast<duckdb_libpgquery::PGSelectStmt *>(stmt));
case duckdb_libpgquery::T_PGExplainStmt:
Expand Down
1 change: 1 addition & 0 deletions src/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_library(
bustub_execution
OBJECT
aggregation_executor.cpp
copy_from_executor.cpp
delete_executor.cpp
executor_factory.cpp
filter_executor.cpp
Expand Down
43 changes: 43 additions & 0 deletions src/execution/copy_from_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

#include "execution/executors/copy_from_executor.h"
#include <csv2/reader.hpp>
#include "csv2/parameters.hpp"

namespace bustub {

CopyFromExecutor::CopyFromExecutor(ExecutorContext *exec_ctx, const CopyFromPlanNode *plan)
: AbstractExecutor(exec_ctx), plan_(plan) {
table_info_ = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid());
// if (plan_->file_type_ == 1) {
// file_ = std::make_shared<CSVFileFormat>(plan_->file_type_, ',', true);
// } else {
// file_ = nullptr;
// }
}

void CopyFromExecutor::Init() {}

auto CopyFromExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool {
if (read_finished_) {
return false;
}
using csv2::delimiter;
using csv2::first_row_is_header;
using csv2::quote_character;
csv2::Reader<delimiter<','>, quote_character<'"'>, first_row_is_header<true>, csv2::trim_policy::trim_whitespace> csv;
;
if (csv.mmap(plan_->file_path_)) {
// row insert to csv
for (const auto row : csv) {
// val to column
for (const auto cell : row) {
std::string val;
cell.read_value(val);
}
}
}
read_finished_ = true;
return true;
}

} // namespace bustub
7 changes: 7 additions & 0 deletions src/execution/executor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "execution/executors/abstract_executor.h"
#include "execution/executors/aggregation_executor.h"
#include "execution/executors/copy_from_executor.h"
#include "execution/executors/delete_executor.h"
#include "execution/executors/filter_executor.h"
#include "execution/executors/hash_join_executor.h"
Expand All @@ -34,6 +35,7 @@
#include "execution/executors/topn_executor.h"
#include "execution/executors/update_executor.h"
#include "execution/executors/values_executor.h"
#include "execution/plans/copy_from_plan.h"
#include "execution/plans/filter_plan.h"
#include "execution/plans/mock_scan_plan.h"
#include "execution/plans/projection_plan.h"
Expand All @@ -58,6 +60,11 @@ auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPl
return std::make_unique<IndexScanExecutor>(exec_ctx, dynamic_cast<const IndexScanPlanNode *>(plan.get()));
}

// Create a new copyfrom executor
case PlanType::CopyFrom: {
return std::make_unique<CopyFromExecutor>(exec_ctx, dynamic_cast<const CopyFromPlanNode *>(plan.get()));
}

// Create a new insert executor
case PlanType::Insert: {
auto insert_plan = dynamic_cast<const InsertPlanNode *>(plan.get());
Expand Down
7 changes: 7 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <string>

#include "binder/simplified_token.h"
#include "binder/statement/copy_statement.h"
#include "binder/statement/select_statement.h"
#include "binder/statement/set_show_statement.h"
#include "binder/tokens.h"
Expand Down Expand Up @@ -184,6 +185,12 @@ class Binder {

auto BindInsert(duckdb_libpgquery::PGInsertStmt *pg_stmt) -> std::unique_ptr<InsertStatement>;

auto BindCopy(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement>;

auto BindCopyFrom(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement>;

auto BindCopyTo(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr<CopyStatement>;

auto BindValuesList(duckdb_libpgquery::PGList *list) -> std::unique_ptr<BoundExpressionListRef>;

auto BindLimitCount(duckdb_libpgquery::PGNode *root) -> std::unique_ptr<BoundExpression>;
Expand Down
41 changes: 41 additions & 0 deletions src/include/binder/statement/copy_statement.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <memory>
#include <string>

#include "binder/bound_expression.h"
#include "binder/bound_statement.h"
#include "binder/expressions/bound_column_ref.h"
#include "binder/statement/insert_statement.h"
#include "binder/table_ref/bound_base_table_ref.h"
#include "catalog/column.h"

namespace bustub {

enum class CopyFileFormat : uint8_t {
NONE = 0,
CSV = 1,
TBL = 2,
};

class CopyStatement : public BoundStatement {
public:
explicit CopyStatement(std::unique_ptr<BoundBaseTableRef> table, std::vector<std::unique_ptr<BoundColumnRef>> columns,
std::string file_path, bool is_from);

void SetCSVFormat() { format_ = CopyFileFormat::CSV; }

std::unique_ptr<BoundBaseTableRef> table_;

std::vector<std::unique_ptr<BoundColumnRef>> columns_;

std::string file_path_;

bool is_from_;

CopyFileFormat format_{CopyFileFormat::NONE};

auto ToString() const -> std::string override;
};

} // namespace bustub
4 changes: 4 additions & 0 deletions src/include/common/enums/statement_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class StatementType : uint8_t {
INSERT_STATEMENT, // insert statement type
UPDATE_STATEMENT, // update statement type
CREATE_STATEMENT, // create statement type
COPY_STATEMENT, // copy statement type
DELETE_STATEMENT, // delete statement type
EXPLAIN_STATEMENT, // explain statement type
DROP_STATEMENT, // drop statement type
Expand All @@ -51,6 +52,9 @@ struct fmt::formatter<bustub::StatementType> : formatter<string_view> {
case bustub::StatementType::INSERT_STATEMENT:
name = "Insert";
break;
case bustub::StatementType::COPY_STATEMENT:
name = "Copy";
break;
case bustub::StatementType::UPDATE_STATEMENT:
name = "Update";
break;
Expand Down
54 changes: 54 additions & 0 deletions src/include/execution/executors/copy_from_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "execution/executor_context.h"
#include "execution/executors/abstract_executor.h"
#include "execution/plans/copy_from_plan.h"
#include "storage/table/tuple.h"

namespace bustub {

struct BaseFileFormat {
uint8_t file_type_;
explicit BaseFileFormat(uint8_t file_type) : file_type_(file_type) {}
virtual ~BaseFileFormat() = default;
};

struct CSVFileFormat : BaseFileFormat {
uint8_t delimiter_ = ' ';

bool has_header_;

CSVFileFormat(uint8_t file_type, uint8_t delimiter, bool has_header)
: BaseFileFormat(file_type), delimiter_(delimiter), has_header_(has_header) {}

~CSVFileFormat() override = default;
};

/**
* CopyFromExecutor executes a copy on a table from file.
*/
class CopyFromExecutor : public AbstractExecutor {
public:
CopyFromExecutor(ExecutorContext *exec_ctx, const CopyFromPlanNode *plan);

/** Initialize the copy */
void Init() override;

auto Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool override;

auto GetOutputSchema() const -> const Schema & override { return plan_->OutputSchema(); };

private:
const CopyFromPlanNode *plan_;

const bustub::TableInfo *table_info_;

bool read_finished_{false};
// std::shared_ptr<BaseFileFormat> file_;
};

} // namespace bustub
3 changes: 2 additions & 1 deletion src/include/execution/plans/abstract_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ enum class PlanType {
Sort,
TopN,
MockScan,
InitCheck
InitCheck,
CopyFrom,
};

class AbstractPlanNode;
Expand Down
Loading