"""
Warping Invariant GML Regression using SRSF
moduleauthor:: Derek Tucker <jdtuck@sandia.gov>
"""
import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
from patsy import bs
from scipy.optimize import minimize
from numpy.random import rand
from joblib import Parallel, delayed
[docs]
class elastic_glm_regression:
"""
This class provides elastic glm regression for functional data using the
SRVF framework accounting for warping
Usage: obj = elastic_glm_regression(f,y,time)
:param f: (M,N) % matrix defining N functions of M samples
:param y: response vector of length N
:param time: time vector of length M
:param alpha: intercept
:param b: coefficient vector
:param B: basis matrix
:param lambda: regularization parameter
:param SSE: sum of squared errors
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 18-Mar-2018
"""
def __init__(self, f, y, time):
"""
Construct an instance of the elastic_glm_regression class
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param y: response vector
:param time: vector of size M describing the sample points
"""
a = time.shape[0]
if f.shape[0] != a:
raise Exception("Columns of f and time must be equal")
self.f = f
self.y = y
self.time = time
[docs]
def calc_model(
self,
link="linear",
B=None,
lam=0,
df=20,
max_itr=20,
smooth_data=False,
sparam=25,
parallel=False,
):
"""
This function identifies a regression model with phase-variability
using elastic pca
:param link: string of link function ('linear', 'quadratic', 'cubic')
:param B: optional matrix describing Basis elements
:param lam: regularization parameter (default 0)
:param df: number of degrees of freedom B-spline (default 20)
:param max_itr: maximum number of iterations (default 20)
:param smooth_data: smooth data using box filter (default = F)
:param sparam: number of times to apply box filter (default = 25)
:param parallel: run in parallel (default = F)
"""
if smooth_data:
self.f = fs.smooth_data(self.f, sparam)
print("Link: %s" % link)
print("Lambda: %5.1f" % lam)
self.lam = lam
self.link = link
# Create B-Spline Basis if none provided
if B is None:
B = bs(self.time, df=df, degree=4, include_intercept=True)
Nb = B.shape[1]
self.B = B
n = self.f.shape[1]
print("Initializing")
b0 = rand(Nb + 1)
out = minimize(
MyLogLikelihoodFn,
b0,
args=(self.y, self.B, self.time, self.f, parallel),
method="SLSQP",
)
a = out.x
if self.link == "linear":
h1, c_hat, cost = Amplitude_Index(
self.f, self.time, self.B, self.y, max_itr, a, 1, parallel
)
yhat1 = c_hat[0] + MapC_to_y(
n, c_hat[1:], self.B, self.time, self.f, parallel
)
yhat = np.polyval(h1, yhat1)
elif self.link == "quadratic":
h1, c_hat, cost = Amplitude_Index(
self.f, self.time, self.B, self.y, max_itr, a, 2, parallel
)
yhat1 = c_hat[0] + MapC_to_y(
n, c_hat[1:], self.B, self.time, self.f, parallel
)
yhat = np.polyval(h1, yhat1)
elif self.link == "cubic":
h1, c_hat, cost = Amplitude_Index(
self.f, self.time, self.B, self.y, max_itr, a, 3, parallel
)
yhat1 = c_hat[0] + MapC_to_y(
n, c_hat[1:], self.B, self.time, self.f, parallel
)
yhat = np.polyval(h1, yhat1)
else:
raise Exception("Invalid Link")
tmp = (self.y - yhat) ** 2
self.SSE = tmp.sum()
self.h = h1
self.alpha = c_hat[0]
self.b = c_hat[1:]
return
[docs]
def predict(self, newdata=None, parallel=True):
"""
This function performs prediction on regression model on new data if available or current stored data in object
Usage: obj.predict(newdata)
:param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data)
:type newdata: dict
:param f: (M,N) matrix of functions
:param time: vector of time points
:param y: truth if available
:param smooth: smooth data if needed
:param sparam: number of times to run filter
"""
if newdata != None:
f = newdata["f"]
time = newdata["time"]
y = newdata["y"]
sparam = newdata["sparam"]
if newdata["smooth"]:
f = fs.smooth_data(f, sparam)
n = f.shape[1]
yhat1 = self.alpha + MapC_to_y(n, self.b, self.B, time, f, parallel)
yhat = np.polyval(self.h, yhat1)
if y is None:
self.SSE = np.nan
else:
self.SSE = np.sum((y - yhat) ** 2)
self.y_pred = yhat
else:
n = self.f.shape[1]
yhat1 = self.alpha + MapC_to_y(
n, self.b, self.B, self.time, self.f, parallel
)
yhat = np.polyval(self.h, yhat1)
self.SSE = np.sum((self.y - yhat) ** 2)
self.y_pred = yhat
return
def Amplitude_Index(f, t, B, y0, MaxIter, b, link, parallel):
J = B.shape[1]
n = f.shape[1]
c_hat = b
cost = 10000
itr = 0
while itr < MaxIter:
itr += 1
print("updating step: iter=%d" % (itr))
y = c_hat[0] + MapC_to_y(n, c_hat[1:], B, t, f, parallel)
h = np.polyfit(y, y0, link)
b0 = rand(J + 1)
out = minimize(
MyLogLikelihoodFn2, b0, args=(y0, B, t, f, h, parallel), method="SLSQP"
)
if cost > out.fun:
cost = out.fun
c_hat = out.x
else:
c_hat = out.x
break
return (h, c_hat, cost)
def MyLogLikelihoodFn2(c, y0, B, t, f, h, parallel):
N = f.shape[1]
J = c.shape[0]
y = c[0] + MapC_to_y(N, c[1:], B, t, f, parallel)
tmp = np.polyval(h, y)
x = (y0 - tmp) * (y0 - tmp)
return x.sum()
def MyLogLikelihoodFn(c, y0, B, t, f, parallel):
N = f.shape[1]
J = c.shape[0]
y = c[0] + MapC_to_y(N, c[1:], B, t, f, parallel)
x = (y0 - y) * (y0 - y)
return x.sum()
def MapC_to_y(n, c, B, t, f, parallel):
dt = np.diff(t)
dt = dt.mean()
y = np.zeros(n)
if parallel:
bet = np.dot(B, c)
q1 = uf.f_to_srsf(bet, t)
y = Parallel(n_jobs=-1)(
delayed(map_driver)(q1, f[:, k], bet, t, dt) for k in range(n)
)
else:
for k in range(0, n):
bet = np.dot(B, c)
q1 = uf.f_to_srsf(bet, t)
q2 = uf.f_to_srsf(f[:, k], t)
gam = uf.optimum_reparam(q1, t, q2)
fn = uf.warp_f_gamma(t, f[:, k], gam)
tmp = bet * fn
y[k] = tmp.sum() * dt
return y
def map_driver(q1, f, bet, t, dt):
q2 = uf.f_to_srsf(f, t)
gam = uf.optimum_reparam(q1, t, q2)
fn = uf.warp_f_gamma(t, f, gam)
tmp = bet * fn
y = tmp.sum() * dt
return y