16
16
from transfer import _send_msg , _recv_msg , _wait_recv_msg
17
17
from monitor_gossip import heartbeat_protocol
18
18
from info import MDS_IPs , MDS_PORT , WRITE_ACK_PORT , OSD_INACTIVE_STATUS_PORT , CLIENT_REQ_PORT , \
19
- RECV_PRIMARY_UPDATE
19
+ RECV_PRIMARY_UPDATE_PORT , MSG_SIZE
20
20
21
21
hashtable = {}
22
22
MDS_flags = {}
28
28
# osd_list = osd_ids list corresponding to pg_id, if update_type == "hashtable"
29
29
# osd_data, else
30
30
def update_backup_monitor (update_type , pg_or_osd_ids_list , osd_list ):
31
- # if update_type == "hash_table":
32
- # for i in range(len(pg_or_osd_ids_list)):
33
- # hashtable[pg_or_osd_ids_list[i]] = osd_list[i]
34
- # else:
35
- # for i in range(len(pg_or_osd_ids_list)):
36
- # cluster_topology[pg_or_osd_ids_list[i]] = osd_list[i]
37
- pass
31
+ primary_update_socket = socket .socket ()
32
+ print ("primary update socket successfully created" )
33
+
34
+ try :
35
+ primary_update_socket .connect ((MDS_IPs ["backup" ], RECV_PRIMARY_UPDATE_PORT ))
36
+ msg = {"update_type" : update_type , "pg_or_osd_ids_list" : pg_or_osd_ids_list , \
37
+ "osd_list" : osd_list }
38
+ _send_msg (primary_update_socket , msg )
39
+ response = _wait_recv_msg (primary_update_socket , MSG_SIZE )
40
+ primary_update_socket .close ()
41
+ if response ["status" ] == "SUCCESS" :
42
+ print ("written to monitor backup successfully" )
43
+ else :
44
+ print ("write to monitor backup failed" )
45
+ except Exception as e :
46
+ print (e )
47
+ primary_update_socket .close ()
38
48
39
49
def recv_primary_update ():
40
- # recv_primary_update_socket = socket.socket()
41
- # print ("primary update socket successfully created")
42
- # recv_primary_update_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43
-
44
- # # reserve a port on your computer
45
- # port = RECV_PRIMARY_UPDATE
46
-
47
- # # Next bind to the port
48
- # # we have not entered any ip in the ip field
49
- # # instead we have inputted an empty string
50
- # # this makes the server listen to requests
51
- # # coming from other computers on the network
52
- # recv_primary_update_socket.bind(('', port))
53
- # print ("primary update socket bound to %s" %(port))
54
-
55
- # # put the socket into listening mode
56
- # recv_primary_update_socket.listen(5)
57
- # print ("socket is listening")
58
-
59
- # # a forever loop until we interrupt it or
60
- # # an error occurs
61
- # while True:
62
-
63
- # # Establish connection with osd
64
- # c, addr = recv_primary_update_socket.accept()
65
- # print ('Got connection from', addr)
66
- pass
50
+ recv_primary_update_socket = socket .socket ()
51
+ print ("recv primary update socket successfully created" )
52
+ recv_primary_update_socket .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
53
+
54
+ # reserve a port on your computer
55
+ port = RECV_PRIMARY_UPDATE_PORT
56
+
57
+ # Next bind to the port
58
+ # we have not entered any ip in the ip field
59
+ # instead we have inputted an empty string
60
+ # this makes the server listen to requests
61
+ # coming from other computers on the network
62
+ recv_primary_update_socket .bind (('' , port ))
63
+ print ("primary update socket bound to %s" % (port ))
64
+
65
+ # put the socket into listening mode
66
+ recv_primary_update_socket .listen (5 )
67
+ print ("primary update socket is listening" )
68
+
69
+ # a forever loop until we interrupt it or
70
+ # an error occurs
71
+ while True :
72
+
73
+ # Establish connection with osd
74
+ c , addr = recv_primary_update_socket .accept ()
75
+ print ('Got connection from' , addr )
76
+
77
+ # recv the update
78
+ update = _recv_msg (c , 1024 )
79
+ print (update )
80
+
81
+ if update ["update_type" ] == "hash_table" :
82
+ for i in range (len (update ["pg_or_osd_ids_list" ])):
83
+ hashtable [update ["pg_or_osd_ids_list" ][i ]] = update ["osd_list" ][i ]
84
+ else :
85
+ for i in range (len (update ["pg_or_osd_ids_list" ])):
86
+ cluster_topology [update ["pg_or_osd_ids_list" ][i ]] = update ["osd_list" ][i ]
87
+
88
+ hashtable_file = open ('hashtable' , 'wb' )
89
+ hashtable_dump = pickle .dumps (hashtable )
90
+ hashtable_file .write (hashtable_dump )
91
+ hashtable_file .close ()
92
+
93
+ cluster_topology_file = open ('cluster_topology' , 'wb' )
94
+ cluster_topology_dump = pickle .dumps (cluster_topology )
95
+ cluster_topology_file .write (cluster_topology_dump )
96
+ cluster_topology_file .close ()
97
+
98
+ # send the acknowledgement
99
+ c .close ()
100
+
101
+ recv_primary_update_socket .close ()
67
102
68
103
def recv_write_acks ():
69
104
global hashtable , cluster_topology , MDS_IP , MDS_flags
@@ -272,20 +307,28 @@ def recv_client_reqs():
272
307
273
308
i = 0
274
309
for osd_id in cluster_topology :
275
- if cluster_topology [osd_id ]["free_space" ] > size :
310
+ if cluster_topology [osd_id ]["free_space" ] > size and cluster_topology [ osd_id ][ "status" ] == 0 :
276
311
hashtable [pg_id ].append ([osd_id , 0 ])
277
312
i = i + 1
278
313
279
314
if (i > 2 ):
280
315
break
281
-
316
+
317
+ if i < 3 :
318
+ print ("less than two osds are free/alive" )
319
+ response = {"status" :"ERROR" , "msg" : "sufficent storage not available" }
320
+ _send_msg (c , response )
321
+ c .close ()
322
+ continue
323
+
324
+
282
325
osd_ids = [osd [0 ] for osd in hashtable [pg_id ]]
283
326
284
327
addrs = {}
285
328
for osd_id in osd_ids :
286
329
addrs [osd_id ] = (cluster_topology [osd_id ]["ip" ], cluster_topology [osd_id ]["port" ])
287
- osds_dict = {"osd_ids" : osd_ids , "addrs" : addrs }
288
-
330
+ osd_dict = {"osd_ids" : osd_ids , "addrs" : addrs }
331
+ response = { "osd_dict" : osd_dict , "status" : "SUCCESS" , "msg" : "written succefully" }
289
332
# updating the backup(only hash_table)
290
333
# update_backup_monitor("hash_table", [pg_id], [hashtable[pg_id]])
291
334
@@ -294,7 +337,7 @@ def recv_client_reqs():
294
337
hashtable_file .write (hashtable_dump )
295
338
hashtable_file .close ()
296
339
297
- _send_msg (c , osds_dict )
340
+ _send_msg (c , response )
298
341
299
342
elif req ["type" ] == "READ" :
300
343
pg_id = req ["pg_id" ]
0 commit comments