Skip to content

Commit 59b0b17

Browse files
committed
vivek's demand
1 parent 5d2ac17 commit 59b0b17

File tree

6 files changed

+104
-48
lines changed

6 files changed

+104
-48
lines changed

client/client.py

+36-12
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,20 @@ def __init__(self, tree):
3333
# self.latest_file_id = tree["last_file_id"]
3434
self.processing = tree["processing"]
3535
self.logged_in = True
36-
self.gui()
3736
self.start_update_handler = False # True if some file is processing for upload
3837
self.update_interval = 2 # 2 sec interval for checking updates on file upload
3938
# self.write_update_recv = False
4039

40+
self.gui()
41+
42+
4143
def update_handler(self):
4244

4345
# if n == 0:
4446
print("update handler started..")
4547
if self.start_update_handler == True:
4648

4749
while True:
48-
time.sleep(self.update_interval)
4950
if len(self.processing) == 0:
5051
self.start_update_handler = False
5152
break
@@ -81,17 +82,24 @@ def update_handler(self):
8182

8283
for filename in file_written: #populate listbox again
8384
self.listbox.insert(END, filename)
84-
self.window.mainloop()
85+
tk.mainloop()
86+
# self.window.mainloop()
8587
elif response["status"] == "NO_UPD":
8688
print(response["msg"])
89+
time.sleep(self.update_interval)
90+
8791

8892
except Exception as e:
8993
s.close()
9094
print("Update failed")
9195
print(e)
96+
time.sleep(self.update_interval)
97+
9298

9399
finally:
94100
print("Exiting login..")
101+
time.sleep(self.update_interval)
102+
95103

96104

97105
def upload(self, file_path):
@@ -100,6 +108,10 @@ def upload(self, file_path):
100108
return
101109

102110
file_id, pg_list = self._chunker(file_path)
111+
112+
if file_id < 0:
113+
return
114+
103115
filename = os.path.basename(file_path)
104116
## send to OSD using sockets ; ask IP to monitor
105117
## using _write function
@@ -133,16 +145,19 @@ def download(self, file_id):
133145
print("DOWNLOAD failed")
134146

135147
res = -1
136-
148+
print(pg.pg_id)
149+
print(len(pg.object_list))
137150
for obj in pg.object_list:
151+
print("pg received..")
138152
if file_id == obj.file_id:
139153
data = obj.data
154+
print("writing file in disk..")
140155
file = open("downloads/"+file_name, "wb")
141156
file.write(data)
142157
file.close()
143158

144159
# print(self.dir_tree)
145-
self._print("[DOWNLOAD]", "Succesful")
160+
self._popup("Update", "Download Succesful "+str(file_name))
146161

147162

148163
def _read(self, pg_id):
@@ -155,7 +170,7 @@ def _read(self, pg_id):
155170

156171
s.connect((ip, port))
157172

158-
msg = {"type":"READ", "PG_id":pg_id}
173+
msg = {"type":"READ", "pg_id":pg_id}
159174
# d_msg = pickle.dumps(msg)
160175

161176
_send_msg(s, msg)
@@ -170,7 +185,7 @@ def _read(self, pg_id):
170185
# write on OSD
171186
s = socket.socket()
172187
s.connect(osd_addr)
173-
data_msg = {"type":"READ", "PG_id":pg_id}
188+
data_msg = {"type":"READ", "pg_id":pg_id}
174189
_send_msg(s, data_msg)
175190

176191
osd_response = _wait_recv_msg(s, 1024)
@@ -181,10 +196,12 @@ def _read(self, pg_id):
181196

182197
if osd_response["pg_id"] == pg_id and osd_response["res"] == "SUCCESS":
183198
s.close()
184-
return 0, osd_response["PG"]
199+
print("PG received from OSD writing on disk")
200+
return 0, osd_response["pg"]
185201

186202
else:
187203
s.close()
204+
print("Error - PG not received from OSD")
188205
return -2, None
189206

190207
def _write(self, pg, pg_data):
@@ -197,7 +214,7 @@ def _write(self, pg, pg_data):
197214

198215
s.connect((ip, port))
199216

200-
msg = {"type":"WRITE", "pg_id":pg.pg_id, "size":sys.getsizeof(pg)}
217+
msg = {"type":"WRITE", "pg_id":pg.pg_id, "size":(sys.getsizeof(pg)/float(1<<20))}
201218
# d_msg = pickle.dumps(msg)
202219

