Source code for fPCA

Vertical and Horizontal Functional Principal Component Analysis using SRSF

moduleauthor:: J. Derek Tucker <>

import numpy as np
import fdasrsf.utility_functions as uf
import fdasrsf.geometry as geo
from scipy import dot
from scipy.linalg import norm, svd
from scipy.integrate import trapz, cumtrapz
from scipy.optimize import fminbound
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
import fdasrsf.plot_style as plot
import collections

[docs]class fdavpca: """ This class provides vertical fPCA using the SRVF framework Usage: obj = fdavpca(warp_data) :param warp_data: fdawarp class with alignment data :param q_pca: srvf principal directions :param f_pca: f principal directions :param latent: latent values :param coef: principal coefficients :param id: point used for f(0) :param mqn: mean srvf :param U: eigenvectors :param stds: geodesic directions Author : J. D. Tucker (JDT) <jdtuck AT> Date : 15-Mar-2018 """ def __init__(self, fdawarp): """ Construct an instance of the fdavpca class :param fdawarp: fdawarp class """ if fdawarp.fn.size==0: raise Exception('Please align fdawarp class using srsf_align!') self.warp_data = fdawarp
[docs] def calc_fpca(self, no=3, id=None, stds = np.arange(-1, 2)): """ This function calculates vertical functional principal component analysis on aligned data :param no: number of components to extract (default = 3) :param id: point to use for f(0) (default = midpoint) :param stds: number of standard deviations along gedoesic to compute (default = -1,0,1) :type no: int :type id: int :rtype: fdavpca object containing :return q_pca: srsf principal directions :return f_pca: functional principal directions :return latent: latent values :return coef: coefficients :return U: eigenvectors """ fn = self.warp_data.fn time = self.warp_data.time qn = self.warp_data.qn M = time.shape[0] if id is None: mididx = int(np.round(M / 2)) else: mididx = id Nstd = stds.shape[0] # FPCA mq_new = qn.mean(axis=1) N = mq_new.shape[0] m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :])) mqn = np.append(mq_new, m_new.mean()) qn2 = np.vstack((qn, m_new)) K = np.cov(qn2) U, s, V = svd(K) stdS = np.sqrt(s) # compute the PCA in the q domain q_pca = np.ndarray(shape=(N + 1, Nstd, no), dtype=float) for k in range(0, no): for l in range(0, Nstd): q_pca[:, l, k] = mqn + stds[l] * stdS[k] * U[:, k] # compute the correspondence in the f domain f_pca = np.ndarray(shape=(N, Nstd, no), dtype=float) for k in range(0, no): for l in range(0, Nstd): f_pca[:, l, k] = uf.cumtrapzmid(time, q_pca[0:N, l, k] * np.abs(q_pca[0:N, l, k]), np.sign(q_pca[N, l, k]) * (q_pca[N, l, k] ** 2), mididx) fbar = fn.mean(axis=1) fsbar = f_pca[:, :, k].mean(axis=1) err = np.transpose(np.tile(fbar-fsbar, (Nstd,1))) f_pca[:, :, k] += err N2 = qn.shape[1] c = np.zeros((N2, no)) for k in range(0, no): for l in range(0, N2): c[l, k] = sum((np.append(qn[:, l], m_new[l]) - mqn) * U[:, k]) self.q_pca = q_pca self.f_pca = f_pca self.latent = s[0:no] self.coef = c self.U = U[:,0:no] = mididx self.mqn = mqn self.time = time self.stds = stds = no return
[docs] def plot(self): """ plot plot elastic vertical fPCA result Usage: obj.plot() """ no = Nstd = self.stds.shape[0] N = self.time.shape[0] num_plot = int(np.ceil(no/3)) CBcdict = { 'Bl': (0, 0, 0), 'Or': (.9, .6, 0), 'SB': (.35, .7, .9), 'bG': (0, .6, .5), 'Ye': (.95, .9, .25), 'Bu': (0, .45, .7), 'Ve': (.8, .4, 0), 'rP': (.8, .6, .7), } cl = sorted(CBcdict.keys()) k = 0 for ii in range(0, num_plot): if k > (no-1): break fig, ax = plt.subplots(2, 3) for k1 in range(0,3): k = k1+(ii)*3 axt = ax[0, k1] if k > (no-1): break for l in range(0, Nstd): axt.plot(self.time, self.q_pca[0:N, l, k], color=CBcdict[cl[l]]) axt.set_title('q domain: PD %d' % (k + 1)) axt = ax[1, k1] for l in range(0, Nstd): axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]]) axt.set_title('f domain: PD %d' % (k + 1)) fig.set_tight_layout(True) cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent) N = self.latent.shape[0] idx = np.arange(0, N) + 1 plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage") plt.ylabel("Percentage") plt.xlabel("Index") return
[docs]class fdahpca: """ This class provides horizontal fPCA using the SRVF framework Usage: obj = fdahpca(warp_data) :param warp_data: fdawarp class with alignment data :param gam_pca: warping functions principal directions :param psi_pca: srvf principal directions :param latent: latent values :param U: eigenvectors :param coef: coefficients :param vec: shooting vectors :param mu: Karcher Mean :param tau: principal directions Author : J. D. Tucker (JDT) <jdtuck AT> Date : 15-Mar-2018 """ def __init__(self, fdawarp): """ Construct an instance of the fdavpca class :param fdawarp: fdawarp class """ if fdawarp.fn.size==0: raise Exception('Please align fdawarp class using srsf_align!') self.warp_data = fdawarp
[docs] def calc_fpca(self, no=3, stds = np.arange(-1, 2)): """ This function calculates horizontal functional principal component analysis on aligned data :param no: number of components to extract (default = 3) :param stds: number of standard deviations along gedoesic to compute (default = -1,0,1) :type no: int :rtype: fdahpca object of numpy ndarray :return q_pca: srsf principal directions :return f_pca: functional principal directions :return latent: latent values :return coef: coefficients :return U: eigenvectors """ # Calculate Shooting Vectors gam = self.warp_data.gam mu, gam_mu, psi, vec = uf.SqrtMean(gam) TT = self.warp_data.time.shape[0] # TFPCA K = np.cov(vec) U, s, V = svd(K) vm = vec.mean(axis=1) gam_pca = np.ndarray(shape=(stds.shape[0], mu.shape[0], no), dtype=float) psi_pca = np.ndarray(shape=(stds.shape[0], mu.shape[0], no), dtype=float) for j in range(0, no): cnt = 0 for k in stds: v = k * np.sqrt(s[j]) * U[:, j] vn = norm(v) / np.sqrt(TT) if vn < 0.0001: psi_pca[cnt, :, j] = mu else: psi_pca[cnt, :, j] = np.cos(vn) * mu + np.sin(vn) * v / vn tmp = cumtrapz(psi_pca[cnt, :, j] * psi_pca[cnt, :, j], np.linspace(0,1,TT), initial=0) gam_pca[cnt, :, j] = (tmp - tmp[0]) / (tmp[-1] - tmp[0]) cnt += 1 N2 = gam.shape[1] c = np.zeros((N2,no)) for k in range(0,no): for i in range(0,N2): c[i,k] = np.sum(dot(vec[:,i]-vm,U[:,k])) self.gam_pca = gam_pca self.psi_pca = psi_pca self.U = U[:,0:no] self.coef = c self.latent = s[0:no] self.gam_mu = gam_mu self.psi_mu = mu self.vec = vec = no self.stds = stds return
[docs] def plot(self): """ plot plot elastic horizontal fPCA results Usage: obj.plot() """ no = TT = self.warp_data.time.shape[0] num_plot = int(np.ceil(no/3)) CBcdict = { 'Bl': (0, 0, 0), 'Or': (.9, .6, 0), 'SB': (.35, .7, .9), 'bG': (0, .6, .5), 'Ye': (.95, .9, .25), 'Bu': (0, .45, .7), 'Ve': (.8, .4, 0), 'rP': (.8, .6, .7), } k = 0 for ii in range(0, num_plot): if k > (no-1): break fig, ax = plt.subplots(1, 3) for k1 in range(0,3): k = k1+(ii)*3 axt = ax[k1] if k > (no-1): break tmp = self.gam_pca[:, :, k] axt.plot(np.linspace(0, 1, TT), tmp.transpose()) axt.set_title('PD %d' % (k + 1)) axt.set_aspect('equal') fig.set_tight_layout(True) cumm_coef = 100 * np.cumsum(self.latent[0:no]) / sum(self.latent[0:no]) idx = np.arange(0, no) + 1 plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage") plt.ylabel("Percentage") plt.xlabel("Index") return
[docs]class fdajpca: """ This class provides joint fPCA using the SRVF framework Usage: obj = fdajpca(warp_data) :param warp_data: fdawarp class with alignment data :param q_pca: srvf principal directions :param f_pca: f principal directions :param latent: latent values :param coef: principal coefficients :param id: point used for f(0) :param mqn: mean srvf :param U: eigenvectors :param mu_psi: mean psi :param mu_g: mean g :param C: scaling value :param stds: geodesic directions Author : J. D. Tucker (JDT) <jdtuck AT> Date : 18-Mar-2018 """ def __init__(self, fdawarp): """ Construct an instance of the fdavpca class :param fdawarp: fdawarp class """ if fdawarp.fn.size==0: raise Exception('Please align fdawarp class using srsf_align!') self.warp_data = fdawarp
[docs] def calc_fpca(self, no=3, stds = np.arange(-1., 2.), id=None, parallel=False, cores=-1): """ This function calculates joint functional principal component analysis on aligned data :param no: number of components to extract (default = 3) :param id: point to use for f(0) (default = midpoint) :param stds: number of standard deviations along gedoesic to compute (default = -1,0,1) :param parallel: run in parallel (default = F) :param cores: number of cores for parallel (default = -1 (all)) :type no: int :type id: int :type parallel: bool :type cores: int :rtype: fdajpca object of numpy ndarray :return q_pca: srsf principal directions :return f_pca: functional principal directions :return latent: latent values :return coef: coefficients :return U: eigenvectors """ fn = self.warp_data.fn time = self.warp_data.time qn = self.warp_data.qn q0 = self.warp_data.q0 gam = self.warp_data.gam M = time.shape[0] if id is None: mididx = int(np.round(M / 2)) else: mididx = id Nstd = stds.shape[0] # set up for fPCA in q-space mq_new = qn.mean(axis=1) m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :])) mqn = np.append(mq_new, m_new.mean()) qn2 = np.vstack((qn, m_new)) # calculate vector space of warping functions mu_psi, gam_mu, psi, vec = uf.SqrtMean(gam, parallel, cores) # joint fPCA C = fminbound(find_C, 0, 1e4, (qn2, vec, q0,no, mu_psi, parallel, cores)) qhat, gamhat, a, U, s, mu_g, g, cov = jointfPCAd(qn2, vec, C, no, mu_psi, parallel, cores) # geodesic paths q_pca = np.ndarray(shape=(M, Nstd, no), dtype=float) f_pca = np.ndarray(shape=(M, Nstd, no), dtype=float) for k in range(0, no): for l in range(0, Nstd): qhat = mqn + dot(U[0:(M+1),k],stds[l]*np.sqrt(s[k])) vechat = dot(U[(M+1):,k],(stds[l]*np.sqrt(s[k]))/C) psihat = geo.exp_map(mu_psi,vechat) gamhat = cumtrapz(psihat*psihat,np.linspace(0,1,M),initial=0) gamhat = (gamhat - gamhat.min()) / (gamhat.max() - gamhat.min()) if (sum(vechat)==0): gamhat = np.linspace(0,1,M) fhat = uf.cumtrapzmid(time, qhat[0:M]*np.fabs(qhat[0:M]), np.sign(qhat[M])*(qhat[M]*qhat[M]), mididx) f_pca[:,l,k] = uf.warp_f_gamma(np.linspace(0,1,M), fhat, gamhat) q_pca[:,l,k] = uf.warp_q_gamma(np.linspace(0,1,M), qhat[0:M], gamhat) self.q_pca = q_pca self.f_pca = f_pca self.latent = s[0:no] self.coef = a self.U = U[:,0:no] self.mu_psi = mu_psi self.mu_g = mu_g = mididx self.C = C self.time = time self.g = g self.cov = cov = no self.stds = stds return
[docs] def plot(self): """ plot plot elastic vertical fPCA result Usage: obj.plot() """ no = M = self.time.shape[0] Nstd = self.stds.shape[0] num_plot = int(np.ceil(no/3)) CBcdict = { 'Bl': (0, 0, 0), 'Or': (.9, .6, 0), 'SB': (.35, .7, .9), 'bG': (0, .6, .5), 'Ye': (.95, .9, .25), 'Bu': (0, .45, .7), 'Ve': (.8, .4, 0), 'rP': (.8, .6, .7), } cl = sorted(CBcdict.keys()) k = 0 for ii in range(0, num_plot): if k > (no-1): break fig, ax = plt.subplots(2, 3) for k1 in range(0,3): k = k1+(ii)*3 axt = ax[0, k1] if k > (no-1): break for l in range(0, Nstd): axt.plot(self.time, self.q_pca[0:M, l, k], color=CBcdict[cl[l]]) axt.set_title('q domain: PD %d' % (k + 1)) axt = ax[1, k1] for l in range(0, Nstd): axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]]) axt.set_title('f domain: PD %d' % (k + 1)) fig.set_tight_layout(True) cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent) idx = np.arange(0, self.latent.shape[0]) + 1 plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage") plt.ylabel("Percentage") plt.xlabel("Index") return
def jointfPCAd(qn, vec, C, m, mu_psi, parallel, cores): (M,N) = qn.shape g = np.vstack((qn, C*vec)) mu_q = qn.mean(axis=1) mu_g = g.mean(axis=1) K = np.cov(g) U, s, V = svd(K) a = np.zeros((N,m)) for i in range(0,N): for j in range(0,m): tmp = (g[:,i]-mu_g) a[i,j] = dot(tmp.T, U[:,j]) qhat = np.tile(mu_q, (N,1)) qhat = qhat.T qhat = qhat + dot(U[0:M,0:m],a.T) vechat = dot(U[M:,0:m], a.T/C) psihat = np.zeros((M-1,N)) gamhat = np.zeros((M-1,N)) if parallel: out = Parallel(n_jobs=cores)(delayed(jfpca_sub)(mu_psi, vechat[:,n]) for n in range(N)) gamhat = np.array(out) gamhat = gamhat.transpose() else: for ii in range(0,N): psihat[:,ii] = geo.exp_map(mu_psi,vechat[:,ii]) gam_tmp = cumtrapz(psihat[:,ii]*psihat[:,ii], np.linspace(0,1,M-1), initial=0) gamhat[:,ii] = (gam_tmp - gam_tmp.min()) / (gam_tmp.max() - gam_tmp.min()) U = U[:,0:m] s = s[0:m] return qhat, gamhat, a, U, s, mu_g, g, K def jfpca_sub(mu_psi, vechat): M = mu_psi.shape[0] psihat = geo.exp_map(mu_psi, vechat) gam_tmp = cumtrapz(psihat*psihat, np.linspace(0,1,M), initial=0) gamhat = (gam_tmp - gam_tmp.min()) / (gam_tmp.max() - gam_tmp.min()) return gamhat def find_C(C, qn, vec, q0, m, mu_psi, parallel, cores): qhat, gamhat, a, U, s, mu_g, g, K = jointfPCAd(qn, vec, C, m, mu_psi, parallel, cores) (M,N) = qn.shape time = np.linspace(0,1,M-1) d = np.zeros(N) if parallel: out = Parallel(n_jobs=cores)(delayed(find_C_sub)(time, qhat[0:(M-1),n], gamhat[:,n], q0[:,n]) for n in range(N)) d = np.array(out) else: for i in range(0,N): tmp = uf.warp_q_gamma(time, qhat[0:(M-1),i], uf.invertGamma(gamhat[:,i])) d[i] = trapz((tmp-q0[:,i])*(tmp-q0[:,i]), time) out = sum(d*d)/N return out def find_C_sub(time, qhat, gamhat, q0): tmp = uf.warp_q_gamma(time, qhat, uf.invertGamma(gamhat)) d = trapz((tmp-q0)*(tmp-q0), time) return d