Skip to content

Commit cd97101

Browse files
committed
2 parents 2cd499c + c6d05af commit cd97101

File tree

4 files changed

+173
-114
lines changed

4 files changed

+173
-114
lines changed

Diff for: monitor/monitor_gossip.py

+31-28
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,42 @@
11
import socket
22
import pickle
33
import os
4-
from transfer import _send_msg, _recv_msg, _wait_recv_msg
5-
from info import HEARTBEAT_PORT, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
6-
7-
def __init__():
4+
import sys
85

9-
#This will help us to know if the storage is alive or not
10-
for i in range(4):
11-
self.storage_node[i] = True
6+
from monitor_replicate import recovery
127

8+
sys.path.insert(1, '../utils/')
9+
from transfer import _send_msg, _recv_msg, _wait_recv_msg
10+
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
1311

14-
def recovery():
15-
#Will start the recovery protocol
16-
pass
1712

18-
def gossip(c, msg):
13+
def gossip(c, msg, live_osd):
1914
node_ip = msg["ip"]
15+
crash_osd_id = msg["osd_id"]
16+
17+
if live_osd[crash_osd_id] == False:
18+
return
19+
20+
print(f"Need to start the recovery protocol for {node_ip} {crash_osd_id}")
21+
22+
hashtable = {"pg_id1":[("osd_id1", True), ("osd_id2", True), ("osd_id4", True)]}
23+
# Use this in maincode
24+
# hashtable_file = open('hashtable', 'rb')
25+
# hashtable_dump = hashtable_file.read()
26+
# hashtable = pickle.load(hashtable_dump)
27+
# hashtable_file.close()
2028

21-
i = 0
22-
for node in storage_ip:
23-
if storage_ip[node] == node_ip:
24-
# self.storage_node[i] = False
2529

26-
print(f"Need to start the recovery protocol for {node_ip} storage node number {i+1}")
27-
recovery()
30+
live_osd[crash_osd_id] = False
31+
rf = 3
2832

29-
break
33+
hastable = recovery(crash_osd_id, hashtable, live_osd, rf)
34+
print(hashtable)
3035

31-
i += 1
3236

33-
res = {"type": "ACK"}
34-
_send_msg(c, res)
3537

3638

37-
def heartbeat_protocol(soc):
39+
def heartbeat_protocol(soc, live_osd):
3840
# Establish connection with client.
3941
c, addr = soc.accept()
4042
print(f"\nGot connection from {addr}")
@@ -56,7 +58,7 @@ def heartbeat_protocol(soc):
5658
elif msg["type"] == "FAIL":
5759
res = {"type": "ACK"}
5860
_send_msg(c, res)
59-
# gossip(c, msg)
61+
gossip(c, msg, live_osd)
6062

6163
c.close()
6264

@@ -66,13 +68,15 @@ def heartbeat_protocol(soc):
6668
return
6769

6870

71+
def run_main_monitor_gossip_cum_recovery():
72+
# Make sure to maintain this
73+
live_osd = {"osd_id1":True, "osd_id2":True, "osd_id3":True, "osd_id4":True}
6974

70-
if __name__ == "__main__":
7175
s = socket.socket()
7276
print ("Socket successfully created")
7377

7478
# reserve a port on your computer in our
75-
port = HEARTBEAT_PORT
79+
port = monitor_ip["primary"]["port"]
7680

7781
# Next bind to the port
7882
# we have not typed any ip in the ip field
@@ -83,14 +87,13 @@ def heartbeat_protocol(soc):
8387
print ("Monitor socket binded to %s" %(port))
8488

8589
# put the socket into listening mode
86-
s.listen(5)
87-
print ("socket is listening")
90+
s.listen(5)
8891

8992
# a forever loop until we interrupt it or
9093
# an error occurs
9194
while True:
9295

93-
heartbeat_protocol(s)
96+
heartbeat_protocol(s, live_osd)
9497

9598
s.close()
9699
# Close the connection with the client

Diff for: monitor/monitor_replicate.py

+22-13
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,42 @@
11
import socket
22
import pickle
3-
from transfer import _send_msg, _recv_msg
3+
import time
4+
from transfer import _send_msg, _recv_msg, _wait_recv_msg
45
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
56

67

78
def send_replicate_request(pg_id, master_osd, clone_osd):
89
soc = socket.socket()
910
soc.settimeout(5)
10-
print(f"Socket successfully created for replicating pg_id {pg_id} from master osd {master_osd} to clone osd {clone_osd}")
11+
print(f"Socket successfully created for replicating {pg_id} from master {master_osd} to clone {clone_osd}")
1112

