Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Add Apache Arrow Flight Connector #24504

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ follow these steps:
* For development, use `make debug` to build a non-optimized debug version.
* Use `make unittest` to build and run tests.

#### Arrow Flight Connector
To enable Arrow Flight connector support, add to the extra cmake flags:
`EXTRA_CMAKE_FLAGS = -DPRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR=ON`

The Arrow Flight connector requires the Arrow Flight library. You can install this dependency
by running the following script from the `presto/presto-native-execution` directory:

`./scripts/setup-adapters.sh arrow_flight`

### Makefile Targets
A reminder of the available Makefile targets can be obtained using `make help`
```
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(thrift)
add_subdirectory(connectors)

add_library(
presto_server_lib
Expand All @@ -29,7 +30,6 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
SessionProperties.cpp
TaskManager.cpp
TaskResource.cpp
Expand All @@ -48,6 +48,7 @@ target_link_libraries(
presto_common
presto_exception
presto_function_metadata
presto_connector
presto_http
presto_operators
presto_velox_conversion
Expand Down
54 changes: 5 additions & 49 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/connectors/Registration.h"
#include "presto_cpp/main/connectors/SystemConnector.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand All @@ -48,13 +49,11 @@
#include "velox/common/memory/MmapAllocator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
#include "velox/dwio/orc/reader/OrcReader.h"
Expand Down Expand Up @@ -88,7 +87,6 @@ constexpr char const* kHttps = "https";
constexpr char const* kTaskUriFormat =
"{}://{}:{}"; // protocol, address and port
constexpr char const* kConnectorName = "connector.name";
constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";

protocol::NodeState convertNodeState(presto::NodeState nodeState) {
switch (nodeState) {
Expand Down Expand Up @@ -254,33 +252,9 @@ void PrestoServer::run() {
registerMemoryArbitrators();
registerShuffleInterfaceFactories();
registerCustomOperators();
registerConnectorFactories();

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
"iceberg"));

registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive"));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive-hadoop2"));
registerPrestoToVeloxConnector(
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

// Register Presto connector factories and connectors
presto::registerConnectors();

velox::exec::OutputBufferManager::initialize({});
initializeVeloxMemory();
Expand Down Expand Up @@ -1165,24 +1139,6 @@ PrestoServer::getAdditionalHttpServerFilters() {
return filters;
}

void PrestoServer::registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
}

std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";
Expand Down
2 changes: 0 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ class PrestoServer {

virtual void unregisterFileReadersAndWriters();

virtual void registerConnectorFactories();

/// Invoked by presto shutdown procedure to unregister connectors.
virtual void unregisterConnectors();

Expand Down
21 changes: 13 additions & 8 deletions presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ void updateFromSystemConfigs(
}

std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
toConnectorConfigs(const protocol::SessionRepresentation& session) {
toConnectorConfigs(const protocol::TaskUpdateRequest& taskUpdateRequest) {
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
connectorConfigs;
for (const auto& entry : session.catalogProperties) {
connectorConfigs.insert(
{entry.first,
std::unordered_map<std::string, std::string>(
entry.second.begin(), entry.second.end())});
for (const auto& entry : taskUpdateRequest.session.catalogProperties) {
auto sessionProperties = std::unordered_map<std::string, std::string>(
entry.second.begin(), entry.second.end());
sessionProperties.insert(
taskUpdateRequest.extraCredentials.begin(),
taskUpdateRequest.extraCredentials.end());
sessionProperties.insert({"user", taskUpdateRequest.session.user});
connectorConfigs.insert({entry.first, sessionProperties});
}

return connectorConfigs;
Expand Down Expand Up @@ -106,9 +109,11 @@ QueryContextManager::QueryContextManager(
std::shared_ptr<velox::core::QueryCtx>
QueryContextManager::findOrCreateQueryCtx(
const protocol::TaskId& taskId,
const protocol::SessionRepresentation& session) {
const protocol::TaskUpdateRequest& taskUpdateRequest) {
return findOrCreateQueryCtx(
taskId, toVeloxConfigs(session), toConnectorConfigs(session));
taskId,
toVeloxConfigs(taskUpdateRequest.session),
toConnectorConfigs(taskUpdateRequest));
}

std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class QueryContextManager {

std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
const protocol::TaskId& taskId,
const protocol::SessionRepresentation& session);
const protocol::TaskUpdateRequest& taskUpdateRequest);

/// Calls the given functor for every present query context.
void visitAllContexts(std::function<void(
Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(

auto queryCtx =
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
taskId, updateRequest.session);
taskId, updateRequest);

VeloxBatchQueryPlanConverter converter(
shuffleName,
Expand Down Expand Up @@ -340,7 +340,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(

queryCtx =
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
taskId, updateRequest.session);
taskId, updateRequest);

VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_);
planFragment = converter.toVeloxQueryPlan(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(presto_connector Registration.cpp PrestoToVeloxConnector.cpp
SystemConnector.cpp)

if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
add_subdirectory(arrow_flight)
target_link_libraries(presto_connector presto_flight_connector)
endif()

target_link_libraries(presto_connector presto_types)
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
* limitations under the License.
*/

#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/TypeParser.h"
#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h"

#include <velox/type/fbhive/HiveTypeParser.h>
#include <velox/velox/type/fbhive/HiveTypeParser.h>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
#pragma once

#include "PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/TypeParser.h"
#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h"
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
#include "velox/connectors/Connector.h"
Expand All @@ -25,6 +23,8 @@
namespace facebook::presto {

class PrestoToVeloxConnector;
class TypeParser;
class VeloxExprConverter;

void registerPrestoToVeloxConnector(
std::unique_ptr<const PrestoToVeloxConnector> connector);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/Registration.h"
#include "presto_cpp/main/connectors/SystemConnector.h"

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConnector.h"
#include "presto_cpp/main/connectors/arrow_flight/ArrowPrestoToVeloxConnector.h"
#endif

#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/tpch/TpchConnector.h"

namespace facebook::presto {
namespace {

constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";
constexpr char const* kIcebergConnectorName = "iceberg";

void registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
if (!velox::connector::hasConnectorFactory(kIcebergConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kIcebergConnectorName));
}

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
if (!velox::connector::hasConnectorFactory(
ArrowFlightConnectorFactory::kArrowFlightConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<ArrowFlightConnectorFactory>());
}
#endif
}
} // namespace

void registerConnectors() {
registerConnectorFactories();

registerPrestoToVeloxConnector(std::make_unique<HivePrestoToVeloxConnector>(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>(kHiveHadoop2ConnectorName));
registerPrestoToVeloxConnector(
std::make_unique<IcebergPrestoToVeloxConnector>(kIcebergConnectorName));
registerPrestoToVeloxConnector(std::make_unique<TpchPrestoToVeloxConnector>(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName));

// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
registerPrestoToVeloxConnector(std::make_unique<ArrowPrestoToVeloxConnector>(
ArrowFlightConnectorFactory::kArrowFlightConnectorName));
#endif
}
} // namespace facebook::presto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::presto {

void registerConnectors();

} // namespace facebook::presto
Loading
Loading