203220
_send_msg(s, msg)
@@ -213,8 +230,9 @@ def _write(self, pg, pg_data):
213230
osd_addr = osd_dict["addrs"][osd_dict["osd_ids"][0]]
214231
s = socket.socket()
215232
s.connect(osd_addr)
216-
data_msg = {"type":"CLIENT_WRITE", "PG":pg, "client_id":self.username, "client_addr":"", "osd_dict":osd_dict}
233+
data_msg = {"type":"CLIENT_WRITE", "pg":pg,"size":(sys.getsizeof(pg)/float(1<<20)), "client_id":self.username, "client_addr":"", "osd_dict":osd_dict}
217234
_send_msg(s, data_msg)
235+
print(len(pg.object_list))
218236

219237
### Add server to receive response
220238
osd_response = _wait_recv_msg(s, 1024)
@@ -234,8 +252,14 @@ def _write(self, pg, pg_data):
234252
return -2
235253

236254
def _chunker(self, file_path):
237-
file = open(file_path, 'rb')
238-
data = file.read()
255+
data = None
256+
try:
257+
file = open(file_path, 'rb')
258+
data = file.read()
259+
file.close()
260+
except Exception as e:
261+
print(e)
262+
return -1, None
239263
#size = sys.getsizeof(file)
240264
# print(data)
241265
file_ids = []

mds/mds.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ def dispatch_backup(self):
136136
res["status"] = "SUCCESS"
137137

138138
elif msg["update_type"] == "WRITE_SUCCESS":
139-
username = msg["user"]
139+
username = msg["update"]["user"]
140140
tree = self._read_tree(username)
141141

142-
pg_id = msg["pg_id"]
142+
pg_id = msg["update"]["pg_id"]
143143
direc = tree["processing"][pg_id][0]
144144
file_id = tree["processing"][pg_id][1]
145145
filename = tree["processing"][pg_id][2]
@@ -150,9 +150,9 @@ def dispatch_backup(self):
150150
res["status"] = "SUCCESS"
151151

152152
elif msg["update_type"] == "UPDATE_PROCESSING":
153-
username = msg["username"]
154-
pg_written = msg["pg_written"]
155-
pg_wait = msg["pg_wait"]
153+
username = msg["update"]["username"]
154+
pg_written = msg["update"]["pg_written"]
155+
pg_wait = msg["update"]["pg_wait"]
156156

157157
tree = self._read_tree(username)
158158

@@ -177,7 +177,7 @@ def dispatch_backup(self):
177177

178178
def update_handle(self, msg): # update will come from Monitor in cluster
179179
res = {"status":"", "pg_id":msg["PG_ID"], "msg":""}
180-
if msg["status"] == "SUCESS":
180+
if msg["status"] == "SUCCESS":
181181
username = msg["client_id"]
182182
tree = self._read_tree(username)
183183
pg_id = msg["PG_ID"]

monitor/init.py

+18-9
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,55 @@ def main():
1616
hashtable_file.write(hashtable_dump)
1717
hashtable_file.close()
1818

19+
MDS_flags = { # 0 - SUCCESS, 1 - ERROR
20+
# "pg_id1":0,
21+
# "pg_id2":0
22+
}
23+
MDS_flags_dump = pickle.dumps(MDS_flags)
24+
MDS_flags_file = open('MDS_flags', 'wb')
25+
MDS_flags_file.write(MDS_flags_dump)
26+
MDS_flags_file.close()
27+
1928
# status = 0(ALIVE), 1(DOWN), 2(OUT)
2029
cluster_topology = {
2130
1:
2231
{
2332
"ip":OSD_IPs[1],
24-
"port":READ_WRITE_PORT,
33+
"port":1207,#READ_WRITE_PORT,
2534
"free_space":1000,
2635
"status":0,
2736
"friends_update":False,
28-
"friends":{}
37+
"friends":set()
2938
},
3039

3140
2:
3241
{
3342
"ip":OSD_IPs[2],
34-
"port":READ_WRITE_PORT,
43+
"port":1208,#READ_WRITE_PORT,
3544
"free_space":1000,
3645
"status":0,
3746
"friends_update":False,
38-
"friends":{}
47+
"friends":set()
3948
},
4049

4150
3:
4251
{
4352
"ip":OSD_IPs[3],
44-
"port":READ_WRITE_PORT,
53+
"port":1209,#READ_WRITE_PORT,
4554
"free_space":1000,
4655
"status":0,
4756
"friends_update":False,
48-
"friends":{}
57+
"friends":set()
4958
},
5059

5160
4:
5261
{
5362
"ip":OSD_IPs[4],
54-
"port":READ_WRITE_PORT,
63+
"port":1210,#READ_WRITE_PORT,
5564
"free_space":1000,
5665
"status":0,
5766
"friends_update":False,
58-
"friends":{}
67+
"friends":set()
5968
}
6069

6170
# "osd_id5":
@@ -84,4 +93,4 @@ def main():
8493
cluster_topology_file.close()
8594

8695
if __name__ == '__main__':
87-
main()
96+
main()

monitor/main.py

+26-8
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
RECV_PRIMARY_UPDATE
2020

2121
hashtable = {}
22+
MDS_flags = {}
2223
cluster_topology = {}
23-
MDS_IP = MDS_IPs["primary"]
24+
MDS_IP = MDS_IPs["primary"]["ip"]
2425

2526
# pg_or_osd_list = pg_ids list, if update_type == "hashtable"
2627
# osd_ids_list, else
@@ -65,7 +66,7 @@ def recv_primary_update():
6566
pass
6667

6768
def recv_write_acks():
68-
global hashtable, cluster_topology, MDS_IP
69+
global hashtable, cluster_topology, MDS_IP, MDS_flags
6970

7071
write_ack_socket = socket.socket()
7172
print ("write ack socket successfully created")
@@ -129,7 +130,7 @@ def recv_write_acks():
129130
# updating the backup
130131
update_backup_monitor("hash_table", [pg_id], [hashtable[pg_id]])
131132
update_backup_monitor("cluster_topology", [osd[0] for osd in hashtable[pg_id]], \
132-
[cluster_topology[osd] for osd in hashtable])
133+
[cluster_topology[osd[0]] for osd in hashtable[pg_id]])
133134

