Skip to content

Commit 5d2ac17

Browse files
committed
Running HeartBeat & Replication...Yayyyyygit status
1 parent f6fc2b3 commit 5d2ac17

23 files changed

+826
-7
lines changed
675 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.
677 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

Test Code/Storage-1/storage_replication.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import socket
22
import pickle
33
import os
4-
from transfer import _send_msg, _recv_msg
4+
from transfer import _send_msg, _recv_msg, _wait_recv_msg
55
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
66

77

@@ -39,7 +39,8 @@ def replicate_pg(soc):
3939
print(f"Socket created to send request to SAVE data at OSD {osd_id}")
4040

4141
ip_add = storage_ip[osd_id]["ip"]
42-
port = storage_ip[osd_id]["port"]
42+
port = storage_ip[osd_id]["port"] + 10
43+
# Remember replication is on normal port + 10
4344

4445
try :
4546
new_soc.connect((ip_add, port))
@@ -72,13 +73,18 @@ def replicate_pg(soc):
7273
pg_id = msg["pg_id"]
7374
pg = msg["pg"]
7475

75-
file = open("./data/"+ pg_id, 'wb')
76+
try :
77+
file = open("./data/"+ pg_id, 'wb')
78+
print("Replicated data is saved")
7679

77-
pg_dump = pickle.dump(pg)
78-
file.write(pg_dump)
80+
pg_dump = pickle.dump(pg)
81+
file.write(pg_dump)
82+
83+
except Exception as e:
84+
print(e)
7985

8086
msg = {"type":"ACK"}
81-
_send_msg(msg)
87+
_send_msg(c, msg)
8288
print("Write successful..send back Ack to the master osd")
8389

8490

@@ -87,5 +93,5 @@ def replicate_pg(soc):
8793

8894
c.close()
8995

90-
print(f"Exiting from pid os.getpid() ..")
96+
print(f"Exiting from pid {os.getpid()} ..")
9197
os._exit(1)
677 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

Test Code/Storage-2/data/pg_id1

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
�}�.

Test Code/Storage-2/info.py

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"""
2+
@author: Kartik Saini
3+
4+
"""
5+
6+
mds_ip = {"primary":{"ip":"", "port":0}, "backup":{"ip":"", "port":0}}
7+
8+
num_objects_per_file = 1
9+
10+
max_num_objects_per_pg = 1
11+
12+
_port = 9999
13+
14+
MSG_SIZE = 1024
15+
HEADERSIZE = 10
16+
17+
"""
18+
Message Name/type Harshit is using :
19+
ALIVE = will be send to check the heartbeat
20+
ACK = will be send in return of heartbeat
21+
FAIL = will be send by storage node to monitor along with the IP or storage number
22+
"""
23+
24+
dir_tree = {
25+
"dir1":{},
26+
"dir2":{}
27+
}
28+
29+
"""
30+
PG -> osds hash table on monitor
31+
32+
OSDs -> ip, port .. for live osds on monitor
33+
34+
OSD -> replicas ... for monitoring on OSD
35+
36+
"""
37+
38+
# Update all the stroage node IP everytime
39+
storage_1 = {"ip":"127.0.0.1", "port":10001}
40+
storage_2 = {"ip":"127.0.0.1", "port":10002}
41+
storage_3 = {"ip":"127.0.0.1", "port":10003}
42+
storage_4 = {"ip":"4.0.0.0", "port":10004}
43+
44+
storage_ip = {"osd_id1":storage_1,"osd_id2":storage_2,"osd_id3":storage_3,"osd_id4":storage_4}
45+
46+
# add 10 to port to get read/write IP
47+
48+
49+
# Update all the monitor node IP
50+
monitor_1 = {"ip":"127.0.0.1", "port":4002}
51+
monitor_2 = {"ip":"127.0.0.1", "port":4002}
52+
monitor_ip = {"primary": monitor_1, "backup": monitor_2}

Test Code/Storage-2/storage_gossip.py

