|
| 1 | +# Logistic Regression Binary Classifier |
| 2 | +# I use bog-of-word as my baisc features and several other features |
| 3 | +# change the features as you need |
| 4 | +''' |
| 5 | +a. Document statistics: the log of the total number of words in the document |
| 6 | +b. Binary word features: 1 for if the punctuation “!” is in the document, 0 otherwise. |
| 7 | +c. Binary word features: 1 for if the punctuation “?” is in the document, 0 otherwise. |
| 8 | +I collect the top 100 most frequently use word for each class. |
| 9 | +d. Binary word features: 1 for if top 100 most frequently use word for positive class is in the document, 0 otherwise. |
| 10 | +e. Binary word features: 1 for if top 100 most frequently use word for negtive class is in the document, 0 otherwise. |
| 11 | +features b, c, d, e are useless, just for example |
| 12 | +''' |
| 13 | + |
| 14 | +import os |
| 15 | +from typing import Sequence, DefaultDict, Dict |
| 16 | + |
| 17 | +import numpy as np |
| 18 | +from collections import defaultdict |
| 19 | +from math import ceil, log |
| 20 | +from random import Random |
| 21 | +import string |
| 22 | +from scipy.special import expit # logistic (sigmoid) function |
| 23 | +from collections import Counter |
| 24 | +from nltk.corpus import stopwords |
| 25 | + |
| 26 | + |
| 27 | +class LogisticRegression(): |
| 28 | + |
| 29 | + def __init__(self): |
| 30 | + self.class_dict = {} |
| 31 | + self.feature_dict = {} |
| 32 | + self.n_features = None |
| 33 | + self.theta = None # weights (and bias) |
| 34 | + self.senti = {} |
| 35 | + |
| 36 | + |
| 37 | + def make_dicts(self, train_set_path: str) -> None: |
| 38 | + ''' |
| 39 | + Given a training set, fills in self.class_dict and self.feature_dict |
| 40 | + Also sets the number of features self.n_features and initializes the |
| 41 | + parameter vector self.theta. |
| 42 | + ''' |
| 43 | + # iterate over training documents |
| 44 | + n_label = 0 |
| 45 | + vocab = 0 |
| 46 | + sentidic = defaultdict(Counter) |
| 47 | + stop_words = set(stopwords.words('english')) |
| 48 | + for root, dirs, files in os.walk(train_set_path): |
| 49 | + for name in files: |
| 50 | + |
| 51 | + # this if statement is necessary for MacOs |
| 52 | + if name == ".DS_Store": |
| 53 | + continue |
| 54 | + |
| 55 | + with open(os.path.join(root, name), encoding="utf8", errors="ignore") as f: |
| 56 | + label = os.path.basename(root) |
| 57 | + |
| 58 | + #suggest positive for 1 and negtive for 0 |
| 59 | + if label not in self.class_dict: |
| 60 | + self.class_dict[label] = n_label |
| 61 | + n_label += 1 |
| 62 | + |
| 63 | + #create word count features (bag-of-word) |
| 64 | + content = f.read().split() |
| 65 | + for word in content: |
| 66 | + if word not in string.punctuation: |
| 67 | + if word not in self.feature_dict: |
| 68 | + self.feature_dict[word] = vocab |
| 69 | + vocab += 1 |
| 70 | + |
| 71 | + # prepare for top 100 most frequently use word without stop word features |
| 72 | + if word not in stop_words: |
| 73 | + sentidic[label][word] += 1 |
| 74 | + for l in sentidic: |
| 75 | + self.senti[l] = [word for word,cnt in sentidic[l].most_common(100)] |
| 76 | + |
| 77 | + self.n_features = len(self.feature_dict) + 5 |
| 78 | + self.theta = np.zeros(self.n_features + 1) |
| 79 | + |
| 80 | + |
| 81 | + def load_data(self, data_set_path: str): |
| 82 | + ''' |
| 83 | + Loads a dataset. Returns a list of filenames, and dictionaries |
| 84 | + of classes and documents such that: |
| 85 | + classes[filename] = class of the document |
| 86 | + documents[filename] = feature vector for the document (use self.featurize) |
| 87 | + ''' |
| 88 | + filenames = [] |
| 89 | + classes = dict() |
| 90 | + documents = dict() |
| 91 | + # iterate over documents |
| 92 | + for root, dirs, files in os.walk(data_set_path): |
| 93 | + for name in files: |
| 94 | + |
| 95 | + # this if statement is necessary for MacOs |
| 96 | + if name == ".DS_Store": |
| 97 | + continue |
| 98 | + |
| 99 | + with open(os.path.join(root, name), encoding="utf8", errors="ignore") as f: |
| 100 | + filenames.append(name) |
| 101 | + label = os.path.basename(root) |
| 102 | + classes[name] = self.class_dict[label] |
| 103 | + content = f.read().split() |
| 104 | + docs = [] |
| 105 | + for word in content: |
| 106 | + if word not in string.punctuation: |
| 107 | + docs.append(word) |
| 108 | + |
| 109 | + #prepare for extra punctuation features |
| 110 | + elif word == "!" or "?": |
| 111 | + docs.append(word) |
| 112 | + |
| 113 | + documents[name] = self.featurize(docs) |
| 114 | + return filenames, classes, documents |
| 115 | + |
| 116 | + |
| 117 | + def featurize(self, document: Sequence[str]) -> np.array: |
| 118 | + ''' |
| 119 | + Given a document (as a list of words), returns a feature vector. |
| 120 | + ''' |
| 121 | + vector = np.zeros(self.n_features + 1) # + 1 for bias |
| 122 | + #for word counts features |
| 123 | + word_counts = defaultdict(int) |
| 124 | + for word in document: |
| 125 | + word_counts[word] += 1 |
| 126 | + |
| 127 | + #for extra punctuation features |
| 128 | + if word == "!": |
| 129 | + vector[-3] = 1 |
| 130 | + elif word == "?": |
| 131 | + vector[-4] = 1 |
| 132 | + #for top 100 most frequently use word without stop word features |
| 133 | + n = -6 |
| 134 | + for s in self.senti: |
| 135 | + if word in self.senti[s]: |
| 136 | + vector[n] = 1 |
| 137 | + continue |
| 138 | + n += 1 |
| 139 | + |
| 140 | + for word, count in word_counts.items(): |
| 141 | + if word in self.feature_dict: |
| 142 | + vector[self.feature_dict[word]] = count |
| 143 | + # for log(word count of doc) |
| 144 | + vector[-2] = log(len(document)) |
| 145 | + vector[-1] = 1 # bias |
| 146 | + return vector |
| 147 | + |
| 148 | + |
| 149 | + def train(self, train_set_path: str, batch_size=0, n_epochs=0, eta=0.01) -> None: |
| 150 | + ''' |
| 151 | + Trains a logistic regression classifier on a training set. |
| 152 | + ''' |
| 153 | + filenames, classes, documents = self.load_data(train_set_path) |
| 154 | + filenames = sorted(filenames) |
| 155 | + n_minibatches = ceil(len(filenames) / batch_size) |
| 156 | + for epoch in range(n_epochs): |
| 157 | + print("Epoch {:} out of {:}".format(epoch + 1, n_epochs)) |
| 158 | + loss = 0 |
| 159 | + for i in range(n_minibatches): |
| 160 | + # list of filenames in minibatch |
| 161 | + minibatch = filenames[i * batch_size: (i + 1) * batch_size] |
| 162 | + size = len(minibatch) |
| 163 | + # create and fill in matrix x and vector y |
| 164 | + x = np.zeros((size, self.n_features + 1)) |
| 165 | + y = np.zeros(size) |
| 166 | + for k in range(size): |
| 167 | + file = minibatch[k] |
| 168 | + y[k] = classes[file] |
| 169 | + for j in range(len(documents[file])): |
| 170 | + x[k, j] = documents[file][j] |
| 171 | + # compute y_hat |
| 172 | + y_hat = expit(np.dot(x, self.theta)) |
| 173 | + # update loss |
| 174 | + loss += -np.sum(y * np.log(y_hat) + (1 - y) * np.log(1 - y_hat)) |
| 175 | + # compute gradient |
| 176 | + gradient = (1 / size) * np.dot(np.transpose(x), (y_hat - y)) |
| 177 | + # update weights (and bias) |
| 178 | + self.theta -= eta * gradient |
| 179 | + loss /= len(filenames) |
| 180 | + print("Average Train Loss: {}".format(loss)) |
| 181 | + # randomize order |
| 182 | + Random(epoch).shuffle(filenames) |
| 183 | + |
| 184 | + |
| 185 | + def test(self, dev_set_path: str) -> DefaultDict[str, Dict[str, int]]: |
| 186 | + ''' |
| 187 | + Tests the classifier on a development or test set. |
| 188 | + Returns a dictionary of filenames mapped to their correct and predicted classes |
| 189 | + ''' |
| 190 | + results = defaultdict(dict) |
| 191 | + filenames, classes, documents = self.load_data(dev_set_path) |
| 192 | + for name in filenames: |
| 193 | + # get most likely class (recall that P(y=1|x) = y_hat) |
| 194 | + y_hat = expit(np.dot(documents[name], self.theta)) |
| 195 | + results[name]['correct'] = classes[name] |
| 196 | + if y_hat > 0.5: |
| 197 | + results[name]['predicted'] = 1 |
| 198 | + else: |
| 199 | + results[name]['predicted'] = 0 |
| 200 | + return results |
| 201 | + |
| 202 | + |
| 203 | + def evaluate(self, results: DefaultDict[str, Dict[str, int]]) -> None: |
| 204 | + ''' |
| 205 | + Given results, calculates the following: |
| 206 | + Precision, Recall, F1 for each class |
| 207 | + Accuracy overall |
| 208 | + Also, prints evaluation metrics in readable format. |
| 209 | + ''' |
| 210 | + confusion_matrix = np.zeros((len(self.class_dict), len(self.class_dict))) |
| 211 | + for filename, result_dict in results.items(): |
| 212 | + cindex = result_dict['correct'] |
| 213 | + pindex = result_dict['predicted'] |
| 214 | + confusion_matrix[pindex, cindex] += 1 |
| 215 | + row_sum = np.sum(confusion_matrix, axis=0) |
| 216 | + col_sum = np.sum(confusion_matrix, axis=1) |
| 217 | + cc = 0 |
| 218 | + for label in self.class_dict: |
| 219 | + label_index = self.class_dict[label] |
| 220 | + tp = confusion_matrix[label_index, label_index] |
| 221 | + cc += tp |
| 222 | + tfp = row_sum[label_index] |
| 223 | + tfc = col_sum[label_index] |
| 224 | + precision = 0 if tfp == 0 else tp / tfp |
| 225 | + recall = 0 if tfc == 0 else tp / tfc |
| 226 | + f1 = 0 if precision + recall == 0 else 2 * precision * recall / (precision + recall) |
| 227 | + print(f'{label}:') |
| 228 | + print(f' precision: {precision}') |
| 229 | + print(f' recall: {recall}') |
| 230 | + print(f' f1: {f1}') |
| 231 | + accuracy = cc / np.sum(row_sum) |
| 232 | + print(f'Overall Accuracy: {accuracy}') |
| 233 | + pass |
| 234 | + |
| 235 | +if __name__ == '__main__': |
| 236 | + lr = LogisticRegression() |
| 237 | + # make sure these point to the right directories |
| 238 | + lr.make_dicts('path/train') |
| 239 | + #change your hyperparameters are you need |
| 240 | + lr.train('path/train', batch_size=10, n_epochs=10, eta=0.005) |
| 241 | + results = lr.test('path/dev') |
| 242 | + lr.evaluate(results) |
0 commit comments