-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkernel_regression.py
282 lines (237 loc) · 8.77 KB
/
kernel_regression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import numpy as np
import pandas as pd
import statsmodels.api as sm
try:
from scikits.bootstrap import ci
except ModuleNotFoundError:
print('Could not load scikits-bootstrap: bootstrap CIs not available.')
__all__ = ['kreg_perm',
'argrank',
'dist2kernel',
'kernel2dist',
'computePWDist',
'computeKregStat']
"""Tested one kernel, logistic regression.
Multiple kernels and continuous outcome should be tested."""
def kreg_perm(y, Ks, X=None, binary=True, nperms=9999, seed=110820, returnPerms=False):
"""Kernel regression of Y adjusting for covariates in X
Implemented the permutation method from:
Zhao N, Chen J, Carroll IM, Ringel-Kulka T, Epstein MP, Zhou H, et al.
Testing in microbiome-profiling studies with MiRKAT, the microbiome
regression-based kernel association test. Am J Hum Genet. The
American Society of Human Genetics; 2015;96(5):797-807.
Parameters
----------
y : pd.Series or np.ndarray shape (n,1)
Endogenous/outcome variable.
Can be continuous or binary (use binary=True)
Ks : pd.DataFrame or np.ndaray shape (n,n) OR list
Kernel or list of kernels
(square, symetric and positive semi-definite)
X : pd.DataFrame OR np.ndarray shape (n,i)
Matrix of covariates for adjustment.
binary : bool
Use True for logistic regression.
nperms : int
Number of permutations for the test.
returnPerms : bool
If True and Ks is a single kernel,
return the observed and permuted Q-statistics.
Returns
-------
pvalue : float
Permutation P-value or global/omnibus P-value,
if there are multiple kernels
pIndivid : ndarray shape (k)
Optionally returns vector of p-values, one
for each kernel provided."""
def computeQ(K, resid, s2):
return np.linalg.multi_dot((resid/s2, K, resid))
n = len(y)
if binary:
family = sm.families.Binomial()
else:
family = sm.families.Gaussian()
if X is None:
X = np.ones(y.shape, dtype=float)
p = 1
else:
if len(X.shape) == 1:
p = 2
else:
p = X.shape[1] + 1
model = sm.GLM(endog=y.astype(float), exog=sm.add_constant(X.astype(float)), family=family)
result = model.fit()
resid = result.resid_response
"""Squared standard error of the parameters (i.e. Beta_se)"""
s2 = (1. / (n-p)) * np.sum(resid**2.)
np.random.seed(seed)
if isinstance(Ks, list):
"""If there are multiple kernels then use a min-P omnibus test"""
Ks = [K.values if isinstance(K, pd.DataFrame) else K for K in Ks]
obsQ = np.array([computeQ(K, resid, s2) for K in Ks])[None,:]
randQ = np.nan * np.zeros((nperms, len(Ks)))
for permi in range(nperms):
rind = np.random.permutation(n)
randQ[permi,:] = [computeQ(K[:, rind][rind,:], resid, s2) for K in Ks]
Qall = np.concatenate((obsQ, randQ), axis=0)
pall = np.zeros(Qall.shape)
for qi in range(len(Ks)):
pall[:, qi] = 1 - argrank(Qall[:, qi]) / (nperms + 1.)
pIndivid = pall[0,:]
minPall= pall.min(axis=1)
pGlobal = argrank(minPall)[0] / (nperms + 1.)
return pGlobal, pIndivid
else:
if isinstance(Ks, pd.DataFrame):
Ks = Ks.values
obsQ = computeQ(Ks, resid, s2)
randQ = np.nan * np.zeros(nperms)
for permi in range(nperms):
rind = np.random.permutation(n)
randQ[permi] = computeQ(Ks[:, rind][rind,:], resid, s2)
pvalue = (np.sum(randQ > obsQ) + 1.) / (nperms + 1.)
if returnPerms:
return pvalue, obsQ, randQ
else:
return pvalue
def computeKregStat(y, K, X=None, binary=True, alpha=None, nstraps=1000):
"""Compute the statistic that is subjected to permutation testing
in kernel regression of Y adjusting for covariates in X.
Parameters
----------
y : pd.Series or np.ndarray shape (n,1)
Endogenous/outcome variable.
Can be continuous or binary (use binary=True)
K : pd.DataFrame or np.ndaray shape (n,n)
Kernel (square, symetric and positive semi-definite)
X : pd.DataFrame OR np.ndarray shape (n,i)
Matrix of covariates for adjustment.
binary : bool
Use True for logistic regression.
alpha : float or None
If not None then compute the 1 - alpha % confidence interval
using a bootstrap.
nstraps : int
Number of random samples for the 1 - alpha CIs
Returns
-------
Q : float
Regression coefficient for K
lb, ub : float (optional)
Upper and lower 1 - alpha confidence bounds based on
a percentile bootstrap with nstraps iterations."""
n = len(y)
if binary:
family = sm.families.Binomial()
else:
family = sm.families.Gaussian()
if X is None:
X = np.ones(y.shape, dtype=float)
if isinstance(K, pd.DataFrame):
K = K.values
def _computeQ(y, K, X):
model = sm.GLM(endog=y.astype(float), exog=sm.add_constant(X.astype(float)), family=family)
result = model.fit()
resid = result.resid_response
"""Squared standard error of the parameters (i.e. Beta_se)"""
s2 = float(result.bse**2)
Q = np.linalg.multi_dot((resid/s2, K, resid))
return Q
Q = _computeQ(y, K, X)
if alpha is None:
return Q
samps = np.zeros(nstraps)
for i in range(nstraps):
rind = np.random.randint(n, size=n)
samps[i] = _computeQ(y[rind], K[:, rind][rind,:], X[rind])
lb, ub = np.percentile(samps, [100*alpha/2, 100 * (1 - alpha/2)])
return Q, lb, ub
def argrank(vec):
"""Return the ascending rank (0-based) of the elements in vec
Parameters
----------
vec : np.ndarray shape (n,) or (n,1)
Returns
-------
ranks : np.ndarray shape (n,)
Ranks of each element in vec (dtype int)"""
sorti = np.argsort(vec)
ranks = np.empty(len(vec), int)
try:
ranks[sorti] = np.arange(len(vec))
except IndexError:
ranks[sorti.values] = np.arange(len(vec))
return ranks
def dist2kernel(dmat):
"""Convert a distance matrix into a similarity kernel
for KernelPCA or kernel regression methods.
Implementation of D2K in MiRKAT, Zhao et al., 2015
Parameters
----------
dmat : ndarray shape (n,n)
Pairwise-distance matrix.
Returns
-------
kernel : ndarray shape (n,n)"""
n = dmat.shape[0]
I = np.identity(n)
"""m = I - dot(1,1')/n"""
m = I - np.ones((n, n))/float(n)
kern = -0.5 * np.linalg.multi_dot((m, dmat**2, m))
if isinstance(dmat, pd.DataFrame):
return pd.DataFrame(kern, index=dmat.index, columns=dmat.columns)
else:
return kern
def kernel2dist(kern):
"""Recover a distance matrix from the kernel.
Implementation of K2D in MiRKAT, Zhao et al., 2015
Also reccommended by:
E. Halperin, J. Buhler, R. Karp, R. Krauthgamer, B. Westover.
Detecting protein sequence conservation via metric embeddings.
Bioinformatics 19, 122-129 (2003).
d_ij^2 = K_ii + K_jj - 2K_ij
Parameters
----------
kernel : ndarray shape (n,n)
Returns
-------
dmat : ndarray shape (n,n)"""
dmat = np.sqrt(np.diag(kern)[None,:] + np.diag(kern)[:, None] - 2 * kern)
return dmat
def posSDCorrection(kernel):
"""Positive semi-definite correction.
Eigenvalue decomposition is used to ensure that the kernel
matrix is positive semi-definite."""
u, s, v = np.linalg.svd(kernel)
return np.linalg.multi_dot((u, np.diag(np.abs(s)), v))
def computePWDist(series1, series2, dfunc, symetric=True):
"""Compute and assemble a pairwise distance matrix
given two pd.Series and a function to compare them.
Parameters
----------
series1, series2 : pd.Series
Items for comparison. Will compute all pairs of distances
between items in series1 and series2.
dfunc : function
Function takes 2 items and returns a float
symetric : bool
If True, only compute half the distance matrix and duplicate.
Returns
-------
dmat : pd.DataFrame
Distance matrix with series1 along rows and
series 2 along the columns."""
nrows = series1.shape[0]
ncols = series2.shape[0]
dmat = np.zeros((nrows, ncols))
for i in range(nrows):
for j in range(ncols):
if symetric:
if i <= j:
d = dfunc(series1.iloc[i], series2.iloc[j])
dmat[i, j] = d
dmat[j, i] = d
else:
dmat[i, j] = dfunc(series1.iloc[i], series2.iloc[j])
return pd.DataFrame(dmat, columns=series2.index, index=series1.index)