Skip to content

Commit 49fc114

Browse files
committed
Refine create and close operators
1 parent 24db51b commit 49fc114

13 files changed

+194
-191
lines changed

assert.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,9 @@ loop:
233233
for _, v := range got {
234234
delete(m, v)
235235
}
236-
assert.Equal(t, 0, len(m))
236+
if len(m) != 0 {
237+
assert.Fail(t, "missing elements", "%v", got)
238+
}
237239
}
238240
if checkHasItem, value := ass.itemToBeChecked(); checkHasItem {
239241
length := len(got)

doc/create.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ Create an Observable from scratch by calling observer methods programmatically.
99
## Example
1010

1111
```go
12-
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
12+
observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
1313
next <- rxgo.Of(1)
1414
next <- rxgo.Of(2)
1515
next <- rxgo.Of(3)
16-
done()
1716
}})
1817
```
1918

@@ -25,12 +24,6 @@ Output:
2524
3
2625
```
2726

28-
There are two ways to close the Observable:
29-
* Closing the `next` channel.
30-
* Calling the `done()` function.
31-
32-
Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.
33-
3427
## Options
3528

3629
### WithBufferedChannel

doc/defer.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ do not create the Observable until the observer subscribes, and create a fresh O
99
## Example
1010

1111
```go
12-
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item, done func()) {
12+
observable := rxgo.Defer([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
1313
next <- rxgo.Of(1)
1414
next <- rxgo.Of(2)
1515
next <- rxgo.Of(3)
16-
done()
1716
}})
1817
```
1918

@@ -25,12 +24,6 @@ Output:
2524
3
2625
```
2726

28-
There are two ways to close the Observable:
29-
* Closing the `next` channel.
30-
* Calling the `done()` function.
31-
32-
Yet, as we can pass multiple producers, using the `done()` function is the recommended approach.
33-
3427
## Options
3528

3629
### WithBufferedChannel

factory_test.go

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -84,76 +84,97 @@ func Test_Concat_OneEmptyObservable(t *testing.T) {
8484
}
8585

8686
func Test_Create(t *testing.T) {
87-
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
87+
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
8888
next <- Of(1)
8989
next <- Of(2)
9090
next <- Of(3)
91-
done()
9291
}})
9392
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
9493
}
9594

9695
func Test_Create_SingleDup(t *testing.T) {
97-
obs := Create([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
96+
obs := Create([]Producer{func(ctx context.Context, next chan<- Item) {
9897
next <- Of(1)
9998
next <- Of(2)
10099
next <- Of(3)
101-
done()
102100
}})
103101
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
104102
Assert(context.Background(), t, obs, IsEmpty(), HasNoError())
105103
}
106104

105+
func Test_Create_ContextCancelled(t *testing.T) {
106+
closed1 := make(chan struct{})
107+
ctx, cancel := context.WithCancel(context.Background())
108+
Create([]Producer{
109+
func(ctx context.Context, next chan<- Item) {
110+
cancel()
111+
}, func(ctx context.Context, next chan<- Item) {
112+
<-ctx.Done()
113+
closed1 <- struct{}{}
114+
},
115+
}, WithContext(ctx)).Run()
116+
117+
select {
118+
case <-time.Tick(time.Second):
119+
assert.FailNow(t, "producer not closed")
120+
case <-closed1:
121+
}
122+
}
123+
107124
func Test_Defer(t *testing.T) {
108-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
125+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
109126
next <- Of(1)
110127
next <- Of(2)
111128
next <- Of(3)
112-
done()
113129
}})
114130
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
115131
}
116132

117133
func Test_Defer_Multiple(t *testing.T) {
118-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
134+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
119135
next <- Of(1)
120136
next <- Of(2)
121-
done()
122-
}, func(ctx context.Context, next chan<- Item, done func()) {
137+
}, func(ctx context.Context, next chan<- Item) {
123138
next <- Of(10)
124139
next <- Of(20)
125-
done()
126140
}})
127141
Assert(context.Background(), t, obs, HasItemsNoOrder(1, 2, 10, 20), HasNoError())
128142
}
129143

130-
func Test_Defer_Close(t *testing.T) {
131-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
132-
next <- Of(1)
133-
next <- Of(2)
134-
next <- Of(3)
135-
done()
136-
}})
137-
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
144+
func Test_Defer_ContextCancelled(t *testing.T) {
145+
closed1 := make(chan struct{})
146+
ctx, cancel := context.WithCancel(context.Background())
147+
Defer([]Producer{
148+
func(ctx context.Context, next chan<- Item) {
149+
cancel()
150+
}, func(ctx context.Context, next chan<- Item) {
151+
<-ctx.Done()
152+
closed1 <- struct{}{}
153+
},
154+
}, WithContext(ctx)).Run()
155+
156+
select {
157+
case <-time.Tick(time.Second):
158+
assert.FailNow(t, "producer not closed")
159+
case <-closed1:
160+
}
138161
}
139162

