Skip to content

Commit c5d9977

Browse files
committed
Implementing connected observable on all iterables
1 parent 831fd49 commit c5d9977

File tree

4 files changed

+170
-69
lines changed

4 files changed

+170
-69
lines changed

factory_connectable_test.go

+167-69
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,179 @@ package rxgo
33
import (
44
"context"
55
"fmt"
6-
"github.com/stretchr/testify/assert"
7-
"golang.org/x/sync/errgroup"
86
"reflect"
97
"sync"
108
"testing"
119
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"golang.org/x/sync/errgroup"
1213
)
1314

15+
func Test_Connectable_IterableChannel_Single(t *testing.T) {
16+
ch := make(chan Item, 10)
17+
go func() {
18+
ch <- Of(1)
19+
ch <- Of(2)
20+
ch <- Of(3)
21+
close(ch)
22+
}()
23+
obs := &ObservableImpl{
24+
iterable: newChannelIterable(ch, WithPublishStrategy()),
25+
}
26+
testConnectableSingle(t, obs)
27+
}
28+
29+
func Test_Connectable_IterableChannel_Composed(t *testing.T) {
30+
ch := make(chan Item, 10)
31+
go func() {
32+
ch <- Of(1)
33+
ch <- Of(2)
34+
ch <- Of(3)
35+
close(ch)
36+
}()
37+
obs := &ObservableImpl{
38+
iterable: newChannelIterable(ch, WithPublishStrategy()),
39+
}
40+
testConnectableComposed(t, obs)
41+
}
42+
43+
func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
44+
ch := make(chan Item, 10)
45+
go func() {
46+
ch <- Of(1)
47+
ch <- Of(2)
48+
ch <- Of(3)
49+
close(ch)
50+
}()
51+
obs := &ObservableImpl{
52+
iterable: newChannelIterable(ch, WithPublishStrategy()),
53+
}
54+
testConnectableWithoutConnect(t, obs)
55+
}
56+
57+
func Test_Connectable_IterableCreate_Single(t *testing.T) {
58+
ctx, cancel := context.WithCancel(context.Background())
59+
defer cancel()
60+
obs := &ObservableImpl{
61+
iterable: newCreateIterable([]Producer{func(_ context.Context, ch chan<- Item) {
62+
ch <- Of(1)
63+
ch <- Of(2)
64+
ch <- Of(3)
65+
cancel()
66+
}}, WithPublishStrategy(), WithContext(ctx)),
67+
}
68+
testConnectableSingle(t, obs)
69+
}
70+
71+
func Test_Connectable_IterableCreate_Composed(t *testing.T) {
72+
ctx, cancel := context.WithCancel(context.Background())
73+
defer cancel()
74+
obs := &ObservableImpl{
75+
iterable: newCreateIterable([]Producer{func(_ context.Context, ch chan<- Item) {
76+
ch <- Of(1)
77+
ch <- Of(2)
78+
ch <- Of(3)
79+
cancel()
80+
}}, WithPublishStrategy(), WithContext(ctx)),
81+
}
82+
testConnectableComposed(t, obs)
83+
}
84+
85+
func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
86+
ctx, cancel := context.WithCancel(context.Background())
87+
defer cancel()
88+
obs := &ObservableImpl{
89+
iterable: newCreateIterable([]Producer{func(_ context.Context, ch chan<- Item) {
90+
ch <- Of(1)
91+
ch <- Of(2)
92+
ch <- Of(3)
93+
cancel()
94+
}}, WithPublishStrategy(), WithContext(ctx)),
95+
}
96+
testConnectableWithoutConnect(t, obs)
97+
}
98+
99+
func Test_Connectable_IterableDefer_Single(t *testing.T) {
100+
ctx, cancel := context.WithCancel(context.Background())
101+
defer cancel()
102+
obs := &ObservableImpl{
103+
iterable: newDeferIterable([]Producer{func(_ context.Context, ch chan<- Item) {
104+
ch <- Of(1)
105+
ch <- Of(2)
106+
ch <- Of(3)
107+
cancel()
108+
}}, WithPublishStrategy(), WithContext(ctx)),
109+
}
110+
testConnectableSingle(t, obs)
111+
}
112+
113+
func Test_Connectable_IterableDefer_Composed(t *testing.T) {
114+
ctx, cancel := context.WithCancel(context.Background())
115+
defer cancel()
116+
obs := &ObservableImpl{
117+
iterable: newDeferIterable([]Producer{func(_ context.Context, ch chan<- Item) {
118+
ch <- Of(1)
119+
ch <- Of(2)
120+
ch <- Of(3)
121+
cancel()
122+
}}, WithPublishStrategy(), WithContext(ctx)),
123+
}
124+
testConnectableComposed(t, obs)
125+
}
126+
127+
func Test_Connectable_IterableJust_Single(t *testing.T) {
128+
ctx, cancel := context.WithCancel(context.Background())
129+
defer cancel()
130+
obs := &ObservableImpl{
131+
iterable: newJustIterable([]interface{}{1, 2, 3}, WithPublishStrategy(), WithContext(ctx)),
132+
}
133+
testConnectableSingle(t, obs)
134+
}
135+
136+
func Test_Connectable_IterableJust_Composed(t *testing.T) {
137+
ctx, cancel := context.WithCancel(context.Background())
138+
defer cancel()
139+
obs := &ObservableImpl{
140+
iterable: newJustIterable([]interface{}{1, 2, 3}, WithPublishStrategy(), WithContext(ctx)),
141+
}
142+
testConnectableComposed(t, obs)
143+
}
144+
145+
func Test_Connectable_IterableRange_Single(t *testing.T) {
146+
ctx, cancel := context.WithCancel(context.Background())
147+
defer cancel()
148+
obs := &ObservableImpl{
149+
iterable: newRangeIterable(1, 2, WithPublishStrategy(), WithContext(ctx)),
150+
}
151+
testConnectableSingle(t, obs)
152+
}
153+
154+
func Test_Connectable_IterableRange_Composed(t *testing.T) {
155+
ctx, cancel := context.WithCancel(context.Background())
156+
defer cancel()
157+
obs := &ObservableImpl{
158+
iterable: newRangeIterable(1, 2, WithPublishStrategy(), WithContext(ctx)),
159+
}
160+
testConnectableComposed(t, obs)
161+
}
162+
163+
func Test_Connectable_IterableSlice_Single(t *testing.T) {
164+
ctx, cancel := context.WithCancel(context.Background())
165+
defer cancel()
166+
obs := &ObservableImpl{iterable: newSliceIterable([]Item{Of(1), Of(2), Of(3)},
167+
WithPublishStrategy(), WithContext(ctx))}
168+
testConnectableSingle(t, obs)
169+
}
170+
171+
func Test_Connectable_IterableSlice_Composed(t *testing.T) {
172+
ctx, cancel := context.WithCancel(context.Background())
173+
defer cancel()
174+
obs := &ObservableImpl{iterable: newSliceIterable([]Item{Of(1), Of(2), Of(3)},
175+
WithPublishStrategy(), WithContext(ctx))}
176+
testConnectableComposed(t, obs)
177+
}
178+
14179
func testConnectableSingle(t *testing.T, obs Observable) {
15180
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
16181
defer cancel()
@@ -85,70 +250,3 @@ func testConnectableWithoutConnect(t *testing.T, obs Observable) {
85250
defer cancel()
86251
Assert(ctx, t, obs, IsEmpty())
87252
}
88-
89-
func Test_Connectable_IterableChannel_Single(t *testing.T) {
90-
ch := make(chan Item, 10)
91-
go func() {
92-
ch <- Of(1)
93-
ch <- Of(2)
94-
ch <- Of(3)
95-
close(ch)
96-
}()
97-
testConnectableSingle(t, FromChannel(ch, WithPublishStrategy()))
98-
}
99-
100-
func Test_Connectable_IterableChannel_Composed(t *testing.T) {
101-
ch := make(chan Item, 10)
102-
go func() {
103-
ch <- Of(1)
104-
ch <- Of(2)
105-
ch <- Of(3)
106-
close(ch)
107-
}()
108-
testConnectableComposed(t, FromChannel(ch, WithPublishStrategy()))
109-
}
110-
111-
func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
112-
ch := make(chan Item, 10)
113-
go func() {
114-
ch <- Of(1)
115-
ch <- Of(2)
116-
ch <- Of(3)
117-
close(ch)
118-
}()
119-
obs := FromChannel(ch, WithPublishStrategy())
120-
testConnectableWithoutConnect(t, obs)
121-
}
122-
123-
func Test_Connectable_IterableCreate_Single(t *testing.T) {
124-
ctx, cancel := context.WithCancel(context.Background())
125-
defer cancel()
126-
testConnectableSingle(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
127-
ch <- Of(1)
128-
ch <- Of(2)
129-
ch <- Of(3)
130-
cancel()
131-
}}, WithPublishStrategy(), WithContext(ctx)))
132-
}
133-
134-
func Test_Connectable_IterableCreate_Composed(t *testing.T) {
135-
ctx, cancel := context.WithCancel(context.Background())
136-
defer cancel()
137-
testConnectableComposed(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
138-
ch <- Of(1)
139-
ch <- Of(2)
140-
ch <- Of(3)
141-
cancel()
142-
}}, WithPublishStrategy(), WithContext(ctx)))
143-
}
144-
145-
func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
146-
ctx, cancel := context.WithCancel(context.Background())
147-
defer cancel()
148-
testConnectableWithoutConnect(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
149-
ch <- Of(1)
150-
ch <- Of(2)
151-
ch <- Of(3)
152-
cancel()
153-
}}, WithPublishStrategy(), WithContext(ctx)))
154-
}

iterable_create.go

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
3636
next: next,
3737
}
3838
}
39+
3940
func (i *createIterable) Observe(opts ...Option) <-chan Item {
4041
mergedOptions := append(i.opts, opts...)
4142
option := parseOptions(mergedOptions...)

observable_operator.go

+1
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
541541
return customObservableOperator(f, opts...)
542542
}
543543

544+
// Connect instructs a connectable Observable to begin emitting items to its subscribers.
544545
func (o *ObservableImpl) Connect() Observable {
545546
o.Observe(connect())
546547
return o

options.go

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func WithErrorStrategy(strategy OnErrorStrategy) Option {
144144
})
145145
}
146146

147+
// WithPublishStrategy converts an ordinary Observable into a connectable Observable.
147148
func WithPublishStrategy() Option {
148149
return newFuncOption(func(options *funcOption) {
149150
options.connectable = true

0 commit comments

Comments
 (0)