|
| 1 | +#Importing Packages for data transformations and multi-threading |
| 2 | +import re |
| 3 | +import pandas as pd |
| 4 | +import threading |
| 5 | +import queue |
| 6 | + |
| 7 | + |
| 8 | +#Defining data cleaning function |
| 9 | +def cleanData(file): |
| 10 | + token = [] |
| 11 | + #Removing the punctuations and other irrelevant string content not meant for mapping |
| 12 | + wform = re.compile("[a-z]+") |
| 13 | + for line in file: |
| 14 | + if len(line) > 0: |
| 15 | + token.append(wform.findall(line.lower())) |
| 16 | + token = [x for x in token if x != []] |
| 17 | + #list = [item for i in token for item in i] |
| 18 | + return token |
| 19 | + |
| 20 | + |
| 21 | +#Defining the data Split function |
| 22 | +def split_function(list): |
| 23 | + part = [] |
| 24 | + for line in list: |
| 25 | + part.append(line) |
| 26 | + list1 = part[0:5000] |
| 27 | + list2 = part[5001:] |
| 28 | + list3 = [list1,list2] |
| 29 | + return list3 |
| 30 | + |
| 31 | +#Defining mapper function, two mapper function gets defined here |
| 32 | +def mapper1(list,map_q1): |
| 33 | + li = [wordl for i in list for wordl in i] |
| 34 | + mapping = [] |
| 35 | + for i in li: |
| 36 | + word = i |
| 37 | + mapping.append("%s\t%d"%(word,1)) |
| 38 | + #queuing up the mapper elements |
| 39 | + map_q1.put(mapping) |
| 40 | +def mapper2(list,map_q2): |
| 41 | + li = [wordl for i in list for wordl in i] |
| 42 | + mapping = [] |
| 43 | + for i in li: |
| 44 | + word = i |
| 45 | + mapping.append("%s\t%d"%(word,1)) |
| 46 | + #queuing up the mapper elements |
| 47 | + map_q2.put(mapping) |
| 48 | + |
| 49 | + |
| 50 | +#Defining sort by function |
| 51 | +def Sort_function(mapping1,mapping2): |
| 52 | + mapping = mapping1+mapping2 |
| 53 | + mapping.sort() |
| 54 | + return mapping |
| 55 | + |
| 56 | +#Defining partition function |
| 57 | +def Partition(tosort): |
| 58 | + a = [] |
| 59 | + b = [] |
| 60 | + for i in tosort: |
| 61 | + x = re.search("^[a-m]", i) |
| 62 | + if(x): |
| 63 | + a.append(i) |
| 64 | + else: |
| 65 | + b.append(i) |
| 66 | + list = [a,b] |
| 67 | + return list |
| 68 | + |
| 69 | +#Defining reducer function, two reducers gets defined |
| 70 | + |
| 71 | +def Reducer1(list,red_q1): |
| 72 | + cur_word = None |
| 73 | + cur_count = 0 |
| 74 | + f_count = {} |
| 75 | + for i in list: |
| 76 | + word,count = i.split('\t',1) |
| 77 | + count = int(count) |
| 78 | + if cur_word == word: |
| 79 | + cur_count += count |
| 80 | + else: |
| 81 | + cur_word = word |
| 82 | + cur_count = count |
| 83 | + f_count[cur_word] = cur_count |
| 84 | + if cur_word == word: |
| 85 | + f_count[cur_word] = cur_count |
| 86 | + #queuing up the elements |
| 87 | + return red_q1.put(f_count) |
| 88 | +def Reducer2(list,red_q2): |
| 89 | + cur_word = None |
| 90 | + cur_count = 0 |
| 91 | + f_count = {} |
| 92 | + for i in list: |
| 93 | + word,count = i.split('\t',1) |
| 94 | + count = int(count) |
| 95 | + if cur_word == word: |
| 96 | + cur_count += count |
| 97 | + else: |
| 98 | + cur_word = word |
| 99 | + cur_count = count |
| 100 | + f_count[cur_word] = cur_count |
| 101 | + if cur_word == word: |
| 102 | + f_count[cur_word] = cur_count |
| 103 | + #queuing up the elements |
| 104 | + return red_q2.put(f_count) |
| 105 | + |
| 106 | +#Performing the big data map reduce using multithreading |
| 107 | + |
| 108 | +def MapReduce(Path): |
| 109 | + map_q1 = queue.Queue() |
| 110 | + map_q2 = queue.Queue() |
| 111 | + red_q1 = queue.Queue() |
| 112 | + red_q2 = queue.Queue() |
| 113 | + file = open (Path,encoding="utf8") |
| 114 | + list1 = cleanData(file) |
| 115 | + list2 = split_function(list1) |
| 116 | + mapping1 = threading.Thread(target=mapper1,args = (list2[0],map_q1)) |
| 117 | + mapping2 = threading.Thread(target=mapper2,args =(list2[1],map_q2)) |
| 118 | + mapping1.start() |
| 119 | + mapping2.start() |
| 120 | + mapping1.join() |
| 121 | + mapping2.join() |
| 122 | + #mapping1 = mapper1(list2[0]) |
| 123 | + #mapping2 = mapper2(list2[1]) |
| 124 | + mapping1= [] |
| 125 | + for i in map_q1.get(): |
| 126 | + mapping1.append(i) |
| 127 | + mapping2=[] |
| 128 | + for j in map_q2.get(): |
| 129 | + mapping2.append(j) |
| 130 | + tosort = Sort_function(mapping1,mapping2) |
| 131 | + Part = Partition(tosort) |
| 132 | + Reduce1 = threading.Thread(target = Reducer1,args =(Part[0],red_q1)) |
| 133 | + Reduce2 = threading.Thread(target = Reducer2,args =(Part[1],red_q2)) |
| 134 | + Reduce1.start() |
| 135 | + Reduce2.start() |
| 136 | + Reduce1.join() |
| 137 | + Reduce2.join() |
| 138 | + #Reduce1 = Reducer1(Part[0]) |
| 139 | + #Reduce2 = Reducer2(Part[1]) |
| 140 | + Reduce1 = red_q1.get() |
| 141 | + Reduce2 = red_q2.get() |
| 142 | + Reduce1.update(Reduce2) |
| 143 | + Output_df = Reduce1 |
| 144 | + Output_df = pd.DataFrame(Output_df.items(), columns=['Word', 'Count']) |
| 145 | + Output_df.to_csv(r'C:\\Users\\skota\\Desktop\\backup\\big_data\\bigdata_output.csv') |
| 146 | + return Output_df |
| 147 | + |
| 148 | + |
| 149 | + |
| 150 | +#Mention the path of the file here, |
| 151 | + |
| 152 | +MapReduce(Path=' ') |
0 commit comments