Source code for elastic_changepoint

"""
Elastic functional change point detection

moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>

"""

import numpy as np
import matplotlib.pyplot as plt
import fdasrsf as fs
import fdasrsf.fPCA as fpca
import fdasrsf.utility_functions as uf
from scipy.linalg import norm, svd
from fdasrsf.geometry import L2norm


# Function Definitions
def BBridge(x=0, y=0, t0=0, T=1, N=100):
    if T <= t0:
        raise Exception("Endpoint is earlier than beginning")

    dt = (T - t0) / N
    t = np.linspace(t0, T, N + 1)
    rng = np.random.default_rng()
    samp = rng.standard_normal(N)
    X = np.insert(np.cumsum(samp) * np.sqrt(dt), 0, 0)
    BB = x + X - (t - t0) / (T - t0) * (X[N] - y + x)
    return BB


def LongRunCovMatrixPrecentered(mdobj, h=0, kern_type="bartlett"):
    D = mdobj.shape[0]
    N = mdobj.shape[1]

    def Kernel(i, h):
        x = i / h
        if kern_type == "flat":
            return 1
        if kern_type == "simple":
            return 0
        if kern_type == "bartlett":
            return 1 - x
        if kern_type == "flat_top":
            if x < 0.1:
                return 1
            else:
                if x >= 0.1 & x < 1.1:
                    return 1.1 - x
                else:
                    return 0
        if kern_type == "parzen":
            if x < 0.5:
                return 1 - 6 * pow(x, 2) + 6 * pow(abs(x), 3)
            else:
                return 2 * pow((1 - abs(x)), 3)

    D_mat = np.zeros((D, D))
    cdata = mdobj

    for k in range(0, D):
        for r in range(k, D):
            s = cdata[k, :] @ cdata[r, :]
            if h > 0:
                for i in range(h):
                    a = cdata[k, range(N - i)] @ cdata[r, range(i, N)]
                    a = a + cdata[r, range(N - i)] @ cdata[k, range(i, N)]
                    s = s + Kernel(i + 1, h) * a
            D_mat[k, r] = s
            D_mat[r, k] = D_mat[k, r]
    return D_mat / N


