Skip to content

Commit 831fd49

Browse files
committed
Create connectable observable
1 parent 308be13 commit 831fd49

File tree

2 files changed

+81
-5
lines changed

2 files changed

+81
-5
lines changed

factory_connectable_test.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,39 @@ func Test_Connectable_IterableChannel_WithoutConnect(t *testing.T) {
116116
ch <- Of(3)
117117
close(ch)
118118
}()
119-
obs := FromChannel(ch, WithPublishStrategy(), WithBufferedChannel(10))
119+
obs := FromChannel(ch, WithPublishStrategy())
120120
testConnectableWithoutConnect(t, obs)
121121
}
122+
123+
func Test_Connectable_IterableCreate_Single(t *testing.T) {
124+
ctx, cancel := context.WithCancel(context.Background())
125+
defer cancel()
126+
testConnectableSingle(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
127+
ch <- Of(1)
128+
ch <- Of(2)
129+
ch <- Of(3)
130+
cancel()
131+
}}, WithPublishStrategy(), WithContext(ctx)))
132+
}
133+
134+
func Test_Connectable_IterableCreate_Composed(t *testing.T) {
135+
ctx, cancel := context.WithCancel(context.Background())
136+
defer cancel()
137+
testConnectableComposed(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
138+
ch <- Of(1)
139+
ch <- Of(2)
140+
ch <- Of(3)
141+
cancel()
142+
}}, WithPublishStrategy(), WithContext(ctx)))
143+
}
144+
145+
func Test_Connectable_IterableCreate_WithoutConnect(t *testing.T) {
146+
ctx, cancel := context.WithCancel(context.Background())
147+
defer cancel()
148+
testConnectableWithoutConnect(t, Create([]Producer{func(_ context.Context, ch chan<- Item) {
149+
ch <- Of(1)
150+
ch <- Of(2)
151+
ch <- Of(3)
152+
cancel()
153+
}}, WithPublishStrategy(), WithContext(ctx)))
154+
}

iterable_create.go

+47-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
)
66

77
type createIterable struct {
8-
opts []Option
9-
next <-chan Item
8+
next <-chan Item
9+
opts []Option
10+
subscribers []chan Item
11+
mutex sync.RWMutex
12+
producerAlreadyCreated bool
1013
}
1114

1215
func newCreateIterable(fs []Producer, opts ...Option) Iterable {
@@ -33,7 +36,47 @@ func newCreateIterable(fs []Producer, opts ...Option) Iterable {
3336
next: next,
3437
}
3538
}
39+
func (i *createIterable) Observe(opts ...Option) <-chan Item {
40+
mergedOptions := append(i.opts, opts...)
41+
option := parseOptions(mergedOptions...)
3642

37-
func (i *createIterable) Observe(_ ...Option) <-chan Item {
38-
return i.next
43+
if !option.isConnectable() {
44+
return i.next
45+
}
46+
47+
if option.isConnectOperation() {
48+
i.connect()
49+
return nil
50+
}
51+
52+
ch := option.buildChannel()
53+
i.mutex.Lock()
54+
i.subscribers = append(i.subscribers, ch)
55+
i.mutex.Unlock()
56+
return ch
57+
}
58+
59+
func (i *createIterable) connect() {
60+
i.mutex.Lock()
61+
if !i.producerAlreadyCreated {
62+
go i.produce()
63+
i.producerAlreadyCreated = true
64+
}
65+
i.mutex.Unlock()
66+
}
67+
68+
func (i *createIterable) produce() {
69+
for item := range i.next {
70+
i.mutex.RLock()
71+
for _, subscriber := range i.subscribers {
72+
subscriber <- item
73+
}
74+
i.mutex.RUnlock()
75+
}
76+
77+
i.mutex.RLock()
78+
for _, subscriber := range i.subscribers {
79+
close(subscriber)
80+
}
81+
i.mutex.RUnlock()
3982
}

0 commit comments

Comments
 (0)