"""
Vertical and Horizontal Functional Principal Component Analysis using SRSF
moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>
"""
import numpy as np
import fdasrsf as fs
import fdasrsf.utility_functions as uf
import fdasrsf.geometry as geo
from scipy.linalg import norm, svd
from scipy.integrate import trapezoid, cumulative_trapezoid
from scipy.optimize import fminbound
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
import fdasrsf.plot_style as plot
import collections
[docs]
class fdavpca:
"""
This class provides vertical fPCA using the
SRVF framework
Usage: obj = fdavpca(warp_data)
:param warp_data: fdawarp class with alignment data
:param q_pca: srvf principal directions
:param f_pca: f principal directions
:param latent: latent values
:param coef: principal coefficients
:param id: point used for f(0)
:param mqn: mean srvf
:param U: eigenvectors
:param stds: geodesic directions
:param new_coef: principal coefficients of new data
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 15-Mar-2018
"""
def __init__(self, fdawarp):
"""
Construct an instance of the fdavpca class
:param fdawarp: fdawarp class
"""
if fdawarp.fn.size == 0:
raise Exception("Please align fdawarp class using srsf_align!")
self.warp_data = fdawarp
[docs]
def calc_fpca(self, no=3, var_exp=None, id=None, stds=np.arange(-1, 2)):
"""
This function calculates vertical functional principal component
analysis on aligned data
:param no: number of components to extract (default = 3)
:param var_exp: compute no based on value percent variance explained
(example: 0.95)
:param id: point to use for f(0) (default = midpoint)
:param stds: number of standard deviations along geodesic to compute
(default = -1,0,1)
:type no: int
:type id: int
:rtype: fdavpca object containing
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:return U: eigenvectors
"""
fn = self.warp_data.fn
time = self.warp_data.time
qn = self.warp_data.qn
M = time.shape[0]
if var_exp is not None:
if var_exp > 1:
raise Exception("var_exp is greater than 1")
no = M
if id is None:
mididx = int(np.round(M / 2))
else:
mididx = id
Nstd = stds.shape[0]
# FPCA
mq_new = qn.mean(axis=1)
N = mq_new.shape[0]
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
self.mqn2 = mqn
qn2 = np.vstack((qn, m_new))
K = np.cov(qn2)
U, s, V = svd(K)
stdS = np.sqrt(s)
# compute the PCA in the q domain
q_pca = np.ndarray(shape=(N + 1, Nstd, no), dtype=float)
for k in range(0, no):
for l in range(0, Nstd):
q_pca[:, l, k] = mqn + stds[l] * stdS[k] * U[:, k]
# compute the correspondence in the f domain
f_pca = np.ndarray(shape=(N, Nstd, no), dtype=float)
for k in range(0, no):
for l in range(0, Nstd):
f_pca[:, l, k] = uf.cumtrapzmid(
time,
q_pca[0:N, l, k] * np.abs(q_pca[0:N, l, k]),
np.sign(q_pca[N, l, k]) * (q_pca[N, l, k] ** 2),
mididx,
)
fbar = fn.mean(axis=1)
fsbar = f_pca[:, :, k].mean(axis=1)
err = np.transpose(np.tile(fbar - fsbar, (Nstd, 1)))
f_pca[:, :, k] += err
N2 = qn.shape[1]
c = np.zeros((N2, no))
for k in range(0, no):
for l in range(0, N2):
c[l, k] = sum((np.append(qn[:, l], m_new[l]) - mqn) * U[:, k])
if var_exp is not None:
cumm_coef = np.cumsum(s) / sum(s)
no = int(np.argwhere(cumm_coef <= var_exp)[-1])
self.q_pca = q_pca
self.f_pca = f_pca
self.latent = s[0:no]
self.coef = c[:, 0:no]
self.U = U[:, 0:no]
self.id = mididx
self.mqn = mqn
self.time = time
self.stds = stds
self.no = no
return
[docs]
def project(self, f):
"""
project new data onto fPCA basis
Usage: obj.project(f)
:param f: numpy array (MxN) of N functions on M time
"""
q1 = fs.f_to_srsf(f, self.time)
M = self.time.shape[0]
n = q1.shape[1]
mq = self.warp_data.mqn
fn = np.zeros((M, n))
qn = np.zeros((M, n))
gam = np.zeros((M, n))
for ii in range(0, n):
gam[:, ii] = fs.optimum_reparam(mq, self.time, q1[:, ii])
fn[:, ii] = fs.warp_f_gamma(self.time, f[:, ii], gam[:, ii])
qn[:, ii] = fs.f_to_srsf(fn[:, ii], self.time)
U = self.U
no = U.shape[1]
m_new = np.sign(fn[self.id, :]) * np.sqrt(np.abs(fn[self.id, :]))
qn1 = np.vstack((qn, m_new))
a = np.zeros((n, no))
for i in range(0, n):
for j in range(0, no):
a[i, j] = sum((qn1[:, i] - self.mqn2) * U[:, j])
self.new_coef = a
return
[docs]
def plot(self):
"""
plot plot elastic vertical fPCA result
Usage: obj.plot()
"""
no = self.no
Nstd = self.stds.shape[0]
N = self.time.shape[0]
num_plot = int(np.ceil(no / 3))
CBcdict = {
"Bl": (0, 0, 0),
"Or": (0.9, 0.6, 0),
"SB": (0.35, 0.7, 0.9),
"bG": (0, 0.6, 0.5),
"Ye": (0.95, 0.9, 0.25),
"Bu": (0, 0.45, 0.7),
"Ve": (0.8, 0.4, 0),
"rP": (0.8, 0.6, 0.7),
}
cl = sorted(CBcdict.keys())
k = 0
for ii in range(0, num_plot):
if k > (no - 1):
break
fig, ax = plt.subplots(2, 3)
for k1 in range(0, 3):
k = k1 + (ii) * 3
axt = ax[0, k1]
if k > (no - 1):
break
for l in range(0, Nstd):
axt.plot(self.time, self.q_pca[0:N, l, k], color=CBcdict[cl[l]])
axt.set_title("q domain: PD %d" % (k + 1))
axt = ax[1, k1]
for l in range(0, Nstd):
axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]])
axt.set_title("f domain: PD %d" % (k + 1))
fig.set_tight_layout(True)
cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent)
N = self.latent.shape[0]
idx = np.arange(0, N) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.ylabel("Percentage")
plt.xlabel("Index")
plt.show()
return
[docs]
class fdahpca:
"""
This class provides horizontal fPCA using the
SRVF framework
Usage: obj = fdahpca(warp_data)
:param warp_data: fdawarp class with alignment data
:param gam_pca: warping functions principal directions
:param psi_pca: srvf principal directions
:param latent: latent values
:param U: eigenvectors
:param coef: coefficients
:param vec: shooting vectors
:param mu: Karcher Mean
:param tau: principal directions
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 15-Mar-2018
"""
def __init__(self, fdawarp):
"""
Construct an instance of the fdavpca class
:param fdawarp: fdawarp class
"""
if fdawarp.fn.size == 0:
raise Exception("Please align fdawarp class using srsf_align!")
self.warp_data = fdawarp
[docs]
def calc_fpca(self, no=3, var_exp=None, stds=np.arange(-1, 2)):
"""
This function calculates horizontal functional principal component
analysis on aligned data
:param no: number of components to extract (default = 3)
:param var_exp: compute no based on value percent variance explained
(example: 0.95)
:param stds: number of standard deviations along geodesic to compute
(default = -1,0,1)
:type no: int
:rtype: fdahpca object of numpy ndarray
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:return U: eigenvectors
"""
# Calculate Shooting Vectors
gam = self.warp_data.gam
mu, gam_mu, psi, vec = uf.SqrtMean(gam)
TT = self.warp_data.time.shape[0]
if var_exp is not None:
if var_exp > 1:
raise Exception("var_exp is greater than 1")
no = TT
# TFPCA
K = np.cov(vec)
U, s, V = svd(K)
vm = vec.mean(axis=1)
self.vm = vm
self.mu_psi = mu
gam_pca = np.ndarray(shape=(stds.shape[0], mu.shape[0], no), dtype=float)
psi_pca = np.ndarray(shape=(stds.shape[0], mu.shape[0], no), dtype=float)
for j in range(0, no):
cnt = 0
for k in stds:
v = k * np.sqrt(s[j]) * U[:, j]
vn = norm(v) / np.sqrt(TT)
if vn < 0.0001:
psi_pca[cnt, :, j] = mu
else:
psi_pca[cnt, :, j] = np.cos(vn) * mu + np.sin(vn) * v / vn
tmp = cumulative_trapezoid(
psi_pca[cnt, :, j] * psi_pca[cnt, :, j],
np.linspace(0, 1, TT),
initial=0,
)
gam_pca[cnt, :, j] = (tmp - tmp[0]) / (tmp[-1] - tmp[0])
cnt += 1
N2 = gam.shape[1]
c = np.zeros((N2, no))
for k in range(0, no):
for i in range(0, N2):
c[i, k] = np.dot(vec[:, i] - vm, U[:, k])
if var_exp is not None:
cumm_coef = np.cumsum(s) / sum(s)
no = int(np.argwhere(cumm_coef <= var_exp)[-1])
self.gam_pca = gam_pca
self.psi_pca = psi_pca
self.U = U[:, 0:no]
self.coef = c[:, 0:no]
self.latent = s[0:no]
self.gam_mu = gam_mu
self.psi_mu = mu
self.vec = vec
self.no = no
self.stds = stds
return
[docs]
def project(self, f):
"""
project new data onto fPCA basis
Usage: obj.project(f)
:param f: numpy array (MxN) of N functions on M time
"""
q1 = fs.f_to_srsf(f, self.time)
M = self.time.shape[0]
n = q1.shape[1]
mq = self.warp_data.mqn
gam = np.zeros((M, n))
for ii in range(0, n):
gam[:, ii] = fs.optimum_reparam(mq, self.time, q1[:, ii])
U = self.U
no = U.shape[1]
mu_psi = self.mu_psi
vec = np.zeros((M, n))
psi = np.zeros((M, n))
time = np.linspace(0, 1, M)
binsize = np.mean(np.diff(time))
for i in range(0, n):
psi[:, i] = np.sqrt(np.gradient(gam[:, i], binsize))
out, theta = fs.inv_exp_map(mu_psi, psi[:, i])
vec[:, i] = out
a = np.zeros((n, no))
for i in range(0, n):
for j in range(0, no):
a[i, j] = np.dot(vec[:, i] - self.vm, U[:, j])
self.new_coef = a
return
[docs]
def plot(self):
"""
plot plot elastic horizontal fPCA results
Usage: obj.plot()
"""
no = self.no
TT = self.warp_data.time.shape[0]
num_plot = int(np.ceil(no / 3))
CBcdict = {
"Bl": (0, 0, 0),
"Or": (0.9, 0.6, 0),
"SB": (0.35, 0.7, 0.9),
"bG": (0, 0.6, 0.5),
"Ye": (0.95, 0.9, 0.25),
"Bu": (0, 0.45, 0.7),
"Ve": (0.8, 0.4, 0),
"rP": (0.8, 0.6, 0.7),
}
k = 0
for ii in range(0, num_plot):
if k > (no - 1):
break
fig, ax = plt.subplots(1, 3)
for k1 in range(0, 3):
k = k1 + (ii) * 3
axt = ax[k1]
if k > (no - 1):
break
tmp = self.gam_pca[:, :, k]
axt.plot(np.linspace(0, 1, TT), tmp.transpose())
axt.set_title("PD %d" % (k + 1))
axt.set_aspect("equal")
fig.set_tight_layout(True)
cumm_coef = 100 * np.cumsum(self.latent[0:no]) / sum(self.latent[0:no])
idx = np.arange(0, no) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.ylabel("Percentage")
plt.xlabel("Index")
plt.show()
return
[docs]
class fdajpca:
"""
This class provides joint fPCA using the
SRVF framework
Usage: obj = fdajpca(warp_data)
:param warp_data: fdawarp class with alignment data
:param q_pca: srvf principal directions
:param f_pca: f principal directions
:param latent: latent values
:param coef: principal coefficients
:param id: point used for f(0)
:param mqn: mean srvf
:param U: eigenvectors
:param mu_psi: mean psi
:param mu_g: mean g
:param C: scaling value
:param stds: geodesic directions
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 18-Mar-2018
"""
def __init__(self, fdawarp):
"""
Construct an instance of the fdavpca class
:param fdawarp: fdawarp class
"""
if fdawarp.fn.size == 0:
raise Exception("Please align fdawarp class using srsf_align!")
self.warp_data = fdawarp
self.M = fdawarp.time.shape[0]
[docs]
def calc_fpca(
self,
no=3,
var_exp=None,
stds=np.arange(-1.0, 2.0),
id=None,
parallel=False,
cores=-1,
):
"""
This function calculates joint functional principal component analysis
on aligned data
:param no: number of components to extract (default = 3)
:param var_exp: compute no based on value percent variance explained
(example: 0.95)
:param id: point to use for f(0) (default = midpoint)
:param stds: number of standard deviations along gedoesic to compute
(default = -1,0,1)
:param parallel: run in parallel (default = F)
:param cores: number of cores for parallel (default = -1 (all))
:type no: int
:type id: int
:type parallel: bool
:type cores: int
:rtype: fdajpca object of numpy ndarray
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:return U: eigenvectors
"""
fn = self.warp_data.fn
time = self.warp_data.time
qn = self.warp_data.qn
q0 = self.warp_data.q0
gam = self.warp_data.gam
M = time.shape[0]
if var_exp is not None:
if var_exp > 1:
raise Exception("var_exp is greater than 1")
no = M
if id is None:
mididx = int(np.round(M / 2))
else:
mididx = id
Nstd = stds.shape[0]
# set up for fPCA in q-space
mq_new = qn.mean(axis=1)
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
qn2 = np.vstack((qn, m_new))
# calculate vector space of warping functions
mu_psi, gam_mu, psi, vec = uf.SqrtMean(gam, parallel, cores)
# joint fPCA
C = fminbound(find_C, 0, 1e4, (qn2, vec, q0, no, mu_psi, parallel, cores))
qhat, gamhat, a, U, s, mu_g, g, cov = jointfPCAd(
qn2, vec, C, no, mu_psi, parallel, cores
)
# geodesic paths
q_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)
f_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)
for k in range(0, no):
for l in range(0, Nstd):
qhat = mqn + np.dot(U[0: (M + 1), k], stds[l] * np.sqrt(s[k]))
vechat = np.dot(U[(M + 1):, k], (stds[l] * np.sqrt(s[k])) / C)
psihat = geo.exp_map(mu_psi, vechat)
gamhat = cumulative_trapezoid(psihat * psihat,
np.linspace(0, 1, M),
initial=0)
gamhat = (gamhat - gamhat.min()) / (gamhat.max() - gamhat.min())
if sum(vechat) == 0:
gamhat = np.linspace(0, 1, M)
fhat = uf.cumtrapzmid(
time,
qhat[0:M] * np.fabs(qhat[0:M]),
np.sign(qhat[M]) * (qhat[M] * qhat[M]),
mididx,
)
f_pca[:, l, k] = uf.warp_f_gamma(np.linspace(0, 1, M),
fhat, gamhat)
q_pca[:, l, k] = uf.warp_q_gamma(
np.linspace(0, 1, M), qhat[0:M], gamhat
)
if var_exp is not None:
cumm_coef = np.cumsum(s) / s.sum()
no = int(np.argwhere(cumm_coef >= var_exp)[0][0])
self.q_pca = q_pca
self.f_pca = f_pca
self.latent = s[0:no]
self.coef = a[:, 0:no]
self.U = U[:, 0:no]
self.mu_psi = mu_psi
self.mu_g = mu_g
self.id = mididx
self.C = C
self.time = time
self.g = g
self.cov = cov
self.no = no
self.stds = stds
return
[docs]
def project(self, f):
"""
project new data onto fPCA basis
Usage: obj.project(f)
:param f: numpy array (MxN) of N functions on M time
"""
q1 = fs.f_to_srsf(f, self.time)
M = self.time.shape[0]
n = q1.shape[1]
mq = self.warp_data.mqn
fn = np.zeros((M, n))
qn = np.zeros((M, n))
gam = np.zeros((M, n))
for ii in range(0, n):
gam[:, ii] = fs.optimum_reparam(mq, self.time, q1[:, ii])
fn[:, ii] = fs.warp_f_gamma(self.time, f[:, ii], gam[:, ii])
qn[:, ii] = fs.f_to_srsf(fn[:, ii], self.time)
U = self.U
no = U.shape[1]
m_new = np.sign(fn[self.id, :]) * np.sqrt(np.abs(fn[self.id, :]))
qn1 = np.vstack((qn, m_new))
C = self.C
TT = self.time.shape[0]
mu_g = self.mu_g
mu_psi = self.mu_psi
vec = np.zeros((M, n))
psi = np.zeros((TT, n))
time = np.linspace(0, 1, TT)
binsize = np.mean(np.diff(time))
for i in range(0, n):
psi[:, i] = np.sqrt(np.gradient(gam[:, i], binsize))
out, theta = fs.inv_exp_map(mu_psi, psi[:, i])
vec[:, i] = out
g = np.vstack((qn1, C * vec))
a = np.zeros((n, no))
for i in range(0, n):
for j in range(0, no):
tmp = g[:, i] - mu_g
a[i, j] = np.dot(tmp.T, U[:, j])
self.new_coef = a
return
[docs]
def plot(self):
"""
plot plot elastic joint fPCA result
Usage: obj.plot()
"""
no = self.no
M = self.time.shape[0]
Nstd = self.stds.shape[0]
num_plot = int(np.ceil(no / 3))
CBcdict = {
"Bl": (0, 0, 0),
"Or": (0.9, 0.6, 0),
"SB": (0.35, 0.7, 0.9),
"bG": (0, 0.6, 0.5),
"Ye": (0.95, 0.9, 0.25),
"Bu": (0, 0.45, 0.7),
"Ve": (0.8, 0.4, 0),
"rP": (0.8, 0.6, 0.7),
}
cl = sorted(CBcdict.keys())
k = 0
for ii in range(0, num_plot):
if k > (no - 1):
break
fig, ax = plt.subplots(2, 3)
for k1 in range(0, 3):
k = k1 + (ii) * 3
axt = ax[0, k1]
if k > (no - 1):
break
for l in range(0, Nstd):
axt.plot(self.time, self.q_pca[0:M, l, k], color=CBcdict[cl[l]])
axt.set_title("q domain: PD %d" % (k + 1))
axt = ax[1, k1]
for l in range(0, Nstd):
axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]])
axt.set_title("f domain: PD %d" % (k + 1))
fig.set_tight_layout(True)
cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent)
idx = np.arange(0, self.latent.shape[0]) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.ylabel("Percentage")
plt.xlabel("Index")
plt.show()
return
[docs]
class fdajpcah:
"""
This class provides joint fPCA using the
SRVF framework using MFPCA
Usage: obj = fdajpcah(warp_data)
:param warp_data: fdawarp class with alignment data
:param q_pca: srvf principal directions
:param f_pca: f principal directions
:param latent: latent values
:param coef: principal coefficients
:param id: point used for f(0)
:param mqn: mean srvf
:param U_q: eigenvectors for q
:param U_h: eigenvectors for gam
:param C: scaling value
:param stds: geodesic directions
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 06-April-2024
"""
def __init__(self, fdawarp):
"""
Construct an instance of the fdavpca class
:param fdawarp: fdawarp class
"""
if fdawarp.fn.size == 0:
raise Exception("Please align fdawarp class using srsf_align!")
self.warp_data = fdawarp
self.M = fdawarp.time.shape[0]
[docs]
def calc_fpca(
self,
var_exp=0.99,
stds=np.arange(-1.0, 2.0),
id=None,
parallel=False,
cores=-1,
):
"""
This function calculates joint functional principal component analysis
on aligned data
:param var_exp: compute no based on value percent variance explained
(default: 0.99)
:param id: point to use for f(0) (default = midpoint)
:param stds: number of standard deviations along gedoesic to compute
(default = -1,0,1)
:param parallel: run in parallel (default = F)
:param cores: number of cores for parallel (default = -1 (all))
:type no: int
:type id: int
:type parallel: bool
:type cores: int
:rtype: fdajpcah object of numpy ndarray
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:param U_q: eigenvectors for q
:param U_h: eigenvectors for gam
"""
fn = self.warp_data.fn
time = self.warp_data.time
qn = self.warp_data.qn
q0 = self.warp_data.q0
gam = self.warp_data.gam
M = time.shape[0]
if var_exp is not None:
if var_exp > 1:
raise Exception("var_exp is greater than 1")
if id is None:
mididx = int(np.round(M / 2))
else:
mididx = id
Nstd = stds.shape[0]
# set up for fPCA in q-space
mq_new = qn.mean(axis=1)
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
qn2 = np.vstack((qn, m_new))
# calculate vector space of warping functions
h = geo.gam_to_h(gam)
# joint fPCA
C = fminbound(find_C_h, 0, 200, (qn2, h, q0, 0.99, parallel, cores))
qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz = jointfPCAhd(qn2, h, C, var_exp)
hc = C * h
mh = np.mean(hc, axis=1)
# geodesic paths
no = cz.shape[1]
q_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)
f_pca = np.ndarray(shape=(M, Nstd, no), dtype=float)
for k in range(0, no):
for l in range(0, Nstd):
qhat = mqn + np.dot(Psi_q[:, k], stds[l] * np.sqrt(sz[k]))
hhat = np.dot(Psi_h[:, k], (stds[l] * np.sqrt(sz[k])) / C)
gamhat = fs.geometry.h_to_gam(hhat)
fhat = fs.utility_functions.cumtrapzmid(
time,
qhat[0:M] * np.fabs(qhat[0:M]),
np.sign(qhat[M]) * (qhat[M] * qhat[M]),
mididx,
)
f_pca[:, l, k] = fs.utility_functions.warp_f_gamma(
np.linspace(0, 1, M), fhat, gamhat
)
q_pca[:, l, k] = fs.utility_functions.warp_q_gamma(
np.linspace(0, 1, M), qhat[0:M], gamhat
)
self.q_pca = q_pca
self.f_pca = f_pca
self.latent = sz[0:no]
self.coef = cz[:, 0:no]
self.U_q = Psi_q
self.U_h = Psi_h
self.id = mididx
self.C = C
self.time = time
self.no = no
self.stds = stds
self.mqn = mqn
self.U = U
self.U1 = Uh
self.Uz = Uz
self.mh = mh
return
[docs]
def project(self, f):
"""
project new data onto fPCA basis
Usage: obj.project(f)
:param f: numpy array (MxN) of N functions on M time
"""
q1 = fs.f_to_srsf(f, self.time)
M = self.time.shape[0]
n = q1.shape[1]
mq = self.warp_data.mqn
fn = np.zeros((M, n))
qn = np.zeros((M, n))
gam = np.zeros((M, n))
for ii in range(0, n):
gam[:, ii] = fs.optimum_reparam(mq, self.time, q1[:, ii])
fn[:, ii] = fs.warp_f_gamma(self.time, f[:, ii], gam[:, ii])
qn[:, ii] = fs.f_to_srsf(fn[:, ii], self.time)
m_new = np.sign(fn[self.id, :]) * np.sqrt(np.abs(fn[self.id, :]))
qn1 = np.vstack((qn, m_new))
C = self.C
h = geo.gam_to_h(gam)
c = (qn1 - self.mqn[:, np.newaxis]).T @ self.U
ch = (C*h - self.mh[:, np.newaxis]).T @ self.U1
Xi = np.hstack((c, ch))
cz = Xi @ self.Uz
self.new_coef = cz
return
[docs]
def plot(self):
"""
plot plot elastic joint fPCA result
Usage: obj.plot()
"""
no = self.no
M = self.time.shape[0]
Nstd = self.stds.shape[0]
num_plot = int(np.ceil(no / 3))
CBcdict = {
"Bl": (0, 0, 0),
"Or": (0.9, 0.6, 0),
"SB": (0.35, 0.7, 0.9),
"bG": (0, 0.6, 0.5),
"Ye": (0.95, 0.9, 0.25),
"Bu": (0, 0.45, 0.7),
"Ve": (0.8, 0.4, 0),
"rP": (0.8, 0.6, 0.7),
}
cl = sorted(CBcdict.keys())
k = 0
for ii in range(0, num_plot):
if k > (no - 1):
break
fig, ax = plt.subplots(2, 3)
for k1 in range(0, 3):
k = k1 + (ii) * 3
axt = ax[0, k1]
if k > (no - 1):
break
for l in range(0, Nstd):
axt.plot(self.time, self.q_pca[0:M, l, k], color=CBcdict[cl[l]])
axt.set_title("q domain: PD %d" % (k + 1))
axt = ax[1, k1]
for l in range(0, Nstd):
axt.plot(self.time, self.f_pca[:, l, k], color=CBcdict[cl[l]])
axt.set_title("f domain: PD %d" % (k + 1))
fig.set_tight_layout(True)
cumm_coef = 100 * np.cumsum(self.latent) / sum(self.latent)
idx = np.arange(0, self.latent.shape[0]) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.ylabel("Percentage")
plt.xlabel("Index")
plt.show()
return
def jointfPCAd(qn, vec, C, m, mu_psi, parallel, cores):
(M, N) = qn.shape
g = np.vstack((qn, C * vec))
mu_q = qn.mean(axis=1)
mu_g = g.mean(axis=1)
K = np.cov(g)
U, s, V = svd(K)
a = np.zeros((N, m))
for i in range(0, N):
for j in range(0, m):
tmp = g[:, i] - mu_g
a[i, j] = np.dot(tmp.T, U[:, j])
qhat = np.tile(mu_q, (N, 1))
qhat = qhat.T
qhat = qhat + np.dot(U[0:M, 0:m], a.T)
vechat = np.dot(U[M:, 0:m], a.T / C)
psihat = np.zeros((M - 1, N))
gamhat = np.zeros((M - 1, N))
if parallel:
out = Parallel(n_jobs=cores)(
delayed(jfpca_sub)(mu_psi, vechat[:, n]) for n in range(N)
)
gamhat = np.array(out)
gamhat = gamhat.transpose()
else:
for ii in range(0, N):
psihat[:, ii] = geo.exp_map(mu_psi, vechat[:, ii])
gam_tmp = cumulative_trapezoid(
psihat[:, ii] * psihat[:, ii], np.linspace(0, 1, M - 1),
initial=0)
gamhat[:, ii] = (gam_tmp - gam_tmp.min()) / (gam_tmp.max() - gam_tmp.min())
U = U[:, 0:m]
s = s[0:m]
return qhat, gamhat, a, U, s, mu_g, g, K
def jointfPCAhd(qn, h, C, var_exp):
(M, N) = qn.shape
# Run Univariate fPCA
# q space
K = np.cov(qn)
mqn = qn.mean(axis=1)
U, s, V = svd(K)
U, V = uf.svd_flip(U, V)
cumm_coef = np.cumsum(s) / s.sum()
no_q = int(np.argwhere(cumm_coef >= var_exp)[0][0])
c = (qn - mqn[:, np.newaxis]).T @ U
c = c[:, 0:no_q]
U = U[:, 0:no_q]
# h space
hc = C * h
mh = np.mean(hc, axis=1)
Kh = np.cov(hc)
Uh, sh, Vh = svd(Kh)
Uh, Vh = uf.svd_flip(Uh, Vh)
cumm_coef = np.cumsum(sh) / sh.sum()
no_h = int(np.argwhere(cumm_coef >= var_exp)[0][0]) + 1
ch = (hc - mh[:, np.newaxis]).T @ Uh
ch = ch[:, 0:no_h]
Uh = Uh[:, 0:no_h]
# Run Multivariate fPCA
Xi = np.hstack((c, ch))
Z = 1 / (Xi.shape[0] - 1) * Xi.T @ Xi
Uz, sz, Vz = svd(Z)
Uz, Vz = uf.svd_flip(Uz, Vz)
cz = Xi @ Uz
Psi_q = U @ Uz[0:no_q, :]
Psi_h = Uh @ Uz[no_q:, :]
hhat = Psi_h @ (cz).T
gamhat = fs.geometry.h_to_gam(hhat)
qhat = Psi_q @ cz.T + mqn[:, np.newaxis]
return qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz
def find_C_h(C, qn, h, q0, var_exp, parallel, cores):
qhat, gamhat, cz, Psi_q, Psi_h, sz, U, Uh, Uz = jointfPCAhd(qn, h, C, var_exp)
(M, N) = qn.shape
time = np.linspace(0, 1, M - 1)
d = np.zeros(N)
if parallel:
out = Parallel(n_jobs=cores)(
delayed(find_C_sub)(time, qhat[0: (M - 1), n], gamhat[:, n], q0[:, n])
for n in range(N)
)
d = np.array(out)
else:
for i in range(0, N):
tmp = uf.warp_q_gamma(time, qhat[0: (M - 1), i],
uf.invertGamma(gamhat[:, i]))
d[i] = trapezoid((tmp - q0[:, i]) * (tmp - q0[:, i]), time)
dout = d.mean()
return dout
def jfpca_sub(mu_psi, vechat):
M = mu_psi.shape[0]
psihat = geo.exp_map(mu_psi, vechat)
gam_tmp = cumulative_trapezoid(psihat * psihat, np.linspace(0, 1, M),
initial=0)
gamhat = (gam_tmp - gam_tmp.min()) / (gam_tmp.max() - gam_tmp.min())
return gamhat
def find_C(C, qn, vec, q0, m, mu_psi, parallel, cores):
qhat, gamhat, a, U, s, mu_g, g, K = jointfPCAd(
qn, vec, C, m, mu_psi, parallel, cores
)
(M, N) = qn.shape
time = np.linspace(0, 1, M - 1)
d = np.zeros(N)
if parallel:
out = Parallel(n_jobs=cores)(
delayed(find_C_sub)(time, qhat[0: (M - 1), n], gamhat[:, n],
q0[:, n])
for n in range(N)
)
d = np.array(out)
else:
for i in range(0, N):
tmp = uf.warp_q_gamma(
time, qhat[0: (M - 1), i], uf.invertGamma(gamhat[:, i])
)
d[i] = trapezoid((tmp - q0[:, i]) * (tmp - q0[:, i]), time)
out = sum(d * d) / N
return out
def find_C_sub(time, qhat, gamhat, q0):
tmp = uf.warp_q_gamma(time, qhat, uf.invertGamma(gamhat))
d = trapezoid((tmp - q0) * (tmp - q0), time)
return d