You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: server/docs/design/bidi-producer-consumers-streaming.md
+39-39Lines changed: 39 additions & 39 deletions
Original file line number
Diff line number
Diff line change
@@ -4,9 +4,9 @@
4
4
5
5
A primary use case of the `hedera-block-node` is to stream live BlockItems (see Terms section) from a producer
6
6
(e.g. Consensus Node) to N consumers (e.g. Mirror Node) with the lowest possible latency while correctly preserving the
7
-
order of the BlockItems. This document outlines several possible strategies to implement this use case and the design
7
+
order of the BlockItems. This document outlines several possible strategies to implement this use case and the design
8
8
of the recommended approach. All strategies rely on the Helidon 4.x.x server implementations of HTTP/2 and gRPC
9
-
services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It
9
+
services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It
10
10
does this by defining bidirectional gRPC streaming services based on protobuf definitions.
11
11
12
12
Helidon provides well-defined APIs and extension points to implement business logic for these services. The main entry
@@ -27,18 +27,18 @@ point for custom logic is an implementation of `GrpcService`.
27
27
### Terms
28
28
29
29
**BlockItem** - The BlockItem is the primary data structure passed between the producer, the `hedera-block-node`
30
-
and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`.
30
+
and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`.
31
31
32
32
**Bidirectional Streaming** - Bidirectional streaming is an [HTTP/2 feature](https://datatracker.ietf.org/doc/html/rfc9113#name-streams-and-multiplexing) allowing both a client and a server to emit
33
-
a continuous stream of frames without waiting for responses. In this way, gRPC services can be used to efficiently
33
+
a continuous stream of frames without waiting for responses. In this way, gRPC services can be used to efficiently
34
34
transmit a continuous flow of BlockItem messages while the HTTP/2 connection is open.
35
35
36
36
**Producer StreamObserver** - The Producer StreamObserver is a custom implementation of the [gRPC StreamObserver
37
-
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Producer
37
+
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Producer
38
38
StreamObserver at runtime when the producer sends a new BlockItem to the `StreamSink` gRPC service.
39
39
40
40
**Consumer StreamObserver** - The Consumer StreamObserver is a custom implementation of the [gRPC StreamObserver
41
-
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Consumer
41
+
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Consumer
42
42
StreamObserver at runtime when the downstream consumer of the `StreamSource` gRPC service sends HTTP/2 responses to
43
43
sent BlockItems.
44
44
@@ -61,45 +61,45 @@ The Block Node gRPC Streaming Services API is now aligned with the names and sim
61
61
## Approaches:
62
62
63
63
All the following approaches require integrating with Helidon 4.x.x gRPC services to implement the bidirectional
64
-
streaming API methods defined above. The following objects are used in all approaches:
64
+
streaming API methods defined above. The following objects are used in all approaches:
65
65
66
66
`BlockItemStreamService` is a custom implementation of the Helidon gRPC `GrpcService`. It is responsible for binding
67
67
the Helidon routing mechanism to the gRPC streaming methods called by producers and consumers.
68
68
69
69
`ProducerBlockItemObserver` is a custom implementation of the Helidon gRPC `StreamObserver` interface.
70
70
`BlockItemStreamService` instantiates a new `ProducerBlockItemObserver` instance when the `StreamSink` gRPC method is
71
-
called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem
71
+
called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem
72
72
from the producer and return BlockItemResponses via a bidirectional stream.
73
73
74
74
`ConsumerBlockItemObserver` is also a custom implementation of the Helidon gRPC `StreamObserver` interface.
75
75
`BlockItemStreamService` instantiates a new `ConsumerBlockItemObserver` instance when the `StreamSource` gRPC method
76
-
is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon
77
-
when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest
78
-
BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses
76
+
is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon
77
+
when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest
78
+
BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses
79
79
from the consumer in receipt of BlockItems.
80
80
81
81
82
82
### Approach 1: Directly passing BlockItems from `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s.
83
83
84
84
Directly passing BlockItems from the `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s without storing
85
85
BlockItems in an intermediate data structure. This approach was the basis for one of the first implementations of gRPC
86
-
Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems:
86
+
Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems:
87
87
88
88
Drawbacks:
89
89
1) Each `ProducerBlockItemObserver` must iterate over the list of subscribed consumers to pass the BlockItem to each
90
90
`ConsumerBlockItemObserver` before saving the BlockItem to disk and issuing a BlockItemResponse back to the producer.
91
91
The linear scaling of consumers will aggregate latency resulting in the last consumer in the list to be penalized
92
92
with the sum of the latencies of all consumers before it.
93
93
2) Dynamically subscribing/unsubscribing `ConsumerBlockItemObserver`s while deterministically broadcasting BlockItems
94
-
to each consumer in the correct order complicates and slows down the process. It requires thread-safe data
94
+
to each consumer in the correct order complicates and slows down the process. It requires thread-safe data
95
95
structures and synchronization on all reads and writes to ensure new/removed subscribers do not disrupt the
96
96
iteration order of the `ConsumerBlockItemObserver`s.
97
97
98
-
### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems.
98
+
### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems.
99
99
100
100
Alternatively, if `ProducerBlockItemObserver`s store BlockItems in a shared data structure before immediately returning
101
101
a response to the producer, the BlockItem is then immediately available for all `ConsumerBlockItemObserver`s to read
102
-
asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the
102
+
asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the
103
103
following consequences:
104
104
105
105
Advantages:
@@ -113,22 +113,22 @@ Drawbacks:
113
113
up or down.
114
114
3) While prototyping this approach, it appeared that `ConsumerBlockItemObserver`s using a busy-wait to watch for new
115
115
BlockItems impaired the ability of the Helidon Virtual Thread instance to process the inbound responses from the
116
-
downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases
116
+
downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases
### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems.
120
+
### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems.
121
121
122
122
With this approach, the `ProducerBlockItemObserver` will store BlockItems in a shared data structure before immediately
123
-
returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems,
123
+
returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems,
124
124
`ConsumerBlockItemObserver`s will send new BlockItems only upon receipt of BlockItemResponses from previously sent
125
-
BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an
125
+
BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an
126
126
internal counter) will calculate and send all newest BlockItems available from the shared data structure to the
127
-
downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems.
127
+
downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems.
128
128
129
129
Advantages:
130
130
1) It will not consume CPU resources polling.
131
-
2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the
131
+
2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the
132
132
consumer to trigger sending the newest BlockItems downstream.
133
133
3) The shared data structure will need to be concurrent but, after the initial write operation, all subsequent reads
134
134
should not require synchronization.
@@ -141,10 +141,10 @@ Drawbacks:
141
141
BlockItemResponses. Given, the latency of a network request/response round-trip, this approach will likely be far
142
142
too slow to be considered effective even when sending a batch of all the latest BlockItems.
143
143
144
-
### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer.
144
+
### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer.
145
145
146
146
The LMAX Disruptor library is a high-performance inter-process pub/sub message passing library that could be used to
147
-
efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor
147
+
efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor
148
148
library is designed to minimize latency as well as CPU cycles to by not blocking while maintaining concurrency
149
149
guarantees.
150
150
@@ -161,64 +161,64 @@ Drawbacks:
161
161
effectively.
162
162
2) Leveraging the Disruptor library requires the communication between the `ProducerBlockItemObserver` and
163
163
`ConsumerBlockItemObserver`s to be affiliated by subscribing/unsubscribing the downstream consumers to receive the
164
-
latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to
164
+
latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to
165
165
the RingBuffer can be complex.
166
166
167
167
---
168
168
169
169
## Design
170
170
171
171
Given the goals and the proposed approaches, Approach #4 has significant advantages and fewer significant drawbacks.
172
-
Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer
172
+
Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer
173
173
intermediate data structure should serve to decouple the producer bidirectional stream from the consumer bidirectional
174
-
streams. Please see the following Entities section and Diagrams for a visual representation of the design.
174
+
streams. Please see the following Entities section and Diagrams for a visual representation of the design.
175
175
176
176
### Producer Registration Flow
177
177
178
178
At boot time, the `BlockItemStreamService` will initialize the `StreamMediator` with the LMAX Disruptor RingBuffer.
179
179
180
180
When a producer calls the `StreamSink` gRPC method, the `BlockItemStreamService` will create a new
181
181
`ProducerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the
182
-
upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
182
+
upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
183
183
the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the producer.
184
184
See the Producer Registration Flow diagram for more details.
185
185
186
186
### Consumer Registration Flow
187
187
188
188
When a consumer calls the `StreamSource` gRPC method, the `BlockItemStreamService` will create a new
189
189
`ConsumerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the
190
-
downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
191
-
the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The
190
+
downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
191
+
the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The
192
192
`BlockItemStreamService` will also subscribe the `ConsumerBlockItemObserver` to the `StreamMediator` to receive the
193
193
streaming BlockItems from the producer.
194
194
195
195
196
196
### Runtime Streaming
197
197
198
198
At runtime, the `ProducerBlockItemObserver` will receive the latest BlockItem from the producer via Helidon and will
199
-
invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The
199
+
invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The
200
200
`ProducerBlockItemObserver` will then persist the BlockItem and return a BlockItemResponse to the producer via
201
201
its reference to `ResponseStreamObserver`.
202
202
203
203
Asynchronously, the RingBuffer will invoke the onEvent(BlockItem) method of all the subscribed
204
-
`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit
205
-
the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will
206
-
respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the
204
+
`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit
205
+
the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will
206
+
respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the
207
207
BlockItemResponse.
208
208
209
209
BlockItems sent to the `ConsumerBlockItemObserver` via the RingBuffer and BlockItemResponses passed by Helidon from
210
-
the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a
210
+
the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a
211
211
configurable timeout threshold is exceeded, the `ConsumerBlockItemObserver` will unsubscribe itself from the
212
-
`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA
213
-
frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to
214
-
`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not
212
+
`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA
213
+
frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to
214
+
`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not
215
215
permanently subscribed to the `StreamMediator`.
216
216
217
217
### Entities
218
218
219
219
**BlockItemStreamService** - The BlockItemStreamService is a custom implementation of the Helidon gRPC GrpcService.
220
220
It is responsible for initializing the StreamMediator and instantiating ProducerBlockItemObserver and
221
-
ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is
221
+
ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is
222
222
the primary binding between the Helidon routing mechanisms and the `hedera-block-node` custom business logic.
223
223
224
224
**StreamObserver** - StreamObserver is the main interface through which Helidon 4.x.x invokes custom business logic
@@ -238,7 +238,7 @@ the producer and consumers.
238
238
The RingBuffer is a fixed-sized array of ConsumerBlockItemObservers that is managed by the Disruptor library.
239
239
240
240
**EventHandler** - The EventHandler is an integration interface provided by the Disruptor library as a mechanism to
241
-
invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing
241
+
invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing
242
242
the latest BlockItem to the ConsumerBlockItemObserver when it is available in the RingBuffer.
243
243
244
244
**ConsumerBlockItemObserver** - A custom implementation of StreamObserver called by Helidon which is responsible for:
0 commit comments