140163
func Test_Defer_SingleDup(t *testing.T) {
141-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
164+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
142165
next <- Of(1)
143166
next <- Of(2)
144167
next <- Of(3)
145-
done()
146168
}})
147169
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
148170
Assert(context.Background(), t, obs, HasItems(1, 2, 3), HasNoError())
149171
}
150172

151173
func Test_Defer_ComposedDup(t *testing.T) {
152-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
174+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
153175
next <- Of(1)
154176
next <- Of(2)
155177
next <- Of(3)
156-
done()
157178
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
158179
return i.(int) + 1, nil
159180
}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
@@ -164,11 +185,10 @@ func Test_Defer_ComposedDup(t *testing.T) {
164185
}
165186

166187
func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
167-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
188+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
168189
next <- Of(1)
169190
next <- Of(2)
170191
next <- Of(3)
171-
done()
172192
}}).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
173193
return i.(int) + 1, nil
174194
}, WithObservationStrategy(Eager)).Map(func(_ context.Context, i interface{}) (_ interface{}, _ error) {
@@ -181,11 +201,10 @@ func Test_Defer_ComposedDup_EagerObservation(t *testing.T) {
181201
}
182202

183203
func Test_Defer_Error(t *testing.T) {
184-
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item, done func()) {
204+
obs := Defer([]Producer{func(ctx context.Context, next chan<- Item) {
185205
next <- Of(1)
186206
next <- Of(2)
187207
next <- Error(errFoo)
188-
done()
189208
}})
190209
Assert(context.Background(), t, obs, HasItems(1, 2), HasError(errFoo))
191210
}

item.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func (i Item) SendBlocking(ch chan<- Item) {
8181
ch <- i
8282
}
8383

84-
// SendCtx sends an item and blocks until it is sent or a context canceled.
84+
// SendContext sends an item and blocks until it is sent or a context canceled.
8585
// It returns a boolean to indicate whether the item was sent.
86-
func (i Item) SendCtx(ctx context.Context, ch chan<- Item) bool {
86+
func (i Item) SendContext(ctx context.Context, ch chan<- Item) bool {
8787
select {
8888
case <-ctx.Done():
8989
return false

item_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func Test_Item_SendContext_True(t *testing.T) {
4343
defer close(ch)
4444
ctx, cancel := context.WithCancel(context.Background())
4545
defer cancel()
46-
assert.True(t, Of(5).SendCtx(ctx, ch))
46+
assert.True(t, Of(5).SendContext(ctx, ch))
4747
}
4848

4949
func Test_Item_SendNonBlocking(t *testing.T) {

iterable_create.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
1515
ctx := option.buildContext()
1616

1717
wg := sync.WaitGroup{}
18-
done := func() {
19-
wg.Done()
20-
}
2118
for _, f := range fs {
19+
f := f
2220
wg.Add(1)
23-
go f(ctx, next, done)
21+
go func() {
22+
defer wg.Done()
23+
f(ctx, next)
24+
}()
2425
}
2526
go func() {
2627
wg.Wait()

iterable_defer.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@ func (i *deferIterable) Observe(opts ...Option) <-chan Item {
2222
ctx := option.buildContext()
2323

2424
wg := sync.WaitGroup{}
25-
done := func() {
26-
wg.Done()
27-
}
2825
for _, f := range i.f {
26+
f := f
2927
wg.Add(1)
30-
go f(ctx, next, done)
28+
go func() {
29+
defer wg.Done()
30+
f(ctx, next)
31+
}()
3132
}
3233
go func() {
3334
wg.Wait()

observable.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type ObservableImpl struct {
8888
}
8989

9090
func defaultErrorFuncOperator(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
91-
item.SendCtx(ctx, dst)
91+
item.SendContext(ctx, dst)
9292
operatorOptions.stop()
9393
}
9494

@@ -297,7 +297,7 @@ func runPar(ctx context.Context, next chan Item, iterable Iterable, operatorFact
297297
case item, ok := <-observe:
298298
if !ok {
299299
if !bypassGather {
300-
Of(op).SendCtx(ctx, gather)
300+
Of(op).SendContext(ctx, gather)
301301
}
302302
return
303303
}

0 commit comments

Comments
 (0)