+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import socket
2+
import pickle
3+
import threading
4+
import time
5+
import os
6+
from transfer import _send_msg, _recv_msg, _wait_recv_msg
7+
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
8+
9+
from storage_replication import replicate_pg
10+
11+
STORAGE_ID = "osd_id2"
12+
13+
14+
def report_monitor(node_ip, osd_id):
15+
flag = 0
16+
#Will call monitor to state about the down node
17+
soc = socket.socket()
18+
soc.settimeout(5)
19+
print ("Socket successfully created for Recovery: Primary")
20+
21+
monitor_1 = monitor_ip["primary"]
22+
23+
try :
24+
soc.connect((monitor_1["ip"], monitor_1["port"]))
25+
# soc.timeout(None)
26+
27+
print(f"Connecting Primary monitor...")
28+
29+
res = {"type": "FAIL", "ip" : node_ip, "osd_id" : osd_id}
30+
_send_msg(soc, res)
31+
32+
msg = _wait_recv_msg(soc, MSG_SIZE)
33+
print(msg)
34+
if msg == None:
35+
pass
36+
elif msg["type"] == "ACK":
37+
38+
soc.close()
39+
return
40+
41+
except Exception as e:
42+
print(e)
43+
print("Didn't Connect! [Timeout] Primary Monitor is Down")
44+
45+
soc.close()
46+
47+
soc = socket.socket()
48+
soc.settimeout(5)
49+
print ("Socket successfully created for Recovery: Backup")
50+
51+
52+
monitor_2 = monitor_ip["backup"]
53+
54+
try :
55+
soc.connect((monitor_2["ip"], monitor_2["port"]))
56+
soc.settimeout(None)
57+
print(f"Connecting Backup monitor...")
58+
59+
res = {"type": "FAIL", "ip" : node_ip, "osd_id" : osd_id}
60+
_send_msg(soc, res)
61+
62+
msg = _wait_recv_msg(soc, MSG_SIZE)
63+
print(msg)
64+
if msg == None:
65+
pass
66+
elif msg["type"] == "ACK":
67+
return
68+
69+
except:
70+
print("MAY GOD HELP US!! WE ARE DOOMED\n\n")
71+
72+
soc.close()
73+
74+
75+
76+
def recv_gossip():
77+
78+
while True:
79+
time.sleep(10)
80+
# Wait for 10 sec to run this protocol
81+
82+
i=0
83+
for i in range(4):
84+
curr_osd = "osd_id" + str(i+1)
85+
if curr_osd != STORAGE_ID:
86+
node_ip = storage_ip[curr_osd]["ip"]
87+
port = storage_ip[curr_osd]["port"]
88+
89+
soc = socket.socket()
90+
soc.settimeout(5)
91+
print(f"\n\nSocket successfully created for Gossip with osd {curr_osd}")
92+
93+
try :
94+
soc.connect((node_ip, port))
95+
soc.settimeout(None)
96+
97+
print(f"Connecting {node_ip} storage node number {curr_osd} port {port}")
98+
99+
msg = {"type": "ALIVE"}
100+
_send_msg(soc, msg)
101+
102+
rec = _wait_recv_msg(soc, MSG_SIZE)
103+
print(msg)
104+
if rec == None:
105+
print(f"Didn't receive data to Storage {curr_osd} ip {node_ip}! [Timeout] ")
106+
report_monitor(node_ip, curr_osd)
107+
108+
elif rec["type"] != "ACK":
109+
report_monitor(node_ip, curr_osd)
110+
111+
except :
112+
print(f"Didn't Connect to Storage {curr_osd} ip {node_ip}! [Timeout]")
113+
report_monitor(node_ip, curr_osd)
114+
115+
soc.close()
116+
117+
time.sleep(3)
118+
119+
120+
121+
def send_heartbeat():
122+
#This will check for incoming messages
123+
#from other nodes and reply
124+
125+
s = socket.socket()
126+
print ("Socket successfully created for Heartbeat")
127+
port = storage_ip[STORAGE_ID]["port"]
128+
s.bind(('', port))
129+
print ("Socket binded to %s" %(port))
130+
131+
# put the socket into listening mode
132+
s.listen(5)
133+
134+
while True:
135+
# Establish connection with client.
136+
c, addr = s.accept()
137+
print(f"\nGot connection from {addr}")
138+
139+
n = os.fork()
140+
if n == 0:
141+
print(F"Inside child process {os.getpid()}")
142+
msg = _recv_msg(c, MSG_SIZE)
143+
print(msg)
144+
145+
if msg == None:
146+
print(f"Didn't receive data from ip {addr}! [Timeout] ")
147+
report_monitor(addr, None)
148+
149+
elif msg["type"] == "ALIVE":
150+
res = {"type": "ACK"}
151+
_send_msg(c, res)
152+
153+
c.close()
154+
155+
print(f"Exiting from pid {os.getpid()} ..\n\n")
156+
os._exit(1)
157+
158+
159+
def read_write_pg_replication():
160+
#This will check for incoming messages
161+
#from other nodes to replicate certain data
162+
163+
s = socket.socket()
164+
print ("Socket successfully created for Read-Write only for replication")
165+
# As mentioned in the info file
166+
# Read/write request will be send on "port + 10"
167+
port = storage_ip[STORAGE_ID]["port"] + 10
168+
s.bind(('', port))
169+
print ("Socket binded to %s" %(port))
170+
171+
# put the socket into listening mode
172+
s.listen(5)
173+
174+
while True:
175+
replicate_pg(s)
176+
177+
178+
if __name__ == "__main__":
179+
180+
p1 = threading.Thread(target=send_heartbeat)
181+
p2 = threading.Thread(target=recv_gossip)
182+
p3 = threading.Thread(target=read_write_pg_replication)
183+
p1.start()
184+
p2.start()
185+
p3.start()
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import socket
2+
import pickle
3+
import os
4+
from transfer import _send_msg, _recv_msg, _wait_recv_msg
5+
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
6+
7+
8+
def replicate_pg(soc):
9+
# This will be always true loop that will run in parallel
10+
11+
c, addr = soc.accept()
12+
print (f"Got connection from {addr} to do the replication")
13+
14+
n = os.fork()
15+
16+
if n==0:
17+
msg = _recv_msg(c, MSG_SIZE)
18+
print(msg)
19+
20+
if msg == None:
21+
print(f"Didn't receive data! [Timeout] {addr}")
22+
msg = {"type":"SEND AGAIN"}
23+
_send_msg(c, msg)
24+
25+
elif msg["type"] == "REPLICATE":
26+
# Can do one thing here
27+
# call the func that act as access this pg_id for the client
28+
# then call other function that act as send this pg to osd to do a write
29+
pg_id = msg["pg_id"]
30+
osd_id = msg["osd_id"]
31+
32+
file = open("./data/"+pg_id, 'rb')
33+
34+
pg_b = file.read()
35+
pg = pickle.loads(pg_b)
36+
37+
## Connect to this osd_id with new socket
38+
new_soc = socket.socket()
39+
print(f"Socket created to send request to SAVE data at OSD {osd_id}")
40+
41+
ip_add = storage_ip[osd_id]["ip"]
42+
port = storage_ip[osd_id]["port"] + 10
43+
# Remember replication is on normal port + 10
44+
45+
try :
46+
new_soc.connect((ip_add, port))
47+
print(f"Connection made with {ip_add} on {port}")
48+
49+
msg = {"type":"SAVE", "pg_id":pg_id, "pg": pg}
50+
_send_msg(new_soc, msg)
51+
print("Msg send to SAVE the data")
52+
53+
msg = _wait_recv_msg(new_soc, MSG_SIZE)
54+
print("Msg received !")
55+
56+
except Exception as e:
57+
print(e)
58+
59+
if msg == None:
60+
res = {"type": "REPLICATION FAIL"}
61+
_send_msg(c, res)
62+
print("Fail Msg send to the monitor")
63+
64+
elif msg["type"] == "ACK":
65+
res = {"type": "ACK"}
66+
_send_msg(c, res)
67+
print("Success Msg send to the monitor")
68+
69+
70+
elif msg["type"] == "SAVE":
71+
# This will take request from other osd to save some data
72+
# This is basic replication strategy
73+
pg_id = msg["pg_id"]
74+
pg = msg["pg"]
75+
76+
try :
77+
file = open("./data/"+ pg_id, 'wb')
78+
print("Replicated data is saved")
79+
80+
pg_dump = pickle.dump(pg)
81+
file.write(pg_dump)
82+
83+
except Exception as e:
84+
print(e)
85+
86+
msg = {"type":"ACK"}
87+
_send_msg(c, msg)
88+
print("Write successful..send back Ack to the master osd")
89+
90+
91+
else:
92+
print("[ERROR] Check the code in replication")
93+
94+
c.close()
95+
96+
print(f"Exiting from pid {os.getpid()} ..")
97+
os._exit(1)

0 commit comments

Comments
 (0)