Source code for elastic_glm_regression

"""
Warping Invariant GML Regression using SRSF

moduleauthor:: Derek Tucker <jdtuck@sandia.gov>

"""

import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
from patsy import bs
from scipy.optimize import minimize
from numpy.random import rand
from joblib import Parallel, delayed


[docs] class elastic_glm_regression: """ This class provides elastic glm regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_glm_regression(f,y,time) :param f: (M,N) % matrix defining N functions of M samples :param y: response vector of length N :param time: time vector of length M :param alpha: intercept :param b: coefficient vector :param B: basis matrix :param lambda: regularization parameter :param SSE: sum of squared errors Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 18-Mar-2018 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_glm_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception("Columns of f and time must be equal") self.f = f self.y = y self.time = time
[docs] def calc_model( self, link="linear", B=None, lam=0, df=20, max_itr=20, smooth_data=False, sparam=25, parallel=False, ): """ This function identifies a regression model with phase-variability using elastic pca :param link: string of link function ('linear', 'quadratic', 'cubic') :param B: optional matrix describing Basis elements :param lam: regularization parameter (default 0) :param df: number of degrees of freedom B-spline (default 20) :param max_itr: maximum number of iterations (default 20) :param smooth_data: smooth data using box filter (default = F) :param sparam: number of times to apply box filter (default = 25) :param parallel: run in parallel (default = F) """ if smooth_data: self.f = fs.smooth_data(self.f, sparam) print("Link: %s" % link) print("Lambda: %5.1f" % lam) self.lam = lam self.link = link # Create B-Spline Basis if none provided if B is None: B = bs(self.time, df=df, degree=4, include_intercept=True) Nb = B.shape[1] self.B = B n = self.f.shape[1] print("Initializing") b0 = rand(Nb + 1) out = minimize( MyLogLikelihoodFn, b0, args=(self.y, self.B, self.time, self.f, parallel), method="SLSQP", ) a = out.x if self.link == "linear": h1, c_hat, cost = Amplitude_Index( self.f, self.time, self.B, self.y, max_itr, a, 1, parallel ) yhat1 = c_hat[0] + MapC_to_y( n, c_hat[1:], self.B, self.time, self.f, parallel ) yhat = np.polyval(h1, yhat1) elif self.link == "quadratic": h1, c_hat, cost = Amplitude_Index( self.f, self.time, self.B, self.y, max_itr, a, 2, parallel ) yhat1 = c_hat[0] + MapC_to_y( n, c_hat[1:], self.B, self.time, self.f, parallel ) yhat = np.polyval(h1, yhat1) elif self.link == "cubic": h1, c_hat, cost = Amplitude_Index( self.f, self.time, self.B, self.y, max_itr, a, 3, parallel ) yhat1 = c_hat[0] + MapC_to_y( n, c_hat[1:], self.B, self.time, self.f, parallel ) yhat = np.polyval(h1, yhat1) else: raise Exception("Invalid Link") tmp = (self.y - yhat) ** 2 self.SSE = tmp.sum() self.h = h1 self.alpha = c_hat[0] self.b = c_hat[1:] return
[docs] def predict(self, newdata=None, parallel=True): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ if newdata != None: f = newdata["f"] time = newdata["time"] y = newdata["y"] sparam = newdata["sparam"] if newdata["smooth"]: f = fs.smooth_data(f, sparam) n = f.shape[1] yhat1 = self.alpha + MapC_to_y(n, self.b, self.B, time, f, parallel) yhat = np.polyval(self.h, yhat1) if y is None: self.SSE = np.nan else: self.SSE = np.sum((y - yhat) ** 2) self.y_pred = yhat else: n = self.f.shape[1] yhat1 = self.alpha + MapC_to_y( n, self.b, self.B, self.time, self.f, parallel ) yhat = np.polyval(self.h, yhat1) self.SSE = np.sum((self.y - yhat) ** 2) self.y_pred = yhat return
def Amplitude_Index(f, t, B, y0, MaxIter, b, link, parallel): J = B.shape[1] n = f.shape[1] c_hat = b cost = 10000 itr = 0 while itr < MaxIter: itr += 1 print("updating step: iter=%d" % (itr)) y = c_hat[0] + MapC_to_y(n, c_hat[1:], B, t, f, parallel) h = np.polyfit(y, y0, link) b0 = rand(J + 1) out = minimize( MyLogLikelihoodFn2, b0, args=(y0, B, t, f, h, parallel), method="SLSQP" ) if cost > out.fun: cost = out.fun c_hat = out.x else: c_hat = out.x break return (h, c_hat, cost) def MyLogLikelihoodFn2(c, y0, B, t, f, h, parallel): N = f.shape[1] J = c.shape[0] y = c[0] + MapC_to_y(N, c[1:], B, t, f, parallel) tmp = np.polyval(h, y) x = (y0 - tmp) * (y0 - tmp) return x.sum() def MyLogLikelihoodFn(c, y0, B, t, f, parallel): N = f.shape[1] J = c.shape[0] y = c[0] + MapC_to_y(N, c[1:], B, t, f, parallel) x = (y0 - y) * (y0 - y) return x.sum() def MapC_to_y(n, c, B, t, f, parallel): dt = np.diff(t) dt = dt.mean() y = np.zeros(n) if parallel: bet = np.dot(B, c) q1 = uf.f_to_srsf(bet, t) y = Parallel(n_jobs=-1)( delayed(map_driver)(q1, f[:, k], bet, t, dt) for k in range(n) ) else: for k in range(0, n): bet = np.dot(B, c) q1 = uf.f_to_srsf(bet, t) q2 = uf.f_to_srsf(f[:, k], t) gam = uf.optimum_reparam(q1, t, q2) fn = uf.warp_f_gamma(t, f[:, k], gam) tmp = bet * fn y[k] = tmp.sum() * dt return y def map_driver(q1, f, bet, t, dt): q2 = uf.f_to_srsf(f, t) gam = uf.optimum_reparam(q1, t, q2) fn = uf.warp_f_gamma(t, f, gam) tmp = bet * fn y = tmp.sum() * dt return y