1213
# reserve a port on your computer in our
1314
ip_add = storage_ip[master_osd]["ip"]
14-
port = storage_ip[master_osd]["port"]
15-
15+
port = storage_ip[master_osd]["port"] + 10
16+
1617
flag = -1
1718
try :
1819
soc.connect((ip_add, port))
19-
soc.timeout(None)
20+
soc.settimeout(None)
2021

21-
print(f"Connecting with osd {osd_id}...")
22+
print(f"Connecting with {master_osd} on port {port}...")
2223

23-
res = {"type": "REPLICATE", "pg_id" : pg_id, "osd" : osd_id}
24+
res = {"type": "REPLICATE", "pg_id" : pg_id, "osd_id" : clone_osd}
2425
_send_msg(soc, res)
2526
print("Message send for replication ")
26-
time.sleep(3)
27+
28+
# time.sleep(5)
2729

28-
msg = _recv_msg(c, MSG_SIZE)
30+
msg = _wait_recv_msg(soc, MSG_SIZE)
31+
print(msg)
2932
if msg == None:
3033
flag = 0
3134
elif msg["type"] == "ACK":
3235
flag = 1
3336

34-
except :
35-
print("Didn't Connect! [Timeout] Primary Monitor is Down")
37+
except Exception as e:
38+
print(e)
39+
print(f"Didn't Connect! [Timeout] master {master_osd} is down, port {port}")
3640

3741
soc.close()
3842
return flag
@@ -56,10 +60,12 @@ def replicate(pg_id, osd_id_master_list, osd_id_clone_dict):
5660
add_entry.append([clone, True])
5761
break
5862
elif ret == -1:
63+
print("may try send_replicate_request again or continue to next master")
5964
# may try send_replicate_request again
6065
# or continue to next master
6166
continue
6267
elif ret == 0:
68+
print("eans master osd server break before sending the ack")
6369
# means master osd server break before sending the ack
6470
continue
6571
else:
@@ -82,6 +88,7 @@ def recovery(crash_osd_id, hashtable, live_osd, replication_factor):
8288
rf = replication_factor
8389

8490
# Check the hashtable for pg_id dependency
91+
new_hashtable = hashtable
8592
for pg_id in hashtable.keys():
8693
list_of_replicated_osds = hashtable[pg_id]
8794
list_of_master = []
@@ -105,6 +112,8 @@ def recovery(crash_osd_id, hashtable, live_osd, replication_factor):
105112
# Thus clone_osd dict() will only have the osd that are live and
106113
# don't have that pg_id which was there is crashed node "crash_osd_id"
107114

108-
for entry in new_entries
109-
hashtable[pg_id].append(entry)
115+
for entry in new_entries:
116+
new_hashtable[pg_id].append(entry)
117+
110118

119+
return new_hashtable

Diff for: osd/storage_gossip.py

+47-21
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,34 @@
33
import threading
44
import time
55
import os
6+
7+
from storage_replication import replicate_pg
8+
9+
sys.path.insert(1, '../utils/')
610
from transfer import _send_msg, _recv_msg, _wait_recv_msg
7-
from info import MONITOR_IPs, OSD_IPs, HEARTBEAT_PORT, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
11+
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
12+
13+
814

9-
STORAGE_ID = 1
15+
STORAGE_ID = "osd_id1"
1016

1117

12-
def report_monitor(node_ip, node_id):
18+
def report_monitor(node_ip, osd_id):
1319
flag = 0
1420
#Will call monitor to state about the down node
1521
soc = socket.socket()
1622
soc.settimeout(5)
1723
print ("Socket successfully created for Recovery: Primary")
1824

19-
monitor_1 = MONITOR_IPs["primary"]
25+
monitor_1 = monitor_ip["primary"]
2026

2127
try :
2228
soc.connect((monitor_1["ip"], monitor_1["port"]))
2329
# soc.timeout(None)
2430

2531
print(f"Connecting Primary monitor...")
2632

27-
res = {"type": "FAIL", "ip" : node_ip, "id" : node_id}
33+
res = {"type": "FAIL", "ip" : node_ip, "osd_id" : osd_id}
2834
_send_msg(soc, res)
2935

3036
msg = _wait_recv_msg(soc, MSG_SIZE)
@@ -47,14 +53,14 @@ def report_monitor(node_ip, node_id):
4753
print ("Socket successfully created for Recovery: Backup")
4854

