Source code for boxplots

"""
Elastic Functional Boxplots

moduleauthor:: J. Derek Tucker <jdtuck@sandia.gov>

"""
import numpy as np
from scipy.integrate import trapezoid
import fdasrsf.utility_functions as uf
import fdasrsf.geometry as geo
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import collections


[docs] class ampbox: """ This class provides amplitude boxplot for functional data using the SRVF framework Usage: obj = ampbox(warp_data) :param warp_data: fdawarp class with alignment data :type warp_data: fdawarp :param Q1: First quartile :param Q3: Second quartile :param Q1a: First quantile based on alpha :param Q3a: Second quantile based on alpha :param minn: minimum extreme function :param maxx: maximum extreme function :param outlier_index: indexes of outlier functions :param f_median: median function :param q_median: median srvf :param plt: surface plot mesh Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 15-Mar-2018 """ def __init__(self, fdawarp): """ Construct an instance of the ampbox class :param fdawarp: fdawarp class """ if fdawarp.fn.size == 0: raise Exception("Please align fdawarp class using srsf_align!") if fdawarp.type == "mean": raise Exception( 'Please align fdawarp class using srsf_align with method="median"!' ) self.warp_data = fdawarp
[docs] def construct_boxplot(self, alpha=0.05, k_a=1): """ This function constructs the amplitude boxplot using the elastic square-root slope (srsf) framework. :param alpha: quantile value (e.g.,=.05, i.e., 95%) :param k_a: scalar for outlier cutoff (e.g.,=1) """ if self.warp_data.rsamps: ft = self.warp_data.fs f_median = self.warp_data.fmean qt = self.warp_data.qs q_median = self.warp_data.mqn time = self.warp_data.time else: ft = self.warp_data.fn f_median = self.warp_data.fmean qt = self.warp_data.qn q_median = self.warp_data.mqn time = self.warp_data.time N = ft.shape[1] lam = 0.5 # compute amplitude distances dy = np.zeros(N) for i in range(0, N): dy[i] = np.sqrt(trapezoid((q_median - qt[:, i]) ** 2, time)) dy_ordering = dy.argsort() CR_50 = dy_ordering[0 : np.ceil(N / 2).astype("int")] tmp = dy[CR_50] m = tmp.max() # identify amplitude quartiles angle = np.zeros((CR_50.shape[0], CR_50.shape[0])) energy = np.zeros((CR_50.shape[0], CR_50.shape[0])) for i in range(0, CR_50.shape[0] - 1): for j in range(i + 1, CR_50.shape[0]): q1 = qt[:, CR_50[i]] - q_median q3 = qt[:, CR_50[j]] - q_median q1 = q1 / np.sqrt(trapezoid(q1**2, time)) q3 = q3 / np.sqrt(trapezoid(q3**2, time)) angle[i, j] = trapezoid(q1 * q3, time) energy[i, j] = (1 - lam) * ( dy[CR_50[i]] / m + dy[CR_50[j]] / m ) - lam * (angle[i, j] + 1) maxloc = energy.argmax() maxloc_row, maxloc_col = np.unravel_index(maxloc, energy.shape) Q1_index = CR_50[maxloc_row] Q3_index = CR_50[maxloc_col] Q1_q = qt[:, Q1_index] Q3_q = qt[:, Q3_index] Q1 = ft[:, Q1_index] Q3 = ft[:, Q3_index] # identify amplitude quantiles dy_ordering = dy.argsort() CR_alpha = dy_ordering[0 : np.round(N * (1 - alpha)).astype("int")] tmp = dy[CR_alpha] m = tmp.max() angle = np.zeros((CR_alpha.shape[0], CR_alpha.shape[0])) energy = np.zeros((CR_alpha.shape[0], CR_alpha.shape[0])) for i in range(0, CR_alpha.shape[0] - 1): for j in range(i + 1, CR_alpha.shape[0]): q1 = qt[:, CR_alpha[i]] - q_median q3 = qt[:, CR_alpha[j]] - q_median q1 /= np.sqrt(trapezoid(q1**2, time)) q3 /= np.sqrt(trapezoid(q3**2, time)) angle[i, j] = trapezoid(q1 * q3, time) energy[i, j] = (1 - lam) * ( dy[CR_alpha[i]] / m + dy[CR_alpha[j]] / m ) - lam * (angle[i, j] + 1) maxloc = energy.argmax() maxloc_row, maxloc_col = np.unravel_index(maxloc, energy.shape) Q1a_index = CR_alpha[maxloc_row] Q3a_index = CR_alpha[maxloc_col] Q1a_q = qt[:, Q1a_index] Q3a_q = qt[:, Q3a_index] Q1a = ft[:, Q1a_index] Q3a = ft[:, Q3a_index] # compute amplitude whiskers IQR = dy[Q1_index] + dy[Q3_index] v1 = Q1_q - q_median v3 = Q3_q - q_median upper_q = Q3_q + k_a * IQR * v3 / np.sqrt(trapezoid(v3**2, time)) lower_q = Q1_q + k_a * IQR * v1 / np.sqrt(trapezoid(v1**2, time)) upper_dis = np.sqrt(trapezoid((upper_q - q_median) ** 2, time)) lower_dis = np.sqrt(trapezoid((lower_q - q_median) ** 2, time)) whisker_dis = max(upper_dis, lower_dis) # identify amplitude outliers outlier_index = np.array([]) for i in range(0, N): if dy[dy_ordering[N - i - 1]] > whisker_dis: outlier_index = np.append(outlier_index, dy[dy_ordering[N - i - 1]]) # identify amplitude extremes distance_to_upper = np.full(N, np.inf) distance_to_lower = np.full(N, np.inf) out_50_CR = np.setdiff1d(np.arange(0, N), outlier_index) for i in range(0, out_50_CR.shape[0]): j = out_50_CR[i] distance_to_upper[j] = np.sqrt(trapezoid((upper_q - qt[:, j]) ** 2, time)) distance_to_lower[j] = np.sqrt(trapezoid((lower_q - qt[:, j]) ** 2, time)) max_index = distance_to_upper.argmin() min_index = distance_to_lower.argmin() min_q = qt[:, min_index] max_q = qt[:, max_index] minn = ft[:, min_index] maxx = ft[:, max_index] s = np.linspace(0, 1, 100) Fs2 = np.zeros((time.shape[0], 595)) Fs2[:, 0] = (1 - s[0]) * minn + s[0] * Q1 for j in range(1, 100): Fs2[:, j] = (1 - s[j]) * minn + s[j] * Q1a Fs2[:, 98 + j] = (1 - s[j]) * Q1a + s[j] * Q1 Fs2[:, 197 + j] = (1 - s[j]) * Q1 + s[j] * f_median Fs2[:, 296 + j] = (1 - s[j]) * f_median + s[j] * Q3 Fs2[:, 395 + j] = (1 - s[j]) * Q3 + s[j] * Q3a Fs2[:, 494 + j] = (1 - s[j]) * Q3a + s[j] * maxx d1 = np.sqrt(trapezoid((q_median - Q1_q) ** 2, time)) d1a = np.sqrt(trapezoid((Q1_q - Q1a_q) ** 2, time)) dl = np.sqrt(trapezoid((Q1a_q - min_q) ** 2, time)) d3 = np.sqrt(trapezoid((q_median - Q3_q) ** 2, time)) d3a = np.sqrt(trapezoid((Q3_q - Q3a_q) ** 2, time)) du = np.sqrt(trapezoid((Q3a_q - max_q) ** 2, time)) part1 = np.linspace(-d1 - d1a - dl, -d1 - d1a, 100) part2 = np.linspace(-d1 - d1a, -d1, 100) part3 = np.linspace(-d1, 0, 100) part4 = np.linspace(0, d3, 100) part5 = np.linspace(d3, d3 + d3a, 100) part6 = np.linspace(d3 + d3a, d3 + d3a + du, 100) allparts = np.hstack( ( part1, part2[1:100], part3[1:100], part4[1:100], part5[1:100], part6[1:100], ) ) U, V = np.meshgrid(time, allparts) U = np.transpose(U) V = np.transpose(V) self.Q1 = Q1 self.Q3 = Q3 self.Q1a = Q1a self.Q3a = Q3a self.minn = minn self.maxx = maxx self.outlier_index = outlier_index self.f_median = f_median self.q_median = q_median plt = collections.namedtuple( "plt", [ "U", "V", "Fs2", "allparts", "d1", "d1a", "dl", "d3", "d3a", "du", "Q1q", "Q3q", ], ) self.plt = plt(U, V, Fs2, allparts, d1, d1a, dl, d3, d3a, du, Q1a_q, Q3a_q) return
[docs] def plot(self): """ plot box plot and surface plot Usage: obj.plot() """ M = self.warp_data.time.shape[0] fig1, ax1 = plt.subplots() ax1.plot(self.warp_data.time, self.f_median, "k") ax1.plot(self.warp_data.time, self.Q1, "b") ax1.plot(self.warp_data.time, self.Q3, "b") ax1.plot(self.warp_data.time, self.Q1a, "g") ax1.plot(self.warp_data.time, self.Q3a, "g") ax1.plot(self.warp_data.time, self.minn, "r") ax1.plot(self.warp_data.time, self.maxx, "r") fig2 = plt.figure() ax = fig2.gca(projection="3d") ax.plot_surface( self.plt.U, self.plt.V, self.plt.Fs2, cmap=cm.viridis, rcount=200, ccount=200, ) ax.plot(self.warp_data.time, np.zeros(M), self.f_median, "k") ax.plot(self.warp_data.time, np.repeat(-self.plt.d1, M), self.Q1, "b") ax.plot( self.warp_data.time, np.repeat(-self.plt.d1 - self.plt.d1a, M), self.Q1a, "g", ) ax.plot( self.warp_data.time, np.repeat(-self.plt.d1 - self.plt.d1a - self.plt.dl, M), self.minn, "r", ) ax.plot(self.warp_data.time, np.repeat(self.plt.d3, M), self.Q3, "b") ax.plot( self.warp_data.time, np.repeat(self.plt.d3 + self.plt.d3a, M), self.Q3a, "g" ) ax.plot( self.warp_data.time, np.repeat(self.plt.d3 + self.plt.d3a + self.plt.du, M), self.maxx, "r", ) plt.show()
[docs] class phbox: """ This class provides phase boxplot for functional data using the SRVF framework Usage: obj = phbox(warp_data) :param warp_data: fdawarp class with alignment data :type warp_data: fdawarp :param Q1: First quartile :param Q3: Second quartile :param Q1a: First quantile based on alpha :param Q3a: Second quantile based on alpha :param minn: minimum extreme function :param maxx: maximum extreme function :param outlier_index: indexes of outlier functions :param median_x: median warping function :param psi_median: median srvf of warping function :param plt: surface plot mesh Author : J. D. Tucker (JDT) <jdtuck AT sandia.gov> Date : 15-Mar-2018 """ def __init__(self, fdawarp): """ Construct an instance of the phbox class :param fdawarp: fdawarp class """ if fdawarp.fn.size == 0: raise Exception("Please align fdawarp class using srsf_align!") if fdawarp.type == "mean": raise Exception( 'Please align fdawarp class using srsf_align with method="median"!' ) self.warp_data = fdawarp
[docs] def construct_boxplot(self, alpha=0.05, k_a=1): """ This function constructs phase boxplot for functional data using the elastic square-root slope (srsf) framework. :param alpha: quantile value (e.g.,=.05, i.e., 95%) :param k_a: scalar for outlier cutoff (e.g.,=1) """ if self.warp_data.rsamps: gam = self.warp_data.gams else: gam = self.warp_data.gam M, N = gam.shape t = np.linspace(0, 1, M) time = t lam = 0.5 # compute phase median median_x, psi_median, psi, vec = uf.SqrtMedian(gam) # compute phase distances dx = np.zeros(N) v = np.zeros((M, N)) for k in range(0, N): v[:, k], d = geo.inv_exp_map(psi_median, psi[:, k]) dx[k] = np.sqrt(trapezoid(v[:, k] ** 2, t)) dx_ordering = dx.argsort() CR_50 = dx_ordering[0 : np.ceil(N / 2).astype("int")] tmp = dx[CR_50] m = tmp.max() # identify phase quartiles angle = np.zeros((CR_50.shape[0], CR_50.shape[0])) energy = np.zeros((CR_50.shape[0], CR_50.shape[0])) for i in range(0, CR_50.shape[0] - 1): for j in range(i + 1, CR_50.shape[0]): q1 = v[:, CR_50[i]] q3 = v[:, CR_50[j]] q1 /= np.sqrt(trapezoid(q1**2, time)) q3 /= np.sqrt(trapezoid(q3**2, time)) angle[i, j] = trapezoid(q1 * q3, time) energy[i, j] = (1 - lam) * ( dx[CR_50[i]] / m + dx[CR_50[j]] / m ) - lam * (angle[i, j] + 1) maxloc = energy.argmax() maxloc_row, maxloc_col = np.unravel_index(maxloc, energy.shape) Q1_index = CR_50[maxloc_row] Q3_index = CR_50[maxloc_col] Q1 = gam[:, Q1_index] Q3 = gam[:, Q3_index] Q1_psi = np.sqrt(np.gradient(Q1, 1 / (M - 1))) Q3_psi = np.sqrt(np.gradient(Q3, 1 / (M - 1))) # identify phase quantiles dx_ordering = dx.argsort() CR_alpha = dx_ordering[0 : np.round(N * (1 - alpha)).astype("int")] tmp = dx[CR_alpha] m = tmp.max() angle = np.zeros((CR_alpha.shape[0], CR_alpha.shape[0])) energy = np.zeros((CR_alpha.shape[0], CR_alpha.shape[0])) for i in range(0, CR_alpha.shape[0] - 1): for j in range(i + 1, CR_alpha.shape[0]): q1 = v[:, CR_alpha[i]] q3 = v[:, CR_alpha[j]] q1 /= np.sqrt(trapezoid(q1**2, time)) q3 /= np.sqrt(trapezoid(q3**2, time)) angle[i, j] = trapezoid(q1 * q3, time) energy[i, j] = (1 - lam) * ( dx[CR_alpha[i]] / m + dx[CR_alpha[j]] / m ) - lam * (angle[i, j] + 1) maxloc = energy.argmax() maxloc_row, maxloc_col = np.unravel_index(maxloc, energy.shape) Q1a_index = CR_alpha[maxloc_row] Q3a_index = CR_alpha[maxloc_col] Q1a = gam[:, Q1a_index] Q3a = gam[:, Q3a_index] Q1a_psi = np.sqrt(np.gradient(Q1a, 1 / (M - 1))) Q3a_psi = np.sqrt(np.gradient(Q3a, 1 / (M - 1))) # check quartile and quantile going in same direction tst = trapezoid(v[:, Q1a_index] * v[:, Q1_index]) if tst < 0: Q1a = gam[:, Q3a_index] Q3a = gam[:, Q1a_index] # compute phase whiskers IQR = dx[Q1_index] + dx[Q3_index] v1 = v[:, Q3a_index] v3 = v[:, Q3a_index] upper_v = v3 + k_a * IQR * v3 / np.sqrt(trapezoid(v3**2, time)) lower_v = v1 + k_a * IQR * v1 / np.sqrt(trapezoid(v1**2, time)) upper_dis = np.sqrt(trapezoid(v3**2, time)) lower_dis = np.sqrt(trapezoid(v1**2, time)) whisker_dis = max(upper_dis, lower_dis) # identify phase outliers outlier_index = np.array([]) for i in range(0, N): if dx[dx_ordering[N - 1 - i]] > whisker_dis: outlier_index = np.append(outlier_index, dx_ordering[N - i - 1]) # identify phase extremes distance_to_upper = np.full(N, np.inf) distance_to_lower = np.full(N, np.inf) out_50_CR = np.setdiff1d(np.arange(0, N), outlier_index) for i in range(0, out_50_CR.shape[0]): j = out_50_CR[i] distance_to_upper[j] = np.sqrt(trapezoid((upper_v - v[:, j]) ** 2, time)) distance_to_lower[j] = np.sqrt(trapezoid((lower_v - v[:, j]) ** 2, time)) max_index = distance_to_upper.argmin() min_index = distance_to_lower.argmin() minn = gam[:, min_index] maxx = gam[:, max_index] min_psi = psi[:, min_index] max_psi = psi[:, max_index] s = np.linspace(0, 1, 100) Fs2 = np.zeros((time.shape[0], 595)) Fs2[:, 0] = (1 - s[0]) * (minn - t) + s[0] * (Q1 - t) for j in range(1, 100): Fs2[:, j] = (1 - s[j]) * (minn - t) + s[j] * (Q1a - t) Fs2[:, 98 + j] = (1 - s[j]) * (Q1a - t) + s[j] * (Q1 - t) Fs2[:, 197 + j] = (1 - s[j]) * (Q1 - t) + s[j] * (median_x - t) Fs2[:, 296 + j] = (1 - s[j]) * (median_x - t) + s[j] * (Q3 - t) Fs2[:, 395 + j] = (1 - s[j]) * (Q3 - t) + s[j] * (Q3a - t) Fs2[:, 494 + j] = (1 - s[j]) * (Q3a - t) + s[j] * (maxx - t) d1 = np.sqrt(trapezoid(psi_median * Q1_psi, time)) d1a = np.sqrt(trapezoid(Q1_psi * Q1a_psi, time)) dl = np.sqrt(trapezoid(Q1a_psi * min_psi, time)) d3 = np.sqrt(trapezoid((psi_median * Q3_psi), time)) d3a = np.sqrt(trapezoid((Q3_psi * Q3a_psi), time)) du = np.sqrt(trapezoid((Q3a_psi * max_psi), time)) part1 = np.linspace(-d1 - d1a - dl, -d1 - d1a, 100) part2 = np.linspace(-d1 - d1a, -d1, 100) part3 = np.linspace(-d1, 0, 100) part4 = np.linspace(0, d3, 100) part5 = np.linspace(d3, d3 + d3a, 100) part6 = np.linspace(d3 + d3a, d3 + d3a + du, 100) allparts = np.hstack( ( part1, part2[1:100], part3[1:100], part4[1:100], part5[1:100], part6[1:100], ) ) U, V = np.meshgrid(time, allparts) U = np.transpose(U) V = np.transpose(V) self.Q1 = Q1 self.Q3 = Q3 self.Q1a = Q1a self.Q3a = Q3a self.minn = minn self.maxx = maxx self.outlier_index = outlier_index self.median_x = median_x self.psi_media = psi_median plt = collections.namedtuple( "plt", [ "U", "V", "Fs2", "allparts", "d1", "d1a", "dl", "d3", "d3a", "du", "Q1_psi", "Q3_psi", ], ) self.plt = plt(U, V, Fs2, allparts, d1, d1a, dl, d3, d3a, du, Q1a_psi, Q3a_psi) return
[docs] def plot(self): """ plot box plot and surface plot Usage: obj.plot() """ M = self.warp_data.time.shape[0] time = np.linspace(0, 1, M) fig1, ax1 = plt.subplots() ax1.plot(time, self.median_x, "k") ax1.plot(time, self.Q1, "b") ax1.plot(time, self.Q3, "b") ax1.plot(time, self.Q1a, "g") ax1.plot(time, self.Q3a, "g") ax1.plot(time, self.minn, "r") ax1.plot(time, self.maxx, "r") ax1.set_aspect("equal") fig2 = plt.figure() ax = fig2.gca(projection="3d") ax.plot_surface( self.plt.U, self.plt.V, self.plt.Fs2, cmap=cm.viridis, rcount=200, ccount=200, ) ax.plot(time, np.zeros(M), self.median_x - time, "k") ax.plot(time, np.repeat(-self.plt.d1, M), self.Q1 - time, "b") ax.plot(time, np.repeat(-self.plt.d1 - self.plt.d1a, M), self.Q1a - time, "g") ax.plot( time, np.repeat(-self.plt.d1 - self.plt.d1a - self.plt.dl, M), self.minn - time, "r", ) ax.plot(time, np.repeat(self.plt.d3, M), self.Q3 - time, "b") ax.plot(time, np.repeat(self.plt.d3 + self.plt.d3a, M), self.Q3a - time, "g") ax.plot( time, np.repeat(self.plt.d3 + self.plt.d3a + self.plt.du, M), self.maxx - time, "r", ) plt.show() return