"""
Group-wise function alignment using SRSF framework and Dynamic Programming
moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>
"""
import numpy as np
import matplotlib.pyplot as plt
import fdasrsf.utility_functions as uf
import fdasrsf.fPCA as fpca
import fdasrsf.geometry as geo
from scipy.integrate import trapz, cumtrapz
from scipy.interpolate import interp1d
from scipy.linalg import svd
from numpy.linalg import norm
from numpy.random import rand, normal
from joblib import Parallel, delayed
from fdasrsf.fPLS import pls_svd
from tqdm import tqdm
import fdasrsf.plot_style as plot
import fpls_warp as fpls
import cbayesian as bay
import collections
[docs]class fdawarp:
"""
This class provides alignment methods for functional data using the SRVF framework
Usage: obj = fdawarp(f,t)
:param f: (M,N): matrix defining N functions of M samples
:param time: time vector of length M
:param fn: aligned functions
:param qn: aligned srvfs
:param q0: initial srvfs
:param fmean: function mean
:param mqn: mean srvf
:param gam: warping functions
:param psi: srvf of warping functions
:param stats: alignment statistics
:param qun: cost function
:param lambda: lambda
:param method: optimization method
:param gamI: inverse warping function
:param rsamps: random samples
:param fs: random aligned functions
:param gams: random warping functions
:param ft: random warped functions
:param qs: random aligned srvfs
:param type: alignment type
:param mcmc: mcmc output if bayesian
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 15-Mar-2018
"""
def __init__(self, f, time):
"""
Construct an instance of the fdawarp class
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param time: vector of size M describing the sample points
"""
a = time.shape[0]
if f.shape[0] != a:
raise Exception('Columns of f and time must be equal')
self.f = f
self.time = time
self.rsamps = False
[docs] def srsf_align(self, method="mean", omethod="DP2", smoothdata=False, parallel=False, lam=0.0, cores=-1, grid_dim=7):
"""
This function aligns a collection of functions using the elastic
square-root slope (srsf) framework.
:param method: (string) warp calculate Karcher Mean or Median (options = "mean" or "median") (default="mean")
:param omethod: optimization method (DP, DP2, RBFGS) (default = DP2)
:param smoothdata: Smooth the data using a box filter (default = F)
:param parallel: run in parallel (default = F)
:param lam: controls the elasticity (default = 0)
:param cores: number of cores for parallel (default = -1 (all))
:param grid_dim: size of the grid, for the DP2 method only (default = 7)
:type lam: double
:type smoothdata: bool
Examples
>>> import tables
>>> fun=tables.open_file("../Data/simu_data.h5")
>>> f = fun.root.f[:]
>>> f = f.transpose()
>>> time = fun.root.time[:]
>>> obj = fs.fdawarp(f,time)
>>> obj.srsf_align()
"""
M = self.f.shape[0]
N = self.f.shape[1]
self.lam = lam
if M > 500:
parallel = True
elif N > 100:
parallel = True
eps = np.finfo(np.double).eps
f0 = self.f
self.method = omethod
methods = ["mean", "median"]
self.type = method
# 0 mean, 1-median
method = [i for i, x in enumerate(methods) if x == method]
if len(method) == 0:
method = 0
else:
method = method[0]
# Compute SRSF function from data
f, g, g2 = uf.gradient_spline(self.time, self.f, smoothdata)
q = g / np.sqrt(abs(g) + eps)
print("Initializing...")
mnq = q.mean(axis=1)
a = mnq.repeat(N)
d1 = a.reshape(M, N)
d = (q - d1) ** 2
dqq = np.sqrt(d.sum(axis=0))
min_ind = dqq.argmin()
mq = q[:, min_ind]
mf = f[:, min_ind]
if parallel:
out = Parallel(n_jobs=cores)(delayed(uf.optimum_reparam)(mq, self.time,
q[:, n], omethod, lam, grid_dim) for n in range(N))
gam = np.array(out)
gam = gam.transpose()
else:
gam = np.zeros((M,N))
for k in range(0,N):
gam[:,k] = uf.optimum_reparam(mq,self.time,q[:,k],omethod,lam,grid_dim)
gamI = uf.SqrtMeanInverse(gam)
mf = np.interp((self.time[-1] - self.time[0]) * gamI + self.time[0], self.time, mf)
mq = uf.f_to_srsf(mf, self.time)
# Compute Karcher Mean
if method == 0:
print("Compute Karcher Mean of %d function in SRSF space..." % N)
if method == 1:
print("Compute Karcher Median of %d function in SRSF space..." % N)
MaxItr = 20
ds = np.repeat(0.0, MaxItr + 2)
ds[0] = np.inf
qun = np.repeat(0.0, MaxItr + 1)
tmp = np.zeros((M, MaxItr + 2))
tmp[:, 0] = mq
mq = tmp
tmp = np.zeros((M, MaxItr+2))
tmp[:,0] = mf
mf = tmp
tmp = np.zeros((M, N, MaxItr + 2))
tmp[:, :, 0] = self.f
f = tmp
tmp = np.zeros((M, N, MaxItr + 2))
tmp[:, :, 0] = q
q = tmp
for r in range(0, MaxItr):
print("updating step: r=%d" % (r + 1))
if r == (MaxItr - 1):
print("maximal number of iterations is reached")
# Matching Step
if parallel:
out = Parallel(n_jobs=cores)(delayed(uf.optimum_reparam)(mq[:, r],
self.time, q[:, n, 0], omethod, lam, grid_dim) for n in range(N))
gam = np.array(out)
gam = gam.transpose()
else:
for k in range(0,N):
gam[:,k] = uf.optimum_reparam(mq[:, r], self.time, q[:, k, 0],
omethod, lam, grid_dim)
gam_dev = np.zeros((M, N))
vtil = np.zeros((M,N))
dtil = np.zeros(N)
for k in range(0, N):
f[:, k, r + 1] = np.interp((self.time[-1] - self.time[0]) * gam[:, k]
+ self.time[0], self.time, f[:, k, 0])
q[:, k, r + 1] = uf.f_to_srsf(f[:, k, r + 1], self.time)
gam_dev[:, k] = np.gradient(gam[:, k], 1 / float(M - 1))
v = q[:, k, r + 1] - mq[:,r]
d = np.sqrt(trapz(v*v, self.time))
vtil[:,k] = v/d
dtil[k] = 1.0/d
mqt = mq[:, r]
a = mqt.repeat(N)
d1 = a.reshape(M, N)
d = (q[:, :, r + 1] - d1) ** 2
if method == 0:
d1 = sum(trapz(d, self.time, axis=0))
d2 = sum(trapz((1 - np.sqrt(gam_dev)) ** 2, self.time, axis=0))
ds_tmp = d1 + lam * d2
ds[r + 1] = ds_tmp
# Minimization Step
# compute the mean of the matched function
qtemp = q[:, :, r + 1]
ftemp = f[:, :, r + 1]
mq[:, r + 1] = qtemp.mean(axis=1)
mf[:, r + 1] = ftemp.mean(axis=1)
qun[r] = norm(mq[:, r + 1] - mq[:, r]) / norm(mq[:, r])
if method == 1:
d1 = np.sqrt(sum(trapz(d, self.time, axis=0)))
d2 = sum(trapz((1 - np.sqrt(gam_dev)) ** 2, self.time, axis=0))
ds_tmp = d1 + lam * d2
ds[r + 1] = ds_tmp
# Minimization Step
# compute the mean of the matched function
stp = .3
vbar = vtil.sum(axis=1)*(1/dtil.sum())
qtemp = q[:, :, r + 1]
ftemp = f[:, :, r + 1]
mq[:, r + 1] = mq[:,r] + stp*vbar
tmp = np.zeros(M)
tmp[1:] = cumtrapz(mq[:, r + 1] * np.abs(mq[:, r + 1]), self.time)
mf[:, r + 1] = np.median(f0[1, :])+tmp
qun[r] = norm(mq[:, r + 1] - mq[:, r]) / norm(mq[:, r])
if qun[r] < 1e-2 or r >= MaxItr:
break
# Last Step with centering of gam
r += 1
if parallel:
out = Parallel(n_jobs=cores)(delayed(uf.optimum_reparam)(mq[:, r], self.time,
q[:, n, 0], omethod, lam, grid_dim) for n in range(N))
gam = np.array(out)
gam = gam.transpose()
else:
for k in range(0,N):
gam[:,k] = uf.optimum_reparam(mq[:, r], self.time, q[:, k, 0], omethod,
lam, grid_dim)
gam_dev = np.zeros((M, N))
for k in range(0, N):
gam_dev[:, k] = np.gradient(gam[:, k], 1 / float(M - 1))
gamI = uf.SqrtMeanInverse(gam)
gamI_dev = np.gradient(gamI, 1 / float(M - 1))
time0 = (self.time[-1] - self.time[0]) * gamI + self.time[0]
mq[:, r + 1] = np.interp(time0, self.time, mq[:, r]) * np.sqrt(gamI_dev)
for k in range(0, N):
q[:, k, r + 1] = np.interp(time0, self.time, q[:, k, r]) * np.sqrt(gamI_dev)
f[:, k, r + 1] = np.interp(time0, self.time, f[:, k, r])
gam[:, k] = np.interp(time0, self.time, gam[:, k])
# Aligned data & stats
self.fn = f[:, :, r + 1]
self.qn = q[:, :, r + 1]
self.q0 = q[:, :, 0]
mean_f0 = f0.mean(axis=1)
std_f0 = f0.std(axis=1)
mean_fn = self.fn.mean(axis=1)
std_fn = self.fn.std(axis=1)
self.gam = gam
self.mqn = mq[:, r + 1]
tmp = np.zeros(M)
tmp[1:] = cumtrapz(self.mqn * np.abs(self.mqn), self.time)
self.fmean = np.mean(f0[1, :]) + tmp
fgam = np.zeros((M, N))
for k in range(0, N):
time0 = (self.time[-1] - self.time[0]) * gam[:, k] + self.time[0]
fgam[:, k] = np.interp(time0, self.time, self.fmean)
var_fgam = fgam.var(axis=1)
self.orig_var = trapz(std_f0 ** 2, self.time)
self.amp_var = trapz(std_fn ** 2, self.time)
self.phase_var = trapz(var_fgam, self.time)
return
[docs] def plot(self):
"""
plot plot functional alignment results
Usage: obj.plot()
"""
M = self.f.shape[0]
plot.f_plot(self.time, self.f, title="f Original Data")
fig, ax = plot.f_plot(np.arange(0, M) / float(M - 1), self.gam,
title="Warping Functions")
ax.set_aspect('equal')
plot.f_plot(self.time, self.fn, title="Warped Data")
mean_f0 = self.f.mean(axis=1)
std_f0 = self.f.std(axis=1)
mean_fn = self.fn.mean(axis=1)
std_fn = self.fn.std(axis=1)
tmp = np.array([mean_f0, mean_f0 + std_f0, mean_f0 - std_f0])
tmp = tmp.transpose()
plot.f_plot(self.time, tmp, title=r"Original Data: Mean $\pm$ STD")
tmp = np.array([mean_fn, mean_fn + std_fn, mean_fn - std_fn])
tmp = tmp.transpose()
plot.f_plot(self.time, tmp, title=r"Warped Data: Mean $\pm$ STD")
plot.f_plot(self.time, self.fmean, title="$f_{mean}$")
plt.show()
return
[docs] def gauss_model(self, n=1, sort_samples=False):
"""
This function models the functional data using a Gaussian model
extracted from the principal components of the srvfs
:param n: number of random samples
:param sort_samples: sort samples (default = T)
:type n: integer
:type sort_samples: bool
"""
fn = self.fn
time = self.time
qn = self.qn
gam = self.gam
# Parameters
eps = np.finfo(np.double).eps
binsize = np.diff(time)
binsize = binsize.mean()
M = time.size
# compute mean and covariance in q-domain
mq_new = qn.mean(axis=1)
mididx = np.round(time.shape[0] / 2)
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
qn2 = np.vstack((qn, m_new))
C = np.cov(qn2)
q_s = np.random.multivariate_normal(mqn, C, n)
q_s = q_s.transpose()
# compute the correspondence to the original function domain
fs = np.zeros((M, n))
for k in range(0, n):
fs[:, k] = uf.cumtrapzmid(time, q_s[0:M, k] * np.abs(q_s[0:M, k]),
np.sign(q_s[M, k]) * (q_s[M, k] ** 2),
mididx)
fbar = fn.mean(axis=1)
fsbar = fs.mean(axis=1)
err = np.transpose(np.tile(fbar-fsbar, (n,1)))
fs += err
# random warping generation
rgam = uf.randomGamma(gam, n)
gams = np.zeros((M, n))
for k in range(0, n):
gams[:, k] = uf.invertGamma(rgam[:, k])
# sort functions and warping
if sort_samples:
mx = fs.max(axis=0)
seq1 = mx.argsort()
# compute the psi-function
fy = np.gradient(rgam, binsize)
psi = fy / np.sqrt(abs(fy) + eps)
ip = np.zeros(n)
len = np.zeros(n)
for i in range(0, n):
tmp = np.ones(M)
ip[i] = tmp.dot(psi[:, i] / M)
len[i] = np.arccos(tmp.dot(psi[:, i] / M))
seq2 = len.argsort()
# combine x-variability and y-variability
ft = np.zeros((M, n))
for k in range(0, n):
ft[:, k] = np.interp(gams[:, seq2[k]], np.arange(0, M) /
np.double(M - 1), fs[:, seq1[k]])
tmp = np.isnan(ft[:, k])
while tmp.any():
rgam2 = uf.randomGamma(gam, 1)
ft[:, k] = np.interp(gams[:, seq2[k]], np.arange(0, M) /
np.double(M - 1), uf.invertGamma(rgam2))
else:
# combine x-variability and y-variability
ft = np.zeros((M, n))
for k in range(0, n):
ft[:, k] = np.interp(gams[:, k], np.arange(0, M) /
np.double(M - 1), fs[:, k])
tmp = np.isnan(ft[:, k])
while tmp.any():
rgam2 = uf.randomGamma(gam, 1)
ft[:, k] = np.interp(gams[:, k], np.arange(0, M) /
np.double(M - 1), uf.invertGamma(rgam2))
self.rsamps = True
self.fs = fs
self.gams = rgam
self.ft = ft
self.qs = q_s[0:M,:]
return
[docs] def joint_gauss_model(self, n=1, no=3):
"""
This function models the functional data using a joint Gaussian model
extracted from the principal components of the srsfs
:param n: number of random samples
:param no: number of principal components (default = 3)
:type n: integer
:type no: integer
"""
# Parameters
fn = self.fn
time = self.time
qn = self.qn
gam = self.gam
M = time.size
# Perform PCA
jfpca = fpca.fdajpca(self)
jfpca.calc_fpca(no=no)
s = jfpca.latent
U = jfpca.U
C = jfpca.C
mu_psi = jfpca.mu_psi
# compute mean and covariance
mq_new = qn.mean(axis=1)
mididx = jfpca.id
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn = np.append(mq_new, m_new.mean())
# generate random samples
vals = np.random.multivariate_normal(np.zeros(s.shape), np.diag(s), n)
tmp = np.matmul(U, np.transpose(vals))
qhat = np.tile(mqn.T,(n,1)).T + tmp[0:M+1,:]
tmp = np.matmul(U, np.transpose(vals)/C)
vechat = tmp[(M+1):,:]
psihat = np.zeros((M,n))
gamhat = np.zeros((M,n))
for ii in range(n):
psihat[:,ii] = geo.exp_map(mu_psi,vechat[:,ii])
gam_tmp = cumtrapz(psihat[:,ii]**2,np.linspace(0,1,M),initial=0.0)
gamhat[:,ii] = (gam_tmp - gam_tmp.min())/(gam_tmp.max()-gam_tmp.min())
ft = np.zeros((M,n))
fhat = np.zeros((M,n))
for ii in range(n):
fhat[:,ii] = uf.cumtrapzmid(time, qhat[0:M,ii]*np.fabs(qhat[0:M,ii]), np.sign(qhat[M,ii])*(qhat[M,ii]*qhat[M,ii]), mididx)
ft[:,ii] = uf.warp_f_gamma(np.linspace(0,1,M),fhat[:,ii],gamhat[:,ii])
self.rsamps = True
self.fs = fhat
self.gams = gamhat
self.ft = ft
self.qs = qhat[0:M,:]
return
[docs] def multiple_align_functions(self, mu, omethod="DP2", smoothdata=False, parallel=False, lam=0.0, cores=-1, grid_dim=7):
"""
This function aligns a collection of functions using the elastic square-root
slope (srsf) framework.
Usage: obj.multiple_align_functions(mu)
obj.multiple_align_functions(lambda)
obj.multiple_align_functions(lambda, ...)
:param mu: vector of function to align to
:param omethod: optimization method (DP, DP2, RBFGS) (default = DP)
:param smoothdata: Smooth the data using a box filter (default = F)
:param parallel: run in parallel (default = F)
:param lam: controls the elasticity (default = 0)
:param cores: number of cores for parallel (default = -1 (all))
:param grid_dim: size of the grid, for the DP2 method only (default = 7)
:type lam: double
:type smoothdata: bool
"""
M = self.f.shape[0]
N = self.f.shape[1]
self.lam = lam
if M > 500:
parallel = True
elif N > 100:
parallel = True
eps = np.finfo(np.double).eps
self.method = omethod
self.type = "multiple"
# Compute SRSF function from data
f, g, g2 = uf.gradient_spline(self.time, self.f, smoothdata)
q = g / np.sqrt(abs(g) + eps)
mq = uf.f_to_srsf(mu, self.time)
if parallel:
out = Parallel(n_jobs=cores)(delayed(uf.optimum_reparam)(mq, self.time,
q[:, n], omethod, lam, grid_dim) for n in range(N))
gam = np.array(out)
gam = gam.transpose()
else:
gam = np.zeros((M,N))
for k in range(0,N):
gam[:,k] = uf.optimum_reparam(mq,self.time,q[:,k],omethod,lam,grid_dim)
self.gamI = uf.SqrtMeanInverse(gam)
fn = np.zeros((M,N))
qn = np.zeros((M,N))
for k in range(0, N):
fn[:, k] = np.interp((self.time[-1] - self.time[0]) * gam[:, k]
+ self.time[0], self.time, f[:, k])
qn[:, k] = uf.f_to_srsf(f[:, k], self.time)
# Aligned data & stats
self.fn = fn
self.qn = qn
self.q0 = q
mean_f0 = f.mean(axis=1)
std_f0 = f.std(axis=1)
mean_fn = self.fn.mean(axis=1)
std_fn = self.fn.std(axis=1)
self.gam = gam
self.mqn = mq
self.fmean = mu
fgam = np.zeros((M, N))
for k in range(0, N):
time0 = (self.time[-1] - self.time[0]) * gam[:, k] + self.time[0]
fgam[:, k] = np.interp(time0, self.time, self.fmean)
var_fgam = fgam.var(axis=1)
self.orig_var = trapz(std_f0 ** 2, self.time)
self.amp_var = trapz(std_fn ** 2, self.time)
self.phase_var = trapz(var_fgam, self.time)
return
[docs]def pairwise_align_bayes(f1i, f2i, time, mcmcopts=None):
"""
This function aligns two functions using Bayesian framework. It will align
f2 to f1. It is based on mapping warping functions to a hypersphere, and a
subsequent exponential mapping to a tangent space. In the tangent space,
the Z-mixture pCN algorithm is used to explore both local and global
structure in the posterior distribution.
The Z-mixture pCN algorithm uses a mixture distribution for the proposal
distribution, controlled by input parameter zpcn. The zpcn$betas must be
between 0 and 1, and are the coefficients of the mixture components, with
larger coefficients corresponding to larger shifts in parameter space. The
zpcn["probs"] give the probability of each shift size.
Usage: out = pairwise_align_bayes(f1i, f2i, time)
out = pairwise_align_bayes(f1i, f2i, time, mcmcopts)
:param f1i: vector defining M samples of function 1
:param f2i: vector defining M samples of function 2
:param time: time vector of length M
:param mcmopts: dict of mcmc parameters
:type mcmcopts: dict
default mcmc options:
tmp = {"betas":np.array([0.5,0.5,0.005,0.0001]),"probs":np.array([0.1,0.1,0.7,0.1])}
mcmcopts = {"iter":2*(10**4) ,"burnin":np.minimum(5*(10**3),2*(10**4)//2),
"alpha0":0.1, "beta0":0.1,"zpcn":tmp,"propvar":1,
"initcoef":np.repeat(0,20), "npoints":200, "extrainfo":True}
:rtype collection containing
:return f2_warped: aligned f2
:return gamma: warping function
:return g_coef: final g_coef
:return psi: final psi
:return sigma1: final sigma
if extrainfo
:return accept: accept of psi samples
:return betas_ind
:return logl: log likelihood
:return gamma_mat: posterior gammas
:return gamma_stats: posterior gamma stats
:return xdist: phase distance posterior
:return ydist: amplitude distance posterior)
"""
if mcmcopts is None:
tmp = {"betas":np.array([0.5,0.5,0.005,0.0001]),"probs":np.array([0.1,0.1,0.7,0.1])}
mcmcopts = {"iter":2*(10**4) ,"burnin":np.minimum(5*(10**3),2*(10**4)//2),"alpha0":0.1,
"beta0":0.1,"zpcn":tmp,"propvar":1,
"initcoef":np.repeat(0,20), "npoints":200, "extrainfo":True}
if f1i.shape[0] != f2i.shape[0]:
raise Exception('Length of f1 and f2 must be equal')
if f1i.shape[0] != time.shape[0]:
raise Exception('Length of f1 and time must be equal')
if mcmcopts["zpcn"]["betas"].shape[0] != mcmcopts["zpcn"]["probs"].shape[0]:
raise Exception('In zpcn, betas must equal length of probs')
if np.mod(mcmcopts["initcoef"].shape[0], 2) != 0:
raise Exception('Length of mcmcopts.initcoef must be even')
# Number of sig figs to report in gamma_mat
SIG_GAM = 13
iter = mcmcopts["iter"]
# parameter settings
pw_sim_global_burnin = mcmcopts["burnin"]
valid_index = np.arange(pw_sim_global_burnin-1,iter)
pw_sim_global_Mg = mcmcopts["initcoef"].shape[0]//2
g_coef_ini = mcmcopts["initcoef"]
numSimPoints = mcmcopts["npoints"]
pw_sim_global_domain_par = np.linspace(0,1,numSimPoints)
g_basis = uf.basis_fourier(pw_sim_global_domain_par, pw_sim_global_Mg, 1)
sigma1_ini = 1
zpcn = mcmcopts["zpcn"]
pw_sim_global_sigma_g = mcmcopts["propvar"]
def propose_g_coef(g_coef_curr):
pCN_beta = zpcn["betas"]
pCN_prob = zpcn["probs"]
probm = np.insert(np.cumsum(pCN_prob),0,0)
z = np.random.rand()
result = {"prop":g_coef_curr,"ind":1}
for i in range (0,pCN_beta.shape[0]):
if z <= probm[i+1] and z > probm[i]:
g_coef_new = normal(0, pw_sim_global_sigma_g / np.repeat(np.arange(1,pw_sim_global_Mg+1),2))
result["prop"] = np.sqrt(1-pCN_beta[i]**2) * g_coef_curr + pCN_beta[i] * g_coef_new
result["ind"] = i
return result
# normalize time to [0,1]
time = (time - time.min())/(time.max()-time.min())
timet = np.linspace(0,1,numSimPoints)
f1 = uf.f_predictfunction(f1i,timet,0)
f2 = uf.f_predictfunction(f2i,timet,0)
# srsf transformation
q1 = uf.f_to_srsf(f1,timet)
q1i = uf.f_to_srsf(f1i,time)
q2 = uf.f_to_srsf(f2,timet)
tmp = uf.f_exp1(uf.f_basistofunction(g_basis["x"],0,g_coef_ini,g_basis))
if tmp.min() < 0:
raise Exception("Invalid initial value of g")
# result vectors
g_coef = np.zeros((iter,g_coef_ini.shape[0]))
sigma1 = np.zeros(iter)
logl = np.zeros(iter)
SSE = np.zeros(iter)
accept = np.zeros(iter, dtype=bool)
accept_betas = np.zeros(iter)
# init
g_coef_curr = g_coef_ini
sigma1_curr = sigma1_ini
SSE_curr = f_SSEg_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_ini,g_basis),q1,q2)
logl_curr = f_logl_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_ini,g_basis),q1,q2,sigma1_ini**2,SSE_curr)
g_coef[0,:] = g_coef_ini
sigma1[0] = sigma1_ini
SSE[0] = SSE_curr
logl[0] = logl_curr
# update the chain for iter-1 times
for m in tqdm(range(1,iter)):
# update g
g_coef_curr, tmp, SSE_curr, accepti, zpcnInd = f_updateg_pw(g_coef_curr, g_basis, sigma1_curr**2, q1, q2, SSE_curr, propose_g_coef)
# update sigma1
newshape = q1.shape[0]/2 + mcmcopts["alpha0"]
newscale = 1/2 * SSE_curr + mcmcopts["beta0"]
sigma1_curr = np.sqrt(1/np.random.gamma(newshape,1/newscale))
logl_curr = f_logl_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_curr,g_basis), q1, q2, sigma1_curr**2, SSE_curr)
# save updates to results
g_coef[m,:] = g_coef_curr
sigma1[m] = sigma1_curr
SSE[m] = SSE_curr
if mcmcopts["extrainfo"]:
logl[m] = logl_curr
accept[m] = accepti
accept_betas[m] = zpcnInd
# calculate posterior mean of psi
pw_sim_est_psi_matrix = np.zeros((numSimPoints,valid_index.shape[0]))
for k in range(0,valid_index.shape[0]):
g_temp = uf.f_basistofunction(g_basis["x"],0,g_coef[valid_index[k],:],g_basis)
psi_temp = uf.f_exp1(g_temp)
pw_sim_est_psi_matrix[:,k] = psi_temp
result_posterior_psi_simDomain = uf.f_psimean(pw_sim_global_domain_par, pw_sim_est_psi_matrix)
# resample to same number of points as the input f1 and f2
interp = interp1d(np.linspace(0,1,result_posterior_psi_simDomain.shape[0]), result_posterior_psi_simDomain, fill_value="extrapolate")
result_posterior_psi = interp(np.linspace(0,1,f1i.shape[0]))
# transform posterior mean of psi to gamma
result_posterior_gamma = uf.f_phiinv(result_posterior_psi)
result_posterior_gamma = uf.norm_gam(result_posterior_gamma)
# warped f2
f2_warped = uf.warp_f_gamma(time, f2i, result_posterior_gamma)
if mcmcopts["extrainfo"]:
M,N = pw_sim_est_psi_matrix.shape
gamma_mat = np.zeros((time.shape[0],N))
one_v = np.ones(M)
Dx = np.zeros(N)
Dy = Dx
for ii in range(0,N):
interp = interp1d(np.linspace(0,1,result_posterior_psi_simDomain.shape[0]), pw_sim_est_psi_matrix[:,ii], fill_value="extrapolate")
result_i = interp(time)
tmp = uf.f_phiinv(result_i)
gamma_mat[:,ii] = uf.norm_gam(tmp)
v, theta = geo.inv_exp_map(one_v,pw_sim_est_psi_matrix[:,ii])
Dx[ii] = np.sqrt(trapz(v**2,pw_sim_global_domain_par))
q2warp = uf.warp_q_gamma(pw_sim_global_domain_par,q2,gamma_mat[:,ii])
Dy[ii] = np.sqrt(trapz((q1i-q2warp)**2,time))
gamma_stats = uf.statsFun(gamma_mat)
results_o = collections.namedtuple('align_bayes', ['f2_warped', 'gamma','g_coef', 'psi', 'sigma1', 'accept', 'betas_ind', 'logl', 'gamma_mat', 'gamma_stats', 'xdist', 'ydist'])
out = results_o(f2_warped, result_posterior_gamma, g_coef, result_posterior_psi, sigma1, accept[1:], accept_betas[1:], logl, gamma_mat, gamma_stats, Dx, Dy)
return(out)
def f_SSEg_pw(g, q1, q2):
obs_domain = np.linspace(0,1,g.shape[0])
exp1g_temp = uf.f_predictfunction(uf.f_exp1(g), obs_domain, 0)
pt = np.insert(bay.bcuL2norm2(obs_domain, exp1g_temp),0,0)
tmp = uf.f_predictfunction(q2,pt,0)
vec = (q1 - tmp * exp1g_temp)**2
out = vec.sum()
return(out)
def f_logl_pw(g, q1, q2, var1, SSEg):
if SSEg == 0:
SSEg = f_SSEg_pw(g, q1, q2)
n = q1.shape[0]
out = n * np.log(1/np.sqrt(2*np.pi)) - n * np.log(np.sqrt(var1)) - SSEg / (2 * var1)
return(out)
def f_updateg_pw(g_coef_curr,g_basis,var1_curr,q1,q2,SSE_curr,propose_g_coef):
g_coef_prop = propose_g_coef(g_coef_curr)
tst = uf.f_exp1(uf.f_basistofunction(g_basis["x"],0,g_coef_prop["prop"],g_basis))
while tst.min() < 0:
g_coef_prop = propose_g_coef(g_coef_curr)
tst = uf.f_exp1(uf.f_basistofunction(g_basis["x"],0,g_coef_prop["prop"],g_basis))
if SSE_curr == 0:
SSE_curr = f_SSEg_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_curr,g_basis), q1, q2)
SSE_prop = f_SSEg_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_prop["prop"],g_basis), q1, q2)
logl_curr = f_logl_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_curr,g_basis), q1, q2, var1_curr, SSE_curr)
logl_prop = f_logl_pw(uf.f_basistofunction(g_basis["x"],0,g_coef_prop["prop"],g_basis), q1, q2, var1_curr, SSE_prop)
ratio = np.minimum(1, np.exp(logl_prop-logl_curr))
u = np.random.rand()
if u <= ratio:
g_coef = g_coef_prop["prop"]
logl = logl_prop
SSE = SSE_prop
accept = True
zpcnInd = g_coef_prop["ind"]
if u > ratio:
g_coef = g_coef_curr
logl = logl_curr
SSE = SSE_curr
accept = False
zpcnInd = g_coef_prop["ind"]
return g_coef, logl, SSE, accept, zpcnInd
[docs]def align_fPCA(f, time, num_comp=3, showplot=True, smoothdata=False, cores=-1):
"""
aligns a collection of functions while extracting principal components.
The functions are aligned to the principal components
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param time: vector of size M describing the sample points
:param num_comp: number of fPCA components
:param showplot: Shows plots of results using matplotlib (default = T)
:param smooth_data: Smooth the data using a box filter (default = F)
:param cores: number of cores for parallel (default = -1 (all))
:type sparam: double
:type smooth_data: bool
:type f: np.ndarray
:type time: np.ndarray
:rtype: tuple of numpy array
:return fn: aligned functions - numpy ndarray of shape (M,N) of N
functions with M samples
:return qn: aligned srvfs - similar structure to fn
:return q0: original srvf - similar structure to fn
:return mqn: srvf mean or median - vector of length M
:return gam: warping functions - similar structure to fn
:return q_pca: srsf principal directions
:return f_pca: functional principal directions
:return latent: latent values
:return coef: coefficients
:return U: eigenvectors
:return orig_var: Original Variance of Functions
:return amp_var: Amplitude Variance
:return phase_var: Phase Variance
"""
lam = 0.0
MaxItr = 50
coef = np.arange(-2., 3.)
Nstd = coef.shape[0]
M = f.shape[0]
N = f.shape[1]
if M > 500:
parallel = True
elif N > 100:
parallel = True
else:
parallel = False
eps = np.finfo(np.double).eps
f0 = f
if showplot:
plot.f_plot(time, f, title="Original Data")
# Compute SRSF function from data
f, g, g2 = uf.gradient_spline(time, f, smoothdata)
q = g / np.sqrt(abs(g) + eps)
print ("Initializing...")
mnq = q.mean(axis=1)
a = mnq.repeat(N)
d1 = a.reshape(M, N)
d = (q - d1) ** 2
dqq = np.sqrt(d.sum(axis=0))
min_ind = dqq.argmin()
print("Aligning %d functions in SRVF space to %d fPCA components..."
% (N, num_comp))
itr = 0
mq = np.zeros((M, MaxItr + 1))
mq[:, itr] = q[:, min_ind]
fi = np.zeros((M, N, MaxItr + 1))
fi[:, :, 0] = f
qi = np.zeros((M, N, MaxItr + 1))
qi[:, :, 0] = q
gam = np.zeros((M, N, MaxItr + 1))
cost = np.zeros(MaxItr + 1)
while itr < MaxItr:
print("updating step: r=%d" % (itr + 1))
if itr == MaxItr:
print("maximal number of iterations is reached")
# PCA Step
a = mq[:, itr].repeat(N)
d1 = a.reshape(M, N)
qhat_cent = qi[:, :, itr] - d1
K = np.cov(qi[:, :, itr])
U, s, V = svd(K)
alpha_i = np.zeros((num_comp, N))
for ii in range(0, num_comp):
for jj in range(0, N):
alpha_i[ii, jj] = trapz(qhat_cent[:, jj] * U[:, ii], time)
U1 = U[:, 0:num_comp]
tmp = U1.dot(alpha_i)
qhat = d1 + tmp
# Matching Step
if parallel:
out = Parallel(n_jobs=cores)(
delayed(uf.optimum_reparam)(qhat[:, n], time, qi[:, n, itr],
"DP", lam) for n in range(N))
gam_t = np.array(out)
gam[:, :, itr] = gam_t.transpose()
else:
gam[:, :, itr] = uf.optimum_reparam(qhat, time, qi[:, :, itr], "DP", lam)
for k in range(0, N):
time0 = (time[-1] - time[0]) * gam[:, k, itr] + time[0]
fi[:, k, itr + 1] = np.interp(time0, time, fi[:, k, itr])
qi[:, k, itr + 1] = uf.f_to_srsf(fi[:, k, itr + 1], time)
qtemp = qi[:, :, itr + 1]
mq[:, itr + 1] = qtemp.mean(axis=1)
cost_temp = np.zeros(N)
for ii in range(0, N):
cost_temp[ii] = norm(qtemp[:, ii] - qhat[:, ii]) ** 2
cost[itr + 1] = cost_temp.mean()
if abs(cost[itr + 1] - cost[itr]) < 1e-06:
break
itr += 1
if itr >= MaxItr:
itrf = MaxItr
else:
itrf = itr+1
cost = cost[1:(itrf+1)]
# Aligned data & stats
fn = fi[:, :, itrf]
qn = qi[:, :, itrf]
q0 = qi[:, :, 0]
mean_f0 = f0.mean(axis=1)
std_f0 = f0.std(axis=1)
mqn = mq[:, itrf]
gamf = gam[:, :, 0]
for k in range(1, itr):
gam_k = gam[:, :, k]
for l in range(0, N):
time0 = (time[-1] - time[0]) * gam_k[:, l] + time[0]
gamf[:, l] = np.interp(time0, time, gamf[:, l])
# Center Mean
gamI = uf.SqrtMeanInverse(gamf)
gamI_dev = np.gradient(gamI, 1 / float(M - 1))
time0 = (time[-1] - time[0]) * gamI + time[0]
mqn = np.interp(time0, time, mqn) * np.sqrt(gamI_dev)
for k in range(0, N):
qn[:, k] = np.interp(time0, time, qn[:, k]) * np.sqrt(gamI_dev)
fn[:, k] = np.interp(time0, time, fn[:, k])
gamf[:, k] = np.interp(time0, time, gamf[:, k])
mean_fn = fn.mean(axis=1)
std_fn = fn.std(axis=1)
# Get Final PCA
mididx = int(np.round(time.shape[0] / 2))
m_new = np.sign(fn[mididx, :]) * np.sqrt(np.abs(fn[mididx, :]))
mqn2 = np.append(mqn, m_new.mean())
qn2 = np.vstack((qn, m_new))
K = np.cov(qn2)
U, s, V = svd(K)
stdS = np.sqrt(s)
# compute the PCA in the q domain
q_pca = np.ndarray(shape=(M + 1, Nstd, num_comp), dtype=float)
for k in range(0, num_comp):
for l in range(0, Nstd):
q_pca[:, l, k] = mqn2 + coef[l] * stdS[k] * U[:, k]
# compute the correspondence in the f domain
f_pca = np.ndarray(shape=(M, Nstd, num_comp), dtype=float)
for k in range(0, num_comp):
for l in range(0, Nstd):
q_pca_tmp = q_pca[0:M, l, k] * np.abs(q_pca[0:M, l, k])
q_pca_tmp2 = np.sign(q_pca[M, l, k]) * (q_pca[M, l, k] ** 2)
f_pca[:, l, k] = uf.cumtrapzmid(time, q_pca_tmp, q_pca_tmp2, mididx)
N2 = qn.shape[1]
c = np.zeros((N2, num_comp))
for k in range(0, num_comp):
for l in range(0, N2):
c[l, k] = sum((np.append(qn[:, l], m_new[l]) - mqn2) * U[:, k])
if showplot:
CBcdict = {
'Bl': (0, 0, 0),
'Or': (.9, .6, 0),
'SB': (.35, .7, .9),
'bG': (0, .6, .5),
'Ye': (.95, .9, .25),
'Bu': (0, .45, .7),
'Ve': (.8, .4, 0),
'rP': (.8, .6, .7),
}
cl = sorted(CBcdict.keys())
# Align Plots
fig, ax = plot.f_plot(np.arange(0, M) / float(M - 1), gamf,
title="Warping Functions")
ax.set_aspect('equal')
plot.f_plot(time, fn, title="Warped Data")
tmp = np.array([mean_f0, mean_f0 + std_f0, mean_f0 - std_f0])
tmp = tmp.transpose()
plot.f_plot(time, tmp, title=r"Original Data: Mean $\pm$ STD")
tmp = np.array([mean_fn, mean_fn + std_fn, mean_fn - std_fn])
tmp = tmp.transpose()
plot.f_plot(time, tmp, title=r"Warped Data: Mean $\pm$ STD")
# PCA Plots
fig, ax = plt.subplots(2, num_comp)
for k in range(0, num_comp):
axt = ax[0, k]
for l in range(0, Nstd):
axt.plot(time, q_pca[0:M, l, k], color=CBcdict[cl[l]])
axt.hold(True)
axt.set_title('q domain: PD %d' % (k + 1))
plot.rstyle(axt)
axt = ax[1, k]
for l in range(0, Nstd):
axt.plot(time, f_pca[:, l, k], color=CBcdict[cl[l]])
axt.hold(True)
axt.set_title('f domain: PD %d' % (k + 1))
plot.rstyle(axt)
fig.set_tight_layout(True)
cumm_coef = 100 * np.cumsum(s) / sum(s)
idx = np.arange(0, M + 1) + 1
plot.f_plot(idx, cumm_coef, "Coefficient Cumulative Percentage")
plt.xlabel("Percentage")
plt.ylabel("Index")
plt.show()
mean_f0 = f0.mean(axis=1)
std_f0 = f0.std(axis=1)
mean_fn = fn.mean(axis=1)
std_fn = fn.std(axis=1)
tmp = np.zeros(M)
tmp[1:] = cumtrapz(mqn * np.abs(mqn), time)
fmean = np.mean(f0[1, :]) + tmp
fgam = np.zeros((M, N))
for k in range(0, N):
time0 = (time[-1] - time[0]) * gamf[:, k] + time[0]
fgam[:, k] = np.interp(time0, time, fmean)
var_fgam = fgam.var(axis=1)
orig_var = trapz(std_f0 ** 2, time)
amp_var = trapz(std_fn ** 2, time)
phase_var = trapz(var_fgam, time)
K = np.cov(fn)
U, s, V = svd(K)
align_fPCAresults = collections.namedtuple('align_fPCA', ['fn', 'qn',
'q0', 'mqn', 'gam', 'q_pca',
'f_pca', 'latent', 'coef',
'U', 'orig_var', 'amp_var',
'phase_var', 'cost'])
out = align_fPCAresults(fn, qn, q0, mqn, gamf, q_pca, f_pca, s, c,
U, orig_var, amp_var, phase_var, cost)
return out
[docs]def align_fPLS(f, g, time, comps=3, showplot=True, smoothdata=False,
delta=0.01, max_itr=100):
"""
This function aligns a collection of functions while performing
principal least squares
:param f: numpy ndarray of shape (M,N) of N functions with M samples
:param g: numpy ndarray of shape (M,N) of N functions with M samples
:param time: vector of size M describing the sample points
:param comps: number of fPLS components
:param showplot: Shows plots of results using matplotlib (default = T)
:param smooth_data: Smooth the data using a box filter (default = F)
:param delta: gradient step size
:param max_itr: maximum number of iterations
:type smooth_data: bool
:type f: np.ndarray
:type g: np.ndarray
:type time: np.ndarray
:rtype: tuple of numpy array
:return fn: aligned functions - numpy ndarray of shape (M,N) of N
functions with M samples
:return gn: aligned functions - numpy ndarray of shape (M,N) of N
functions with M samples
:return qfn: aligned srvfs - similar structure to fn
:return qgn: aligned srvfs - similar structure to fn
:return qf0: original srvf - similar structure to fn
:return qg0: original srvf - similar structure to fn
:return gam: warping functions - similar structure to fn
:return wqf: srsf principal weight functions
:return wqg: srsf principal weight functions
:return wf: srsf principal weight functions
:return wg: srsf principal weight functions
:return cost: cost function value
"""
print ("Initializing...")
binsize = np.diff(time)
binsize = binsize.mean()
eps = np.finfo(np.double).eps
M = f.shape[0]
N = f.shape[1]
f0 = f
g0 = g
if showplot:
plot.f_plot(time, f, title="f Original Data")
plot.f_plot(time, g, title="g Original Data")
# Compute q-function of f and g
f, g1, g2 = uf.gradient_spline(time, f, smoothdata)
qf = g1 / np.sqrt(abs(g1) + eps)
g, g1, g2 = uf.gradient_spline(time, g, smoothdata)
qg = g1 / np.sqrt(abs(g1) + eps)
print("Calculating fPLS weight functions for %d Warped Functions..." % N)
itr = 0
fi = np.zeros((M, N, max_itr + 1))
fi[:, :, itr] = f
gi = np.zeros((M, N, max_itr + 1))
gi[:, :, itr] = g
qfi = np.zeros((M, N, max_itr + 1))
qfi[:, :, itr] = qf
qgi = np.zeros((M, N, max_itr + 1))
qgi[:, :, itr] = qg
wqf1, wqg1, alpha, values, costmp = pls_svd(time, qfi[:, :, itr],
qgi[:, :, itr], 2, 0)
wqf = np.zeros((M, max_itr + 1))
wqf[:, itr] = wqf1[:, 0]
wqg = np.zeros((M, max_itr + 1))
wqg[:, itr] = wqg1[:, 0]
gam = np.zeros((M, N, max_itr + 1))
tmp = np.tile(np.linspace(0, 1, M), (N, 1))
gam[:, :, itr] = tmp.transpose()
wqf_diff = np.zeros(max_itr + 1)
cost = np.zeros(max_itr + 1)
cost_diff = 1
while itr <= max_itr:
# warping
gamtmp = np.ascontiguousarray(gam[:, :, 0])
qftmp = np.ascontiguousarray(qfi[:, :, 0])
qgtmp = np.ascontiguousarray(qgi[:, :, 0])
wqftmp = np.ascontiguousarray(wqf[:, itr])
wqgtmp = np.ascontiguousarray(wqg[:, itr])
gam[:, :, itr + 1] = fpls.fpls_warp(time, gamtmp, qftmp, qgtmp,
wqftmp, wqgtmp, display=0,
delta=delta, tol=1e-6,
max_iter=4000)
for k in range(0, N):
gam_k = gam[:, k, itr + 1]
time0 = (time[-1] - time[0]) * gam_k + time[0]
fi[:, k, itr + 1] = np.interp(time0, time, fi[:, k, 0])
gi[:, k, itr + 1] = np.interp(time0, time, gi[:, k, 0])
qfi[:, k, itr + 1] = uf.warp_q_gamma(time, qfi[:, k, 0], gam_k)
qgi[:, k, itr + 1] = uf.warp_q_gamma(time, qgi[:, k, 0], gam_k)
# PLS
wqfi, wqgi, alpha, values, costmp = pls_svd(time, qfi[:, :, itr + 1],
qgi[:, :, itr + 1], 2, 0)
wqf[:, itr + 1] = wqfi[:, 0]
wqg[:, itr + 1] = wqgi[:, 0]
wqf_diff[itr] = np.sqrt(sum(wqf[:, itr + 1] - wqf[:, itr]) ** 2)
rfi = np.zeros(N)
rgi = np.zeros(N)
for l in range(0, N):
rfi[l] = uf.innerprod_q(time, qfi[:, l, itr + 1], wqf[:, itr + 1])
rgi[l] = uf.innerprod_q(time, qgi[:, l, itr + 1], wqg[:, itr + 1])
cost[itr] = np.cov(rfi, rgi)[1, 0]
if itr > 1:
cost_diff = cost[itr] - cost[itr - 1]
print("Iteration: %d - Diff Value: %f - %f" % (itr + 1, wqf_diff[itr],
cost[itr]))
if wqf_diff[itr] < 1e-1 or abs(cost_diff) < 1e-3:
break
itr += 1
cost = cost[0:(itr + 1)]
# Aligned data & stats
fn = fi[:, :, itr + 1]
gn = gi[:, :, itr + 1]
qfn = qfi[:, :, itr + 1]
qf0 = qfi[:, :, 0]
qgn = qgi[:, :, itr + 1]
qg0 = qgi[:, :, 0]
wqfn, wqgn, alpha, values, costmp = pls_svd(time, qfn, qgn, comps, 0)
wf = np.zeros((M, comps))
wg = np.zeros((M, comps))
for ii in range(0, comps):
wf[:, ii] = cumtrapz(wqfn[:, ii] * np.abs(wqfn[:, ii]), time, initial=0)
wg[:, ii] = cumtrapz(wqgn[:, ii] * np.abs(wqgn[:, ii]), time, initial=0)
gam_f = gam[:, :, itr + 1]
if showplot:
# Align Plots
fig, ax = plot.f_plot(np.arange(0, M) / float(M - 1), gam_f,
title="Warping Functions")
ax.set_aspect('equal')
plot.f_plot(time, fn, title="fn Warped Data")
plot.f_plot(time, gn, title="gn Warped Data")
plot.f_plot(time, wf, title="wf")
plot.f_plot(time, wg, title="wg")
plt.show()
align_fPLSresults = collections.namedtuple('align_fPLS', ['wf', 'wg', 'fn',
'gn', 'qfn', 'qgn', 'qf0',
'qg0', 'wqf', 'wqg', 'gam',
'values', 'cost'])
out = align_fPLSresults(wf, wg, fn, gn, qfn, qgn, qf0, qg0, wqfn,
wqgn, gam_f, values, cost)
return out