[docs] class elastic_change: """ " This class provides elastic changepoint using elastic fpca Usage: obj = elastic_change(f,time) :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param BBridges: precomputed Brownian Bridges (default: None) :param use_BBridges: use precomputed Brownian Bridges (default: False) :param warp_data: aligned data (default: None) :param Sn: test statistic values :param Tn: max of test statistic :param p: p-value :param k_star: change point :param values: values of computed Brownian Bridges :param dat_a: data before changepoint :param dat_b: data after changepoint :param warp_a: warping functions before changepoint :param warp_b: warping functions after changepoint :param mean_a: mean function before changepoint :param mean_b: mean function after changepoint :param warp_mean_a: mean warping function before changepoint :param warp_mean_b: mean warping function after changepoin Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 27-Apr-2022 """ def __init__( self, f, time, BBridges=None, use_BBridges=False, smooth_data=False, warp_data=None, use_warp_data=False, parallel=False, sparam=25, ): """ Construct an instance of the elastic_change class :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param BBridges: precomputed Brownian Bridges (default: None) :param use_BBridges: use precomputed Brownian Bridges (default: False) :param smooth_data: smooth function data (default: False) :param warp_data: precomputed aligned data (default: None) :param use_warp_data: use precomputed warping data (default: False) :param parallel: run computation in parallel (default: True) :param sparam: number of smoothing runs of box filter (default: 25) """ self.f = f self.time = time self.BBridges = BBridges self.use_BBridges = use_BBridges if smooth_data: self.f = fs.smooth_data(self.f, sparam) if use_warp_data: self.warp_data = warp_data # Align Data else: self.warp_data = fs.fdawarp(self.f, self.time) self.warp_data.srsf_align(parallel=parallel)
[docs] def compute( self, pca_method="vert", pc=0.95, d=1000, compute_epidemic=False, n_pcs=5, preset_pcs=False, ): """ Compute elastic change detection :param pca_method: string specifying pca method (options = "combined","vert", or "horiz", default = "combined") :param pc: percentage of cumulative variance to use (default: 0.95) :param compute_epidemic: compute epidemic changepoint model (default: False) :param n_pcs: scalar specify number of principal components (default: 5) :param preset_pcs: use all PCs (default: False) """ N1 = self.f.shape[1] # Calculate PCA if pca_method == "combined": self.pca = fpca.fdajpca(self.warp_data) elif pca_method == "vert": self.pca = fpca.fdavpca(self.warp_data) elif pca_method == "horiz": self.pca = fpca.fdahpca(self.warp_data) else: raise Exception("Invalid fPCA Method") self.pca.calc_fpca(N1) cumm_coef = np.cumsum(self.pca.latent) / sum(self.pca.latent) if preset_pcs == True: self.no = n_pcs else: self.no = np.argwhere(cumm_coef >= pc)[0][0] lam = 1 / self.pca.latent[0 : self.no] Sigma = np.diag(lam) eta = self.pca.coef[:, 0 : self.no] eta_bar = eta.sum(axis=0) / N1 # Compute Test Statistic self.Sn = np.zeros(N1) for i in range(1, N1): tmp_eta = eta[0:i, :] tmp = tmp_eta.sum(axis=0) - tmp_eta.shape[0] * eta_bar self.Sn[i] = 1 / (N1) * tmp.T @ Sigma @ tmp self.Tn = self.Sn.mean() self.k_star = self.Sn.argmax() # compute distribution if self.use_BBridges == False: values = np.zeros(d) for i in range(d): B_tmp = np.zeros((self.no, N1)) for j in range(self.no): B_tmp[j, :] = BBridge(N=N1 - 1) ** 2 values[i] = B_tmp.sum(axis=0).mean() elif self.no > self.BBridges.shape[0]: values = np.zeros(d) for i in range(d): B_tmp = np.zeros((self.no, N1)) for j in range(self.no): B_tmp[j, :] = BBridge(N=N1 - 1) ** 2 values[i] = B_tmp.sum(axis=0).mean() else: values = (self.BBridges[0 : self.no, :, 0:d] ** 2).sum(axis=0).mean(axis=0) z = self.Tn <= values self.p = z[z == True].shape[0] / z.shape[0] self.sim_values = values self.dat_a = self.f[:, 0 : self.k_star] self.dat_b = self.f[:, self.k_star :] self.warp_a = self.warp_data.gam[:, 0 : self.k_star] self.warp_b = self.warp_data.gam[:, self.k_star :] self.mean_a = self.warp_data.fn[:, 0 : self.k_star].mean(axis=1) self.mean_b = self.warp_data.fn[:, self.k_star :].mean(axis=1) if self.warp_a.size == 0: self.warp_mean_a = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_a) self.warp_mean_a = gam_mu if self.warp_b.size == 0: self.warp_mean_b = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_b) self.warp_mean_b = gam_mu self.delta = self.mean_b - self.mean_a if self.warp_mean_a.size > 0 and self.warp_mean_b.size > 0: self.delta_warp = self.warp_mean_b - self.warp_mean_a else: self.delta_warp = self.time if compute_epidemic: self.compute_epidemic = True snk_minus_vals = np.zeros((N1, N1, self.no)) snk_minus_summary = np.zeros((N1, N1)) for i in range(N1): for j in range(N1): if i > j: continue tmp_eta = eta[0:i, :] tmp = tmp_eta.sum(axis=0) - tmp_eta.shape[0] * eta_bar tmp_eta_j = eta[0:j, :] tmp_j = tmp_eta_j.sum(axis=0) - tmp_eta.shape[0] * eta_bar snk_minus_vals[i, j, :] = tmp - tmp_j snk_minus_summary[i, j] = sum(pow(snk_minus_vals[i, j], 2) * lam) self.tau1 = np.argmax(snk_minus_summary.max(axis=1)) self.tau2 = np.argmax(snk_minus_summary[self.tau1, :]) self.Sn_epidemic = snk_minus_summary self.Tn_epidemic = 1 / (pow(N1, 3)) * snk_minus_summary.sum() # compute distribution if self.use_BBridges == False: values = np.zeros(d) for i in range(d): B_tmp = np.zeros((self.no, N1, N1)) for j in range(self.no): BBridge_val = BBridge(N=N1 - 1) for i1 in range(N1): for j1 in range(N1): if i1 > j1: continue B_tmp[j, i1, j1] = ( BBridge_val[i1] - BBridge_val[j1] ) ** 2 values[i] = B_tmp.mean(axis=(1, 2)).sum() elif self.no > self.BBridges.shape[0]: values = np.zeros(d) for i in range(d): B_tmp = np.zeros((self.no, N1, N1)) for j in range(self.no): BBridge_val = BBridge(N=N1 - 1) for i1 in range(N1): for j1 in range(N1): if i1 > j1: continue B_tmp[j, i1, j1] = ( BBridge_val[i1] - BBridge_val[j1] ) ** 2 values[i] = B_tmp.mean(axis=(1, 2)).sum() else: values = np.zeros(d) for i in range(d): B_tmp = np.zeros((self.no, N1, N1)) for j in range(self.no): for i1 in range(N1): for j1 in range(N1): if i1 > j1: continue B_tmp[j, i1, j1] = ( self.BBridges[j, i1, i] - self.BBridges[j, j1, i] ) ** 2 values[i] = B_tmp.mean(axis=(1, 2)).sum() self.p_epidemic = np.mean(values >= self.Tn_epidemic) self.sim_values_epidemic = values self.dat_a_epidemic = np.column_stack( (self.f[:, 0 : self.tau1], self.f[:, self.tau2 :]) ) self.dat_b_epidemic = self.f[:, self.tau1 : self.tau2] self.warp_a_epidemic = np.column_stack( ( self.warp_data.gam[:, 0 : self.tau1], self.warp_data.gam[:, self.tau2 :], ) ) self.warp_b_epidemic = self.warp_data.gam[:, self.tau1 : self.tau2] self.mean_a_epidemic = np.column_stack( (self.warp_data.fn[:, 0 : self.tau1], self.warp_data.fn[:, self.tau2 :]) ).mean(axis=1) self.mean_b_epidemic = self.warp_data.fn[:, self.tau1 : self.tau2].mean( axis=1 ) if self.warp_a_epidemic.size == 0: self.warp_mean_a_epidemic = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_a_epidemic) self.warp_mean_a_epidemic = gam_mu if self.warp_b_epidemic.size == 0: self.warp_mean_b_epidemic = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_b_epidemic) self.warp_mean_b_epidemic = gam_mu self.delta_epidemic = self.mean_b_epidemic - self.mean_a_epidemic if ( self.warp_mean_a_epidemic.size > 0 and self.warp_mean_b_epidemic.size > 0 ): self.delta_warp_epidemic = ( self.warp_mean_b_epidemic - self.warp_mean_a_epidemic ) else: self.delta_warp_epidemic = self.time else: self.compute_epidemic = False return
[docs] def plot(self): """ plot elastic changepoint results Usage: obj.plot() """ fig, ax = plt.subplots() ax.plot(self.time, self.f, "0.8") ax.plot(self.time, self.dat_a, "pink") ax.plot(self.time, self.dat_b, "lightskyblue") ax.plot(self.time, self.mean_a, "red", label="Before") ax.plot(self.time, self.mean_b, "blue", label="After") plt.title("Functional Data") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta, "black") plt.title("Estimated Amplitude Change Function") fig, ax = plt.subplots() ax.plot(self.time, self.warp_data.gam, "0.8") ax.plot(self.time, self.warp_a, "pink") ax.plot(self.time, self.warp_b, "lightskyblue") ax.plot(self.time, self.warp_mean_a, "red", label="Before") ax.plot(self.time, self.warp_mean_b, "blue", label="After") ax.set_aspect("equal") plt.title("Warping Functions") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta_warp, "black") plt.title("Estimated Phase Change Function") plt.show() if self.compute_epidemic: fig, ax = plt.subplots() ax.plot(self.time, self.f, "0.8") ax.plot(self.time, self.dat_a_epidemic, "pink") ax.plot(self.time, self.dat_b_epidemic, "lightskyblue") ax.plot(self.time, self.mean_a_epidemic, "red", label="Before/After") ax.plot(self.time, self.mean_b_epidemic, "blue", label="During") plt.title("Functional Data, Epidemic") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta_epidemic, "black") plt.title("Estimated Amplitude Change Function, Epidemic") fig, ax = plt.subplots() ax.plot(self.time, self.warp_data.gam, "0.8") ax.plot(self.time, self.warp_a_epidemic, "pink") ax.plot(self.time, self.warp_b_epidemic, "lightskyblue") ax.plot(self.time, self.warp_mean_a, "red", label="Before/After") ax.plot(self.time, self.warp_mean_b, "blue", label="During") ax.set_aspect("equal") plt.title("Warping Functions") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta_warp_epidemic, "black") plt.title("Estimated Phase Change Function, Epidemic") plt.show() fig, ax = plt.subplots() ax.imshow(self.Sn_epidemic) plt.title("Test statistic, Epidemic") plt.show() return
[docs] class elastic_amp_change_ff: """ " This class provides elastic changepoint using elastic FDA. It is fully-functional and an extension of the methodology of Aue et al. Usage: obj = elastic_amp_change_ff(f,time) :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param warp_data: aligned data (default: None) :param Sn: test statistic values :param Tn: max of test statistic :param p: p-value :param k_star: change point :param values: values of computed Brownian Bridges :param dat_a: data before changepoint :param dat_b: data after changepoint :param warp_a: warping functions before changepoint :param warp_b: warping functions after changepoint :param mean_a: mean function before changepoint :param mean_b: mean function after changepoint :param warp_mean_a: mean warping function before changepoint :param warp_mean_b: mean warping function after changepoint Author : J. Derek Tucker <jdtuck AT sandia.gov> and Drew Yarger <anyarge AT sandia.gov> Date : 24-Aug-2022 """ def __init__( self, f, time, smooth_data=False, sparam=25, use_warp_data=False, warp_data=None, parallel=False, ): """ Construct an instance of the elastic_change class :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param smooth_data: smooth function data (default: False) :param sparam: number of smoothing runs of box filter (default: 25) :param use_warp_data: use precomputed warping data (default: False) :param warp_data: precomputed aligned data (default: None) :param parallel: run computation in parallel (default: True) """ self.f = f self.time = time if smooth_data: self.f = fs.smooth_data(self.f, sparam) if use_warp_data: self.warp_data = warp_data # Align Data else: self.warp_data = fs.fdawarp(self.f, self.time) self.warp_data.srsf_align(parallel=parallel)
[docs] def compute(self, d=1000, h=0, M_approx=365, compute_epidemic=False): """ Compute elastic change detection :param d: number of monte carlo iterations to compute p-value :param h: index of window type to compute long run covariance :param M_approx: number of time points to compute p-value :param compute_epidemic: compute epidemic changepoint model (default: False) """ M = self.f.shape[0] N1 = self.f.shape[1] self.mu = np.zeros((M, N1)) self.mu_f = np.zeros((M, N1)) # Compute Karcher mean for first i+1 functions self.mu[:, 0] = self.warp_data.qn[:, 0] self.mu_f[:, 0] = self.warp_data.fn[:, 0] for i in range(1, N1): self.mu[:, i] = self.warp_data.qn[:, 0 : (i + 1)].sum(axis=1) self.mu_f[:, i] = self.warp_data.fn[:, 0 : (i + 1)].sum(axis=1) # compute test statistic self.Sn = np.zeros(N1 + 1) for k in range(1, N1 + 1): self.Sn[k] = ( 1 / M * norm( 1 / np.sqrt(N1) * (self.mu_f[:, k - 1] - (k / N1) * self.mu_f[:, -1]) ) ** 2 ) self.k_star = np.argmax(self.Sn) self.Tn = np.max(self.Sn) # compute means on either side of the changepoint self.dat_a = self.f[:, range(self.k_star)] self.dat_b = self.f[:, range(self.k_star, N1)] self.warp_a = self.warp_data.gam[:, 0 : self.k_star] self.warp_b = self.warp_data.gam[:, self.k_star :] self.mean_a = self.warp_data.fn[:, 0 : self.k_star].mean(axis=1) self.mean_b = self.warp_data.fn[:, self.k_star :].mean(axis=1) if self.warp_a.size == 0: self.warp_mean_a = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_a) self.warp_mean_a = gam_mu if self.warp_b.size == 0: self.warp_mean_b = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_b) self.warp_mean_b = gam_mu self.delta = self.mean_b - self.mean_a if self.warp_mean_a.size > 0 and self.warp_mean_b.size > 0: self.delta_warp = self.warp_mean_b - self.warp_mean_a else: self.delta_warp = self.time # center your data self.centered_data = np.zeros((M, N1)) for i in range(self.k_star): self.centered_data[:, i] = self.warp_data.fn[:, i] - self.mean_a for i in range(self.k_star, N1): self.centered_data[:, i] = self.warp_data.fn[:, i] - self.mean_b # estimate eigenvalues of covariance operator D_mat = LongRunCovMatrixPrecentered(self.centered_data, h=h) self.lambda_vals, self.pca_coef = np.linalg.eig(D_mat) self.lambda_vals = self.lambda_vals.real / M self.pca_coef = self.pca_coef.real def asymp(N): BridgeLam = np.zeros((M, N)) for j in range(M): BridgeLam[j, :] = self.lambda_vals[j] * ( BBridge(0, 0, 0, 1, N - 1) ** 2 ) return max(BridgeLam.sum(axis=0)) values = np.zeros(d) for sim in range(d): values[sim] = asymp(N1) z = self.Tn <= values self.p = z.mean() self.values = values if compute_epidemic: self.compute_epidemic = True snk_minus_vals = np.zeros((N1, N1, M)) snk_minus_summary = np.zeros((N1, N1)) for i in range(N1): for j in range(N1): if i > j: continue snk_minus_vals[i, j, :] = ( 1 / np.sqrt(N1) * ( self.warp_data.fn[:, i : (j + 1)].sum(axis=1) - ((j - i + 1) / N1) * self.mu_f[:, -1] ) ) snk_minus_summary[i, j] = ( 1 / M * (norm(snk_minus_vals[i, j, :]) ** 2) ) self.tau1 = np.argmax(snk_minus_summary.max(axis=1)) self.tau2 = np.argmax(snk_minus_summary[self.tau1, :]) self.Sn_epidemic = snk_minus_summary self.Tn_epidemic = snk_minus_summary.max() # compute means on either side of the changepoint self.dat_a_epidemic = self.f[:, range(self.tau1, self.tau2 + 1)] self.dat_b_epidemic = np.column_stack( (self.f[:, range(0, self.tau1)], self.f[:, range(self.tau2 + 1, N1)]) ) self.warp_a_epidemic = self.warp_data.gam[ :, range(self.tau1, self.tau2 + 1) ] self.warp_b_epidemic = np.column_stack( ( self.warp_data.gam[:, range(0, self.tau1)], self.warp_data.gam[:, range(self.tau2 + 1, N1)], ) ) self.mean_a_epidemic = self.warp_data.fn[ :, range(self.tau1, self.tau2 + 1) ].mean(axis=1) self.mean_b_epidemic = np.column_stack( ( self.warp_data.fn[:, range(0, self.tau1)], self.warp_data.fn[:, range(self.tau2 + 1, N1)], ) ).mean(axis=1) if self.warp_a_epidemic.size == 0: self.warp_mean_a_epidemic = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_a_epidemic) self.warp_mean_a_epidemic = gam_mu if self.warp_b_epidemic.size == 0: self.warp_mean_b_epidemic = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_b_epidemic) self.warp_mean_b_epidemic = gam_mu self.delta_epidemic = self.mean_b_epidemic - self.mean_a_epidemic if ( self.warp_mean_a_epidemic.size > 0 and self.warp_mean_b_epidemic.size > 0 ): self.delta_warp_epidemic = ( self.warp_mean_b_epidemic - self.warp_mean_a_epidemic ) else: self.delta_warp_epidemic = self.time # center your data self.centered_data_epidemic = np.zeros((M, N1)) for i in range(N1): if i in range(self.tau1, self.tau2 + 1): self.centered_data_epidemic[:, i] = ( self.warp_data.fn[:, i] - self.mean_b ) else: self.centered_data_epidemic[:, i] = ( self.warp_data.fn[:, i] - self.mean_a ) # estimate eigenvalues of covariance operator D_mat = LongRunCovMatrixPrecentered(self.centered_data_epidemic, h=h) self.lambda_vals_epidemic, self.pca_coef_epidemic = np.linalg.eig(D_mat) self.lambda_vals_epidemic = self.lambda_vals_epidemic.real / M self.pca_coef_epidemic = self.pca_coef_epidemic.real * M def asymp(N): BridgeLam = np.zeros((M_approx, N, N)) for j in range(M_approx): BB = BBridge(0, 0, 0, 1, N - 1) BB_sub = np.broadcast_to(BB, (N, N)) - np.broadcast_to(BB, (N, N)).T BridgeLam[j, :, :] = self.lambda_vals_epidemic[j] * (BB_sub**2) return BridgeLam.sum(axis=0).max() values = np.zeros(d) for sim in range(d): values[sim] = asymp(N1) z = self.Tn_epidemic <= values self.p_epidemic = z.mean() self.values_epidemic = values else: self.compute_epidemic = False return
[docs] def plot(self): """ plot elastic changepoint results Usage: obj.plot() """ fig, ax = plt.subplots() ax.plot(self.time, self.f, "0.8") ax.plot(self.time, self.dat_a, "pink") ax.plot(self.time, self.dat_b, "lightskyblue") ax.plot(self.time, self.mean_a, "red", label="Before") ax.plot(self.time, self.mean_b, "blue", label="After") plt.title("Functional Data") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta, "black") plt.title("Estimated Amplitude Change Function") fig, ax = plt.subplots() ax.plot(self.time, self.warp_data.gam, "0.8") ax.plot(self.time, self.warp_data.gam[:, range(self.k_star)], "pink") ax.plot(self.time, self.warp_data.gam[:, self.k_star :], "lightskyblue") ax.plot(self.time, self.warp_mean_a, "red", label="Before") ax.plot(self.time, self.warp_mean_b, "blue", label="After") ax.set_aspect("equal") plt.title("Warping Functions") plt.legend() fig, ax = plt.subplots() ax.plot(list(range(self.Sn.shape[0])), self.Sn) plt.title("Test statistic") plt.show() if self.compute_epidemic: fig, ax = plt.subplots() ax.plot(self.time, self.f, "0.8") ax.plot(self.time, self.dat_a_epidemic, "pink") ax.plot(self.time, self.dat_b_epidemic, "lightskyblue") ax.plot(self.time, self.mean_a_epidemic, "red", label="Before/After") ax.plot(self.time, self.mean_b_epidemic, "blue", label="During") plt.title("Functional Data, Epidemic") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta_epidemic, "black") plt.title("Estimated Amplitude Change Function, Epidemic") fig, ax = plt.subplots() ax.plot(self.time, self.warp_data.gam, "0.8") ax.plot(self.time, self.warp_a_epidemic, "pink") ax.plot(self.time, self.warp_b_epidemic, "lightskyblue") ax.plot(self.time, self.warp_mean_a, "red", label="Before/After") ax.plot(self.time, self.warp_mean_b, "blue", label="During") ax.set_aspect("equal") plt.title("Warping Functions") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta_warp_epidemic, "black") plt.title("Estimated Phase Change Function, Epidemic") plt.show() fig, ax = plt.subplots() ax.imshow(self.Sn_epidemic) plt.title("Test statistic, Epidemic") plt.show() return
[docs] class elastic_ph_change_ff: """ " This class provides elastic changepoint using elastic FDA on warping functions. It is fully-functional and an extension of the methodology of Aue et al. Usage: obj = elastic_ph_change_ff(f,time) :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param warp_data: aligned data (default: None) :param Sn: test statistic values :param Tn: max of test statistic :param p: p-value :param k_star: change point :param values: values of computed Brownian Bridges :param dat_a: data before changepoint :param dat_b: data after changepoint :param warp_a: warping functions before changepoint :param warp_b: warping functions after changepoint :param mean_a: mean function before changepoint :param mean_b: mean function after changepoint :param warp_mean_a: mean warping function before changepoint :param warp_mean_b: mean warping function after changepoint Author : J. Derek Tucker <jdtuck AT sandia.gov> Date : 17-Nov-2022 """ def __init__( self, f, time, smooth_data=False, sparam=25, use_warp_data=False, warp_data=None, parallel=False, ): """ Construct an instance of the elastic_change class :param f: (M,N) % matrix defining N functions of M samples :param time: time vector of length M :param smooth_data: smooth function data (default: False) :param sparam: number of smoothing runs of box filter (default: 25) :param use_warp_data: use precomputed warping data (default: False) :param warp_data: precomputed aligned data (default: None) :param parallel: run computation in parallel (default: True) """ self.f = f self.time = time if smooth_data: self.f = fs.smooth_data(self.f, sparam) if use_warp_data: self.warp_data = warp_data # Align Data else: self.warp_data = fs.fdawarp(self.f, self.time) self.warp_data.srsf_align(parallel=parallel)
[docs] def compute(self, d=1000, h=0, M_approx=365): """ Compute elastic change detection :param d: number of monte carlo iterations to compute p-value :param h: index of window type to compute long run covariance :param M_approx: number of time points to compute p-value """ M = self.f.shape[0] N1 = self.f.shape[1] self.mu = np.zeros((M, N1)) # Compute Karcher mean of warping functions mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_data.gam) self.vec = vec self.mu[:, 0] = vec[:, 0] for i in range(1, N1): self.mu[:, i] = vec[:, 0 : (i + 1)].sum(axis=1) # compute test statistic self.Sn = np.zeros(N1 + 1) # k indexes subscript of test statistic for k in range(1, N1 + 1): self.Sn[k] = ( norm(1 / np.sqrt(N1) * (self.mu[:, k - 1] - (k / N1) * self.mu[:, -1])) ** 2 / M ) self.k_star = np.argmax(self.Sn) self.Tn = np.max(self.Sn) # compute means on either side of the changepoint self.dat_a = self.f[:, range(self.k_star)] self.dat_b = self.f[:, range(self.k_star, N1)] self.warp_a = self.warp_data.gam[:, 0 : self.k_star] self.warp_b = self.warp_data.gam[:, self.k_star :] self.mean_a = self.warp_data.fn[:, 0 : self.k_star].mean(axis=1) self.mean_b = self.warp_data.fn[:, self.k_star :].mean(axis=1) if self.warp_a.size == 0: mu_a = np.zeros(M) self.warp_mean_a = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_a) mu_a = vec.mean(axis=1) self.warp_mean_a = gam_mu if self.warp_b.size == 0: mu_b = np.zeros(M) self.warp_mean_b = np.array([]) else: mu, gam_mu, psi, vec = uf.SqrtMean(self.warp_b) mu_b = vec.mean(axis=1) self.warp_mean_b = gam_mu self.delta = self.mean_b - self.mean_a if self.warp_mean_a.size > 0 and self.warp_mean_b.size > 0: self.delta_warp = self.warp_mean_b - self.warp_mean_a else: self.delta_warp = self.time # center your data self.centered_data = np.zeros((M, N1)) for i in range(self.k_star): self.centered_data[:, i] = self.vec[:, i] - mu_a for i in range(self.k_star, N1): self.centered_data[:, i] = self.vec[:, i] - mu_b # estimate eigenvalues of covariance operator D_mat = LongRunCovMatrixPrecentered(self.centered_data, h=h) self.lambda_vals, self.pca_coef = np.linalg.eig(D_mat) self.lambda_vals = self.lambda_vals.real / M self.pca_coef = self.pca_coef.real * M def asymp(N): BridgeLam = np.zeros((M, N)) for j in range(M): BridgeLam[j, :] = self.lambda_vals[j] * ( BBridge(0, 0, 0, 1, N - 1) ** 2 ) return max(BridgeLam.sum(axis=0)) values = np.zeros(d) for sim in range(d): values[sim] = asymp(N1) z = self.Tn <= values self.p = z.mean() self.values = values return
[docs] def plot(self): """ plot elastic changepoint results Usage: obj.plot() """ fig, ax = plt.subplots() ax.plot(self.time, self.f, "0.8") ax.plot(self.time, self.dat_a, "pink") ax.plot(self.time, self.dat_b, "lightskyblue") ax.plot(self.time, self.mean_a, "red", label="Before") ax.plot(self.time, self.mean_b, "blue", label="After") plt.title("Functional Data") plt.legend() fig, ax = plt.subplots() ax.plot(self.time, self.delta, "black") plt.title("Estimated Amplitude Change Function") fig, ax = plt.subplots() ax.plot(self.time, self.warp_data.gam, "0.8") ax.plot(self.time, self.warp_data.gam[:, range(self.k_star)], "pink") ax.plot(self.time, self.warp_data.gam[:, self.k_star :], "lightskyblue") ax.plot(self.time, self.warp_mean_a, "red", label="Before") ax.plot(self.time, self.warp_mean_b, "blue", label="After") ax.set_aspect("equal") plt.title("Warping Functions") plt.legend() fig, ax = plt.subplots() ax.plot(list(range(self.Sn.shape[0])), self.Sn) plt.title("Test statistic") plt.show() return