134135
hashtable_file = open('hashtable', 'wb')
135136
cluster_topology_file = open('cluster_topology', 'wb')
@@ -161,15 +162,25 @@ def recv_write_acks():
161162
MDS_update_socket = socket.socket()
162163
print ("MDS write ack socket successfully created")
163164

164-
MDS_update_socket.connect(MDS_IP, MDS_PORT)
165+
MDS_update_socket.connect((MDS_IP, MDS_PORT))
165166

166167
msg = {"type": "WRITE_RESPONSE", "PG_ID": pg_id, \
167168
"status": "SUCCESS", "message": "write successful",\
168169
"client_id": client_id}
169170

170171
_send_msg(MDS_update_socket, msg)
171172

172-
MDS_ack = _wait_recv_msg(MDS_update_socket, msg)
173+
MDS_ack = _wait_recv_msg(MDS_update_socket, 1024)
174+
pg_id = MDS_ack["pg_id"]
175+
if MDS_ack["status"] == "SUCCESS":
176+
print("Write successful")
177+
MDS_flags[pg_id] = 0
178+
else:
179+
print("Write error from MDS")
180+
print(MDS_ack["msg"])
181+
MDS_flags[pg_id] = 1
182+
## resend pgs in MDS_flags with flag = 1
183+
## start that loop here
173184

174185
MDS_update_socket.close()
175186

@@ -256,18 +267,19 @@ def recv_client_reqs():
256267
if(req["type"] == "WRITE"):
257268
pg_id = req["pg_id"]
258269
size = req["size"]
270+
print(size)
259271
hashtable[pg_id] = []
260272

261273
i = 0
262274
for osd_id in cluster_topology:
263275
if cluster_topology[osd_id]["free_space"] > size:
264-
hashtable[pg_id].append((osd_id, 0))
276+
hashtable[pg_id].append([osd_id, 0])
265277
i = i+1
266278

267279
if(i>2):
268280
break
269281

270-
osd_ids = [hashtable[pg_id][i][0] for i in range(3)]
282+
osd_ids = [osd[0] for osd in hashtable[pg_id]]
271283

272284
addrs = {}
273285
for osd_id in osd_ids:
@@ -342,17 +354,23 @@ def main(argc, argv):
342354
# }
343355
# }
344356

345-
global hashtable, cluster_topology
357+
global hashtable, cluster_topology, MDS_flags
346358

347359
hashtable_file = open('hashtable', 'rb')
348360
hashtable_dump = hashtable_file.read()
349361
hashtable = pickle.loads(hashtable_dump)
350362

363+
364+
MDS_flags_file = open('MDS_flags', 'rb')
365+
MDS_flags_dump = MDS_flags_file.read()
366+
MDS_flags = pickle.loads(MDS_flags_dump)
367+
351368
cluster_topology_file = open('cluster_topology', 'rb')
352369
cluster_topology_dump = cluster_topology_file.read()
353370
cluster_topology = pickle.loads(cluster_topology_dump)
354371

355372
hashtable_file.close()
373+
MDS_flags_file.close()
356374
cluster_topology_file.close()
357375

358376
if isPrimary:

0 commit comments

Comments
 (0)