21
21
import java .nio .charset .Charset ;
22
22
import java .util .ArrayList ;
23
23
import java .util .List ;
24
- import java .util .function .Supplier ;
25
24
26
25
import org .reactivestreams .Publisher ;
27
26
import reactor .core .publisher .Flux ;
28
27
import reactor .core .publisher .Mono ;
29
28
30
29
import org .springframework .core .io .buffer .DataBuffer ;
30
+ import org .springframework .core .io .buffer .DataBufferFactory ;
31
31
32
32
/**
33
33
* A {@link Writer} that can write a {@link Flux} (or {@link Publisher}) to a data buffer.
37
37
*/
38
38
class FluxWriter extends Writer {
39
39
40
- private final Supplier < DataBuffer > factory ;
40
+ private final DataBufferFactory factory ;
41
41
42
42
private final Charset charset ;
43
43
44
- private List <String > current = new ArrayList <>();
45
-
46
44
private List <Object > accumulated = new ArrayList <>();
47
45
48
- FluxWriter (Supplier <DataBuffer > factory ) {
49
- this (factory , Charset .defaultCharset ());
50
- }
51
-
52
- FluxWriter (Supplier <DataBuffer > factory , Charset charset ) {
46
+ FluxWriter (DataBufferFactory factory , Charset charset ) {
53
47
this .factory = factory ;
54
48
this .charset = charset ;
55
49
}
56
50
57
- public Publisher <? extends Publisher <? extends DataBuffer >> getBuffers () {
51
+ @ SuppressWarnings ("unchecked" )
52
+ public Flux <? extends Publisher <? extends DataBuffer >> getBuffers () {
58
53
Flux <String > buffers = Flux .empty ();
59
- if (!this .current .isEmpty ()) {
60
- this .accumulated .add (new ArrayList <>(this .current ));
61
- this .current .clear ();
62
- }
54
+ List <String > chunks = new ArrayList <>();
63
55
for (Object thing : this .accumulated ) {
64
56
if (thing instanceof Publisher ) {
65
- @ SuppressWarnings ("unchecked" )
66
- Publisher <String > publisher = (Publisher <String >) thing ;
67
- buffers = buffers .concatWith (publisher );
57
+ buffers = concatValues (chunks , buffers );
58
+ buffers = buffers .concatWith ((Publisher <String >) thing );
68
59
}
69
60
else {
70
- @ SuppressWarnings ("unchecked" )
71
- List <String > list = (List <String >) thing ;
72
- buffers = buffers .concatWithValues (list .toArray (new String [0 ]));
61
+ chunks .add ((String ) thing );
73
62
}
74
63
}
75
- return buffers .map ((string ) -> Mono .just (buffer ().write (string , this .charset )));
64
+ buffers = concatValues (chunks , buffers );
65
+ return buffers .map ((string ) -> Mono .fromCallable (() ->
66
+ this .factory .allocateBuffer ().write (string , this .charset )));
67
+ }
68
+
69
+ private Flux <String > concatValues (List <String > chunks , Flux <String > buffers ) {
70
+ if (!chunks .isEmpty ()) {
71
+ buffers = buffers .concatWithValues (chunks .toArray (new String [0 ]));
72
+ chunks .clear ();
73
+ }
74
+ return buffers ;
76
75
}
77
76
78
77
@ Override
79
78
public void write (char [] cbuf , int off , int len ) throws IOException {
80
- this .current .add (new String (cbuf , off , len ));
79
+ this .accumulated .add (new String (cbuf , off , len ));
81
80
}
82
81
83
82
@ Override
@@ -92,23 +91,8 @@ public void release() {
92
91
// TODO: maybe implement this and call it on error
93
92
}
94
93
95
- private DataBuffer buffer () {
96
- return this .factory .get ();
97
- }
98
-
99
94
public void write (Object thing ) {
100
- if (thing instanceof Publisher ) {
101
- if (!this .current .isEmpty ()) {
102
- this .accumulated .add (new ArrayList <>(this .current ));
103
- this .current .clear ();
104
- }
105
- this .accumulated .add (thing );
106
- }
107
- else {
108
- if (thing instanceof String ) {
109
- this .current .add ((String ) thing );
110
- }
111
- }
95
+ this .accumulated .add (thing );
112
96
}
113
97
114
98
}
0 commit comments