Skip to content

Commit ea311af

Browse files
committed
Disposed type
1 parent c5d9977 commit ea311af

6 files changed

+87
-26
lines changed

Diff for: factory_connectable_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,24 @@ func Test_Connectable_IterableChannel_Composed(t *testing.T) {
4040
testConnectableComposed(t, obs)
4141
}
4242

43+
func Test_Connectable_IterableChannel_Disposed(t *testing.T) {
44+
ch := make(chan Item, 10)
45+
go func() {
46+
ch <- Of(1)
47+
ch <- Of(2)
48+
ch <- Of(3)
49+
close(ch)
50+
}()
51+
obs := &ObservableImpl{
52+
iterable: newChannelIterable(ch, WithPublishStrategy()),
53+
}
54+
obs.Connect()()
55+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
56+
defer cancel()
57+
time.Sleep(50 * time.Millisecond)
58+
Assert(ctx, t, obs, IsEmpty())
59+
}
60+
4361
func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
4462
ch := make(chan Item, 10)
4563
go func() {
@@ -82,6 +100,24 @@ func Test_Connectable_IterableCreate_Composed(t *testing.T) {
82100
testConnectableComposed(t, obs)
83101
}
84102

103+
func Test_Connectable_IterableCreate_Disposed(t *testing.T) {
104+
ctx, cancel := context.WithCancel(context.Background())
105+
defer cancel()
106+
obs := &ObservableImpl{
107+
iterable: newCreateIterable([]Producer{func(_ context.Context, ch chan<- Item) {
108+
ch <- Of(1)
109+
ch <- Of(2)
110+
ch <- Of(3)
111+
cancel()
112+
}}, WithPublishStrategy(), WithContext(ctx)),
113+
}
114+
obs.Connect()()
115+
ctx, cancel = context.WithTimeout(context.Background(), 550*time.Millisecond)
116+
defer cancel()
117+
time.Sleep(50 * time.Millisecond)
118+
Assert(ctx, t, obs, IsEmpty())
119+
}
120+
85121
func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
86122
ctx, cancel := context.WithCancel(context.Background())
87123
defer cancel()

Diff for: iterable_channel.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rxgo
22

33
import (
4+
"context"
45
"sync"
56
)
67

@@ -29,7 +30,7 @@ func (i *channelIterable) Observe(opts ...Option) <-chan Item {
2930
}
3031

3132
if option.isConnectOperation() {
32-
i.connect()
33+
i.connect(option.buildContext())
3334
return nil
3435
}
3536

@@ -40,27 +41,37 @@ func (i *channelIterable) Observe(opts ...Option) <-chan Item {
4041
return ch
4142
}
4243

43-
func (i *channelIterable) connect() {
44+
func (i *channelIterable) connect(ctx context.Context) {
4445
i.mutex.Lock()
4546
if !i.producerAlreadyCreated {
46-
go i.produce()
47+
go i.produce(ctx)
4748
i.producerAlreadyCreated = true
4849
}
4950
i.mutex.Unlock()
5051
}
5152

52-
func (i *channelIterable) produce() {
53-
for item := range i.next {
53+
func (i *channelIterable) produce(ctx context.Context) {
54+
defer func() {
5455
i.mutex.RLock()
5556
for _, subscriber := range i.subscribers {
56-
subscriber <- item
57+
close(subscriber)
5758
}
5859
i.mutex.RUnlock()
59-
}
60+
}()
6061

61-
i.mutex.RLock()
62-
for _, subscriber := range i.subscribers {
63-
close(subscriber)
62+
for {
63+
select {
64+
case <-ctx.Done():
65+
return
66+
case item, ok := <-i.next:
67+
if !ok {
68+
return
69+
}
70+
i.mutex.RLock()
71+
for _, subscriber := range i.subscribers {
72+
subscriber <- item
73+
}
74+
i.mutex.RUnlock()
75+
}
6476
}
65-
i.mutex.RUnlock()
6677
}

Diff for: iterable_create.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rxgo
22

33
import (
4+
"context"
45
"sync"
56
)
67

@@ -46,7 +47,7 @@ func (i *createIterable) Observe(opts ...Option) <-chan Item {
4647
}
4748

4849
if option.isConnectOperation() {
49-
i.connect()
50+
i.connect(option.buildContext())
5051
return nil
5152
}
5253

@@ -57,27 +58,37 @@ func (i *createIterable) Observe(opts ...Option) <-chan Item {
5758
return ch
5859
}
5960

60-
func (i *createIterable) connect() {
61+
func (i *createIterable) connect(ctx context.Context) {
6162
i.mutex.Lock()
6263
if !i.producerAlreadyCreated {
63-
go i.produce()
64+
go i.produce(ctx)
6465
i.producerAlreadyCreated = true
6566
}
6667
i.mutex.Unlock()
6768
}
6869

69-
func (i *createIterable) produce() {
70-
for item := range i.next {
70+
func (i *createIterable) produce(ctx context.Context) {
71+
defer func() {
7172
i.mutex.RLock()
7273
for _, subscriber := range i.subscribers {
73-
subscriber <- item
74+
close(subscriber)
7475
}
7576
i.mutex.RUnlock()
76-
}
77+
}()
7778

78-
i.mutex.RLock()
79-
for _, subscriber := range i.subscribers {
80-
close(subscriber)
79+
for {
80+
select {
81+
case <-ctx.Done():
82+
return
83+
case item, ok := <-i.next:
84+
if !ok {
85+
return
86+
}
87+
i.mutex.RLock()
88+
for _, subscriber := range i.subscribers {
89+
subscriber <- item
90+
}
91+
i.mutex.RUnlock()
92+
}
8193
}
82-
i.mutex.RUnlock()
8394
}

Diff for: observable.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
// Observable is the standard interface for Observables.
1111
type Observable interface {
1212
Iterable
13-
Connect() Observable
1413
All(predicate Predicate, opts ...Option) Single
1514
AverageFloat32(opts ...Option) Single
1615
AverageFloat64(opts ...Option) Single
@@ -23,6 +22,7 @@ type Observable interface {
2322
BufferWithCount(count int, opts ...Option) Observable
2423
BufferWithTime(timespan Duration, opts ...Option) Observable
2524
BufferWithTimeOrCount(timespan Duration, count int, opts ...Option) Observable
25+
Connect() Disposable
2626
Contains(equal Predicate, opts ...Option) Single
2727
Count(opts ...Option) Single
2828
Debounce(timespan Duration, opts ...Option) Observable

Diff for: observable_operator.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,10 @@ func (o *ObservableImpl) BufferWithTimeOrCount(timespan Duration, count int, opt
542542
}
543543

544544
// Connect instructs a connectable Observable to begin emitting items to its subscribers.
545-
func (o *ObservableImpl) Connect() Observable {
546-
o.Observe(connect())
547-
return o
545+
func (o *ObservableImpl) Connect() Disposable {
546+
ctx, cancel := context.WithCancel(context.Background())
547+
o.Observe(WithContext(ctx), connect())
548+
return Disposable(cancel)
548549
}
549550

550551
// Contains determines whether an Observable emits a particular item or not.

Diff for: types.go

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type (
3737
Supplier func(ctx context.Context) Item
3838
// Disposed is a notification channel indicating when an Observable is closed.
3939
Disposed <-chan struct{}
40+
// Disposable is a function to be called in order to dispose a subscription.
41+
Disposable context.CancelFunc
4042

4143
// NextFunc handles a next item in a stream.
4244
NextFunc func(interface{})

0 commit comments

Comments
 (0)