-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
80 lines (64 loc) · 1.34 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main
import (
"fmt"
"sync"
)
type Message struct {
r int
s int
v V
sender int
receiver int
}
func (m *Message) String() string {
return fmt.Sprintf("(r:%v, s:%v, v:%v)", m.r, m.s, m.v)
}
const NullPos = 2
const MsgTypes = 3
type MessageQueue struct {
messagesR1 [][MsgTypes]uint64
messagesR2 [][MsgTypes]uint64
enoughMsg uint64
enoughMsgCondR1 []*sync.Cond
enoughMsgCondR2 []*sync.Cond
}
func (mq *MessageQueue) HasEnoughMsgs(r int, s int) bool {
msgs := mq.messagesR1
if r == 2 {
msgs = mq.messagesR2
}
return msgs[s][0]+msgs[s][1]+msgs[s][NullPos] >= mq.enoughMsg
}
func (mq *MessageQueue) Enqueue(msg *Message) {
r, s := msg.r, msg.s
msgs := mq.messagesR1
enoughMsgCond := mq.enoughMsgCondR1[s]
if r == 2 {
msgs = mq.messagesR2
enoughMsgCond = mq.enoughMsgCondR2[s]
}
i := msg.v
if msg.v == NULL {
i = NullPos
}
enoughMsgCond.L.Lock()
msgs[s][i] += 1
enoughMsgCond.L.Unlock()
if mq.HasEnoughMsgs(r, s) {
enoughMsgCond.Broadcast()
}
}
func (mq *MessageQueue) DequeueEnoughMsg(r int, s int) [MsgTypes]uint64 {
msgs := mq.messagesR1
enoughMsgCond := mq.enoughMsgCondR1[s]
if r == 2 {
msgs = mq.messagesR2
enoughMsgCond = mq.enoughMsgCondR2[s]
}
enoughMsgCond.L.Lock()
defer enoughMsgCond.L.Unlock()
for !mq.HasEnoughMsgs(r, s) {
enoughMsgCond.Wait()
}
return msgs[s]
}