Skip to content

GH-37937: [C++][FlightRPC] Investigate using gRPC's generic API using gRPC's BidiReactor#49339

Draft
raulcd wants to merge 7 commits intoapache:mainfrom
raulcd:raulcd-async-flight-poc
Draft

GH-37937: [C++][FlightRPC] Investigate using gRPC's generic API using gRPC's BidiReactor#49339
raulcd wants to merge 7 commits intoapache:mainfrom
raulcd:raulcd-async-flight-poc

Conversation

@raulcd
Copy link
Member

@raulcd raulcd commented Feb 19, 2026

Warning

Do not merge, this is a PoC being discussed at the moment

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

This PR includes breaking changes to public APIs. (If there are any breaking changes to public APIs, please explain which changes are breaking. If not, you can remove this.)

This PR contains a "Critical Fix". (If the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld), please provide explanation. If not, you can remove this.)

@github-actions
Copy link

⚠️ GitHub issue #37937 has been automatically assigned in GitHub to PR creator.

@raulcd
Copy link
Member Author

raulcd commented Feb 19, 2026

Some initial discussion happened on this same branch on a PR my fork, see more details here:
raulcd#97 (comment)

I am moving the PR to here for visibility.

Copy link
Member Author

@raulcd raulcd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm @pitrou I've spent some time today learning more about our FlightDataSerialize method and how we could expose an API that leaves gRPC out of the picture using a BufferVector instead. This is only for the write path so far but I wanted to share early to validate this is in-line with what we had been discussing.

@github-actions github-actions bot added awaiting changes Awaiting changes awaiting change review Awaiting change review and removed awaiting committer review Awaiting committer review awaiting changes Awaiting changes labels Feb 19, 2026
raulcd added 5 commits March 13, 2026 10:07
…ges. Build new PayloadData::SerializeToBuffers method to retrieve list of arrow::buffers from a PayloadData. This function internally is a copy on what we had on FlightDataSerialize but using arrow buffers instead of grpc::ByteBuffers. Move the logic to a single place and reuse on FlightDataSerialize
…entation that consumes arrow::buffers and triggers calls to a user built listener once RecordBatch has been read
@raulcd raulcd force-pushed the raulcd-async-flight-poc branch from 2deb7c8 to f88832c Compare March 13, 2026 09:10
@raulcd
Copy link
Member Author

raulcd commented Mar 13, 2026

@lidavidm @pitrou I've moved the serialization / Deserialization logic from arrow/flight/transport/grpc/serialization_internal.cc to arrow/flight/serialization_internal.cc.
Those are now serializing a FlightPayload to arrow::BufferVector or deserializing from an arrow::Buffer to FlightData. Those are still supposed to be internal:

/// \brief Serialize a FlightPayload to a vector of buffers.
///
/// The first buffer contains the protobuf wire format header. Subsequent
/// buffers are zero-copy references to the IPC body buffers, with padding
/// buffers inserted as needed for 8-byte alignment.
arrow::Result<arrow::BufferVector> SerializePayloadToBuffers(
    const FlightPayload& payload);

/// \brief Deserialize FlightData from a contiguous buffer.
arrow::Result<FlightData> DeserializeFlightData(
    const std::shared_ptr<arrow::Buffer>& buffer);

The external API to be used is either FlightPayload::SerializeToBuffers() or a new FlightMessageDecoder::Consume(std::shared_ptr<Buffer> buffer).
The gRPC transport side serialization/deserialization is still handled on arrow/flight/transport/grpc/serialization_internal.cc but those are just tiny wrappers for arrow/flight/serialization_internal.cc only managing the grpc::Slices / ByteBuffer on top of it.

I've adapted the gRPC Async PoC to show how this would work with the bidi reactor.

The only point where I am not entirely clear is whether utilities specific to gRPC should be exposed. This matters for users building their own bidi reactors (see the async grpc example on the PR). If our stand-point is that users
should manage gRPC themselves then those users need a way to convert between arrow::Buffer and grpc::ByteBuffer/Slice which isn't trivial.

/// Convert an Arrow Buffer to a gRPC Slice.
arrow::Result<::grpc::Slice> SliceFromBuffer(const std::shared_ptr<arrow::Buffer>& buf);

/// Wrap a gRPC ByteBuffer as a zero-copy Arrow Buffer (and clear the ByteBuffer).
arrow::Status WrapGrpcBuffer(::grpc::ByteBuffer* grpc_buf,
                             std::shared_ptr<arrow::Buffer>* out);

I'd like to move this forward, maybe an initial PR only moving the serialization / deserialization logic to handle arrow::Buffer/BufferVector instead of grpc::ByteBuffers/Slice is the best approach?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant