Source code for elastic_glm_regression

"""
Warping Invariant GML Regression using SRSF

moduleauthor:: Derek Tucker <jdtuck@sandia.gov>

"""

import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
from patsy import bs
from scipy.optimize import minimize
from numpy.random import rand
from joblib import Parallel, delayed

[docs]class elastic_glm_regression: """ This class provides elastic glm regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_glm_regression(f,y,time) :param f: (M,N) % matrix defining N functions of M samples :param y: response vector of length N :param time: time vector of length M :param information :param alpha: intercept :param b: coefficient vector :param B: basis matrix :param lambda: regularization parameter :param SSE: sum of squared errors Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 18-Mar-2018 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_glm_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception('Columns of f and time must be equal') self.f = f self.y = y self.time = time
[docs] def calc_model(self, link='linear', B=None, lam=0, df=20, max_itr=20, smooth_data=False, sparam=25, parallel=False): """ This function identifies a regression model with phase-variability using elastic pca :param link: string of link function ('linear', 'quadratic', 'cubic') :param B: optional matrix describing Basis elements :param lam: regularization parameter (default 0) :param df: number of degrees of freedom B-spline (default 20) :param max_itr: maximum number of iterations (default 20) :param smooth_data: smooth data using box filter (default = F) :param sparam: number of times to apply box filter (default = 25) :param parallel: run in parallel (default = F) """ if smooth_data: self.f = fs.smooth_data(self.f,sparam) print("Link: %s" % link) print("Lambda: %5.1f" % lam) self.lam = lam self.link = link # Create B-Spline Basis if none provided if B is None: B = bs(self.time, df=df, degree=4, include_intercept=True) Nb = B.shape[1] self.B = B n = self.f.shape[1] print("Initializing") b0 = rand(Nb+1) out = minimize(MyLogLikelihoodFn, b0, args=(self.y,self.B,self.time,self.f,parallel), method="SLSQP") a = out.x if self.link == 'linear': h1, c_hat, cost = Amplitude_Index(self.f, self.time, self.B, self.y, max_itr, a, 1, parallel) yhat1 = c_hat[0] + MapC_to_y(n,c_hat[1:],self.B,self.time,self.f,parallel) yhat = np.polyval(h1,yhat1) elif self.link == 'quadratic': h1, c_hat, cost = Amplitude_Index(self.f, self.time, self.B, self.y, max_itr, a, 2, parallel) yhat1 = c_hat[0] + MapC_to_y(n,c_hat[1:],self.B,self.time,self.f,parallel) yhat = np.polyval(h1,yhat1) elif self.link == 'cubic': h1, c_hat, cost = Amplitude_Index(self.f, self.time, self.B, self.y, max_itr, a, 3, parallel) yhat1 = c_hat[0] + MapC_to_y(n,c_hat[1:],self.B,self.time,self.f,parallel) yhat = np.polyval(h1,yhat1) else: raise Exception('Invalid Link') tmp = (self.y-yhat)**2 self.SSE = tmp.sum() self.h = h1 self.alpha = c_hat[0] self.b = c_hat[1:] return
[docs] def predict(self, newdata=None, parallel=True): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict() obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ if newdata != None: f = newdata['f'] time = newdata['time'] y = newdata['y'] sparam = newdata['sparam'] if newdata['smooth']: f = fs.smooth_data(f,sparam) n = f.shape[1] yhat1 = self.alpha + MapC_to_y(n,self.b,self.B,time,f,parallel) yhat = np.polyval(self.h,yhat1) if y is None: self.SSE = np.nan else: self.SSE = np.sum((y-yhat)**2) self.y_pred = yhat else: n = self.f.shape[1] yhat1 = self.alpha + MapC_to_y(n,self.b,self.B,self.time,self.f,parallel) yhat = np.polyval(self.h,yhat1) self.SSE = np.sum((self.y-yhat)**2) self.y_pred = yhat return
def Amplitude_Index(f, t, B, y0, MaxIter, b, link, parallel): J = B.shape[1] n = f.shape[1] c_hat = b cost = 10000 itr = 0 while itr < MaxIter: itr += 1 print("updating step: iter=%d" % (itr)) y = c_hat[0] + MapC_to_y(n,c_hat[1:],B,t,f,parallel) h = np.polyfit(y, y0, link) b0 = rand(J+1) out = minimize(MyLogLikelihoodFn2, b0, args=(y0,B,t,f,h,parallel), method="SLSQP") if cost > out.fun: cost = out.fun c_hat = out.x else: c_hat = out.x break return(h,c_hat,cost) def MyLogLikelihoodFn2(c, y0, B, t, f, h, parallel): N = f.shape[1] J = c.shape[0] y = c[0] + MapC_to_y(N,c[1:],B,t,f,parallel) tmp = np.polyval(h,y) x = (y0-tmp)*(y0-tmp) return(x.sum()) def MyLogLikelihoodFn(c, y0, B, t, f, parallel): N = f.shape[1] J = c.shape[0] y = c[0] + MapC_to_y(N,c[1:],B,t,f,parallel) x = (y0-y)*(y0-y) return(x.sum()) def MapC_to_y(n,c,B,t,f,parallel): dt = np.diff(t) dt = dt.mean() y = np.zeros(n) if parallel: with Parallel(n_jobs=-1) as parallel: for k in range(0,n): bet = np.dot(B,c) q1 = uf.f_to_srsf(bet, t) q2 = uf.f_to_srsf(f[:,k], t) gam = uf.optimum_reparam(q1,t,q2) fn = uf.warp_f_gamma(t, f[:,k], gam) tmp = bet*fn y[k] = tmp.sum()*dt else: for k in range(0,n): bet = np.dot(B,c) q1 = uf.f_to_srsf(bet, t) q2 = uf.f_to_srsf(f[:,k], t) gam = uf.optimum_reparam(q1,t,q2) fn = uf.warp_f_gamma(t, f[:,k], gam) tmp = bet*fn y[k] = tmp.sum()*dt return(y)