Source code for regression

"""
Warping Invariant Regression using SRSF

moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>

"""

import numpy as np
import fdasrsf.utility_functions as uf
from scipy.optimize import fmin_l_bfgs_b
from scipy.integrate import trapezoid
from scipy.linalg import inv, norm
from patsy import bs
from joblib import Parallel, delayed
import mlogit_warp as mw


[docs] class elastic_regression: """ This class provides elastic regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_regression(f,y,time) :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy array of N responses :param time: vector of size M describing the sample points :param B: optional matrix describing Basis elements :param alpha: alpha parameter of model :param beta: beta(t) of model :param fn: aligned functions - numpy ndarray of shape (M,N) of M functions with N samples :param qn: aligned srvfs - similar structure to fn :param gamma: calculated warping functions :param q: original training SRSFs :param b: basis coefficients :param SSE: sum of squared error Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 29-Oct-2021 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception("Columns of f and time must be equal") self.f = f self.y = y self.time = time
[docs] def calc_model(self, B=None, lam=0, df=20, max_itr=20, cores=-1, smooth=False): """ This function identifies a regression model with phase-variability using elastic pca :param B: optional matrix describing Basis elements :param lam: regularization parameter (default 0) :param df: number of degrees of freedom B-spline (default 20) :param max_itr: maximum number of iterations (default 20) :param cores: number of cores for parallel processing (default all) """ M = self.f.shape[0] N = self.f.shape[1] if M > 500: parallel = True elif N > 100: parallel = True else: parallel = False binsize = np.diff(self.time) binsize = binsize.mean() # Create B-Spline Basis if none provided if B is None: B = bs(self.time, df=df, degree=4, include_intercept=True) Nb = B.shape[1] self.B = B # second derivative for regularization Bdiff = np.zeros((M, Nb)) for ii in range(0, Nb): Bdiff[:, ii] = np.gradient(np.gradient(B[:, ii], binsize), binsize) self.Bdiff = Bdiff self.q = uf.f_to_srsf(self.f, self.time, smooth) gamma = np.tile(np.linspace(0, 1, M), (N, 1)) gamma = gamma.transpose() itr = 1 self.SSE = np.zeros(max_itr) while itr <= max_itr: print("Iteration: %d" % itr) # align data fn = np.zeros((M, N)) qn = np.zeros((M, N)) for ii in range(0, N): fn[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamma[:, ii] + self.time[0], self.time, self.f[:, ii], ) qn[:, ii] = uf.warp_q_gamma(self.time, self.q[:, ii], gamma[:, ii]) # OLS using basis Phi = np.ones((N, Nb + 1)) for ii in range(0, N): for jj in range(1, Nb + 1): Phi[ii, jj] = trapezoid(qn[:, ii] * B[:, jj - 1], self.time) R = np.zeros((Nb + 1, Nb + 1)) for ii in range(1, Nb + 1): for jj in range(1, Nb + 1): R[ii, jj] = trapezoid(Bdiff[:, ii - 1] * Bdiff[:, jj - 1], self.time) xx = np.dot(Phi.T, Phi) inv_xx = inv(xx + lam * R) xy = np.dot(Phi.T, self.y) b = np.dot(inv_xx, xy) alpha = b[0] beta = B.dot(b[1 : Nb + 1]) beta = beta.reshape(M) # compute the SSE int_X = np.zeros(N) for ii in range(0, N): int_X[ii] = trapezoid(qn[:, ii] * beta, self.time) self.SSE[itr - 1] = sum((self.y.reshape(N) - alpha - int_X) ** 2) # find gamma gamma_new = np.zeros((M, N)) if parallel: out = Parallel(n_jobs=cores)( delayed(regression_warp)( beta, self.time, self.q[:, n], self.y[n], alpha ) for n in range(N) ) gamma_new = np.array(out) gamma_new = gamma_new.transpose() else: for ii in range(0, N): gamma_new[:, ii] = regression_warp( beta, self.time, self.q[:, ii], self.y[ii], alpha ) if norm(gamma - gamma_new) < 1e-5: break else: gamma = gamma_new itr += 1 # Last Step with centering of gam gamI = uf.SqrtMeanInverse(gamma_new) gamI_dev = np.gradient(gamI, 1 / float(M - 1)) beta = np.interp( (self.time[-1] - self.time[0]) * gamI + self.time[0], self.time, beta ) * np.sqrt(gamI_dev) for ii in range(0, N): qn[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamI + self.time[0], self.time, qn[:, ii], ) * np.sqrt(gamI_dev) fn[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamI + self.time[0], self.time, fn[:, ii], ) gamma[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamI + self.time[0], self.time, gamma_new[:, ii], ) self.qn = qn self.fn = fn self.gamma = gamma self.alpha = alpha self.beta = beta self.b = b[1:-1] self.SSE = self.SSE[0:itr] return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ if newdata != None: f = newdata["f"] time = newdata["time"] y = newdata["y"] q = uf.f_to_srsf(f, time, newdata["smooth"]) n = f.shape[1] yhat = np.zeros(n) for ii in range(0, n): diff = self.q - q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma(time, q[:, ii], self.gamma[:, dist.argmin()]) yhat[ii] = self.alpha + trapezoid(q_tmp * self.beta, time) if y is None: self.SSE = np.nan else: self.SSE = np.sum((y - yhat) ** 2) self.y_pred = yhat else: n = self.f.shape[1] yhat = np.zeros(n) for ii in range(0, n): diff = self.q - self.q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma( self.time, self.q[:, ii], self.gamma[:, dist.argmin()] ) yhat[ii] = self.alpha + trapezoid(q_tmp * self.beta, self.time) self.SSE = np.sum((self.y - yhat) ** 2) self.y_pred = yhat return
[docs] class elastic_logistic: """ This class provides elastic logistic regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_logistic(f,y,time) :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy array of N responses :param time: vector of size M describing the sample points :param B: optional matrix describing Basis elements :param alpha: alpha parameter of model :param beta: beta(t) of model :param fn: aligned functions - numpy ndarray of shape (M,N) of M functions with N samples :param qn: aligned srvfs - similar structure to fn :param gamma: calculated warping functions :param q: original training SRSFs :param b: basis coefficients :param Loss: logistic loss :type f: np.ndarray :type time: np.ndarray Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 29-Oct-2021 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] if f.shape[0] != a: raise Exception("Columns of f and time must be equal") self.f = f self.y = y self.time = time
[docs] def calc_model(self, B=None, lam=0, df=20, max_itr=20, cores=-1, smooth=False): """ This function identifies a regression model with phase-variability using elastic pca :param B: optional matrix describing Basis elements :param lam: regularization parameter (default 0) :param df: number of degrees of freedom B-spline (default 20) :param max_itr: maximum number of iterations (default 20) :param cores: number of cores for parallel processing (default all) """ M = self.f.shape[0] N = self.f.shape[1] if M > 500: parallel = True elif N > 100: parallel = True else: parallel = False binsize = np.diff(self.time) binsize = binsize.mean() # Create B-Spline Basis if none provided if B is None: B = bs(self.time, df=df, degree=4, include_intercept=True) Nb = B.shape[1] self.B = B self.q = uf.f_to_srsf(self.f, self.time, smooth) gamma = np.tile(np.linspace(0, 1, M), (N, 1)) gamma = gamma.transpose() itr = 1 self.LL = np.zeros(max_itr) while itr <= max_itr: print("Iteration: %d" % itr) # align data fn = np.zeros((M, N)) qn = np.zeros((M, N)) for ii in range(0, N): fn[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamma[:, ii] + self.time[0], self.time, self.f[:, ii], ) qn[:, ii] = uf.warp_q_gamma(self.time, self.q[:, ii], gamma[:, ii]) Phi = np.ones((N, Nb + 1)) for ii in range(0, N): for jj in range(1, Nb + 1): Phi[ii, jj] = trapezoid(qn[:, ii] * B[:, jj - 1], self.time) # Find alpha and beta using l_bfgs b0 = np.zeros(Nb + 1) out = fmin_l_bfgs_b( logit_loss, b0, fprime=logit_gradient, args=(Phi, self.y), pgtol=1e-10, maxiter=200, maxfun=250, factr=1e-30, ) b = out[0] alpha = b[0] beta = B.dot(b[1 : Nb + 1]) beta = beta.reshape(M) # compute the logistic loss self.LL[itr - 1] = logit_loss(b, Phi, self.y) # find gamma gamma_new = np.zeros((M, N)) if parallel: out = Parallel(n_jobs=cores)( delayed(logistic_warp)(beta, self.time, self.q[:, n], self.y[n]) for n in range(N) ) gamma_new = np.array(out) gamma_new = gamma_new.transpose() else: for ii in range(0, N): gamma_new[:, ii] = logistic_warp( beta, self.time, self.q[:, ii], self.y[ii] ) if norm(gamma - gamma_new) < 1e-5: break else: gamma = gamma_new itr += 1 self.qn = qn self.fn = fn self.gamma = gamma self.alpha = alpha self.beta = beta self.b = b[1:-1] self.LL = self.LL[0:itr] return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ if newdata != None: f = newdata["f"] time = newdata["time"] y = newdata["y"] q = uf.f_to_srsf(f, time, newdata["smooth"]) n = f.shape[1] yhat = np.zeros(n) for ii in range(0, n): diff = self.q - q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma(time, q[:, ii], self.gamma[:, dist.argmin()]) yhat[ii] = self.alpha + trapezoid(q_tmp * self.beta, time) if y is None: yhat = phi(yhat) y_labels = np.ones(n) y_labels[yhat < 0.5] = -1 self.PC = None else: yhat = phi(yhat) y_labels = np.ones(n) y_labels[yhat < 0.5] = -1 TP = sum(y[y_labels == 1] == 1) FP = sum(y[y_labels == -1] == 1) TN = sum(y[y_labels == -1] == -1) FN = sum(y[y_labels == 1] == -1) self.PC = (TP + TN) / float(TP + FP + FN + TN) self.y_pred = yhat self.y_labels = y_labels else: n = self.f.shape[1] yhat = np.zeros(n) for ii in range(0, n): diff = self.q - self.q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma( self.time, self.q[:, ii], self.gamma[:, dist.argmin()] ) yhat[ii] = self.alpha + trapezoid(q_tmp * self.beta, self.time) yhat = phi(yhat) y_labels = np.ones(n) y_labels[yhat < 0.5] = -1 TP = sum(self.y[y_labels == 1] == 1) FP = sum(self.y[y_labels == -1] == 1) TN = sum(self.y[y_labels == -1] == -1) FN = sum(self.y[y_labels == 1] == -1) self.PC = (TP + TN) / float(TP + FP + FN + TN) self.y_pred = yhat self.y_labels = y_labels return
[docs] class elastic_mlogistic: """ This class provides elastic multinomial logistic regression for functional data using the SRVF framework accounting for warping Usage: obj = elastic_mlogistic(f,y,time) :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy array of N responses :param time: vector of size M describing the sample points :param B: optional matrix describing Basis elements :param alpha: alpha parameter of model :param beta: beta(t) of model :param fn: aligned functions - numpy ndarray of shape (M,N) of N functions with M samples :param qn: aligned srvfs - similar structure to fn :param gamma: calculated warping functions :param q: original training SRSFs :param b: basis coefficients :param Loss: logistic loss :type f: np.ndarray :type time: np.ndarray Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 29-Oct-2021 """ def __init__(self, f, y, time): """ Construct an instance of the elastic_regression class :param f: numpy ndarray of shape (M,N) of N functions with M samples :param y: response vector :param time: vector of size M describing the sample points """ a = time.shape[0] M = f.shape[0] N = f.shape[1] if f.shape[0] != a: raise Exception("Columns of f and time must be equal") self.f = f self.y = y # Code labels m = y.max() Y = np.zeros((N, m), dtype=int) for ii in range(0, N): Y[ii, y[ii] - 1] = 1 self.Y = Y self.time = time
[docs] def calc_model( self, B=None, lam=0, df=20, max_itr=20, delta=0.01, cores=-1, smooth=False ): """ This function identifies a regression model with phase-variability using elastic pca :param B: optional matrix describing Basis elements :param lam: regularization parameter (default 0) :param df: number of degrees of freedom B-spline (default 20) :param max_itr: maximum number of iterations (default 20) :param cores: number of cores for parallel processing (default all) """ M = self.f.shape[0] N = self.f.shape[1] m = self.y.max() if M > 500: parallel = True elif N > 100: parallel = True else: parallel = False binsize = np.diff(self.time) binsize = binsize.mean() # Create B-Spline Basis if none provided if B is None: B = bs(self.time, df=df, degree=4, include_intercept=True) Nb = B.shape[1] self.B = B self.q = uf.f_to_srsf(self.f, self.time, smooth) gamma = np.tile(np.linspace(0, 1, M), (N, 1)) gamma = gamma.transpose() itr = 1 self.LL = np.zeros(max_itr) while itr <= max_itr: print("Iteration: %d" % itr) # align data fn = np.zeros((M, N)) qn = np.zeros((M, N)) for ii in range(0, N): fn[:, ii] = np.interp( (self.time[-1] - self.time[0]) * gamma[:, ii] + self.time[0], self.time, self.f[:, ii], ) qn[:, ii] = uf.warp_q_gamma(self.time, self.q[:, ii], gamma[:, ii]) Phi = np.ones((N, Nb + 1)) for ii in range(0, N): for jj in range(1, Nb + 1): Phi[ii, jj] = trapezoid(qn[:, ii] * B[:, jj - 1], self.time) # Find alpha and beta using l_bfgs b0 = np.zeros(m * (Nb + 1)) out = fmin_l_bfgs_b( mlogit_loss, b0, fprime=mlogit_gradient, args=(Phi, self.Y), pgtol=1e-10, maxiter=200, maxfun=250, factr=1e-30, ) b = out[0] B0 = b.reshape(Nb + 1, m) alpha = B0[0, :] beta = np.zeros((M, m)) for i in range(0, m): beta[:, i] = B.dot(B0[1 : Nb + 1, i]) # compute the logistic loss self.LL[itr - 1] = mlogit_loss(b, Phi, self.Y) # find gamma gamma_new = np.zeros((M, N)) if parallel: out = Parallel(n_jobs=cores)( delayed(mlogit_warp_grad)( alpha, beta, self.time, self.q[:, n], self.Y[n, :], delta=delta ) for n in range(N) ) gamma_new = np.array(out) gamma_new = gamma_new.transpose() else: for ii in range(0, N): gamma_new[:, ii] = mlogit_warp_grad( alpha, beta, self.time, self.q[:, ii], self.Y[ii, :], delta=delta, ) if norm(gamma - gamma_new) < 1e-5: break else: gamma = gamma_new itr += 1 self.qn = qn self.fn = fn self.gamma = gamma self.alpha = alpha self.beta = beta self.b = b[1:-1] self.n_classes = m self.LL = self.LL[0:itr] return
[docs] def predict(self, newdata=None): """ This function performs prediction on regression model on new data if available or current stored data in object Usage: obj.predict(newdata) :param newdata: dict containing new data for prediction (needs the keys below, if None predicts on training data) :type newdata: dict :param f: (M,N) matrix of functions :param time: vector of time points :param y: truth if available :param smooth: smooth data if needed :param sparam: number of times to run filter """ if newdata != None: f = newdata["f"] time = newdata["time"] y = newdata["y"] q = uf.f_to_srsf(f, time, newdata["smooth"]) n = f.shape[1] m = self.n_classes yhat = np.zeros((n, m)) for ii in range(0, n): diff = self.q - q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma(time, q[:, ii], self.gamma[:, dist.argmin()]) for jj in range(0, m): yhat[ii, jj] = self.alpha[jj] + trapezoid( q_tmp * self.beta[:, jj], time ) if y is None: yhat = phi(yhat.ravel()) yhat = yhat.reshape(n, m) y_labels = yhat.argmax(axis=1) + 1 self.PC = None else: yhat = phi(yhat.ravel()) yhat = yhat.reshape(n, m) y_labels = yhat.argmax(axis=1) + 1 PC = np.zeros(m) cls_set = np.arange(1, m + 1) for ii in range(0, m): cls_sub = np.delete(cls_set, ii) TP = sum(y[y_labels == (ii + 1)] == (ii + 1)) FP = sum(y[np.in1d(y_labels, cls_sub)] == (ii + 1)) TN = sum( y[np.in1d(y_labels, cls_sub)] == y_labels[np.in1d(y_labels, cls_sub)] ) FN = sum(np.in1d(y[y_labels == (ii + 1)], cls_sub)) PC[ii] = (TP + TN) / float(TP + FP + FN + TN) self.PC = sum(y == y_labels) / float(y_labels.size) self.y_pred = yhat self.y_labels = y_labels else: n = self.f.shape[1] m = self.n_classes yhat = np.zeros((n, m)) for ii in range(0, n): diff = self.q - self.q[:, ii][:, np.newaxis] dist = np.sum(np.abs(diff) ** 2, axis=0) ** (1.0 / 2) q_tmp = uf.warp_q_gamma( self.time, self.q[:, ii], self.gamma[:, dist.argmin()] ) for jj in range(0, m): yhat[ii, jj] = self.alpha[jj] + trapezoid( q_tmp * self.beta[:, jj], self.time ) yhat = phi(yhat.ravel()) yhat = yhat.reshape(n, m) y_labels = yhat.argmax(axis=1) + 1 PC = np.zeros(m) cls_set = np.arange(1, m + 1) for ii in range(0, m): cls_sub = np.delete(cls_set, ii) TP = sum(self.y[y_labels == (ii + 1)] == (ii + 1)) FP = sum(self.y[np.in1d(y_labels, cls_sub)] == (ii + 1)) TN = sum( self.y[np.in1d(y_labels, cls_sub)] == y_labels[np.in1d(y_labels, cls_sub)] ) FN = sum(np.in1d(self.y[y_labels == (ii + 1)], cls_sub)) PC[ii] = (TP + TN) / float(TP + FP + FN + TN) self.PC = sum(self.y == y_labels) / float(y_labels.size) self.y_pred = yhat self.y_labels = y_labels return
# helper functions for linear regression
[docs] def regression_warp(beta, time, q, y, alpha): """ calculates optimal warping for function linear regression :param beta: numpy ndarray of shape (M,N) of M functions with N samples :param time: vector of size N describing the sample points :param q: numpy ndarray of shape (M,N) of M functions with N samples :param y: numpy ndarray of shape (1,N) of M functions with N samples responses :param alpha: numpy scalar :rtype: numpy array :return gamma_new: warping function """ gam_M = uf.optimum_reparam(beta, time, q) qM = uf.warp_q_gamma(time, q, gam_M) y_M = trapezoid(qM * beta, time) gam_m = uf.optimum_reparam(-1 * beta, time, q) qm = uf.warp_q_gamma(time, q, gam_m) y_m = trapezoid(qm * beta, time) if y > alpha + y_M: gamma_new = gam_M elif y < alpha + y_m: gamma_new = gam_m else: gamma_new = uf.zero_crossing(y - alpha, q, beta, time, y_M, y_m, gam_M, gam_m) return gamma_new
# helper functions for logistic regression
[docs] def logistic_warp(beta, time, q, y): """ calculates optimal warping for function logistic regression :param beta: numpy ndarray of shape (M,N) of N functions with M samples :param time: vector of size N describing the sample points :param q: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :rtype: numpy array :return gamma: warping function """ if y == 1: gamma = uf.optimum_reparam(beta, time, q) elif y == -1: gamma = uf.optimum_reparam(-1 * beta, time, q) return gamma
[docs] def phi(t): """ calculates logistic function, returns 1 / (1 + exp(-t)) :param t: scalar :rtype: numpy array :return out: return value """ # logistic function, returns 1 / (1 + exp(-t)) idx = t > 0 out = np.empty(t.size, dtype=np.float) out[idx] = 1.0 / (1 + np.exp(-t[idx])) exp_t = np.exp(t[~idx]) out[~idx] = exp_t / (1.0 + exp_t) return out
[docs] def logit_loss(b, X, y): """ logistic loss function, returns Sum{-log(phi(t))} :param b: numpy ndarray of shape (M,N) of N functions with M samples :param X: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) of N responses :rtype: numpy array :return out: loss value """ z = X.dot(b) yz = y * z idx = yz > 0 out = np.zeros_like(yz) out[idx] = np.log(1 + np.exp(-yz[idx])) out[~idx] = -yz[~idx] + np.log(1 + np.exp(yz[~idx])) out = out.sum() return out
[docs] def logit_gradient(b, X, y): """ calculates gradient of the logistic loss :param b: numpy ndarray of shape (M,N) of N functions with M samples :param X: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :rtype: numpy array :return grad: gradient of logistic loss """ z = X.dot(b) z = phi(y * z) z0 = (z - 1) * y grad = X.T.dot(z0) return grad
[docs] def logit_hessian(s, b, X, y): """ calculates hessian of the logistic loss :param s: numpy ndarray of shape (M,N) of N functions with M samples :param b: numpy ndarray of shape (M,N) of N functions with M samples :param X: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :rtype: numpy array :return out: hessian of logistic loss """ z = X.dot(b) z = phi(y * z) d = z * (1 - z) wa = d * X.dot(s) Hs = X.T.dot(wa) out = Hs return out
# helper functions for multinomial logistic regression
[docs] def mlogit_warp_grad( alpha, beta, time, q, y, max_itr=8000, tol=1e-10, delta=0.008, display=0 ): """ calculates optimal warping for functional multinomial logistic regression :param alpha: scalar :param beta: numpy ndarray of shape (M,N) of N functions with M samples :param time: vector of size M describing the sample points :param q: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :param max_itr: maximum number of iterations (Default=8000) :param tol: stopping tolerance (Default=1e-10) :param delta: gradient step size (Default=0.008) :param display: display iterations (Default=0) :rtype: tuple of numpy array :return gam_old: warping function """ gam_old = mw.mlogit_warp( np.ascontiguousarray(alpha), np.ascontiguousarray(beta), time, np.ascontiguousarray(q), np.ascontiguousarray(y, dtype=np.int32), max_itr, tol, delta, display, ) return gam_old
[docs] def mlogit_loss(b, X, Y): """ calculates multinomial logistic loss (negative log-likelihood) :param b: numpy ndarray of shape (M,N) of N functions with M samples :param X: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :rtype: numpy array :return nll: negative log-likelihood """ N, m = Y.shape # n_samples, n_classes M = X.shape[1] # n_features B = b.reshape(M, m) Yhat = np.dot(X, B) Yhat -= Yhat.min(axis=1)[:, np.newaxis] Yhat = np.exp(-Yhat) # l1-normalize Yhat /= Yhat.sum(axis=1)[:, np.newaxis] Yhat = Yhat * Y nll = np.sum(np.log(Yhat.sum(axis=1))) nll /= -float(N) return nll
[docs] def mlogit_gradient(b, X, Y): """ calculates gradient of the multinomial logistic loss :param b: numpy ndarray of shape (M,N) of N functions with M samples :param X: numpy ndarray of shape (M,N) of N functions with M samples :param y: numpy ndarray of shape (1,N) responses :rtype: numpy array :return grad: gradient """ N, m = Y.shape # n_samples, n_classes M = X.shape[1] # n_features B = b.reshape(M, m) Yhat = np.dot(X, B) Yhat -= Yhat.min(axis=1)[:, np.newaxis] Yhat = np.exp(-Yhat) # l1-normalize Yhat /= Yhat.sum(axis=1)[:, np.newaxis] _Yhat = Yhat * Y _Yhat /= _Yhat.sum(axis=1)[:, np.newaxis] Yhat -= _Yhat grad = np.dot(X.T, Yhat) grad /= -float(N) grad = grad.ravel() return grad