-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathunbounded_chan.go
118 lines (104 loc) · 3.1 KB
/
unbounded_chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package chanx
import (
"context"
"sync/atomic"
)
// UnboundedChan is an unbounded chan.
// In is used to write without blocking, which supports multiple writers.
// and Out is used to read, which supports multiple readers.
// You can close the in channel if you want.
type UnboundedChan[T any] struct {
bufCount int64
In chan<- T // channel for write
Out <-chan T // channel for read
buffer *RingBuffer[T] // buffer
}
// Len returns len of In plus len of Out plus len of buffer.
// It is not accurate and only for your evaluating approximate number of elements in this chan,
// see https://github.com/smallnest/chanx/issues/7.
func (c *UnboundedChan[T]) Len() int {
return len(c.In) + c.BufLen() + len(c.Out)
}
// BufLen returns len of the buffer.
// It is not accurate and only for your evaluating approximate number of elements in this chan,
// see https://github.com/smallnest/chanx/issues/7.
func (c *UnboundedChan[T]) BufLen() int {
return int(atomic.LoadInt64(&c.bufCount))
}
// NewUnboundedChan creates the unbounded chan.
// in is used to write without blocking, which supports multiple writers.
// and out is used to read, which supports multiple readers.
// You can close the in channel if you want.
func NewUnboundedChan[T any](ctx context.Context, initCapacity int) *UnboundedChan[T] {
return NewUnboundedChanSize[T](ctx, initCapacity, initCapacity, initCapacity)
}
// NewUnboundedChanSize is like NewUnboundedChan but you can set initial capacity for In, Out, Buffer.
func NewUnboundedChanSize[T any](ctx context.Context, initInCapacity, initOutCapacity, initBufCapacity int) *UnboundedChan[T] {
in := make(chan T, initInCapacity)
out := make(chan T, initOutCapacity)
ch := UnboundedChan[T]{In: in, Out: out, buffer: NewRingBuffer[T](initBufCapacity)}
go process(ctx, in, out, &ch)
return &ch
}
func process[T any](ctx context.Context, in, out chan T, ch *UnboundedChan[T]) {
defer close(out)
drain := func() {
for !ch.buffer.IsEmpty() {
select {
case out <- ch.buffer.Pop():
atomic.AddInt64(&ch.bufCount, -1)
case <-ctx.Done():
return
}
}
ch.buffer.Reset()
atomic.StoreInt64(&ch.bufCount, 0)
}
for {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok { // in is closed
drain()
return
}
// make sure values' order
// buffer has some values
if atomic.LoadInt64(&ch.bufCount) > 0 {
ch.buffer.Write(val)
atomic.AddInt64(&ch.bufCount, 1)
} else {
// out is not full
select {
case out <- val:
continue
default:
}
// out is full
ch.buffer.Write(val)
atomic.AddInt64(&ch.bufCount, 1)
}
for !ch.buffer.IsEmpty() {
select {
case <-ctx.Done():
return
case val, ok := <-in:
if !ok { // in is closed
drain()
return
}
ch.buffer.Write(val)
atomic.AddInt64(&ch.bufCount, 1)
case out <- ch.buffer.Peek():
ch.buffer.Pop()
atomic.AddInt64(&ch.bufCount, -1)
if ch.buffer.IsEmpty() && ch.buffer.size > ch.buffer.initialSize { // after burst
ch.buffer.Reset()
atomic.StoreInt64(&ch.bufCount, 0)
}
}
}
}
}
}