"""
statistic calculation for SRVF (curves) open and closed using Karcher
Mean and Variance
moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>
"""
from numpy import (
zeros,
sqrt,
fabs,
cos,
sin,
tile,
vstack,
empty,
cov,
inf,
mean,
arange,
prod,
)
from numpy import append, arccos, cumsum, sum, linspace
from numpy.linalg import svd
from numpy.random import randn
import fdasrsf.curve_functions as cf
import fdasrsf.utility_functions as uf
import fdasrsf.plot_style as plot
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
import collections
[docs]
class fdacurve:
"""
This class provides alignment methods for open and closed curves using the SRVF framework
Usage: obj = fdacurve(beta, mode, N, scale)
:param beta: numpy ndarray of shape (n, M, N) describing N curves
in R^M
:param mode: Open ('O') or closed curve ('C') (default 'O')
:param N: resample curve to N points
:param scale: scale curve to length 1 (true/false)
:param q: (n,T,K) matrix defining n dimensional srvf on T samples with K srvfs
:param betan: aligned curves
:param qn: aligned srvfs
:param basis: calculated basis
:param beta_mean: karcher mean curve
:param q_mean: karcher mean srvf
:param gams: warping functions
:param v: shooting vectors
:param C: karcher covariance
:param s: pca singular values
:param U: pca singular vectors
:param coef: pca coefficients
:param pca principal directions
:param qun: cost function
:param lambda: lambda
:param samples: random samples
:param gamr: random warping functions
:param cent: center
:param scale: scale
:param len: length of curve
:param len_q: length of srvf
:param mean_scale mean length
:param mean_scale_q mean length srvf
:param E: energy
Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov>
Date : 26-Aug-2020
"""
def __init__(self, beta, mode="O", N=200, scale=False):
"""
fdacurve Construct an instance of this class
:param beta: (n,T,K) matrix defining n dimensional curve on T samples with K curves
:param mode: Open ('O') or closed curve ('C') (default 'O')
:param N: resample curve to N points
:param scale: include scale (true/false)
"""
self.mode = mode
self.scale = scale
K = beta.shape[2]
n = beta.shape[0]
q = zeros((n, N, K))
beta1 = zeros((n, N, K))
cent1 = zeros((n, K))
len1 = zeros(K)
lenq1 = zeros(K)
for ii in range(0, K):
if beta.shape[1] != N:
beta1[:, :, ii] = cf.resamplecurve(beta[:, :, ii], N, mode=mode)
else:
beta1[:, :, ii] = beta[:, :, ii]
a = -cf.calculatecentroid(beta1[:, :, ii])
beta1[:, :, ii] += tile(a, (N, 1)).T
q[:, :, ii], len1[ii], lenq1[ii] = cf.curve_to_q(beta1[:, :, ii], mode)
cent1[:, ii] = -a
self.q = q
self.beta = beta1
self.cent = cent1
self.len = len1
self.len_q = lenq1
[docs]
def karcher_mean(
self, rotation=True, parallel=False, lam=0.0, cores=-1, method="DP"
):
"""
This calculates the mean of a set of curves
:param rotation: compute optimal rotation (default = T)
:param parallel: run in parallel (default = F)
:param lam: controls the elasticity (default = 0)
:param cores: number of cores for parallel (default = -1 (all))
:param method: method to apply optimization (default="DP") options are "DP" or "RBFGS"
"""
n, T, N = self.beta.shape
modes = ["O", "C"]
mode = [i for i, x in enumerate(modes) if x == self.mode]
if len(mode) == 0:
mode = 0
else:
mode = mode[0]
# Initialize mu as one of the shapes
mu = self.q[:, :, 0]
betamean = self.beta[:, :, 0]
itr = 0
gamma = zeros((T, N))
maxit = 20
sumd = zeros(maxit + 1)
v = zeros((n, T, N))
sumv = zeros((n, T))
normvbar = zeros(maxit + 1)
delta = 0.5
tolv = 1e-4
told = 5 * 1e-3
print(
"Computing Karcher Mean of %d curves in SRVF space with lam=%02f" % (N, lam)
)
while itr < maxit:
print("updating step: %d" % (itr + 1))
if iter == maxit:
print("maximal number of iterations reached")
mu = mu / sqrt(cf.innerprod_q2(mu, mu))
if mode == 1:
self.basis = cf.find_basis_normal(mu)
else:
self.basis = []
sumv = zeros((n, T))
sumd[0] = inf
sumd[itr + 1] = 0
out = Parallel(n_jobs=cores)(
delayed(karcher_calc)(
mu, self.q[:, :, n], self.basis, mode, lam, rotation, method
)
for n in range(N)
)
v = zeros((n, T, N))
gamma = zeros((T, N))
for i in range(0, N):
v[:, :, i] = out[i][0]
gamma[:, i] = out[i][1]
sumv += v[:, :, i]
sumd[itr + 1] = sumd[itr + 1] + out[i][2] ** 2
sumv = v.sum(axis=2)
# Compute average direction of tangent vectors v_i
vbar = sumv / float(N)
normvbar[itr] = sqrt(cf.innerprod_q2(vbar, vbar))
normv = normvbar[itr]
if (sumd[itr] - sumd[itr + 1]) < 0:
break
elif normv > tolv and fabs(sumd[itr + 1] - sumd[itr]) > told:
# Update mu in direction of vbar
mu = (
cos(delta * normvbar[itr]) * mu
+ sin(delta * normvbar[itr]) * vbar / normvbar[itr]
)
if mode == 1:
mu = cf.project_curve(mu)
x = cf.q_to_curve(mu)
a = -1 * cf.calculatecentroid(x)
betamean = x + tile(a, [T, 1]).T
else:
break
itr += 1
# compute average length
if self.scale:
self.mean_scale = (prod(self.len)) ** (1.0 / self.len.shape[0])
self.mean_scale_q = (prod(self.len_q)) ** (1.0 / self.len.shape[0])
betamean = self.mean_scale * betamean
self.q_mean = mu
self.beta_mean = betamean
self.v = v
self.qun = sumd[0 : (itr + 1)]
self.E = normvbar[0 : (itr + 1)]
return
[docs]
def srvf_align(self, rotation=True, lam=0.0, parallel=False, cores=-1, method="DP"):
"""
This aligns a set of curves to the mean and computes mean if not computed
:param rotation: compute optimal rotation (default = T)
:param lam: controls the elasticity (default = 0)
:param parallel: run in parallel (default = F)
:param cores: number of cores for parallel (default = -1 (all))
:param method: method to apply optimization (default="DP") options are "DP" or "RBFGS"
"""
n, T, N = self.beta.shape
modes = ["O", "C"]
mode = [i for i, x in enumerate(modes) if x == self.mode]
if len(mode) == 0:
mode = 0
else:
mode = mode[0]
# find mean
if not hasattr(self, "beta_mean"):
self.karcher_mean(
rotation=rotation,
lam=lam,
parallel=parallel,
cores=cores,
method=method,
)
self.qn = zeros((n, T, N))
self.betan = zeros((n, T, N))
self.gams = zeros((T, N))
centroid2 = cf.calculatecentroid(self.beta_mean)
self.beta_mean = self.beta_mean - tile(centroid2, [T, 1]).T
# align to mean
out = Parallel(n_jobs=-1)(
delayed(cf.find_rotation_and_seed_unique)(
self.q_mean, self.q[:, :, n], mode, lam, rotation, method
)
for n in range(N)
)
for ii in range(0, N):
self.gams[:, ii] = out[ii][2]
self.qn[:, :, ii] = out[ii][0]
btmp = out[ii][1].dot(self.beta[:, :, ii])
self.betan[:, :, ii] = cf.group_action_by_gamma_coord(btmp, out[ii][2])
return
[docs]
def karcher_cov(self):
"""
This calculates the mean of a set of curves
"""
if not hasattr(self, "beta_mean"):
self.karcher_mean()
M, N, K = self.v.shape
N1 = M * N
if self.scale:
N1 += 1
tmpv = zeros((N1, K))
for i in range(0, K):
tmp = self.v[:, :, i]
tmpv1 = tmp.flatten()
if self.scale:
tmpv1 = append(tmpv1, self.len[i])
tmpv[:, i] = tmpv1
if self.scale:
VM = mean(self.v, 2)
VM = VM.flatten()
VM = append(VM, self.mean_scale)
tmpv = tmpv - tile(VM, [K, 1]).T
self.C = cov(tmpv)
else:
self.C = cov(tmpv)
return
[docs]
def shape_pca(self, no=10):
"""
Computes principal direction of variation specified by no. N is
Number of shapes away from mean. Creates 2*N+1 shape sequence
:param no: number of direction (default 3)
"""
if not hasattr(self, "C"):
self.karcher_cov()
U1, s, V = svd(self.C)
self.U = U1[:, 0:no]
self.s = s[0:no]
# express shapes as coefficients
K = self.beta.shape[2]
VM = mean(self.v, 2)
VM = VM.flatten()
if self.scale:
VM = append(VM, self.mean_scale)
x = zeros((no, K))
for ii in range(0, K):
tmpv = self.v[:, :, ii]
tmpv1 = tmpv.flatten()
if self.scale:
tmpv1 = append(tmpv1, self.len[ii])
Utmp = self.U.T
x[:, ii] = Utmp.dot((tmpv1 - VM))
self.coef = x
modes = ["O", "C"]
mode = [i for i, x in enumerate(modes) if x == self.mode]
if len(mode) == 0:
mode = 0
else:
mode = mode[0]
n1, T, N1 = self.beta.shape
p = zeros((n1, T, no, 10))
for j in range(0, no):
for i in range(1, 11):
tmp = VM + 0.5 * (i - 5) * sqrt(self.s[j]) * self.U[:, j]
m, n = self.q_mean.shape
if self.scale:
tmp_scale = tmp[-1]
tmp = tmp[0:-1]
else:
tmp_scale = 1
v1 = tmp.reshape(m, n)
q2n = cf.elastic_shooting(self.q_mean, v1, mode)
p[:, :, j, i - 1] = cf.q_to_curve(q2n, tmp_scale)
self.pca = p
return
[docs]
def sample_shapes(self, no=3, numSamp=10):
"""
Computes sample shapes from mean and covariance
:param no: number of direction (default 3)
:param numSamp: number of samples (default 10)
"""
n, T = self.q_mean.shape
modes = ["O", "C"]
mode = [i for i, x in enumerate(modes) if x == self.mode]
if len(mode) == 0:
mode = 0
else:
mode = mode[0]
U, s, V = svd(self.C)
if mode == 0:
N = 2
else:
N = 10
epsilon = 1.0 / (N - 1)
samples = empty(numSamp, dtype=object)
for i in range(0, numSamp):
v = zeros((2, T))
for m in range(0, no):
v = v + randn() * sqrt(s[m]) * vstack((U[0:T, m], U[T : 2 * T, m]))
q1 = self.q_mean
for j in range(0, N - 1):
normv = sqrt(cf.innerprod_q2(v, v))
if normv < 1e-4:
q2 = self.q_mean
else:
q2 = cos(epsilon * normv) * q1 + sin(epsilon * normv) * v / normv
if mode == 1:
q2 = cf.project_curve(q2)
# Parallel translate tangent vector
basis2 = cf.find_basis_normal(q2)
v = cf.parallel_translate(v, q1, q2, basis2, mode)
q1 = q2
samples[i] = cf.q_to_curve(q2)
self.samples = samples
return
[docs]
def plot(self, multivariate=False):
"""
plot curve mean results
:param multivariate: plot as multivariate functions instead of curves (default=False)
"""
if multivariate:
n, T, K = self.beta.shape
for jj in range(n):
fig, ax = plt.subplots()
for ii in range(0, K):
ax.plot(linspace(0, 1, T), self.beta[jj, :, ii])
plt.title("Original Function: %d" % (jj + 1))
else:
fig, ax = plt.subplots()
n, T, K = self.beta.shape
for ii in range(0, K):
ax.plot(self.beta[0, :, ii], self.beta[1, :, ii])
plt.title("Curves")
ax.set_aspect("equal")
plt.axis("off")
plt.gca().invert_yaxis()
if hasattr(self, "gams"):
M = self.gams.shape[0]
fig, ax = plot.f_plot(
arange(0, M) / float(M - 1), self.gams, title="Warping Functions"
)
if hasattr(self, "beta_mean"):
if multivariate:
for jj in range(n):
fig, ax = plt.subplots()
ax.plot(linspace(0, 1, T), self.beta_mean[jj, :])
plt.title("Karcher Mean: %d" % (jj + 1))
else:
fig, ax = plt.subplots()
ax.plot(self.beta_mean[0, :], self.beta_mean[1, :])
plt.title("Karcher Mean")
ax.set_aspect("equal")
plt.axis("off")
plt.gca().invert_yaxis()
if multivariate:
if hasattr(self, "betan"):
n, T, K = self.beta.shape
for jj in range(n):
fig, ax = plt.subplots()
for ii in range(0, K):
ax.plot(linspace(0, 1, T), self.betan[jj, :, ii])
plt.title("Aligned Function: %d" % (jj + 1))
plt.show()
def plot_pca(self):
if not hasattr(self, "s"):
raise NameError("Calculate PCA")
fig, ax = plt.subplots()
ax.plot(cumsum(self.s) / sum(self.s) * 100)
plt.title("Variability Explained")
plt.xlabel("PC")
# plot principal modes of variability
VM = mean(self.v, 2)
VM = VM.flatten()
if self.scale:
VM = append(VM, self.mean_scale)
modes = ["O", "C"]
mode = [i for i, x in enumerate(modes) if x == self.mode]
if len(mode) == 0:
mode = 0
else:
mode = mode[0]
for j in range(0, 4):
fig, ax = plt.subplots()
for i in range(1, 11):
tmp = VM + 0.5 * (i - 5) * sqrt(self.s[j]) * self.U[:, j]
m, n = self.q_mean.shape
if self.scale:
tmp_scale = tmp[-1]
tmp = tmp[0:-1]
else:
tmp_scale = 1
v1 = tmp.reshape(m, n)
q2n = cf.elastic_shooting(self.q_mean, v1, mode)
p = cf.q_to_curve(q2n, tmp_scale)
mv = 0.2
if self.scale:
mv *= self.mean_scale
if i == 5:
ax.plot(mv * i + p[0, :], p[1, :], "k", linewidth=2)
else:
ax.plot(mv * i + p[0, :], p[1, :], linewidth=2)
ax.set_aspect("equal")
plt.axis("off")
plt.title("PD %d" % (j + 1))
plt.show()
def karcher_calc(mu, q, basis, closed, lam, rotation, method):
# Compute shooting vector from mu to q_i
qn_t, R, gamI = cf.find_rotation_and_seed_unique(
mu, q, closed, lam, rotation, method
)
qn_t = qn_t / sqrt(cf.innerprod_q2(qn_t, qn_t))
q1dotq2 = cf.innerprod_q2(mu, qn_t)
if q1dotq2 > 1:
q1dotq2 = 1
d = arccos(q1dotq2)
u = qn_t - q1dotq2 * mu
normu = sqrt(cf.innerprod_q2(u, u))
if normu > 1e-4:
w = u * arccos(q1dotq2) / normu
else:
w = zeros(qn_t.shape)
# Project to tangent space of manifold to obtain v_i
if closed == 0:
v = w
else:
v = cf.project_tangent(w, q, basis)
return (v, gamI, d)