forked from qandeelabbassi/python-svm-sgd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsvm.py
167 lines (133 loc) · 5.31 KB
/
svm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split as tts
from sklearn.metrics import accuracy_score, recall_score, precision_score
from sklearn.utils import shuffle
# >> FEATURE SELECTION << #
def remove_correlated_features(X):
corr_threshold = 0.9
corr = X.corr()
drop_columns = np.full(corr.shape[0], False, dtype=bool)
for i in range(corr.shape[0]):
for j in range(i + 1, corr.shape[0]):
if corr.iloc[i, j] >= corr_threshold:
drop_columns[j] = True
columns_dropped = X.columns[drop_columns]
X.drop(columns_dropped, axis=1, inplace=True)
return columns_dropped
def remove_less_significant_features(X, Y):
sl = 0.05
regression_ols = None
columns_dropped = np.array([])
for itr in range(0, len(X.columns)):
regression_ols = sm.OLS(Y, X).fit()
max_col = regression_ols.pvalues.idxmax()
max_val = regression_ols.pvalues.max()
if max_val > sl:
X.drop(max_col, axis='columns', inplace=True)
columns_dropped = np.append(columns_dropped, [max_col])
else:
break
regression_ols.summary()
return columns_dropped
##############################
# >> MODEL TRAINING << #
def compute_cost(W, X, Y):
# calculate hinge loss
N = X.shape[0]
distances = 1 - Y * (np.dot(X, W))
distances[distances < 0] = 0 # equivalent to max(0, distance)
hinge_loss = regularization_strength * (np.sum(distances) / N)
# calculate cost
cost = 1 / 2 * np.dot(W, W) + hinge_loss
return cost
# I haven't tested it but this same function should work for
# vanilla and mini-batch gradient descent as well
def calculate_cost_gradient(W, X_batch, Y_batch):
# if only one example is passed (eg. in case of SGD)
if type(Y_batch) == np.float64:
Y_batch = np.array([Y_batch])
X_batch = np.array([X_batch]) # gives multidimensional array
distance = 1 - (Y_batch * np.dot(X_batch, W))
dw = np.zeros(len(W))
for ind, d in enumerate(distance):
if max(0, d) == 0:
di = W
else:
di = W - (regularization_strength * Y_batch[ind] * X_batch[ind])
dw += di
dw = dw/len(Y_batch) # average
return dw
def sgd(features, outputs):
max_epochs = 5000
weights = np.zeros(features.shape[1])
nth = 0
prev_cost = float("inf")
cost_threshold = 0.01 # in percent
# stochastic gradient descent
for epoch in range(1, max_epochs):
# shuffle to prevent repeating update cycles
X, Y = shuffle(features, outputs)
for ind, x in enumerate(X):
ascent = calculate_cost_gradient(weights, x, Y[ind])
weights = weights - (learning_rate * ascent)
# convergence check on 2^nth epoch
if epoch == 2 ** nth or epoch == max_epochs - 1:
cost = compute_cost(weights, features, outputs)
print("Epoch is: {} and Cost is: {}".format(epoch, cost))
# stoppage criterion
if abs(prev_cost - cost) < cost_threshold * prev_cost:
return weights
prev_cost = cost
nth += 1
return weights
########################
def init():
print("reading dataset...")
# read data in pandas (pd) data frame
data = pd.read_csv('./data/data.csv')
# drop last column (extra column added by pd)
# and unnecessary first column (id)
data.drop(data.columns[[-1, 0]], axis=1, inplace=True)
print("applying feature engineering...")
# convert categorical labels to numbers
diag_map = {'M': 1.0, 'B': -1.0}
data['diagnosis'] = data['diagnosis'].map(diag_map)
# put features & outputs in different data frames
Y = data.loc[:, 'diagnosis']
X = data.iloc[:, 1:]
# filter features
remove_correlated_features(X)
remove_less_significant_features(X, Y)
# normalize data for better convergence and to prevent overflow
X_normalized = MinMaxScaler().fit_transform(X.values)
X = pd.DataFrame(X_normalized)
# insert 1 in every row for intercept b
X.insert(loc=len(X.columns), column='intercept', value=1)
# split data into train and test set
print("splitting dataset into train and test sets...")
X_train, X_test, y_train, y_test = tts(X, Y, test_size=0.2, random_state=42)
# train the model
print("training started...")
W = sgd(X_train.to_numpy(), y_train.to_numpy())
print("training finished.")
print("weights are: {}".format(W))
# testing the model
print("testing the model...")
y_train_predicted = np.array([])
for i in range(X_train.shape[0]):
yp = np.sign(np.dot(X_train.to_numpy()[i], W))
y_train_predicted = np.append(y_train_predicted, yp)
y_test_predicted = np.array([])
for i in range(X_test.shape[0]):
yp = np.sign(np.dot(X_test.to_numpy()[i], W))
y_test_predicted = np.append(y_test_predicted, yp)
print("accuracy on test dataset: {}".format(accuracy_score(y_test, y_test_predicted)))
print("recall on test dataset: {}".format(recall_score(y_test, y_test_predicted)))
print("precision on test dataset: {}".format(recall_score(y_test, y_test_predicted)))
# set hyper-parameters and call init
regularization_strength = 10000
learning_rate = 0.000001
init()