Source code for pcr_regression

"""
Warping Invariant PCR Regression using SRSF

moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>

"""

import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
import fdasrsf.fPCA as fpca
import fdasrsf.regression as rg
import fdasrsf.geometry as geo
from scipy import dot
from scipy.linalg import inv, norm
from scipy.integrate import trapz, cumtrapz
from scipy.optimize import fmin_l_bfgs_b
import collections

[docs]class elastic_pcr_regression: """ This class provides elastic pcr regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_pcr_regression(f,y,time) :param f: (M,N) % matrix defining N functions of M samples :param y: response vector of length N :param warp_data: fdawarp object of alignment :param pca: class dependent on fPCA method used object of fPCA :param alpha: intercept :param b: coefficient vector :param SSE: sum of squared errors Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 18-Mar-2018 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_pcr_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception('Columns of f and time must be equal') self.f = f self.y = y self.time = time
[docs] def calc_model(self, pca_method="combined", no=5, smooth_data=False, sparam=25, parallel=False, C=None): """ This function identifies a regression model with phase-variability using elastic pca :param pca_method: string specifing pca method (options = "combined", "vert", or "horiz", default = "combined") :param no: scalar specify number of principal components (default=5) :param smooth_data: smooth data using box filter (default = F) :param sparam: number of times to apply box filter (default = 25) :param parallel: run in parallel (default = F) :param C: scale balance parameter for combined method (default = None) """ if smooth_data: self.f = fs.smooth_data(self.f,sparam) N1 = self.f.shape[1] # Align Data self.warp_data = fs.fdawarp(self.f,self.time) self.warp_data.srsf_align(parallel=parallel) # Calculate PCA if pca_method=='combined': out_pca = fpca.fdajpca(self.warp_data) elif pca_method=='vert': out_pca = fpca.fdavpca(self.warp_data) elif pca_method=='horiz': out_pca = fpca.fdahpca(self.warp_data) else: raise Exception('Invalid fPCA Method') out_pca.calc_fpca(no) # OLS using PCA basis lam = 0 R = 0 Phi = np.ones((N1, no+1)) Phi[:,1:(no+1)] = out_pca.coef xx = dot(Phi.T, Phi) inv_xx = inv(xx + lam * R) xy = dot(Phi.T, self.y) b = dot(inv_xx, xy) alpha = b[0] b = b[1:no+1] # compute the SSE int_X = np.zeros(N1) for ii in range(0,N1): int_X[ii] = np.sum(out_pca.coef*b) SSE = np.sum((self.y-alpha-int_X)**2) self.alpha = alpha self.b = b self.pca = out_pca self.SSE = SSE self.pca_method = pca_method return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict() obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ omethod = self.warp_data.method lam = self.warp_data.lam M = self.time.shape[0] if newdata != None: f = newdata['f'] time = newdata['time'] y = newdata['y'] sparam = newdata['sparam'] if newdata['smooth']: f = fs.smooth_data(f,sparam) q1 = fs.f_to_srsf(f,time) n = q1.shape[1] self.y_pred = np.zeros(n) mq = self.warp_data.mqn fn = np.zeros((M,n)) qn = np.zeros((M,n)) gam = np.zeros((M,n)) for ii in range(0,n): gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod) fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii]) qn[:,ii] = uf.f_to_srsf(fn[:,ii],time) m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:])) qn1 = np.vstack((qn, m_new)) U = self.pca.U no = U.shape[1] if self.pca.__class__.__name__ == 'fdajpca': C = self.pca.C TT = self.time.shape[0] mu_g = self.pca.mu_g mu_psi = self.pca.mu_psi vec = np.zeros((M,n)) psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) g = np.vstack((qn1, C*vec)) a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (g[:,i]-mu_g) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdavpca': a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (qn1[:,i]-self.pca.mqn) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdahpca': a = np.zeros((n,no)) mu_psi = self.pca.psi_mu vec = np.zeros((M,n)) TT = self.time.shape[0] psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) vm = self.pca.vec.mean(axis=1) for i in range(0,n): for j in range(0,no): a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j])) else: raise Exception('Invalid fPCA Method') for ii in range(0,n): self.y_pred[ii] = self.alpha + np.sum(a[ii,:]*self.b) if y == None: self.SSE = np.nan else: self.SSE = np.sum((y-self.y_pred)**2) else: n = self.pca.coef.shape[1] self.y_pred = np.zeros(n) for ii in range(0,n): self.y_pred[ii] = self.alpha + np.dot(self.pca.coef[ii,:],self.b) self.SSE = np.sum((self.y-self.y_pred)**2) return
[docs]class elastic_lpcr_regression: """ This class provides elastic logistic pcr regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_lpcr_regression(f,y,time) :param f: (M,N) % matrix defining N functions of M samples :param y: response vector of length N (-1/1) :param warp_data: fdawarp object of alignment :param pca: class dependent on fPCA method used object of fPCA :param information :param alpha: intercept :param b: coefficient vector :param Loss: logistic loss :param PC: probability of classification :param ylabels: predicted labels Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 18-Mar-2018 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_lpcr_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception('Columns of f and time must be equal') self.f = f self.y = y self.time = time
[docs] def calc_model(self, pca_method="combined", no=5, smooth_data=False, sparam=25, parallel=False): """ This function identifies a logistic regression model with phase-variability using elastic pca :param pca_method: string specifing pca method (options = "combined", "vert", or "horiz", default = "combined") :param no: scalar specify number of principal components (default=5) :param smooth_data: smooth data using box filter (default = F) :param sparam: number of times to apply box filter (default = 25) :param parallel: calculate in parallel (default = F) :type f: np.ndarray :type time: np.ndarray """ if smooth_data: self.f = fs.smooth_data(self.f,sparam) N1 = self.f.shape[1] # Align Data self.warp_data = fs.fdawarp(self.f,self.time) self.warp_data.srsf_align(parallel=parallel) # Calculate PCA if pca_method=='combined': out_pca = fpca.fdajpca(self.warp_data) elif pca_method=='vert': out_pca = fpca.fdavpca(self.warp_data) elif pca_method=='horiz': out_pca = fpca.fdahpca(self.warp_data) else: raise Exception('Invalid fPCA Method') out_pca.calc_fpca(no) # OLS using PCA basis lam = 0 R = 0 Phi = np.ones((N1, no+1)) Phi[:,1:(no+1)] = out_pca.coef # Find alpha and beta using l_bfgs b0 = np.zeros(no+1) out = fmin_l_bfgs_b(rg.logit_loss, b0, fprime=rg.logit_gradient, args=(Phi, self.y), pgtol=1e-10, maxiter=200, maxfun=250, factr=1e-30) b = out[0] alpha = b[0] # compute the Loss LL = rg.logit_loss(b,Phi,self.y) b = b[1:no+1] self.alpha = alpha self.b = b self.pca = out_pca self.LL = LL self.pca_method = pca_method return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict() obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ omethod = self.warp_data.method lam = self.warp_data.lam M = self.time.shape[0] if newdata != None: f = newdata['f'] time = newdata['time'] y = newdata['y'] sparam = newdata['sparam'] if newdata['smooth']: f = fs.smooth_data(f,sparam) q1 = fs.f_to_srsf(f,time) n = q1.shape[1] self.y_pred = np.zeros(n) mq = self.warp_data.mqn fn = np.zeros((M,n)) qn = np.zeros((M,n)) gam = np.zeros((M,n)) for ii in range(0,n): gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod) fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii]) qn[:,ii] = uf.f_to_srsf(fn[:,ii],time) m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:])) qn1 = np.vstack((qn, m_new)) U = self.pca.U no = U.shape[1] if self.pca.__class__.__name__ == 'fdajpca': C = self.pca.C TT = self.time.shape[0] mu_g = self.pca.mu_g mu_psi = self.pca.mu_psi vec = np.zeros((M,n)) psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) g = np.vstack((qn1, C*vec)) a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (g[:,i]-mu_g) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdavpca': a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (qn1[:,i]-self.pca.mqn) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdahpca': a = np.zeros((n,no)) mu_psi = self.pca.psi_mu vec = np.zeros((M,n)) TT = self.time.shape[0] psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) vm = self.pca.vec.mean(axis=1) for i in range(0,n): for j in range(0,no): a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j])) else: raise Exception('Invalid fPCA Method') for ii in range(0,n): self.y_pred[ii] = self.alpha + np.sum(a[ii,:]*self.b) if y == None: self.y_pred = rg.phi(self.y_pred) self.y_labels = np.ones(n) self.y_labels[self.y_pred < 0.5] = -1 self.PC = np.nan else: self.y_pred = rg.phi(self.y_pred) self.y_labels = np.ones(n) self.y_labels[self.y_pred < 0.5] = -1 TP = np.sum(y[self.y_labels == 1] == 1) FP = np.sum(y[self.y_labels == -1] == 1) TN = np.sum(y[self.y_labels == -1] == -1) FN = np.sum(y[self.y_labels == 1] == -1) self.PC = (TP+TN)/(TP+FP+FN+TN) else: n = self.pca.coef.shape[1] self.y_pred = np.zeros(n) for ii in range(0,n): self.y_pred[ii] = self.alpha + np.dot(self.pca.coef[ii,:],self.b) self.y_pred = rg.phi(self.y_pred) self.y_labels = np.ones(n) self.y_labels[self.y_pred < 0.5] = -1 TP = np.sum(self.y[self.y_labels == 1] == 1) FP = np.sum(self.y[self.y_labels == -1] == 1) TN = np.sum(self.y[self.y_labels == -1] == -1) FN = np.sum(self.y[self.y_labels == 1] == -1) self.PC = (TP+TN)/(TP+FP+FN+TN) return
[docs]class elastic_mlpcr_regression: """ This class provides elastic multinomial logistic pcr regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_mlpcr_regression(f,y,time) :param f: (M,N) % matrix defining N functions of M samples :param y: response vector of length N :param Y: coded label matrix :param warp_data: fdawarp object of alignment :param pca: class dependent on fPCA method used object of fPCA :param information :param alpha: intercept :param b: coefficient vector :param Loss: logistic loss :param PC: probability of classification :param ylabels: predicted labels :param Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 18-Mar-2018 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_mlpcr_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception('Columns of f and time must be equal') self.f = f self.y = y self.time = time N1 = f.shape[1] # Code labels m = y.max() self.n_classes = m self.Y = np.zeros((N1, m), dtype=int) for ii in range(0, N1): self.Y[ii, y[ii]-1] = 1
[docs] def calc_model(self, pca_method="combined", no=5, smooth_data=False, sparam=25, parallel=False): """ This function identifies a logistic regression model with phase-variability using elastic pca :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy array of N responses :param time: vector of size M describing the sample points :param pca_method: string specifing pca method (options = "combined", "vert", or "horiz", default = "combined") :param no: scalar specify number of principal components (default=5) :param smooth_data: smooth data using box filter (default = F) :param sparam: number of times to apply box filter (default = 25) :param parallel: run model in parallel (default = F) :type f: np.ndarray :type time: np.ndarray """ if smooth_data: self.f = fs.smooth_data(self.f,sparam) N1 = self.f.shape[1] # Align Data self.warp_data = fs.fdawarp(self.f,self.time) self.warp_data.srsf_align(parallel=parallel) # Calculate PCA if pca_method=='combined': out_pca = fpca.fdajpca(self.warp_data) elif pca_method=='vert': out_pca = fpca.fdavpca(self.warp_data) elif pca_method=='horiz': out_pca = fpca.fdahpca(self.warp_data) else: raise Exception('Invalid fPCA Method') out_pca.calc_fpca(no) # OLS using PCA basis lam = 0 R = 0 Phi = np.ones((N1, no+1)) Phi[:,1:(no+1)] = out_pca.coef # Find alpha and beta using l_bfgs b0 = np.zeros(self.n_classes*(no+1)) out = fmin_l_bfgs_b(rg.mlogit_loss, b0, fprime=rg.mlogit_gradient, args=(Phi, self.Y), pgtol=1e-10, maxiter=200, maxfun=250, factr=1e-30) b = out[0] B0 = b.reshape(no+1, self.n_classes) alpha = B0[0, :] # compute the Loss LL = rg.mlogit_loss(b,Phi,self.y) b = B0[1:no+1,:] self.alpha = alpha self.b = b self.pca = out_pca self.LL = LL self.pca_method = pca_method return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict() obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ omethod = self.warp_data.method lam = self.warp_data.lam m = self.n_classes M = self.time.shape[0] if newdata != None: f = newdata['f'] time = newdata['time'] y = newdata['y'] sparam = newdata['sparam'] if newdata['smooth']: f = fs.smooth_data(f,sparam) q1 = fs.f_to_srsf(f,time) n = q1.shape[1] self.y_pred = np.zeros((n,m)) mq = self.warp_data.mqn fn = np.zeros((M,n)) qn = np.zeros((M,n)) gam = np.zeros((M,n)) for ii in range(0,n): gam[:,ii] = uf.optimum_reparam(mq,time,q1[:,ii],omethod) fn[:,ii] = uf.warp_f_gamma(time,f[:,ii],gam[:,ii]) qn[:,ii] = uf.f_to_srsf(fn[:,ii],time) m_new = np.sign(fn[self.pca.id,:])*np.sqrt(np.abs(fn[self.pca.id,:])) qn1 = np.vstack((qn, m_new)) U = self.pca.U no = U.shape[1] if self.pca.__class__.__name__ == 'fdajpca': C = self.pca.C TT = self.time.shape[0] mu_g = self.pca.mu_g mu_psi = self.pca.mu_psi vec = np.zeros((M,n)) psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) g = np.vstack((qn1, C*vec)) a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (g[:,i]-mu_g) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdavpca': a = np.zeros((n,no)) for i in range(0,n): for j in range(0,no): tmp = (qn1[:,i]-self.pca.mqn) a[i,j] = dot(tmp.T, U[:,j]) elif self.pca.__class__.__name__ == 'fdahpca': a = np.zeros((n,no)) mu_psi = self.pca.psi_mu vec = np.zeros((M,n)) TT = self.time.shape[0] psi = np.zeros((TT,n)) binsize = np.mean(np.diff(self.time)) for i in range(0,n): psi[:,i] = np.sqrt(np.gradient(gam[:,i],binsize)) vec[:,i] = geo.inv_exp_map(mu_psi, psi[:,i]) vm = self.pca.vec.mean(axis=1) for i in range(0,n): for j in range(0,no): a[i,j] = np.sum(dot(vec[:,i]-vm,U[:,j])) else: raise Exception('Invalid fPCA Method') for ii in range(0,n): for jj in range(0,m): self.y_pred[ii,jj] = self.alpha[jj] + np.sum(a[ii,:]*self.b[:,jj]) if y == None: self.y_pred = rg.phi(self.y_pred.reshape((1,n*m))) self.y_pred = self.y_pred.reshape((n,m)) self.y_labels = np.argmax(self.y_pred,axis=1) self.PC = np.nan else: self.y_pred = rg.phi(self.y_pred.reshape((1,n*m))) self.y_pred = self.y_pred.reshape((n,m)) self.y_labels = np.argmax(self.y_pred,axis=1) self.PC = np.zeros(m) cls_set = np.arange(0,m) for ii in range(0,m): cls_sub = np.setdiff1d(cls_set,ii) TP = np.sum(y[self.y_labels == ii] == ii) FP = np.sum(y[np.in1d(self.y_labels,cls_sub)] == ii) TN = np.sum(y[np.in1d(self.y_labels,cls_sub)] == self.y_labels[np.in1d(self.y_labels,cls_sub)]) FN = np.sum(np.in1d(y[self.y_labels==ii],cls_sub)) self.PC[ii] = (TP+TN)/(TP+FP+FN+TN) self.PCo = np.sum(y == self.y_labels)/self.y_labels.shape[0] else: n = self.pca.coef.shape[1] self.y_pred = np.zeros((n,m)) for ii in range(0,n): for jj in range(0,m): self.y_pred[ii,jj] = self.alpha[jj] + np.sum(self.pca.coef[ii,:]*self.b[:,jj]) self.y_pred = rg.phi(self.y_pred.reshape((1,n*m))) self.y_pred = self.y_pred.reshape((n,m)) self.y_labels = np.argmax(self.y_pred,axis=1) self.PC = np.zeros(m) cls_set = np.arange(0,m) for ii in range(0,m): cls_sub = np.setdiff1d(cls_set,ii) TP = np.sum(self.y[self.y_labels == ii] == ii) FP = np.sum(self.y[np.in1d(self.y_labels,cls_sub)] == ii) TN = np.sum(self.y[np.in1d(self.y_labels,cls_sub)] == self.y_labels[np.in1d(self.y_labels,cls_sub)]) FN = np.sum(np.in1d(y[self.y_labels==ii],cls_sub)) self.PC[ii] = (TP+TN)/(TP+FP+FN+TN) self.PCo = np.sum(y == self.y_labels)/self.y_labels.shape[0] return