Skip to content

Commit f9e024d

Browse files
committed
add http service, plan parsing, catalog serialization
Signed-off-by: Alex Chi <[email protected]>
1 parent c5dfacc commit f9e024d

File tree

7 files changed

+273
-0
lines changed

7 files changed

+273
-0
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ set(BUSTUB_THIRD_PARTY_INCLUDE_DIR
139139
${PROJECT_SOURCE_DIR}/third_party/argparse/include
140140
${PROJECT_SOURCE_DIR}/third_party/cpp_random_distributions
141141
${PROJECT_SOURCE_DIR}/third_party/backward-cpp/include
142+
${PROJECT_SOURCE_DIR}/third_party/json/include
142143
)
143144

144145
include_directories(${BUSTUB_SRC_INCLUDE_DIR} ${BUSTUB_TEST_INCLUDE_DIR} ${BUSTUB_THIRD_PARTY_INCLUDE_DIR})

src/common/bustub_instance.cpp

+44
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,50 @@ auto BustubInstance::ExecuteSql(const std::string &sql, ResultWriter &writer,
251251
}
252252
}
253253

254+
auto BustubInstance::ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool {
255+
auto txn = txn_manager_->Begin();
256+
try {
257+
auto result = ExecutePlanTxn(plan, txn, writer);
258+
txn_manager_->Commit(txn);
259+
delete txn;
260+
return result;
261+
} catch (bustub::Exception &ex) {
262+
txn_manager_->Abort(txn);
263+
delete txn;
264+
throw ex;
265+
}
266+
}
267+
268+
auto BustubInstance::ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool {
269+
// Execute the query.
270+
auto exec_ctx = MakeExecutorContext(txn, false);
271+
std::vector<Tuple> result_set{};
272+
auto is_successful = execution_engine_->Execute(plan, &result_set, txn, exec_ctx.get());
273+
274+
// Return the result set as a vector of string.
275+
auto schema = plan->OutputSchema();
276+
277+
// Generate header for the result set.
278+
writer.BeginTable(false);
279+
writer.BeginHeader();
280+
for (const auto &column : schema.GetColumns()) {
281+
writer.WriteHeaderCell(column.GetName());
282+
}
283+
writer.EndHeader();
284+
285+
// Transforming result set into strings.
286+
for (const auto &tuple : result_set) {
287+
writer.BeginRow();
288+
for (uint32_t i = 0; i < schema.GetColumnCount(); i++) {
289+
writer.WriteCell(tuple.GetValue(&schema, i).ToString());
290+
}
291+
writer.EndRow();
292+
}
293+
writer.EndTable();
294+
295+
return is_successful;
296+
}
297+
254298
auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,
255299
std::shared_ptr<CheckOptions> check_options) -> bool {
256300
if (!sql.empty() && sql[0] == '\\') {

src/include/common/bustub_instance.h

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/config.h"
2727
#include "common/util/string_util.h"
2828
#include "execution/check_options.h"
29+
#include "execution/plans/abstract_plan.h"
2930
#include "libfort/lib/fort.hpp"
3031
#include "type/value.h"
3132

@@ -265,6 +266,9 @@ class BustubInstance {
265266
/** Get the current transaction. */
266267
auto CurrentManagedTxn() -> Transaction *;
267268

269+
auto ExecutePlan(const AbstractPlanNodeRef &plan, ResultWriter &writer) -> bool;
270+
auto ExecutePlanTxn(const AbstractPlanNodeRef &plan, Transaction *txn, ResultWriter &writer) -> bool;
271+
268272
/**
269273
* FOR TEST ONLY. Generate test tables in this BusTub instance.
270274
* It's used in the shell to predefine some tables, as we don't support

third_party/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@ add_subdirectory(utf8proc)
1717
add_subdirectory(backward-cpp)
1818

1919
add_subdirectory(readerwriterqueue)
20+
21+
add_subdirectory(json)
22+
23+
add_subdirectory(cpp-httplib)

tools/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ add_subdirectory(terrier_bench)
88
add_subdirectory(bpm_bench)
99
add_subdirectory(btree_bench)
1010
add_subdirectory(htable_bench)
11+
add_subdirectory(http-server)
1112

1213
add_backward(shell)
1314
add_backward(nc-shell)

tools/http-server/CMakeLists.txt

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
set(HTTP_SERVER_SOURCES http-server.cpp)
2+
add_executable(http-server ${HTTP_SERVER_SOURCES})
3+
4+
target_link_libraries(http-server bustub)
5+
set_target_properties(http-server PROPERTIES OUTPUT_NAME bustub-http-server)

tools/http-server/http-server.cpp

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#include <iostream>
2+
#include <memory>
3+
#include <nlohmann/json.hpp>
4+
#include <sstream>
5+
#include <string>
6+
#include <vector>
7+
#include "binder/table_ref/bound_join_ref.h"
8+
#include "catalog/column.h"
9+
#include "catalog/schema.h"
10+
#include "common/bustub_instance.h"
11+
#include "common/exception.h"
12+
#include "cpp-httplib/httplib.h"
13+
#include "execution/expressions/abstract_expression.h"
14+
#include "execution/expressions/column_value_expression.h"
15+
#include "execution/expressions/comparison_expression.h"
16+
#include "execution/plans/abstract_plan.h"
17+
#include "execution/plans/nested_loop_join_plan.h"
18+
#include "execution/plans/projection_plan.h"
19+
#include "execution/plans/seq_scan_plan.h"
20+
#include "type/type.h"
21+
#include "type/type_id.h"
22+
23+
using json = nlohmann::json;
24+
25+
/*
26+
auto TransformType(const std::string &ft) -> bustub::TypeId {
27+
if (ft == "INTEGER") {
28+
return bustub::TypeId::INTEGER;
29+
}
30+
if (ft == "BOOLEAN") {
31+
return bustub::TypeId::BOOLEAN;
32+
}
33+
throw bustub::Exception("unsupported field type");
34+
}
35+
36+
auto TransformExpr(json expr, const std::vector<bustub::AbstractPlanNodeRef> &children)
37+
-> bustub::AbstractExpressionRef {
38+
bustub::TypeId out_type = TransformType(expr["outType"]);
39+
if (expr.contains("op")) {
40+
json op = expr["op"];
41+
std::string name = op["name"];
42+
std::vector<json> operands_json = expr["operands"];
43+
std::vector<bustub::AbstractExpressionRef> operands;
44+
operands.reserve(operands_json.size());
45+
for (const auto &operand_json : operands_json) {
46+
operands.emplace_back(TransformExpr(operand_json, children));
47+
}
48+
if (name == "=") {
49+
return std::make_shared<bustub::ComparisonExpression>(operands[0], operands[1], bustub::ComparisonType::Equal);
50+
}
51+
throw bustub::Exception("unsupported op");
52+
}
53+
54+
// column value expression
55+
std::string name = expr["name"];
56+
size_t input = expr["input"];
57+
size_t i = 0;
58+
for (; i < children.size(); i++) {
59+
size_t cnt = children[i]->OutputSchema().GetColumnCount();
60+
if (input < cnt) {
61+
break;
62+
}
63+
input -= cnt;
64+
}
65+
return std::make_shared<bustub::ColumnValueExpression>(i, input, out_type);
66+
}
67+
68+
auto TransformRootRel(bustub::BustubInstance &bustub, const std::map<std::string, json> &rels, json root_rel)
69+
-> bustub::AbstractPlanNodeRef {
70+
std::string rel_op = root_rel["relOp"];
71+
std::vector<std::string> inputs = root_rel["inputs"];
72+
std::vector<bustub::AbstractPlanNodeRef> input_plan_nodes;
73+
for (const auto &input : inputs) {
74+
auto input_rel = rels.find(input)->second;
75+
auto input_plan_node = TransformRootRel(bustub, rels, input_rel);
76+
input_plan_nodes.emplace_back(std::move(input_plan_node));
77+
}
78+
std::vector<std::string> fields = root_rel["fields"];
79+
std::vector<std::string> field_types = root_rel["fieldTypes"];
80+
std::vector<bustub::Column> columns;
81+
for (size_t i = 0; i < fields.size(); i++) {
82+
auto ft = field_types[i];
83+
columns.emplace_back(fields[i], TransformType(ft));
84+
}
85+
bustub::SchemaRef schema = std::make_shared<bustub::Schema>(columns);
86+
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableJoin") {
87+
std::string join_type = root_rel["joinType"];
88+
bustub::JoinType jt;
89+
if (join_type == "inner") {
90+
jt = bustub::JoinType::INNER;
91+
} else {
92+
throw bustub::Exception("unsupported join type");
93+
}
94+
auto predicate = TransformExpr(root_rel["condition"], input_plan_nodes);
95+
return std::make_shared<bustub::NestedLoopJoinPlanNode>(std::move(schema), input_plan_nodes[0], input_plan_nodes[1],
96+
predicate, jt);
97+
}
98+
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableTableScan") {
99+
std::string table_name = root_rel["table"][0];
100+
auto table_info = bustub.catalog_->GetTable(table_name);
101+
return std::make_shared<bustub::SeqScanPlanNode>(std::move(schema), table_info->oid_, table_name);
102+
}
103+
if (rel_op == "org.apache.calcite.interpreter.Bindables$BindableProject") {
104+
std::vector<bustub::AbstractExpressionRef> exprs;
105+
std::vector<json> exprs_json = root_rel["exprs"];
106+
exprs.reserve(exprs_json.size());
107+
for (const auto &expr_json : exprs_json) {
108+
exprs.emplace_back(TransformExpr(expr_json, input_plan_nodes));
109+
}
110+
return std::make_shared<bustub::ProjectionPlanNode>(std::move(schema), exprs, input_plan_nodes[0]);
111+
}
112+
throw bustub::Exception("unsupported rel type");
113+
}
114+
*/
115+
116+
auto BuildPlanNodeFromJson(bustub::BustubInstance &bustub, json plan) -> bustub::AbstractPlanNodeRef {
117+
std::map<std::string, json> rels;
118+
std::vector<json> json_rels = plan["rels"];
119+
for (const auto &rel : json_rels) {
120+
rels[rel["id"]] = rel;
121+
}
122+
json root_rel = *json_rels.rbegin();
123+
// return TransformRootRel(bustub, rels, root_rel);
124+
return nullptr;
125+
}
126+
127+
// NOLINTNEXTLINE
128+
auto main(int argc, char **argv) -> int {
129+
auto bustub = std::make_unique<bustub::BustubInstance>();
130+
131+
// HTTP
132+
httplib::Server svr;
133+
134+
svr.set_exception_handler([](const auto &req, auto &res, std::exception_ptr ep) {
135+
std::string exception;
136+
try {
137+
std::rethrow_exception(ep);
138+
} catch (bustub::Exception &e) {
139+
exception = e.what();
140+
} catch (std::exception &e) {
141+
exception = e.what();
142+
} catch (...) { // See the following NOTE
143+
exception = "unknown exception";
144+
}
145+
res.set_content(exception, "text/plain");
146+
res.status = 500;
147+
});
148+
149+
svr.Post("/sql", [&bustub](const httplib::Request &req, httplib::Response &res) {
150+
std::stringstream ss;
151+
bustub::SimpleStreamWriter writer(ss, false);
152+
json data = json::parse(req.body);
153+
std::cerr << "SQL request: " << data["sql"] << std::endl;
154+
bustub->ExecuteSql(data["sql"], writer);
155+
res.set_content(ss.str(), "text/plain");
156+
});
157+
158+
svr.Post("/plan", [&bustub](const httplib::Request &req, httplib::Response &res) {
159+
std::stringstream ss;
160+
bustub::SimpleStreamWriter writer(ss, false);
161+
std::cerr << "Plan request:";
162+
json json_plan = json::parse(req.body);
163+
std::cerr << json_plan << std::endl;
164+
auto plan = BuildPlanNodeFromJson(*bustub, json_plan);
165+
if (plan == nullptr) {
166+
std::cerr << "unsupported plan" << std::endl;
167+
res.set_content("unsupported plan", "text/plain");
168+
res.status = 500;
169+
} else {
170+
std::cerr << "BusTub plan:" << std::endl << plan->ToString(true) << std::endl;
171+
bustub->ExecutePlan(plan, writer);
172+
res.set_content(ss.str(), "text/plain");
173+
}
174+
});
175+
176+
svr.Get("/catalog", [&bustub](const httplib::Request &req, httplib::Response &res) {
177+
std::cerr << "Catalog request" << std::endl;
178+
auto tables = bustub->catalog_->GetTableNames();
179+
std::vector<json> catalog;
180+
for (const auto &tbl_name : tables) {
181+
auto table = bustub->catalog_->GetTable(tbl_name);
182+
std::vector<json> schema;
183+
for (size_t c = 0; c < table->schema_.GetColumnCount(); c++) {
184+
auto col = table->schema_.GetColumn(c);
185+
switch (col.GetType()) {
186+
case bustub::TypeId::BIGINT: {
187+
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "bigint"}});
188+
break;
189+
}
190+
case bustub::TypeId::INTEGER: {
191+
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "integer"}});
192+
break;
193+
}
194+
case bustub::TypeId::VARCHAR: {
195+
schema.emplace_back(std::map<std::string, std::string>{{"name", col.GetName()}, {"type", "varchar"}});
196+
break;
197+
}
198+
default:
199+
throw bustub::Exception("unsupported column type");
200+
}
201+
}
202+
catalog.emplace_back(std::map<std::string, json>{{std::string("name"), json(table->name_)},
203+
{std::string("oid"), json(table->oid_)},
204+
{std::string("schema"), json(schema)}});
205+
}
206+
res.set_content(json(std::map<std::string, json>{{"catalog", catalog}}).dump(), "text/plain");
207+
});
208+
209+
std::cerr << "BusTub HTTP server listening" << std::endl;
210+
211+
svr.listen("127.0.0.1", 23333);
212+
213+
return 0;
214+
}

0 commit comments

Comments
 (0)