-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathglmtree.py
110 lines (87 loc) · 5.7 KB
/
glmtree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LassoCV, LogisticRegression, LogisticRegressionCV
from sklearn.base import RegressorMixin, ClassifierMixin
from .mobtree import MoBTreeRegressor, MoBTreeClassifier
from warnings import simplefilter
from sklearn.exceptions import ConvergenceWarning
simplefilter("ignore", category=ConvergenceWarning)
__all__ = ["GLMTreeRegressor", "GLMTreeClassifier"]
class GLMTreeRegressor(MoBTreeRegressor, RegressorMixin):
def __init__(self, max_depth=3, min_samples_leaf=50, min_impurity_decrease=0, feature_names=None,
split_features=None, n_screen_grid=1, n_feature_search=10, n_split_grid=20, reg_lambda=0, random_state=0):
super(GLMTreeRegressor, self).__init__(max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_impurity_decrease,
feature_names=feature_names,
split_features=split_features,
n_screen_grid=n_screen_grid,
n_feature_search=n_feature_search,
n_split_grid=n_split_grid,
random_state=random_state)
self.reg_lambda = reg_lambda
self.base_estimator = LinearRegression()
def build_root(self):
self.base_estimator.fit(self.x, self.y)
root_impurity = self.evaluate_estimator(self.base_estimator, self.x, self.y.ravel())
return root_impurity
def build_leaf(self, sample_indice):
mx = self.x[sample_indice].mean(0)
sx = self.x[sample_indice].std(0) + self.EPSILON
nx = (self.x[sample_indice] - mx) / sx
if len(self.reg_lambda) > 1:
best_estimator = LassoCV(alphas=self.reg_lambda, cv=5, precompute=False, random_state=self.random_state)
best_estimator.fit(nx, self.y[sample_indice])
else:
if self.reg_lambda[0] > 0:
best_estimator = Lasso(alpha=self.reg_lambda[0], precompute=False, random_state=self.random_state)
else:
best_estimator = LinearRegression()
best_estimator.fit(nx, self.y[sample_indice])
best_estimator.coef_ = best_estimator.coef_ / sx
best_estimator.intercept_ = best_estimator.intercept_ - np.dot(mx, best_estimator.coef_.T)
xmin = np.min(np.dot(self.x[sample_indice], best_estimator.coef_) + best_estimator.intercept_)
xmax = np.max(np.dot(self.x[sample_indice], best_estimator.coef_) + best_estimator.intercept_)
predict_func = lambda x: np.clip(best_estimator.predict(x), xmin, xmax)
best_impurity = self.get_loss(self.y[sample_indice], best_estimator.predict(self.x[sample_indice]))
return predict_func, best_estimator, best_impurity
class GLMTreeClassifier(MoBTreeClassifier, ClassifierMixin):
def __init__(self, max_depth=3, min_samples_leaf=50, min_impurity_decrease=0, feature_names=None,
split_features=None, n_screen_grid=1, n_feature_search=10, n_split_grid=20, reg_lambda=0, random_state=0):
super(GLMTreeClassifier, self).__init__(max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_impurity_decrease,
feature_names=feature_names,
split_features=split_features,
n_screen_grid=n_screen_grid,
n_feature_search=n_feature_search,
n_split_grid=n_split_grid,
random_state=random_state)
self.reg_lambda = reg_lambda
self.base_estimator = LogisticRegression(penalty='none', random_state=self.random_state)
def build_root(self):
self.base_estimator.fit(self.x, self.y)
root_impurity = self.evaluate_estimator(self.base_estimator, self.x, self.y.ravel())
return root_impurity
def build_leaf(self, sample_indice):
if (self.y[sample_indice].std() == 0) | (self.y[sample_indice].sum() < 5) | ((1 - self.y[sample_indice]).sum() < 5):
best_estimator = None
predict_func = lambda x: np.ones(x.shape[0]) * self.y[sample_indice].mean()
best_impurity = self.get_loss(self.y[sample_indice], predict_func(self.x[sample_indice]))
else:
if len(self.reg_lambda) > 1:
best_estimator = LogisticRegressionCV(Cs=self.reg_lambda, penalty="l1", solver="liblinear", scoring="roc_auc",
cv=5, random_state=self.random_state)
else:
best_estimator = LogisticRegression(C=self.reg_lambda[0], penalty="l1", solver="liblinear", random_state=self.random_state)
mx = self.x[sample_indice].mean(0)
sx = self.x[sample_indice].std(0) + self.EPSILON
nx = (self.x[sample_indice] - mx) / sx
best_estimator.fit(nx, self.y[sample_indice])
best_estimator.coef_ = best_estimator.coef_ / sx
best_estimator.intercept_ = best_estimator.intercept_ - np.dot(mx, best_estimator.coef_.T)
xmin = np.min(np.dot(self.x[sample_indice], best_estimator.coef_.ravel()))
xmax = np.max(np.dot(self.x[sample_indice], best_estimator.coef_.ravel()))
predict_func = lambda x: np.clip(np.dot(x, best_estimator.coef_.ravel()), xmin, xmax) + best_estimator.intercept_
best_impurity = self.get_loss(self.y[sample_indice], best_estimator.predict_proba(self.x[sample_indice])[:, 1])
return predict_func, best_estimator, best_impurity