Skip to content

Commit 6da2796

Browse files
yhl25vigith
andauthored
doc: reduce streaming (numaproj#1689)
Signed-off-by: Yashash H L <[email protected]> Signed-off-by: Vigith Maurice <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
1 parent aea4a32 commit 6da2796

File tree

4 files changed

+80
-2
lines changed

4 files changed

+80
-2
lines changed

docs/user-guide/user-defined-functions/reduce/windowing/fixed.md

+35-1
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,44 @@ event time characteristic) of `2031-09-29T18:47:00Z` will belong to the window w
5858
`[2031-09-29T18:47:00Z, 2031-09-29T18:48:00Z)`
5959

6060
It is important to note that because of this property, for a constant throughput, the first window
61-
may contain fewer elements than other windows.
61+
may contain fewer elements than other windows.
6262

63+
Check the links below to see the UDF examples for different languages.
6364

65+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
66+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducer/examples)
67+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reduce)
6468

6569

6670

71+
## Streaming Mode
72+
73+
Reduce can be enabled on streaming mode to stream messages or forward partial responses to the next vertex.
74+
This is useful for custom triggering, where we want to forward responses to the next vertex quickly,
75+
even before the fixed window closes. The close-of-book and a final triggering will still happen even if
76+
partial results have been emitted.
77+
78+
79+
To enable reduce streaming, set the `streaming` flag to `true` in the fixed window configuration.
80+
81+
```yaml
82+
vertices:
83+
- name: my-udf
84+
udf:
85+
groupBy:
86+
window:
87+
fixed:
88+
length: duration
89+
streaming: true # set streaming to true to enable reduce streamer
90+
```
91+
92+
Note: UDFs should use the ReduceStreamer functionality in the SDKs to use this feature.
93+
94+
Check the links below to see the UDF examples in streaming mode for different languages.
95+
96+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reducestream)
97+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducestreamer/examples)
98+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum)
99+
100+
67101

docs/user-guide/user-defined-functions/reduce/windowing/session.md

+7
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ later, meaning it's beyond the session gap of the previous window, initiating a
6868
`Event-4` and `Event-5`, and it closes 30 seconds after `Event-5` at `2031-09-29T18:47:40Z`, if no further events arrive
6969
for the key until the timeout.
7070

71+
Note: Streaming mode is by default enabled for session windows.
72+
73+
Check the links below to see the UDF examples for different languages. Currently, we have the SDK support for Golang and Java.
74+
75+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/sessionreducer)
76+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter)
77+
7178

7279

7380

docs/user-guide/user-defined-functions/reduce/windowing/sliding.md

+37
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,43 @@ window `present + sliding`)
7878
From the point above, it follows then that immediately upon startup, for the first window, fewer elements may get
7979
aggregated depending on the current _lateness_ of the data stream.
8080

81+
Check the links below to see the UDF examples for different languages.
82+
83+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
84+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducer/examples)
85+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reduce)
86+
87+
88+
## Streaming Mode
89+
90+
Reduce can be enabled on streaming mode to stream messages or forward partial responses to the next vertex.
91+
This is useful for custom triggering, where we want to forward responses to the next vertex quickly,
92+
even before the fixed window closes. The close-of-book and a final triggering will still happen even if
93+
partial results have been emitted.
94+
95+
96+
To enable reduce streaming, set the `streaming` flag to `true` in the sliding window configuration.
97+
98+
```yaml
99+
vertices:
100+
- name: my-udf
101+
udf:
102+
groupBy:
103+
window:
104+
sliding:
105+
length: duration
106+
slide: duration
107+
streaming: true # set streaming to true to enable reduce streamer
108+
```
109+
110+
Note: UDFs should use the ReduceStreamer functionality in the SDKs to use this feature.
111+
112+
Check the links below to see the UDF examples in streaming mode for different languages.
113+
114+
- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reducestream)
115+
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducestreamer/examples)
116+
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum)
117+
81118

82119

83120

pkg/udf/forward/forward_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1671,7 +1671,7 @@ func validateMetrics(t *testing.T, batchSize int64) {
16711671

16721672
err := testutil.CollectAndCompare(metrics.ReadDataMessagesCount, strings.NewReader(metadata+expected), "forwarder_data_read_total")
16731673
if err != nil {
1674-
t.Errorf("unexpected collecting result:\n%s", err)
1674+
t.Errorf("unexpected collecting result: %v", err)
16751675
}
16761676

16771677
writeMetadata := `

0 commit comments

Comments
 (0)