16
16
from transfer import _send_msg , _recv_msg , _wait_recv_msg
17
17
# from monitor_gossip import heartbeat_protocol
18
18
from info import MDS_IPs , MDS_PORT , WRITE_ACK_PORT , OSD_INACTIVE_STATUS_PORT , CLIENT_REQ_PORT , \
19
- RECV_PRIMARY_UPDATE_PORT , MSG_SIZE
19
+ RECV_PRIMARY_UPDATE_PORT , MSG_SIZE , MONITOR_IPs
20
20
21
21
# hashtable = {}
22
22
MDS_flags = {}
23
23
cluster_topology = {}
24
24
MDS_IP = MDS_IPs ["primary" ]["ip" ]
25
+ isPrimary = True
25
26
26
27
# pg_or_osd_list = pg_ids list, if update_type == "hashtable"
27
28
# osd_ids_list, else
@@ -32,7 +33,7 @@ def update_backup_monitor(update_type, pg_or_osd_ids_list, osd_list):
32
33
print ("primary update socket successfully created" )
33
34
34
35
try :
35
- primary_update_socket .connect ((MDS_IPs ["backup" ][ "ip " ], RECV_PRIMARY_UPDATE_PORT ))
36
+ primary_update_socket .connect ((MONITOR_IPs ["backup" ], RECV_PRIMARY_UPDATE_PORT ))
36
37
msg = {"update_type" : update_type , "pg_or_osd_ids_list" : pg_or_osd_ids_list , \
37
38
"osd_list" : osd_list }
38
39
_send_msg (primary_update_socket , msg )
@@ -46,62 +47,6 @@ def update_backup_monitor(update_type, pg_or_osd_ids_list, osd_list):
46
47
print (e )
47
48
primary_update_socket .close ()
48
49
49
- def recv_primary_update ():
50
- recv_primary_update_socket = socket .socket ()
51
- print ("recv primary update socket successfully created" )
52
- recv_primary_update_socket .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
53
-
54
- # reserve a port on your computer
55
- port = RECV_PRIMARY_UPDATE_PORT
56
-
57
- # Next bind to the port
58
- # we have not entered any ip in the ip field
59
- # instead we have inputted an empty string
60
- # this makes the server listen to requests
61
- # coming from other computers on the network
62
- recv_primary_update_socket .bind (('' , port ))
63
- print ("primary update socket bound to %s" % (port ))
64
-
65
- # put the socket into listening mode
66
- recv_primary_update_socket .listen (5 )
67
- print ("primary update socket is listening" )
68
-
69
- # a forever loop until we interrupt it or
70
- # an error occurs
71
- while True :
72
-
73
- # Establish connection with osd
74
- c , addr = recv_primary_update_socket .accept ()
75
- print ('Got connection from' , addr )
76
-
77
- # recv the update
78
- update = _recv_msg (c , 1024 )
79
- print (update )
80
- # hashtable = _read
81
-
82
- if update ["update_type" ] == "hash_table" :
83
- for i in range (len (update ["pg_or_osd_ids_list" ])):
84
- hashtable [update ["pg_or_osd_ids_list" ][i ]] = update ["osd_list" ][i ]
85
- else :
86
- for i in range (len (update ["pg_or_osd_ids_list" ])):
87
- cluster_topology [update ["pg_or_osd_ids_list" ][i ]] = update ["osd_list" ][i ]
88
-
89
- hashtable_file = open ('hashtable' , 'wb' )
90
- hashtable_dump = pickle .dumps (hashtable )
91
- hashtable_file .write (hashtable_dump )
92
- hashtable_file .close ()
93
-
94
- cluster_topology_file = open ('cluster_topology' , 'wb' )
95
- cluster_topology_dump = pickle .dumps (cluster_topology )
96
- cluster_topology_file .write (cluster_topology_dump )
97
- cluster_topology_file .close ()
98
- msg = {"status" :"SUCCESS" }
99
- _send_msg (c , msg )
100
- # send the acknowledgement
101
- c .close ()
102
-
103
- recv_primary_update_socket .close ()
104
-
105
50
def recv_write_acks ():
106
51
global hashtable , cluster_topology , MDS_IP , MDS_flags
107
52
@@ -164,10 +109,11 @@ def recv_write_acks():
164
109
165
110
cluster_topology [osd_id ]["free_space" ] = free_space
166
111
167
- # updating the backup
168
- update_backup_monitor ("hash_table" , [pg_id ], [hashtable [pg_id ]])
169
- update_backup_monitor ("cluster_topology" , [osd [0 ] for osd in hashtable [pg_id ]], \
170
- [cluster_topology [osd [0 ]] for osd in hashtable [pg_id ]])
112
+ if isPrimary :
113
+ # updating the backup
114
+ update_backup_monitor ("hash_table" , [pg_id ], [hashtable [pg_id ]])
115
+ update_backup_monitor ("cluster_topology" , [osd [0 ] for osd in hashtable [pg_id ]], \
116
+ [cluster_topology [osd [0 ]] for osd in hashtable [pg_id ]])
171
117
172
118
hashtable_file = open ('hashtable' , 'wb' )
173
119
cluster_topology_file = open ('cluster_topology' , 'wb' )
@@ -331,8 +277,9 @@ def recv_client_reqs():
331
277
addrs [osd_id ] = (cluster_topology [osd_id ]["ip" ], cluster_topology [osd_id ]["port" ])
332
278
osd_dict = {"osd_ids" : osd_ids , "addrs" : addrs }
333
279
response = {"osd_dict" : osd_dict , "status" :"SUCCESS" , "msg" : "written succefully" }
334
- # updating the backup(only hash_table)
335
- update_backup_monitor ("hash_table" , [pg_id ], [hashtable [pg_id ]])
280
+ if isPrimary :
281
+ # updating the backup(only hash_table)
282
+ update_backup_monitor ("hash_table" , [pg_id ], [hashtable [pg_id ]])
336
283
337
284
hashtable_file = open ('hashtable' , 'wb' )
338
285
hashtable_dump = pickle .dumps (hashtable )
@@ -355,6 +302,8 @@ def recv_client_reqs():
355
302
356
303
357
304
def main (argc , argv ):
305
+ global isPrimary
306
+
358
307
if argc < 2 :
359
308
print ('usage: python3 main.py <monitor_type>' ) # monitor_type = "primary" or "backup"
360
309
exit (- 1 )
@@ -399,7 +348,7 @@ def main(argc, argv):
399
348
# }
400
349
# }
401
350
402
- global hashtable , cluster_topology , MDS_flags
351
+ global hashtable , cluster_topology , MDS_flags , MDS_IP
403
352
404
353
hashtable_file = open ('hashtable' , 'rb' )
405
354
hashtable_dump = hashtable_file .read ()
@@ -418,28 +367,25 @@ def main(argc, argv):
418
367
MDS_flags_file .close ()
419
368
cluster_topology_file .close ()
420
369
421
- if isPrimary :
422
- ## THREADS
423
- # write_acks_thread : receives write acks from osds
424
- # client_reqs_thread : receives client reqs from the client and
425
- # sends back the osds' addresses
426
- # osd_inactive_status_thread : receives the osd_ids which are inactive
427
-
428
- write_acks_thread = threading .Thread (target = recv_write_acks )
429
- client_reqs_thread = threading .Thread (target = recv_client_reqs )
430
- osd_inactive_status_thread = threading .Thread (target = recv_inactive_osd )
431
-
432
- # starting the threads
433
- write_acks_thread .start ()
434
- client_reqs_thread .start ()
435
- osd_inactive_status_thread .start ()
436
-
437
- # closing the threads
438
- write_acks_thread .join ()
439
- client_reqs_thread .join ()
440
- osd_inactive_status_thread .join ()
441
- else :
442
- recv_primary_update ()
370
+ ## THREADS
371
+ # write_acks_thread : receives write acks from osds
372
+ # client_reqs_thread : receives client reqs from the client and
373
+ # sends back the osds' addresses
374
+ # osd_inactive_status_thread : receives the osd_ids which are inactive
375
+
376
+ write_acks_thread = threading .Thread (target = recv_write_acks )
377
+ client_reqs_thread = threading .Thread (target = recv_client_reqs )
378
+ osd_inactive_status_thread = threading .Thread (target = recv_inactive_osd )
379
+
380
+ # starting the threads
381
+ write_acks_thread .start ()
382
+ client_reqs_thread .start ()
383
+ osd_inactive_status_thread .start ()
384
+
385
+ # closing the threads
386
+ write_acks_thread .join ()
387
+ client_reqs_thread .join ()
388
+ osd_inactive_status_thread .join ()
443
389
444
390
def _read_hash ():
445
391
hashtable_file = open ('hashtable' , 'rb' )
0 commit comments