Skip to content

Commit baff3ca

Browse files
author
dondish
committed
Every node is now allowed to store some extra data that all of the nodes will have access to (The data is immutable for now, maybe later a broadcast message of new data will be made).
1 parent 24c6488 commit baff3ca

File tree

4 files changed

+73
-53
lines changed

4 files changed

+73
-53
lines changed

cluster_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func (t TestMessage) Type() string {
2020
}
2121

2222
func TestCreateSingleNodeCluster(t *testing.T) {
23-
master := CreateCluster("localhost:5555")
23+
master := CreateCluster("localhost:5555", nil)
2424
defer master.Close()
2525

2626
assert.Empty(t, master.Nodes, "the node should not be connected to other nodes implicitly")
@@ -29,19 +29,20 @@ func TestCreateSingleNodeCluster(t *testing.T) {
2929
}
3030

3131
func TestCreateTwoNodeCluster(t *testing.T) {
32-
master := CreateCluster("localhost:5556")
32+
master := CreateCluster("localhost:5556", nil)
3333
defer master.Close()
3434
time.Sleep(500 * time.Millisecond)
35-
node, err := JoinCluster("localhost:5557", "localhost:5556")
35+
node, err := JoinCluster("localhost:5557", "localhost:5556", nil)
3636

3737
if err != nil {
3838
fmt.Println("couldn't create node:", err)
3939
assert.NotNil(t, err, "There shouldn't be an error while closing the slave")
4040
}
4141

4242
defer node.Close()
43+
time.Sleep(500 * time.Millisecond)
4344
_, ok := node.Nodes.Load(0)
44-
assert.True(t, ok, 0, "the node should have master in its nodes map")
45+
assert.True(t, ok, "the node should have master in its nodes map")
4546
assert.True(t, node.Id == 1, "the node's id should be set to 1")
4647
assert.NotNil(t, master.Message, "the message channel should not be nil")
4748

@@ -72,10 +73,10 @@ func TestCreateTwoNodeCluster(t *testing.T) {
7273
}
7374

7475
func TestCreateMultiNodeCluster(t *testing.T) {
75-
master := CreateCluster("localhost:5558")
76+
master := CreateCluster("localhost:5558", nil)
7677
defer master.Close()
7778
time.Sleep(500 * time.Millisecond)
78-
node1, err := JoinCluster("localhost:5559", "localhost:5558")
79+
node1, err := JoinCluster("localhost:5559", "localhost:5558", nil)
7980

8081
if err != nil {
8182
fmt.Println("couldn't create node 1:", err)
@@ -84,7 +85,7 @@ func TestCreateMultiNodeCluster(t *testing.T) {
8485

8586
defer node1.Close()
8687
time.Sleep(500 * time.Millisecond)
87-
node2, err := JoinCluster("localhost:5560", "localhost:5558")
88+
node2, err := JoinCluster("localhost:5560", "localhost:5558", nil)
8889

8990
if err != nil {
9091
fmt.Println("couldn't create node 2:", err)

connection.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// TODO Add heartbeat mechanism
12
package go_cluster
23

34
import (
@@ -8,11 +9,12 @@ import (
89
)
910

1011
// Resembles a connection between two nodes.
11-
// The API makes it simpler to customize the connection.
1212
type Connection struct {
13-
Conn *net.TCPConn
13+
Conn *net.TCPConn // The connection
14+
Data interface{} // The remote node's extra data
1415
}
1516

17+
// The transport protocol
1618
type Transport struct {
1719
Type string
1820
Message Message
@@ -32,15 +34,17 @@ func connect(address string) (*Connection, error) {
3234
}
3335

3436
// Connects a node to a new node and sets up message handling
35-
func connectNewNode(id int, addr string, node *Node) {
37+
func connectNewNode(id int, addr string, data interface{}, node *Node) {
3638
if conn, err := connect(addr); err != nil {
3739
node.Log("error while connecting to a new node:", err)
3840
} else {
41+
conn.Data = data
3942
node.Nodes.Store(id, conn)
40-
msg := IdReqMessage{Id: node.Id}
43+
msg := GreetingMessage{Id: node.Id, Data: data}
4144
if err := conn.Write(msg); err != nil {
4245
node.Log("couldn't send the message to the new node:", err)
4346
}
47+
go handleMessages(conn, node, id)
4448
node.Log("Successfully connected to the new node!")
4549
}
4650
}
@@ -70,18 +74,19 @@ func handleIncoming(address string, node *Node) {
7074
continue
7175
}
7276
connection := &Connection{Conn: conn}
77+
node.Nodes.Store(node.NextId, connection)
7378
go handleMessages(connection, node, node.NextId)
74-
if err := connection.Write(ReadyMessage{Id: node.NextId}); err != nil {
79+
if err := connection.Write(ReadyMessage{Id: node.NextId, EntryId: node.Id}); err != nil {
7580
node.Log("failed to send ready message to ", conn.RemoteAddr().String(), ":", err.Error())
7681
continue
7782
}
78-
node.Nodes.Store(node.NextId, connection)
7983
node.NextId++
8084
}
8185
}
8286

8387
// Handles the messages incoming from the connection
8488
func handleMessages(connection *Connection, node *Node, remoteid int) {
89+
node.Log("Message handling started with", remoteid, "(note that this might not be the ")
8590
conn := connection.Conn
8691
dec := gob.NewDecoder(conn)
8792
for {
@@ -104,22 +109,36 @@ func handleMessages(connection *Connection, node *Node, remoteid int) {
104109
node.Message <- message
105110
}
106111
} else {
107-
if data.Type == "idreq" {
108-
remoteid = data.Message.(IdReqMessage).Id
112+
if data.Type == "readyreq" {
113+
if _, ok := node.Nodes.Load(remoteid); !ok {
114+
readymsg := data.Message.(ReadyMessage)
115+
node.Id = readymsg.Id
116+
remoteid = readymsg.EntryId
117+
node.NextId = node.Id + 1
118+
if err := connection.Write(IntroduceMessage{Addr: node.Addr, Data: data}); err != nil {
119+
node.Log("couldn't send an introduce message to the entry point:", err)
120+
os.Exit(1)
121+
}
122+
node.Nodes.Store(remoteid, connection)
123+
node.Log("Node Intialized!")
124+
}
125+
} else if data.Type == "greetreq" {
126+
remoteid = data.Message.(GreetingMessage).Id
109127
node.Log("Successfully connected to node", remoteid)
110128
node.Nodes.Store(remoteid, connection)
111-
} else if data.Type == "addrreq" {
129+
} else if data.Type == "introreq" {
112130
msg := NewNodeMessage{
113131
Id: remoteid,
114-
Addr: data.Message.(AddressMessage).Addr,
132+
Addr: data.Message.(IntroduceMessage).Addr,
133+
Data: data.Message.(IntroduceMessage).Data,
115134
}
116135
if err := node.Broadcast(msg, remoteid); err != nil {
117136
node.Log("error while broadcasting a new node:", err)
118137
}
119-
} else if data.Type == "newnode" {
138+
} else if data.Type == "newnodereq" {
120139
msg := data.Message.(NewNodeMessage)
121140
node.NextId = msg.Id + 1
122-
connectNewNode(msg.Id, msg.Addr, node)
141+
connectNewNode(msg.Id, msg.Addr, msg.Data, node)
123142
} else {
124143
node.Message <- data.Message
125144
}

message.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package go_cluster
22

33
import (
44
"encoding/gob"
5-
"strconv"
65
)
76

87
// The Message interface, this is supposed to be customized (for example Msg is encoded in gob).
@@ -26,55 +25,59 @@ func (m ErrorMessage) Msg() interface{} {
2625

2726
// The message the master sends to a newly connected node with its id
2827
type ReadyMessage struct {
29-
Id int
28+
Id int
29+
EntryId int
3030
}
3131

3232
func (m ReadyMessage) Msg() interface{} {
33-
return strconv.Itoa(m.Id)
33+
return m
3434
}
3535

3636
func (m ReadyMessage) Type() string {
37-
return "ready"
37+
return "readyreq"
3838
}
3939

4040
// The message a node sends to the node it's newly connected to with its id to make authentication easier
41-
type IdReqMessage struct {
42-
Id int
41+
type GreetingMessage struct {
42+
Id int
43+
Data interface{}
4344
}
4445

45-
func (m IdReqMessage) Msg() interface{} {
46-
return strconv.Itoa(m.Id)
46+
func (m GreetingMessage) Msg() interface{} {
47+
return m
4748
}
4849

49-
func (m IdReqMessage) Type() string {
50-
return "idreq"
50+
func (m GreetingMessage) Type() string {
51+
return "greetreq"
5152
}
5253

5354
// The message the master sends when all nodes when a new node joins
5455
type NewNodeMessage struct {
55-
Id int // The Id
56-
Addr string // The address to connect to
56+
Id int // The Id
57+
Addr string // The address to connect to
58+
Data interface{} // The new node's data
5759
}
5860

5961
func (m NewNodeMessage) Msg() interface{} {
6062
return m
6163
}
6264

6365
func (m NewNodeMessage) Type() string {
64-
return "newnode"
66+
return "newnodereq"
6567
}
6668

67-
// The message a new node sends to the master with the incoming connections address
68-
type AddressMessage struct {
69+
// The message a new node sends to the master with its information
70+
type IntroduceMessage struct {
6971
Addr string
72+
Data interface{}
7073
}
7174

72-
func (m AddressMessage) Msg() interface{} {
75+
func (m IntroduceMessage) Msg() interface{} {
7376
return m.Addr
7477
}
7578

76-
func (m AddressMessage) Type() string {
77-
return "addrreq"
79+
func (m IntroduceMessage) Type() string {
80+
return "introreq"
7881
}
7982

8083
// Register the message type to gob

node.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,59 +11,57 @@ import (
1111

1212
// The node is the general data type in go-cluster, it resembles a node in the cluster
1313
type Node struct {
14+
Addr string // Incoming connections address
1415
Ready bool // Whether is node is ready for connections
1516
Id int // This node's ID
16-
NextId int // (Master Only) Next client's id
17+
NextId int // Next client's id
1718
Message chan Message // The channel to forward messages to
1819
Nodes *sync.Map // A map that maps other node ids to their connections.
20+
Data interface{} // Allows the node to have extra information attached to it
1921
}
2022

2123
func Init() {
2224
gob.Register(ReadyMessage{})
2325
gob.Register(ErrorMessage{})
24-
gob.Register(IdReqMessage{})
26+
gob.Register(GreetingMessage{})
2527
gob.Register(NewNodeMessage{})
26-
gob.Register(AddressMessage{})
28+
gob.Register(IntroduceMessage{})
29+
gob.Register(Transport{})
2730
}
2831

2932
// Creates a new master node
3033
// This new node will introduce other nodes to each other.
31-
func CreateCluster(address string) *Node {
34+
func CreateCluster(address string, data interface{}) *Node {
3235
node := &Node{
36+
Addr: address,
3337
Ready: true,
3438
NextId: 1,
3539
Message: make(chan Message),
3640
Nodes: new(sync.Map),
41+
Data: data,
3742
}
3843
go handleIncoming(address, node)
3944
return node
4045
}
4146

4247
// Creates a new node and connects to the master
43-
func JoinCluster(address, maddress string) (*Node, error) {
48+
func JoinCluster(address, maddress string, data interface{}) (*Node, error) {
4449
node := &Node{
50+
Addr: address,
4551
Ready: true,
4652
Id: 1,
4753
NextId: 1,
4854
Message: make(chan Message),
4955
Nodes: new(sync.Map),
56+
Data: data,
5057
}
51-
// TODO add id
5258
if conn, err := connect(maddress); err != nil {
5359
return nil, err
5460
} else if conn == nil {
5561
return nil, errors.New("connection cannot be nil, unexpected error occurred")
5662
} else {
57-
node.Nodes.Store(0, conn)
58-
go handleMessages(conn, node, 0)
59-
readymsg := <-node.Message
60-
node.Id = readymsg.(ReadyMessage).Id
61-
node.NextId = node.Id + 1
62-
log.Println("Node Intialized! Id:", node.Id, "Address:", address)
6363
go handleIncoming(address, node)
64-
if err := node.Send(AddressMessage{Addr: address}, 0); err != nil {
65-
return nil, err
66-
}
64+
go handleMessages(conn, node, 0)
6765
return node, nil
6866
}
6967
}
@@ -105,7 +103,6 @@ func (n Node) Broadcast(message Message, except ...int) error {
105103
// Shuts down the node, if the node is the master it will broadcast the new master to all nodes before closing.
106104
// It accepts a channel that will receive a boolean when the node is shutdown.
107105
func (n *Node) Close() {
108-
// TODO new master broadcast
109106
n.Log("Shutting down...")
110107
n.Nodes.Range(func(id, conn interface{}) bool {
111108
if err := conn.(*Connection).Close(); err != nil {

0 commit comments

Comments
 (0)