Skip to content

Commit f6fc2b3

Browse files
committed
Final Running Replication Code
1 parent 0da95c6 commit f6fc2b3

11 files changed

+40
-28
lines changed
-675 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

Test Code/Monitor/info.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@
3636
"""
3737

3838
# Update all the stroage node IP everytime
39-
storage_1 = {"ip":"127.0.0.1", "port":7001}
40-
storage_2 = {"ip":"127.0.0.1", "port":7002}
41-
storage_3 = {"ip":"127.0.0.1", "port":7003}
42-
storage_4 = {"ip":"4.0.0.0", "port":7004}
39+
storage_1 = {"ip":"127.0.0.1", "port":10001}
40+
storage_2 = {"ip":"127.0.0.1", "port":10002}
41+
storage_3 = {"ip":"127.0.0.1", "port":10003}
42+
storage_4 = {"ip":"4.0.0.0", "port":10004}
4343

4444
storage_ip = {"osd_id1":storage_1,"osd_id2":storage_2,"osd_id3":storage_3,"osd_id4":storage_4}
4545

4646
# add 10 to port to get read/write IP
4747

4848

4949
# Update all the monitor node IP
50-
monitor_1 = {"ip":"127.0.0.1", "port":5002}
51-
monitor_2 = {"ip":"127.0.0.1", "port":5002}
50+
monitor_1 = {"ip":"127.0.0.1", "port":4002}
51+
monitor_2 = {"ip":"127.0.0.1", "port":4002}
5252
monitor_ip = {"primary": monitor_1, "backup": monitor_2}

Test Code/Monitor/monitor_replicate.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import socket
22
import pickle
3+
import time
34
from transfer import _send_msg, _recv_msg, _wait_recv_msg
45
from info import mds_ip, monitor_ip, storage_ip, num_objects_per_file, max_num_objects_per_pg, MSG_SIZE, HEADERSIZE
56

@@ -12,26 +13,30 @@ def send_replicate_request(pg_id, master_osd, clone_osd):
1213
# reserve a port on your computer in our
1314
ip_add = storage_ip[master_osd]["ip"]
1415
port = storage_ip[master_osd]["port"] + 10
15-
16+
1617
flag = -1
1718
try :
1819
soc.connect((ip_add, port))
19-
soc.timeout(None)
20+
soc.settimeout(None)
2021

21-
print(f"Connecting with osd {osd_id}...")
22+
print(f"Connecting with {master_osd} on port {port}...")
2223

23-
res = {"type": "REPLICATE", "pg_id" : pg_id, "osd" : osd_id}
24+
res = {"type": "REPLICATE", "pg_id" : pg_id, "osd_id" : clone_osd}
2425
_send_msg(soc, res)
2526
print("Message send for replication ")
27+
28+
# time.sleep(5)
2629

27-
msg = _wait_recv_msg(c, MSG_SIZE)
30+
msg = _wait_recv_msg(soc, MSG_SIZE)
31+
print(msg)
2832
if msg == None:
2933
flag = 0
3034
elif msg["type"] == "ACK":
3135
flag = 1
3236

33-
except :
34-
print("Didn't Connect! [Timeout] Master OSD is Down")
37+
except Exception as e:
38+
print(e)
39+
print(f"Didn't Connect! [Timeout] master {master_osd} is down, port {port}")
3540

3641
soc.close()
3742
return flag
@@ -55,10 +60,12 @@ def replicate(pg_id, osd_id_master_list, osd_id_clone_dict):
5560
add_entry.append([clone, True])
5661
break
5762
elif ret == -1:
63+
print("may try send_replicate_request again or continue to next master")
5864
# may try send_replicate_request again
5965
# or continue to next master
6066
continue
6167
elif ret == 0:
68+
print("eans master osd server break before sending the ack")
6269
# means master osd server break before sending the ack
6370
continue
6471
else:
-677 Bytes
Binary file not shown.
Binary file not shown.
Binary file not shown.

Test Code/Storage-1/data/pg_id1

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
�}�.

Test Code/Storage-1/info.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@
3636
"""
3737

3838
# Update all the stroage node IP everytime
39-
storage_1 = {"ip":"127.0.0.1", "port":7001}
40-
storage_2 = {"ip":"127.0.0.1", "port":7002}
41-
storage_3 = {"ip":"127.0.0.1", "port":7003}
42-
storage_4 = {"ip":"4.0.0.0", "port":7004}
39+
storage_1 = {"ip":"127.0.0.1", "port":10001}
40+
storage_2 = {"ip":"127.0.0.1", "port":10002}
41+
storage_3 = {"ip":"127.0.0.1", "port":10003}
42+
storage_4 = {"ip":"4.0.0.0", "port":10004}
4343

4444
storage_ip = {"osd_id1":storage_1,"osd_id2":storage_2,"osd_id3":storage_3,"osd_id4":storage_4}
4545

4646
# add 10 to port to get read/write IP
4747

4848

4949
# Update all the monitor node IP
50-
monitor_1 = {"ip":"127.0.0.1", "port":5002}
51-
monitor_2 = {"ip":"127.0.0.1", "port":5002}
50+
monitor_1 = {"ip":"127.0.0.1", "port":4002}
51+
monitor_2 = {"ip":"127.0.0.1", "port":4002}
5252
monitor_ip = {"primary": monitor_1, "backup": monitor_2}

Test Code/Storage-1/storage_replication.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def replicate_pg(soc):
99
# This will be always true loop that will run in parallel
1010

1111
c, addr = soc.accept()
12-
print ('Got connection from', addr )
12+
print (f"Got connection from {addr} to do the replication")
1313

1414
n = os.fork()
1515

@@ -19,6 +19,8 @@ def replicate_pg(soc):
1919

2020
if msg == None:
2121
print(f"Didn't receive data! [Timeout] {addr}")
22+
msg = {"type":"SEND AGAIN"}
23+
_send_msg(c, msg)
2224

2325
elif msg["type"] == "REPLICATE":
2426
# Can do one thing here
@@ -39,17 +41,19 @@ def replicate_pg(soc):
3941
ip_add = storage_ip[osd_id]["ip"]
4042
port = storage_ip[osd_id]["port"]
4143

42-
new_soc.connect((ip_add, port))
43-
print(f"Connection made with {ip_add} on {port}")
44+
try :
45+
new_soc.connect((ip_add, port))
46+
print(f"Connection made with {ip_add} on {port}")
4447

45-
msg = {"type":"SAVE", "pg_id":pg_id, "pg": pg}
46-
_send_msg(new_soc, msg)
47-
print("Msg send to SAVE the data")
48+
msg = {"type":"SAVE", "pg_id":pg_id, "pg": pg}
49+
_send_msg(new_soc, msg)
50+
print("Msg send to SAVE the data")
4851

49-
time.sleep(3)
52+
msg = _wait_recv_msg(new_soc, MSG_SIZE)
53+
print("Msg received !")
5054

51-
msg = _recv_msg(new_soc, MSG_SIZE)
52-
print("Msg received !")
55+
except Exception as e:
56+
print(e)
5357

5458
if msg == None:
5559
res = {"type": "REPLICATION FAIL"}

0 commit comments

Comments
 (0)