Skip to content

Commit 11e131d

Browse files
committed
concurrency: add lock free link list
Change-Id: Ic22d4071b1e00e360bef14e69d949c535aa8e081
1 parent 6de4339 commit 11e131d

File tree

1 file changed

+157
-0
lines changed

1 file changed

+157
-0
lines changed

go/basic/concurrency/linked_queue.go

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
lock free linked queue.
3+
ref:
4+
1. http://ddrv.cn/a/591069
5+
2. https://coolshell.cn/articles/8239.html
6+
*/
7+
8+
package concurrent
9+
10+
import (
11+
"sync"
12+
"sync/atomic"
13+
"unsafe"
14+
)
15+
16+
type LinkedQueueNode struct {
17+
Value interface{}
18+
Next *LinkedQueueNode
19+
}
20+
21+
func (node *LinkedQueueNode) casNext(oldV, newV *LinkedQueueNode) bool {
22+
return atomic.CompareAndSwapPointer(
23+
(*unsafe.Pointer)(unsafe.Pointer(&node.Next)),
24+
unsafe.Pointer(oldV),
25+
unsafe.Pointer(newV),
26+
)
27+
}
28+
29+
func (node *LinkedQueueNode) loadNext() *LinkedQueueNode {
30+
return (*LinkedQueueNode)(atomic.LoadPointer(
31+
(*unsafe.Pointer)(unsafe.Pointer(&node.Next)),
32+
))
33+
}
34+
35+
type LinkedQueue struct {
36+
head *LinkedQueueNode
37+
tail *LinkedQueueNode
38+
size int64
39+
m sync.Mutex
40+
}
41+
42+
func NewLinkedQueue() *LinkedQueue {
43+
dummy := &LinkedQueueNode{}
44+
dummy.Value = nil
45+
dummy.Next = nil
46+
return &LinkedQueue{ // like container/list, use same node
47+
head: dummy,
48+
tail: dummy,
49+
}
50+
}
51+
52+
func (queue *LinkedQueue) casTail(oldV, newV *LinkedQueueNode) bool {
53+
return atomic.CompareAndSwapPointer(
54+
(*unsafe.Pointer)(unsafe.Pointer(&queue.tail)),
55+
unsafe.Pointer(oldV),
56+
unsafe.Pointer(newV),
57+
)
58+
}
59+
60+
func (queue *LinkedQueue) casHead(oldV, newV *LinkedQueueNode) bool {
61+
return atomic.CompareAndSwapPointer(
62+
(*unsafe.Pointer)(unsafe.Pointer(&queue.head)),
63+
unsafe.Pointer(oldV),
64+
unsafe.Pointer(newV),
65+
)
66+
}
67+
68+
func (queue *LinkedQueue) loadHead() *LinkedQueueNode {
69+
return (*LinkedQueueNode)(atomic.LoadPointer(
70+
(*unsafe.Pointer)(unsafe.Pointer(&queue.head)),
71+
))
72+
}
73+
74+
func (queue *LinkedQueue) loadTail() *LinkedQueueNode {
75+
return (*LinkedQueueNode)(atomic.LoadPointer(
76+
(*unsafe.Pointer)(unsafe.Pointer(&queue.tail)),
77+
))
78+
}
79+
80+
func (queue *LinkedQueue) Enqueue(v interface{}) bool {
81+
newNode := &LinkedQueueNode{Value: v, Next: nil}
82+
var tail, next *LinkedQueueNode
83+
for {
84+
// use atomic load and cas
85+
tail = queue.loadTail()
86+
next = tail.loadNext()
87+
if tail == queue.loadTail() { // double check
88+
if next == nil { // queue tail
89+
if tail.casNext(next, newNode) { // link to queue
90+
break
91+
}
92+
} else {
93+
queue.casTail(tail, next) // move tail pointer to real tail
94+
}
95+
}
96+
}
97+
98+
queue.casTail(tail, newNode) // failure is ok, another thread has update
99+
atomic.AddInt64(&queue.size, 1)
100+
return true
101+
}
102+
103+
func (queue *LinkedQueue) Dequeue() interface{} {
104+
var head, tail, first *LinkedQueueNode
105+
for {
106+
// use atomic load and cas
107+
head = queue.loadHead() // dummy
108+
tail = queue.loadTail() // dummy
109+
first = head.loadNext() // nil
110+
if head == queue.loadHead() { // double check
111+
if first == nil { // empty list
112+
return nil
113+
}
114+
if head == tail { // empty list
115+
queue.casTail(tail, first) // move tail to real pointer
116+
continue
117+
}
118+
if queue.casHead(head, first) {
119+
break
120+
}
121+
}
122+
}
123+
124+
atomic.AddInt64(&queue.size, -1)
125+
return first.Value
126+
}
127+
128+
func (queue *LinkedQueue) Size() int64 {
129+
return atomic.LoadInt64(&queue.size)
130+
}
131+
132+
func (queue *LinkedQueue) EnqueueWithLock(v interface{}) bool {
133+
newNode := &LinkedQueueNode{Value: v, Next: nil}
134+
queue.m.Lock()
135+
defer queue.m.Unlock()
136+
tail := queue.tail
137+
tail.Next = newNode
138+
queue.tail = newNode
139+
queue.size += 1
140+
return true
141+
}
142+
143+
func (queue *LinkedQueue) DequeueWithLock() interface{} {
144+
var head, tail, first *LinkedQueueNode
145+
queue.m.Lock()
146+
defer queue.m.Unlock()
147+
head = queue.head
148+
tail = queue.tail
149+
first = head.Next
150+
if head == tail {
151+
return nil
152+
}
153+
queue.head = first
154+
head.Next = nil
155+
queue.size -= 1
156+
return first.Value
157+
}

0 commit comments

Comments
 (0)