Skip to content

Commit

Permalink
[native] Add Arrow Flight Connector
Browse files Browse the repository at this point in the history
The native Arrow Flight connector can be used to connect to any Arrow Flight
enabled Data Source. The metadata layer is handled by the Presto coordinator
and does not need to be re-implemented in C++. Any Java connector that inherits
from `presto-base-arrow-flight` can use this connector as it's counterpart for
the Prestissimo layer.

Different Arrow-Flight enabled data sources can differ in authentication styles.
A plugin-style interface is provided to handle such cases with custom
authentication code by extending `arrow_flight::auth::Authenticator`.

RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0004-arrow-flight-connector.md#prestissimo-implementation

Co-authored-by: Ashwin Kumar <[email protected]>
Co-authored-by: Rijin-N <[email protected]>
Co-authored-by: Nischay Yadav <[email protected]>
  • Loading branch information
3 people authored and BryanCutler committed Feb 17, 2025
1 parent 83e9709 commit cc96d0a
Show file tree
Hide file tree
Showing 68 changed files with 3,416 additions and 70 deletions.
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR "Enable Arrow Flight connector" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ follow these steps:
* For development, use `make debug` to build a non-optimized debug version.
* Use `make unittest` to build and run tests.

#### Arrow Flight Connector
To enable Arrow Flight connector support, add to the extra cmake flags:
`EXTRA_CMAKE_FLAGS = -DPRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR=ON`

The Arrow Flight connector requires the Arrow Flight library. You can install this dependency
by running the following script from the `presto/presto-native-execution` directory:

`./scripts/setup-adapters.sh arrow_flight`

