-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft.go
158 lines (121 loc) · 2.93 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package raft
import (
"fmt"
"log"
"net"
"sync"
"time"
)
// Raft is the main object of the node itself
type Raft struct {
config *Config
tcpListener *net.TCPListener
udpListener *net.UDPConn
stateLock sync.Mutex
self *State
stateCh chan int
nodes []*Node
nodeMap map[string]*Node
heartBeatLock sync.Mutex
heartBeat *time.Ticker
stopHeartBeat chan bool
electionTimeoutTickerLock sync.Mutex
electionTimeoutTicker *time.Ticker
votingTimeoutTickerLock sync.Mutex
votingTimeoutTicker *time.Ticker
hbChan chan *heartBeat // Altay check it
votes chan *Vote
stateChanged chan bool
joinStatus bool
}
func newRaft(conf *Config) (*Raft, error) {
tcplnAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.BindTCPPort)
tcpln, err := net.Listen("tcp", tcplnAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
}
udplnAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.BindUDPPort)
udpln, err := net.ListenPacket("udp", udplnAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
}
r := &Raft{
config: conf,
tcpListener: tcpln.(*net.TCPListener),
udpListener: udpln.(*net.UDPConn),
nodeMap: make(map[string]*Node),
hbChan: make(chan *heartBeat, 1),
votes: make(chan *Vote, 100),
stateChanged: make(chan bool),
stateCh: make(chan int),
joinStatus: false,
}
go r.ListenTCP()
go r.ListenUDP()
go r.handleState()
return r, nil
}
// Init func
func Init(conf *Config) (*Raft, error) {
r, err := newRaft(conf)
if err != nil {
return nil, err
}
st, err := createInitState(conf)
if err != nil {
return nil, err
}
r.self = st
r.stateCh <- r.self.state
return r, nil
}
func createInitState(conf *Config) (*State, error) {
// name := fmt.Sprintf("%s:%d&%d", conf.BindAddr, conf.BindTCPPort, conf.BindUDPPort)
state := &State{
state: Follower,
leaderID: "",
term: 0,
vote: &Vote{
Voter: "",
VoteStatus: UnknownVote,
},
}
return state, nil
}
// Join func
func Join(conf *Config, joinAddr string, port int) (*Raft, error) {
r, err := Init(conf)
if err != nil {
log.Printf("Failed to initiate the node. Err %s", err)
}
addr := getUDPAddr(joinAddr, port)
go r.joinToClusterByAddr(addr)
return r, nil
}
// Nodes ...
func (r *Raft) Nodes() []*Node {
return r.nodes
}
// Shutdown closes the listener and shuts down the Raft
func (r *Raft) Shutdown() {
r.tcpListener.Close()
r.udpListener.Close()
log.Println("Shutting down the Raft...")
}
// GetState temporary function
func (r *Raft) GetState() {
// fmt.Println("Machine name: ", r.config.Name)
//
// fmt.Printf("Nodes: \n")
// for num, node := range r.Nodes() {
// fmt.Printf("%d : %s\n", num, node.Name)
// }
// switch r.self.state {
// case Follower:
// fmt.Println("I am Follower")
// case Leader:
// fmt.Println("I am Leader")
// case Candidate:
// fmt.Println("I am Candidate")
// }
}