Source code for curve_stats

statistic calculation for SRVF (curves) open and closed using Karcher
Mean and Variance

moduleauthor:: J. Derek Tucker <>

from numpy import zeros, sqrt, fabs, cos, sin, tile, vstack, empty, cov, inf, mean, arange, prod
from numpy import append, arccos, cumsum, sum
from numpy.linalg import svd
from numpy.random import randn
import fdasrsf.curve_functions as cf
import fdasrsf.utility_functions as uf
import fdasrsf.plot_style as plot
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
import collections

[docs]class fdacurve: """ This class provides alignment methods for open and closed curves using the SRVF framework Usage: obj = fdacurve(beta, mode, N, scale) :param beta: numpy ndarray of shape (n, M, N) describing N curves in R^M :param mode: Open ('O') or closed curve ('C') (default 'O') :param N: resample curve to N points :param scale: scale curve to length 1 (true/false) :param q: (n,T,K) matrix defining n dimensional srvf on T samples with K srvfs :param betan: aligned curves :param qn: aligned srvfs :param basis: calculated basis :param beta_mean: karcher mean curve :param q_mean: karcher mean srvf :param gams: warping functions :param v: shooting vectors :param C: karcher covariance :param s: pca singular values :param U: pca singular vectors :param coef: pca coefficients :param qun: cost function :param samples: random samples :param gamr: random warping functions :param cent: center :param scale: scale :param len: length of curve :param len_q: length of srvf :param mean_scale mean length :param mean_scale_q mean length srvf :param E: energy Author : J. D. Tucker (JDT) <jdtuck AT> Date : 26-Aug-2020 """ def __init__(self, beta, mode='O', N=200, scale=False): """ fdacurve Construct an instance of this class :param beta: (n,T,K) matrix defining n dimensional curve on T samples with K curves :param mode: Open ('O') or closed curve ('C') (default 'O') :param N: resample curve to N points :param scale: include scale (true/false) """ self.mode = mode self.scale = scale K = beta.shape[2] n = beta.shape[0] q = zeros((n,N,K)) beta1 = zeros((n,N,K)) cent1 = zeros((n,K)) len1 = zeros(K) lenq1 = zeros(K) for ii in range(0,K): if (beta.shape[1] != N): beta1[:,:,ii] = cf.resamplecurve(beta[:,:,ii],N,mode) else: beta1[:,:,ii] = beta[:,:,ii] a = -cf.calculatecentroid(beta1[:,:,ii]) beta1[:,:,ii] += tile(a, (N,1)).T q[:,:,ii], len1[ii], lenq1[ii] = cf.curve_to_q(beta1[:,:,ii], mode) cent1[:,ii] = -a self.q = q self.beta = beta1 self.cent = cent1 self.len = len1 self.len_q = lenq1
[docs] def karcher_mean(self, parallel=False, cores=-1, method="DP"): """ This calculates the mean of a set of curves :param parallel: run in parallel (default = F) :param cores: number of cores for parallel (default = -1 (all)) :param method: method to apply optimization (default="DP") options are "DP" or "RBFGS" """ n, T, N = self.beta.shape modes = ['O', 'C'] mode = [i for i, x in enumerate(modes) if x == self.mode] if len(mode) == 0: mode = 0 else: mode = mode[0] # Initialize mu as one of the shapes mu = self.q[:, :, 0] betamean = self.beta[:,:,0] itr = 0 gamma = zeros((T,N)) maxit = 20 sumd = zeros(maxit+1) v = zeros((n, T, N)) sumv = zeros((n,T)) normvbar = zeros(maxit+1) delta = 0.5 tolv = 1e-4 told = 5*1e-3 print("Computing Karcher Mean of %d curves in SRVF space.." % N) while itr < maxit: print("updating step: %d" % (itr+1)) if iter == maxit: print("maximal number of iterations reached") mu = mu / sqrt(cf.innerprod_q2(mu, mu)) if mode == 1: self.basis = cf.find_basis_normal(mu) else: self.basis = [] sumv = zeros((n, T)) sumd[0] = inf sumd[itr+1] = 0 out = Parallel(n_jobs=cores)(delayed(karcher_calc)(mu, self.q[:, :, n], self.basis, mode, method) for n in range(N)) v = zeros((n, T, N)) gamma = zeros((T,N)) for i in range(0, N): v[:, :, i] = out[i][0] gamma[:, i] = out[i][1] sumv += v[:, :, i] sumd[itr+1] = sumd[itr+1] + out[i][2]**2 sumv = v.sum(axis=2) # Compute average direction of tangent vectors v_i vbar = sumv/float(N) normvbar[itr] = sqrt(cf.innerprod_q2(vbar, vbar)) normv = normvbar[itr] if ((sumd[itr]-sumd[itr+1]) < 0): break elif (normv > tolv and fabs(sumd[itr+1]-sumd[itr]) > told): # Update mu in direction of vbar mu = cos(delta*normvbar[itr])*mu + sin(delta*normvbar[itr]) * vbar/normvbar[itr] if mode == 1: mu = cf.project_curve(mu) x = cf.q_to_curve(mu) a = -1*cf.calculatecentroid(x) betamean = x + tile(a, [T, 1]).T else: break itr += 1 # compute average length if self.scale: self.mean_scale = (prod(self.len))**(1./self.len.shape[0]) self.mean_scale_q = (prod(self.len_q))**(1./self.len.shape[0]) betamean = self.mean_scale * betamean self.q_mean = mu self.beta_mean = betamean self.v = v self.qun = sumd[0:(itr+1)] self.E = normvbar[0:(itr+1)] return
[docs] def srvf_align(self, parallel=False, cores=-1, method="DP"): """ This aligns a set of curves to the mean and computes mean if not computed :param parallel: run in parallel (default = F) :param cores: number of cores for parallel (default = -1 (all)) :param method: method to apply optimization (default="DP") options are "DP" or "RBFGS" """ n, T, N = self.beta.shape modes = ['O', 'C'] mode = [i for i, x in enumerate(modes) if x == self.mode] if len(mode) == 0: mode = 0 else: mode = mode[0] # find mean if not hasattr(self, 'beta_mean'): self.karcher_mean() self.qn = zeros((n, T, N)) self.betan = zeros((n, T, N)) self.gams = zeros((T,N)) centroid2 = cf.calculatecentroid(self.beta_mean) self.beta_mean = self.beta_mean - tile(centroid2, [T, 1]).T # align to mean out = Parallel(n_jobs=-1)(delayed(cf.find_rotation_and_seed_unique)(self.q_mean, self.q[:, :, n], mode, method) for n in range(N)) for ii in range(0, N): self.gams[:,ii] = out[ii][2] self.qn[:, :, ii] = out[ii][0] btmp = out[ii][1].dot(self.beta[:,:,ii]) self.betan[:, :, ii] = cf.group_action_by_gamma_coord(btmp,out[ii][2]) return
[docs] def karcher_cov(self): """ This calculates the mean of a set of curves """ if not hasattr(self, 'beta_mean'): self.karcher_mean() M,N,K = self.v.shape N1 = M*N if (self.scale): N1 += 1 tmpv = zeros((N1,K)) for i in range(0,K): tmp = self.v[:,:,i] tmpv1 = tmp.flatten() if (self.scale): tmpv1 = append(tmpv1, self.len[i]) tmpv[:,i] = tmpv1 if (self.scale): VM = mean(self.v, 2) VM = VM.flatten() VM = append(VM, self.mean_scale) tmpv = tmpv - tile(VM, [K, 1]).T self.C = cov(tmpv) else: self.C = cov(tmpv) return
[docs] def shape_pca(self, no=10): """ Computes principal direction of variation specified by no. N is Number of shapes away from mean. Creates 2*N+1 shape sequence :param no: number of direction (default 3) """ if not hasattr(self, 'C'): self.karcher_cov() U1, s, V = svd(self.C) self.U = U1[:,0:no] self.s = s[0:no] # express shapes as coefficients K = self.beta.shape[2] VM = mean(self.v,2) VM = VM.flatten() if (self.scale): VM = append(VM, self.mean_scale) x = zeros((no,K)) for ii in range(0,K): tmpv = self.v[:,:,ii] tmpv1 = tmpv.flatten() if (self.scale): tmpv1 = append(tmpv1, self.len[ii]) Utmp = self.U.T x[:,ii] = VM)) self.coef = x return
[docs] def sample_shapes(self, no=3, numSamp=10): """ Computes sample shapes from mean and covariance :param no: number of direction (default 3) :param numSamp: number of samples (default 10) """ n, T = self.q_mean.shape modes = ['O', 'C'] mode = [i for i, x in enumerate(modes) if x == self.mode] if len(mode) == 0: mode = 0 else: mode = mode[0] U, s, V = svd(self.C) if mode == 0: N = 2 else: N = 10 epsilon = 1./(N-1) samples = empty(numSamp, dtype=object) for i in range(0, numSamp): v = zeros((2, T)) for m in range(0, no): v = v + randn()*sqrt(s[m])*vstack((U[0:T, m], U[T:2*T, m])) q1 = self.q_mean for j in range(0, N-1): normv = sqrt(cf.innerprod_q2(v, v)) if normv < 1e-4: q2 = self.q_mean else: q2 = cos(epsilon*normv)*q1+sin(epsilon*normv)*v/normv if mode == 1: q2 = cf.project_curve(q2) # Parallel translate tangent vector basis2 = cf.find_basis_normal(q2) v = cf.parallel_translate(v, q1, q2, basis2, mode) q1 = q2 samples[i] = cf.q_to_curve(q2) self.samples = samples return
[docs] def plot(self): """ plot curve mean results """ fig, ax = plt.subplots() n,T,K = self.beta.shape for ii in range(0,K): ax.plot(self.beta[0,:,ii],self.beta[1,:,ii]) plt.title('Curves') ax.set_aspect('equal') plt.axis('off') plt.gca().invert_yaxis() if hasattr(self,'gams'): M = self.gams.shape[0] fig, ax = plot.f_plot(arange(0, M) / float(M - 1), self.gams, title="Warping Functions") if hasattr(self,'beta_mean'): fig, ax = plt.subplots() ax.plot(self.beta_mean[0,:],self.beta_mean[1,:]) plt.title('Karcher Mean') ax.set_aspect('equal') plt.axis('off') plt.gca().invert_yaxis()
def plot_pca(self): if not hasattr(self,'s'): raise NameError('Calculate PCA') fig, ax = plt.subplots() ax.plot(cumsum(self.s)/sum(self.s)*100) plt.title('Variability Explained') plt.xlabel('PC') # plot principal modes of variability VM = mean(self.v,2) VM = VM.flatten() if (self.scale): VM = append(VM, self.mean_scale) for j in range(0,4): fig, ax = plt.subplots() for i in range(1,11): tmp = VM + 0.5*(i-5)*sqrt(self.s[j])*self.U[:,j] m,n = self.q_mean.shape if (self.scale): tmp_scale = tmp[-1] tmp = tmp[0:-1] else: tmp_scale = 1 v1 = tmp.reshape(m,n) q2n = cf.elastic_shooting(self.q_mean,v1) p = cf.q_to_curve(q2n, tmp_scale) mv = 0.2 if (self.scale): mv *= self.mean_scale if i == 5: ax.plot(mv*i + p[0,:], p[1,:], 'k', linewidth=2) else: ax.plot(mv*i + p[0,:], p[1,:], linewidth=2) ax.set_aspect('equal') plt.axis('off') plt.title('PD %d' % (j+1))
def karcher_calc(mu, q, basis, closed, method): # Compute shooting vector from mu to q_i qn_t, R, gamI = cf.find_rotation_and_seed_unique(mu, q, closed, method) qn_t = qn_t / sqrt(cf.innerprod_q2(qn_t,qn_t)) q1dotq2 = cf.innerprod_q2(mu,qn_t) if (q1dotq2 > 1): q1dotq2 = 1 d = arccos(q1dotq2) u = qn_t - q1dotq2*mu normu = sqrt(cf.innerprod_q2(u,u)) if (normu>1e-4): w = u*arccos(q1dotq2)/normu else: w = zeros(qn_t.shape) # Project to tangent space of manifold to obtain v_i if closed == 0: v = w else: v = cf.project_tangent(w, q, basis) return(v, gamI, d)