"""
Warping Invariant PCR Regression using SRSF
moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>
"""
import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
import fdasrsf.fPCA as fpca
import fdasrsf.regression as rg
import fdasrsf.geometry as geo
from scipy import dot
from scipy.linalg import inv, norm
from scipy.integrate import trapz, cumtrapz
from scipy.optimize import fmin_l_bfgs_b
import collections
[docs]class elastic_pcr_regression:
"""
This class provides elastic pcr regression for functional data using the
SRVF framework accounting for warping
Usage: obj = elastic_pcr_regression(f,y,time)
:param f: (M,N) % matrix defining N functions of M samples
:param y: response vector of length N
:param warp_data: fdawarp object of alignment
:param pca: class dependent on fPCA method used object of fPCA
:param alpha: intercept
:param b: coefficient vector
:param SSE: sum of squared errors
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 18-Mar-2018
"""
def __init__(self, f, y, time):
"""
Construct an instance of the elastic_pcr_regression class
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param y: response vector
:param time: vector of size M describing the sample points
"""
a = time.shape[0]
if f.shape[0] != a:
raise Exception('Columns of f and time must be equal')
self.f = f
self.y = y
self.time = time
[docs] def calc_model(self, pca_method="combined", no=5,
smooth_data=False, sparam=25, parallel=False,
C=None):
"""
This function identifies a regression model with phase-variability
using elastic pca
:param pca_method: string specifing pca method (options = "combined",
"vert", or "horiz", default = "combined")
:param no: scalar specify number of principal components (default=5)
:param smooth_data: smooth data using box filter (default = F)
:param sparam: number of times to apply box filter (default = 25)
:param parallel: run in parallel (default = F)
:param C: scale balance parameter for combined method (default = None)
"""
if smooth_data:
self.f = fs.smooth_data(self.f,sparam)
N1 = self.f.shape[1]
# Align Data
self.warp_data = fs.fdawarp(self.f,self.time)
self.warp_data.srsf_align(parallel=parallel)
# Calculate PCA
if pca_method=='combined':
self.pca = fpca.fdajpca(self.warp_data)
elif pca_method=='vert':
self.pca = fpca.fdavpca(self.warp_data)
elif pca_method=='horiz':
self.pca = fpca.fdahpca(self.warp_data)
else:
raise Exception('Invalid fPCA Method')
self.pca.calc_fpca(no)
# OLS using PCA basis
lam = 0
R = 0
Phi = np.ones((N1, no+1))
Phi[:,1:(no+1)] = self.pca.coef
xx = dot(Phi.T, Phi)
inv_xx = inv(xx + lam * R)
xy = dot(Phi.T, self.y)
b = dot(inv_xx, xy)
alpha = b[0]
b = b[1:no+1]
# compute the SSE
int_X = np.zeros(N1)
for ii in range(0,N1):
int_X[ii] = np.sum(self.pca.coef[ii,:]*b)
SSE = np.sum((self.y-alpha-int_X)**2)
self.alpha = alpha
self.b = b
self.SSE = SSE
self.pca_method = pca_method
return
[docs] def predict(self, newdata=None):
"""
This function performs prediction on regression model on new data if available or current stored data in object
Usage: obj.predict()
obj.predict(newdata)
:param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data)
:type newdata: dict
:param f: (M,N) matrix of functions
:param time: vector of time points
:param y: truth if available
:param smooth: smooth data if needed
:param sparam: number of times to run filter
"""
omethod = self.warp_data.method
lam = self.warp_data.lam
M = self.time.shape[0]
if newdata != None:
f = newdata['f']
time = newdata['time']
y = newdata['y']
if newdata['smooth']:
sparam = newdata['sparam']
f = fs.smooth_data(f,sparam)
q1 = fs.f_to_srsf(f,time)
n = q1.shape[1]
self.y_pred = np.zeros(n)
mq = self.warp_data.mqn
fn = np.zeros((M,n))
qn = np.zeros((M,n))
gam = np.zeros((M,n))
for ii in range(0,n):
gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod,lam)
fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii])
qn[:,ii] = uf.f_to_srsf(fn[:,ii],time)
U = self.pca.U
no = U.shape[1]
if self.pca.__class__.__name__ == 'fdajpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
C = self.pca.C
TT = self.time.shape[0]
mu_g = self.pca.mu_g
mu_psi = self.pca.mu_psi
vec = np.zeros((M,n))
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
g = np.vstack((qn1, C*vec))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (g[:,i]-mu_g)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdavpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (qn1[:,i]-self.pca.mqn)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdahpca':
a = np.zeros((n,no))
mu_psi = self.pca.psi_mu
vec = np.zeros((M,n))
TT = self.time.shape[0]
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
vm = self.pca.vec.mean(axis=1)
for i in range(0,n):
for j in range(0,no):
a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j]))
else:
raise Exception('Invalid fPCA Method')
for ii in range(0,n):
self.y_pred[ii] = self.alpha + np.dot(a[ii,:],self.b)
if y is None:
self.SSE = np.nan
else:
self.SSE = np.sum((y-self.y_pred)**2)
else:
n = self.pca.coef.shape[0]
self.y_pred = np.zeros(n)
for ii in range(0,n):
self.y_pred[ii] = self.alpha + np.dot(self.pca.coef[ii,:],self.b)
self.SSE = np.sum((self.y-self.y_pred)**2)
return
[docs]class elastic_lpcr_regression:
"""
This class provides elastic logistic pcr regression for functional
data using the SRVF framework accounting for warping
Usage: obj = elastic_lpcr_regression(f,y,time)
:param f: (M,N) % matrix defining N functions of M samples
:param y: response vector of length N (-1/1)
:param warp_data: fdawarp object of alignment
:param pca: class dependent on fPCA method used object of fPCA
:param information
:param alpha: intercept
:param b: coefficient vector
:param Loss: logistic loss
:param PC: probability of classification
:param ylabels: predicted labels
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 18-Mar-2018
"""
def __init__(self, f, y, time):
"""
Construct an instance of the elastic_lpcr_regression class
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param y: response vector
:param time: vector of size M describing the sample points
"""
a = time.shape[0]
if f.shape[0] != a:
raise Exception('Columns of f and time must be equal')
self.f = f
self.y = y
self.time = time
[docs] def calc_model(self, pca_method="combined", no=5,
smooth_data=False, sparam=25, parallel=False):
"""
This function identifies a logistic regression model with phase-variability
using elastic pca
:param pca_method: string specifing pca method (options = "combined",
"vert", or "horiz", default = "combined")
:param no: scalar specify number of principal components (default=5)
:param smooth_data: smooth data using box filter (default = F)
:param sparam: number of times to apply box filter (default = 25)
:param parallel: calculate in parallel (default = F)
:type f: np.ndarray
:type time: np.ndarray
"""
if smooth_data:
self.f = fs.smooth_data(self.f,sparam)
N1 = self.f.shape[1]
# Align Data
self.warp_data = fs.fdawarp(self.f,self.time)
self.warp_data.srsf_align(parallel=parallel)
# Calculate PCA
if pca_method=='combined':
self.pca = fpca.fdajpca(self.warp_data)
elif pca_method=='vert':
self.pca = fpca.fdavpca(self.warp_data)
elif pca_method=='horiz':
self.pca = fpca.fdahpca(self.warp_data)
else:
raise Exception('Invalid fPCA Method')
self.pca.calc_fpca(no)
# OLS using PCA basis
lam = 0
R = 0
Phi = np.ones((N1, no+1))
Phi[:,1:(no+1)] = self.pca.coef
# Find alpha and beta using l_bfgs
b0 = np.zeros(no+1)
out = fmin_l_bfgs_b(rg.logit_loss, b0, fprime=rg.logit_gradient,
args=(Phi, self.y), pgtol=1e-10, maxiter=200,
maxfun=250, factr=1e-30)
b = out[0]
alpha = b[0]
# compute the Loss
LL = rg.logit_loss(b,Phi,self.y)
b = b[1:no+1]
self.alpha = alpha
self.b = b
self.LL = LL
self.pca_method = pca_method
return
[docs] def predict(self, newdata=None):
"""
This function performs prediction on regression model on new data if available or current stored data in object
Usage: obj.predict()
obj.predict(newdata)
:param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data)
:type newdata: dict
:param f: (M,N) matrix of functions
:param time: vector of time points
:param y: truth if available
:param smooth: smooth data if needed
:param sparam: number of times to run filter
"""
omethod = self.warp_data.method
lam = self.warp_data.lam
M = self.time.shape[0]
if newdata != None:
f = newdata['f']
time = newdata['time']
y = newdata['y']
if newdata['smooth']:
sparam = newdata['sparam']
f = fs.smooth_data(f,sparam)
q1 = fs.f_to_srsf(f,time)
n = q1.shape[1]
self.y_pred = np.zeros(n)
mq = self.warp_data.mqn
fn = np.zeros((M,n))
qn = np.zeros((M,n))
gam = np.zeros((M,n))
for ii in range(0,n):
gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod)
fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii])
qn[:,ii] = uf.f_to_srsf(fn[:,ii],time)
U = self.pca.U
no = U.shape[1]
if self.pca.__class__.__name__ == 'fdajpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
C = self.pca.C
TT = self.time.shape[0]
mu_g = self.pca.mu_g
mu_psi = self.pca.mu_psi
vec = np.zeros((M,n))
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
g = np.vstack((qn1, C*vec))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (g[:,i]-mu_g)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdavpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (qn1[:,i]-self.pca.mqn)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdahpca':
a = np.zeros((n,no))
mu_psi = self.pca.psi_mu
vec = np.zeros((M,n))
TT = self.time.shape[0]
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
vm = self.pca.vec.mean(axis=1)
for i in range(0,n):
for j in range(0,no):
a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j]))
else:
raise Exception('Invalid fPCA Method')
for ii in range(0,n):
self.y_pred[ii] = self.alpha + np.sum(a[ii,:]*self.b)
if y is None:
self.y_pred = rg.phi(self.y_pred)
self.y_labels = np.ones(n)
self.y_labels[self.y_pred < 0.5] = -1
self.PC = np.nan
else:
self.y_pred = rg.phi(self.y_pred)
self.y_labels = np.ones(n)
self.y_labels[self.y_pred < 0.5] = -1
TP = np.sum(y[self.y_labels == 1] == 1)
FP = np.sum(y[self.y_labels == -1] == 1)
TN = np.sum(y[self.y_labels == -1] == -1)
FN = np.sum(y[self.y_labels == 1] == -1)
self.PC = (TP+TN)/(TP+FP+FN+TN)
else:
n = self.pca.coef.shape[0]
self.y_pred = np.zeros(n)
for ii in range(0,n):
self.y_pred[ii] = self.alpha + np.dot(self.pca.coef[ii,:],self.b)
self.y_pred = rg.phi(self.y_pred)
self.y_labels = np.ones(n)
self.y_labels[self.y_pred < 0.5] = -1
TP = np.sum(self.y[self.y_labels == 1] == 1)
FP = np.sum(self.y[self.y_labels == -1] == 1)
TN = np.sum(self.y[self.y_labels == -1] == -1)
FN = np.sum(self.y[self.y_labels == 1] == -1)
self.PC = (TP+TN)/(TP+FP+FN+TN)
return
[docs]class elastic_mlpcr_regression:
"""
This class provides elastic multinomial logistic pcr regression for functional
data using the SRVF framework accounting for warping
Usage: obj = elastic_mlpcr_regression(f,y,time)
:param f: (M,N) % matrix defining N functions of M samples
:param y: response vector of length N
:param Y: coded label matrix
:param warp_data: fdawarp object of alignment
:param pca: class dependent on fPCA method used object of fPCA
:param information
:param alpha: intercept
:param b: coefficient vector
:param Loss: logistic loss
:param PC: probability of classification
:param ylabels: predicted labels
:param
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 18-Mar-2018
"""
def __init__(self, f, y, time):
"""
Construct an instance of the elastic_mlpcr_regression class
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param y: response vector
:param time: vector of size M describing the sample points
"""
a = time.shape[0]
if f.shape[0] != a:
raise Exception('Columns of f and time must be equal')
self.f = f
self.y = y
self.time = time
N1 = f.shape[1]
# Code labels
m = y.max()
self.n_classes = m
self.Y = np.zeros((N1, m), dtype=int)
for ii in range(0, N1):
self.Y[ii, y[ii]-1] = 1
[docs] def calc_model(self, pca_method="combined", no=5,
smooth_data=False, sparam=25, parallel=False):
"""
This function identifies a logistic regression model with phase-variability
using elastic pca
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param y: numpy array of N responses
:param time: vector of size M describing the sample points
:param pca_method: string specifing pca method (options = "combined",
"vert", or "horiz", default = "combined")
:param no: scalar specify number of principal components (default=5)
:param smooth_data: smooth data using box filter (default = F)
:param sparam: number of times to apply box filter (default = 25)
:param parallel: run model in parallel (default = F)
:type f: np.ndarray
:type time: np.ndarray
"""
if smooth_data:
self.f = fs.smooth_data(self.f,sparam)
N1 = self.f.shape[1]
# Align Data
self.warp_data = fs.fdawarp(self.f,self.time)
self.warp_data.srsf_align(parallel=parallel)
# Calculate PCA
if pca_method=='combined':
self.pca = fpca.fdajpca(self.warp_data)
elif pca_method=='vert':
self.pca = fpca.fdavpca(self.warp_data)
elif pca_method=='horiz':
self.pca = fpca.fdahpca(self.warp_data)
else:
raise Exception('Invalid fPCA Method')
self.pca.calc_fpca(no)
# OLS using PCA basis
lam = 0
R = 0
Phi = np.ones((N1, no+1))
Phi[:,1:(no+1)] = self.pca.coef
# Find alpha and beta using l_bfgs
b0 = np.zeros(self.n_classes*(no+1))
out = fmin_l_bfgs_b(rg.mlogit_loss, b0, fprime=rg.mlogit_gradient,
args=(Phi, self.Y), pgtol=1e-10, maxiter=200,
maxfun=250, factr=1e-30)
b = out[0]
B0 = b.reshape(no+1, self.n_classes)
alpha = B0[0, :]
# compute the Loss
LL = rg.mlogit_loss(b,Phi,self.y)
b = B0[1:no+1,:]
self.alpha = alpha
self.b = b
self.LL = LL
self.pca_method = pca_method
return
[docs] def predict(self, newdata=None):
"""
This function performs prediction on regression model on new data if available or current stored data in object
Usage: obj.predict()
obj.predict(newdata)
:param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data)
:type newdata: dict
:param f: (M,N) matrix of functions
:param time: vector of time points
:param y: truth if available
:param smooth: smooth data if needed
:param sparam: number of times to run filter
"""
omethod = self.warp_data.method
lam = self.warp_data.lam
m = self.n_classes
M = self.time.shape[0]
if newdata != None:
f = newdata['f']
time = newdata['time']
y = newdata['y']
if newdata['smooth']:
sparam = newdata['sparam']
f = fs.smooth_data(f,sparam)
q1 = fs.f_to_srsf(f,time)
n = q1.shape[1]
self.y_pred = np.zeros((n,m))
mq = self.warp_data.mqn
fn = np.zeros((M,n))
qn = np.zeros((M,n))
gam = np.zeros((M,n))
for ii in range(0,n):
gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod)
fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii])
qn[:,ii] = uf.f_to_srsf(fn[:,ii],time)
U = self.pca.U
no = U.shape[1]
if self.pca.__class__.__name__ == 'fdajpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
C = self.pca.C
TT = self.time.shape[0]
mu_g = self.pca.mu_g
mu_psi = self.pca.mu_psi
vec = np.zeros((M,n))
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
g = np.vstack((qn1, C*vec))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (g[:,i]-mu_g)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdavpca':
m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:]))
qn1 = np.vstack((qn, m_new))
a = np.zeros((n,no))
for i in range(0,n):
for j in range(0,no):
tmp = (qn1[:,i]-self.pca.mqn)
a[i,j] = dot(tmp.T, U[:,j])
elif self.pca.__class__.__name__ == 'fdahpca':
a = np.zeros((n,no))
mu_psi = self.pca.psi_mu
vec = np.zeros((M,n))
TT = self.time.shape[0]
psi = np.zeros((TT,n))
binsize = np.mean(np.diff(self.time))
for i in range(0,n):
psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize))
out, theta = geo.inv_exp_map(mu_psi, psi[:,i])
vec[:,i] = out
vm = self.pca.vec.mean(axis=1)
for i in range(0,n):
for j in range(0,no):
a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j]))
else:
raise Exception('Invalid fPCA Method')
for ii in range(0,n):
for jj in range(0,m):
self.y_pred[ii,jj] = self.alpha[jj] + np.sum(a[ii,:]*self.b[:,jj])
if y is None:
self.y_pred = rg.phi(self.y_pred.reshape((1,n*m)))
self.y_pred = self.y_pred.reshape((n,m))
self.y_labels = np.argmax(self.y_pred,axis=1)
self.PC = np.nan
else:
self.y_pred = rg.phi(self.y_pred.reshape((1,n*m)))
self.y_pred = self.y_pred.reshape((n,m))
self.y_labels = np.argmax(self.y_pred,axis=1)
self.PC = np.zeros(m)
cls_set = np.arange(0,m)
for ii in range(0,m):
cls_sub = np.setdiff1d(cls_set,ii)
TP = np.sum(y[self.y_labels == ii] == ii)
FP = np.sum(y[np.in1d(self.y_labels,cls_sub)] == ii)
TN = np.sum(y[np.in1d(self.y_labels,cls_sub)] == self.y_labels[np.in1d(self.y_labels,cls_sub)])
FN = np.sum(np.in1d(y[self.y_labels==ii],cls_sub))
self.PC[ii] = (TP+TN)/(TP+FP+FN+TN)
self.PCo = np.sum(y == self.y_labels)/self.y_labels.shape[0]
else:
n = self.pca.coef.shape[0]
self.y_pred = np.zeros((n,m))
for ii in range(0,n):
for jj in range(0,m):
self.y_pred[ii,jj] = self.alpha[jj] + np.sum(self.pca.coef[ii,:]*self.b[:,jj])
self.y_pred = rg.phi(self.y_pred.reshape((1,n*m)))
self.y_pred = self.y_pred.reshape((n,m))
self.y_labels = np.argmax(self.y_pred,axis=1)
self.PC = np.zeros(m)
cls_set = np.arange(0,m)
for ii in range(0,m):
cls_sub = np.setdiff1d(cls_set,ii)
TP = np.sum(self.y[self.y_labels == ii] == ii)
FP = np.sum(self.y[np.in1d(self.y_labels,cls_sub)] == ii)
TN = np.sum(self.y[np.in1d(self.y_labels,cls_sub)] == self.y_labels[np.in1d(self.y_labels,cls_sub)])
FN = np.sum(np.in1d(y[self.y_labels==ii],cls_sub))
self.PC[ii] = (TP+TN)/(TP+FP+FN+TN)
self.PCo = np.sum(y == self.y_labels)/self.y_labels.shape[0]
return