### Makefile Targets
A reminder of the available Makefile targets can be obtained using `make help`
```
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(types)
add_subdirectory(http)
add_subdirectory(common)
add_subdirectory(thrift)
add_subdirectory(connectors)

add_library(
presto_server_lib
Expand All @@ -29,7 +30,6 @@ add_library(
QueryContextManager.cpp
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
SessionProperties.cpp
TaskManager.cpp
TaskResource.cpp
Expand All @@ -48,6 +48,7 @@ target_link_libraries(
presto_common
presto_exception
presto_function_metadata
presto_connector
presto_http
presto_operators
presto_velox_conversion
Expand Down
54 changes: 5 additions & 49 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include "presto_cpp/main/PeriodicMemoryChecker.h"
#include "presto_cpp/main/PeriodicTaskManager.h"
#include "presto_cpp/main/SignalHandler.h"
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/connectors/Registration.h"
#include "presto_cpp/main/connectors/SystemConnector.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand All @@ -48,13 +49,11 @@
#include "velox/common/memory/MmapAllocator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#include "velox/connectors/tpch/TpchConnector.h"
#include "velox/dwio/dwrf/RegisterDwrfReader.h"
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
Expand Down Expand Up @@ -87,7 +86,6 @@ constexpr char const* kHttps = "https";
constexpr char const* kTaskUriFormat =
"{}://{}:{}"; // protocol, address and port
constexpr char const* kConnectorName = "connector.name";
constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";

protocol::NodeState convertNodeState(presto::NodeState nodeState) {
switch (nodeState) {
Expand Down Expand Up @@ -253,33 +251,9 @@ void PrestoServer::run() {
registerMemoryArbitrators();
registerShuffleInterfaceFactories();
registerCustomOperators();
registerConnectorFactories();

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
"iceberg"));

registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive"));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>("hive-hadoop2"));
registerPrestoToVeloxConnector(
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

// Register Presto connector factories and connectors
presto::registerConnectors();

velox::exec::OutputBufferManager::initialize({});
initializeVeloxMemory();
Expand Down Expand Up @@ -1164,24 +1138,6 @@ PrestoServer::getAdditionalHttpServerFilters() {
return filters;
}

void PrestoServer::registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
}

std::vector<std::string> PrestoServer::registerConnectors(
const fs::path& configDirectoryPath) {
static const std::string kPropertiesExtension = ".properties";
Expand Down
2 changes: 0 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,6 @@ class PrestoServer {

virtual void unregisterFileReadersAndWriters();

virtual void registerConnectorFactories();

/// Invoked by presto shutdown procedure to unregister connectors.
virtual void unregisterConnectors();

Expand Down
18 changes: 18 additions & 0 deletions presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(presto_connector Registration.cpp PrestoToVeloxConnector.cpp
SystemConnector.cpp)

if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
add_subdirectory(arrow_flight)
target_link_libraries(presto_connector presto_flight_connector)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
* limitations under the License.
*/

#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h"

#include <velox/type/fbhive/HiveTypeParser.h>
#include <velox/velox/type/fbhive/HiveTypeParser.h>
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
#pragma once

#include "PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include "presto_cpp/main/types/TypeParser.h"
#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h"
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/Registration.h"
#include "presto_cpp/main/connectors/SystemConnector.h"

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConnector.h"
#include "presto_cpp/main/connectors/arrow_flight/ArrowPrestoToVeloxConnector.h"
#endif

#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/tpch/TpchConnector.h"

namespace facebook::presto {
namespace {

constexpr char const* kHiveHadoop2ConnectorName = "hive-hadoop2";
constexpr char const* kIcebergConnectorName = "iceberg";

void registerConnectorFactories() {
// These checks for connector factories can be removed after we remove the
// registrations from the Velox library.
if (!velox::connector::hasConnectorFactory(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>());
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kHiveHadoop2ConnectorName));
}
if (!velox::connector::hasConnectorFactory(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}

// Register Velox connector factory for iceberg.
// The iceberg catalog is handled by the hive connector factory.
if (!velox::connector::hasConnectorFactory(kIcebergConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::hive::HiveConnectorFactory>(
kIcebergConnectorName));
}

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
if (!velox::connector::hasConnectorFactory(
ArrowFlightConnectorFactory::kArrowFlightConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<ArrowFlightConnectorFactory>());
}
#endif
}
} // namespace

void registerConnectors() {
registerConnectorFactories();

registerPrestoToVeloxConnector(std::make_unique<HivePrestoToVeloxConnector>(
velox::connector::hive::HiveConnectorFactory::kHiveConnectorName));
registerPrestoToVeloxConnector(
std::make_unique<HivePrestoToVeloxConnector>(kHiveHadoop2ConnectorName));
registerPrestoToVeloxConnector(
std::make_unique<IcebergPrestoToVeloxConnector>(kIcebergConnectorName));
registerPrestoToVeloxConnector(std::make_unique<TpchPrestoToVeloxConnector>(
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName));

// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
// be used in the naming. So the protocol class is mapped
// to all the different prefixes for System tables/columns.
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("system"));
registerPrestoToVeloxConnector(
std::make_unique<SystemPrestoToVeloxConnector>("$system@system"));

#ifdef PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR
registerPrestoToVeloxConnector(std::make_unique<ArrowPrestoToVeloxConnector>(
ArrowFlightConnectorFactory::kArrowFlightConnectorName));
#endif
}
} // namespace facebook::presto
20 changes: 20 additions & 0 deletions presto-native-execution/presto_cpp/main/connectors/Registration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::presto {

void registerConnectors();

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/SystemConnector.h"
#include "presto_cpp/main/connectors/SystemConnector.h"
#include "presto_cpp/main/PrestoTask.h"
#include "presto_cpp/main/TaskManager.h"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
*/
#pragma once

#include "presto_cpp/main/SystemSplit.h"
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
#include "presto_cpp/main/connectors/SystemSplit.h"

#include "velox/connectors/Connector.h"

namespace facebook::presto {

class TaskManager;

class SystemColumnHandle : public velox::connector::ColumnHandle {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/connectors/arrow_flight/ArrowFlightConfig.h"

namespace facebook::presto {

std::string ArrowFlightConfig::authenticatorName() {
return config_->get<std::string>(kAuthenticatorName, "none");
}

std::optional<std::string> ArrowFlightConfig::defaultServerHostname() {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kDefaultServerHost));
}

std::optional<uint16_t> ArrowFlightConfig::defaultServerPort() {
return static_cast<std::optional<uint16_t>>(
config_->get<uint16_t>(kDefaultServerPort));
}

bool ArrowFlightConfig::defaultServerSslEnabled() {
return config_->get<bool>(kDefaultServerSslEnabled, false);
}

bool ArrowFlightConfig::serverVerify() {
return config_->get<bool>(kServerVerify, true);
}

std::optional<std::string> ArrowFlightConfig::serverSslCertificate() {
return static_cast<std::optional<std::string>>(
config_->get<std::string>(kServerSslCertificate));
}

} // namespace facebook::presto
Loading

0 comments on commit cc96d0a

Please sign in to comment.