4955

50-
monitor_2 = MONITOR_IPs["backup"]
56+
monitor_2 = monitor_ip["backup"]
5157

5258
try :
5359
soc.connect((monitor_2["ip"], monitor_2["port"]))
5460
soc.settimeout(None)
5561
print(f"Connecting Backup monitor...")
5662

57-
res = {"type": "FAIL", "ip" : node_ip, "id" : node_id}
63+
res = {"type": "FAIL", "ip" : node_ip, "osd_id" : osd_id}
5864
_send_msg(soc, res)
5965

6066
msg = _wait_recv_msg(soc, MSG_SIZE)
@@ -79,35 +85,36 @@ def recv_gossip():
7985

8086
i=0
8187
for i in range(4):
82-
if i+1 != STORAGE_ID:
83-
node_ip = OSD_IPs[i+1]["ip"]
84-
port = OSD_IPs[i+1]["port"]
88+
curr_osd = "osd_id" + str(i+1)
89+
if curr_osd != STORAGE_ID:
90+
node_ip = storage_ip[curr_osd]["ip"]
91+
port = storage_ip[curr_osd]["port"]
8592

8693
soc = socket.socket()
8794
soc.settimeout(5)
88-
print(f"\n\nSocket successfully created for Gossip with osd {i+1}")
95+
print(f"\n\nSocket successfully created for Gossip with osd {curr_osd}")
8996

9097
try :
9198
soc.connect((node_ip, port))
9299
soc.settimeout(None)
93100

94-
print(f"Connecting {node_ip} storage node number {i+1} port {port}")
101+
print(f"Connecting {node_ip} storage node number {curr_osd} port {port}")
95102

96103
msg = {"type": "ALIVE"}
97104
_send_msg(soc, msg)
98105

99106
rec = _wait_recv_msg(soc, MSG_SIZE)
100107
print(msg)
101108
if rec == None:
102-
print(f"Didn't receive data to Storage {i+1} ip {node_ip}! [Timeout] ")
103-
report_monitor(node_ip, i+1)
109+
print(f"Didn't receive data to Storage {curr_osd} ip {node_ip}! [Timeout] ")
110+
report_monitor(node_ip, curr_osd)
104111

105112
elif rec["type"] != "ACK":
106-
report_monitor(node_ip, i+1)
113+
report_monitor(node_ip, curr_osd)
107114

108115
except :
109-
print(f"Didn't Connect to Storage {i+1} ip {node_ip}! [Timeout]")
110-
report_monitor(node_ip, i+1)
116+
print(f"Didn't Connect to Storage {curr_osd} ip {node_ip}! [Timeout]")
117+
report_monitor(node_ip, curr_osd)
111118

112119
soc.close()
113120

@@ -121,13 +128,12 @@ def send_heartbeat():
121128

122129
s = socket.socket()
123130
print ("Socket successfully created for Heartbeat")
124-
port = HEARTBEAT_PORT
131+
port = storage_ip[STORAGE_ID]["port"]
125132
s.bind(('', port))
126133
print ("Socket binded to %s" %(port))
127134

128135
# put the socket into listening mode
129136
s.listen(5)
130-
print ("Socket is listening")
131137

132138
while True:
133139
# Establish connection with client.
@@ -154,9 +160,29 @@ def send_heartbeat():
154160
os._exit(1)
155161

156162

157-
if __name__ == "__main__":
163+
def read_write_pg_replication():
164+
#This will check for incoming messages
165+
#from other nodes to replicate certain data
166+
167+
s = socket.socket()
168+
print ("Socket successfully created for Read-Write only for replication")
169+
# As mentioned in the info file
170+
# Read/write request will be send on "port + 10"
171+
port = storage_ip[STORAGE_ID]["port"] + 10
172+
s.bind(('', port))
173+
print ("Socket binded to %s" %(port))
174+
175+
# put the socket into listening mode
176+
s.listen(5)
177+
178+
while True:
179+
replicate_pg(s)
180+
158181

182+
def _run_main_osd_gossip_cum_recovery():
159183
p1 = threading.Thread(target=send_heartbeat)
160184
p2 = threading.Thread(target=recv_gossip)
185+
p3 = threading.Thread(target=read_write_pg_replication)
161186
p1.start()
162-
p2.start()
187+
p2.start()
188+
p3.start()

0 commit comments

Comments
 (0)