-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCLI.py
245 lines (181 loc) · 6.44 KB
/
CLI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#!/bin/python3
import sys
import Query
import Connection
import socket
from time import sleep
import threading
thread_stop = False
class CLI(object):
def __init__(self, configFile):
self.configFile = configFile # File name of config file for CLI
self.mapper1Port = None # Port number to other processes in same node
self.mapper2Port = None
self.reducerPort = None
self.paxosPort = None
self.sockToMapper1 = None # Sockets to other processes in same node
self.sockToMapper2 = None
self.sockToReducer = None
self.sockToPaxos = None
self.incomingStream = [] # Stream to receive incoming messages
def takeCommand(self):
global thread_stop
sleep(2) # For initial start up messages from other processes
print("")
print("List of data processing commands:")
print("map\t\tfilename")
print("reduce\t\tfilename1 filename2 ...")
print("replicate\tfilename")
print("stop")
print("resume")
print("status")
print("")
print("List of data query commands:")
print("total\t\tpos1 pos2 ...")
print("print")
print("merge\t\tpos1 pos2 ...")
print("")
while True:
consoleInput = input("Command (enter 'exit' to quit):")
consoleInput = consoleInput.split()
command = consoleInput[0]
args = consoleInput[1:]
### Ends the program. Send exit to all processes. ###
if command == "exit":
self.sockToMapper1.sendall(("close%").encode())
self.sockToMapper2.sendall(("close%").encode())
self.sockToReducer.sendall(("close%").encode())
self.sockToPaxos.sendall(("close%").encode())
thread_stop = True
break
### Initiate map command to both Mapper processes ###
elif command == "map":
try:
print("Mapping File")
filenameToMapper = args[0]
offset1, size1, offset2, size2 = getMappingProperties(filenameToMapper)
msgToMapper1 = filenameToMapper + " " + str(offset1) + " " + str(size1) + "%"
msgToMapper2 = filenameToMapper + " " + str(offset2) + " " + str(size2) + "%"
self.sockToMapper1.sendall((msgToMapper1).encode())
self.sockToMapper2.sendall((msgToMapper2).encode())
print("Mapping message sent")
except FileNotFoundError:
print(" --- File", filenameToMapper, "Not Found --- ")
### Initiate reduce command to Reducer process ###
elif command == "reduce":
msgToReducer = ""
for fileName in args:
msgToReducer = msgToReducer + fileName + " "
msgToReducer = msgToReducer[:-1]
msgToReducer += "%"
print(msgToReducer) # DO WE WANT THIS PRINT STATEMENT?
self.sockToReducer.sendall((msgToReducer).encode())
### Initiate Paxos to replicate a log ###
elif command == "replicate": #part1
self.sockToPaxos.sendall(("x replicate " + str(args[0]) + "%" ).encode())
### Stops the Paxos, imitates an offline node. ###
elif command == "stop": #part1
print("Sending Stop")
self.sockToPaxos.sendall(("x stop%").encode())
### Resumes the Paxos, imitates a node coming online. ###
elif command == "resume": #part1
print("Sending Resume")
self.sockToPaxos.sendall(("x resume%").encode())
### Look for received messages ###
elif command == "status":
print("Checking messages/status")
continue
### Prints total amount of words ###
elif command == "total": #part1
indexes = ""
for i in args:
indexes += (" " + str(i))
self.sockToPaxos.sendall(("x total" + indexes + "%").encode())
### Prints all file names ###
elif command == "print": #part1
self.sockToPaxos.sendall(("x print%").encode())
### Merges two log entries and prints ###
elif command == "merge": #part1
self.sockToPaxos.sendall(("x merge " + str(args[0]) + " " + str(args[1]) + "%").encode())
else:
print("Not a recognizable command", flush=True)
print("Program exitted.", flush=True)
def config(self):
f = open(self.configFile, 'r')
lines = f.readlines()
self.mapper1Port = lines[0]
self.mapper2Port = lines[1]
self.reducerPort = lines[2]
self.paxosPort = lines[3]
def makeConnections(self):
incomingSock = Connection.createAcceptSocket("127.0.0.1", 5001)
sleep(5)
self.sockToMapper1 = Connection.createConnectSocket("127.0.0.1", self.mapper1Port)
self.sockToMapper2 = Connection.createConnectSocket("127.0.0.1", self.mapper2Port)
self.sockToReducer = Connection.createConnectSocket("127.0.0.1", self.reducerPort)
self.sockToPaxos = Connection.createConnectSocket("127.0.0.1", self.paxosPort)
sleep(5)
for i in range(4):
self.incomingStream.append(Connection.openConnection(incomingSock))
def closeConnections(self):
Connection.closeSocket(self.sockToPaxos)
Connection.closeSocket(self.sockToMapper1)
Connection.closeSocket(self.sockToMapper2)
Connection.closeSocket(self.sockToReducer)
def receiveMessages(cliUnit):
global thread_stop
while not thread_stop:
for i in range(len(cliUnit.incomingStream)):
stream = cliUnit.incomingStream[i]
stream.settimeout(1)
try:
data = stream.recv(1024).decode()
if len(data) > 0:
if data[-1] == "%":
data = data[:-1]
data = data.split("%")
print("\nStream", i, "Message Recevied:", data, flush=True)
print("\nCommand (enter 'exit' to quit):", flush=True)
except socket.timeout:
continue
############################ END CLI CLASS ##############################
def getMappingProperties(filenameToMapper):
offset1 = 0
offset2 = 0
size1 = 0
size2 = 0
with open(filenameToMapper, "r") as f:
lines = f.readlines()
totalFile = ""
numChars = 0
for line in lines:
numChars += len(line)
numChars += 1
totalFile = totalFile + line + " "
size1 = int(numChars / 2)
while True:
if totalFile[size1] != " ":
size1 += 1
else:
break
size2 = numChars - size1
offset2 = size1
print("numChars is:", numChars)
return offset1, size1, offset2, size2
def main():
if(len(sys.argv) != 2):
print("--- ERROR: Please pass in config file ---")
exit(1)
client = CLI(sys.argv[1])
client.config()
client.makeConnections()
# Listening Thread
listen_thread = threading.Thread(target=receiveMessages, args=(client,))
listen_thread.daemon = True
listen_thread.start()
client.takeCommand()
# Join Thread
listen_thread.join()
client.closeConnections()
if __name__ == "__main__":
main()