Source code for gmprocess.waveform_processing.nn_quality_assurance

"""Module for the neural network QA method."""

import csv
import numpy as np
import copy
from scipy.integrate import cumulative_trapezoid
import os
import logging
import pathlib
from gmprocess.waveform_processing.processing_step import processing_step


[docs] @processing_step def nnet_qa(st, acceptance_threshold, model_name, config=None): """Apply the neural network QA algorithm by Bellagamba et al. (2019), Assess the quality of a stream by analyzing its two horizontal components as described in Bellagamba et al. (2019). Performs three steps: 1) Compute the quality metrics (see paper for more info) 2) Preprocess the quality metrics (deskew, standardize and decorrelate) 3) Evaluate the quality using a neural network-based model Two models are available: 'Cant' and 'CantWell'. To minimize the number of low quality ground motion included, the natural acceptance threshold 0.5 can be raised (up to an extreme value of 0.95). Recommended parameters are: - acceptance_threshold = 0.5 or 0.6 - model_name = 'CantWell' Args: st (StationStream): The ground motion record to analyze. Should contain at least 2 orthogonal horizontal traces. acceptance_threshold (float): Threshold from which GM records are considered acceptable. model_name (string): name of the used model ('Cant' or 'CantWell') config (dict): Configuration dictionary (or None). See get_config(). Returns: StationStream: With QA analysis applied. """ # This check only works if we have two horizontal components in the stream if st.num_horizontal != 2: for tr in st: tr.fail( "Stream does not contain two horiztonal components. " "NNet QA check will not be performed." ) return st # Also need to check that we don't have data arrays of all zeros, as this # will cause problems all_zeros = False for tr in st: if np.all(tr.data == 0): all_zeros = True if all_zeros: for tr in st: tr.fail( "The data contains all zeros, so the " "nnet_qa check is not able to be performed." ) return st # Check that we have the required trace parameters have_params = True for tr in st: if not {"signal_spectrum", "noise_spectrum", "snr"}.issubset( set(tr.get_cached_names()) ): have_params = False if not have_params: for tr in st: tr.fail( "One or more traces in the stream does have the required " "trace parameters to perform the nnet_qa check." ) return st # Create the path to the NN folder based on model name nn_path = pathlib.Path(__file__).parent / ".." / "data" / "nn_qa" / model_name # Compute the quality metrics qm = compute_quality_metrics(st) # Pre-process the qualtiy metrics qm = preprocess_quality_metrics(qm, model_name) # Instanciate the NN (based on model_name) NN = neuralNet() NN.load_nn(nn_path) # Use NN scores = NN.use_nn(qm)[0] # Accepted? flag_accept = False if scores[1] >= acceptance_threshold: flag_accept = True # Add parameters to Stream (acceptance threshold, model_name, score_lowQ, # score_highQ, highQualityFlag) nnet_dict = { "accept_thres": acceptance_threshold, "model_name": model_name, "score_LQ": scores[0], "score_HQ": scores[1], "pass_QA": flag_accept, } st.set_stream_param("nnet_qa", nnet_dict) if not flag_accept: for tr in st: tr.fail("Failed NNet QA check.") return st
def is_number(s): """ Check if given input is a number. Args: s (any type): Data to test Returns: bool: True if is a number, False if isn't """ try: float(s) return True except ValueError: return False def load_csv(data_path, row_ignore=0, col_ignore=0): """ Load csv files from a given path and returns a list of list. For all imported data, check if is a number. If so, returns a float. If not, returns a string. Args: data_path (string): path to the csv to load row_ignore (int): number of rows to ignore col_ignore (int) : number of columns to ignore Returns: list of list: containing the data from the csv """ M = [] with open(data_path, encoding="utf-8") as csvfile: readCSV = csv.reader(csvfile) # Skip header for i in range(row_ignore): next(csvfile) for row in readCSV: # Input vector single_line = [] for i in range(col_ignore, len(row)): if is_number(row[i]): single_line.append(float(row[i])) else: single_line.append(row[i]) M.append(single_line) return M def sigmoid(v_input): """ Performs a sigmoid operation on the input (1/(e(-x)+1)) Args: v_input (float): a number defined on R (real) Returns: float: sigmoid result (a number between 0 and 1) """ v_act = [] for x in v_input: v_act.append(1.0 / (1 + np.exp(-x))) return v_act def tanh(v_input): """ Performs a hyperbolic tangent operation on the input (2/(e(2x)+1)) Args: v_input (float): a number defined on R (real) Returns: float: tanh result (a number between -1 and 1) """ v_act = [] for x in v_input: v_act.append(np.tanh(x)) return v_act class neuralNet: """ Class allowing the instantiation and use of simple (1 or 2 layers) neural networks """ def __init__(self): """ Instantiate an empty neural network (no weights, functions, or biases loaded """ self.n_input = 0 self.n_neuron_H1 = 0 self.n_neuron_H2 = -1 self.n_output = 0 self.activation_H1 = "NA" self.activation_H2 = "NA" self.activation_output = "NA" self.w_H1 = [] self.w_H2 = [] self.b_H1 = [] self.b_H2 = [] self.w_output = [] self.b_output = [] # load_nn: load and build neural network model def load_nn(self, nn_path): """ Populate an instantated neural netowrk with data contained in a specific folder. Args: nn_path (string): path to the folder containing the required information (masterF.txt, weights.csv, biases.csv) """ data_path = os.path.join(nn_path, "masterF.txt") with open(data_path, encoding="utf-8") as masterF: readCSV = csv.reader(masterF) for row in readCSV: if len(row) == 7: self.n_input = int(row[0]) self.n_neuron_H1 = int(row[1]) self.n_neuron_H2 = int(row[3]) self.n_output = int(row[5]) self.activation_H1 = row[2] self.activation_H2 = row[4] self.activation_output = row[6] elif len(row) == 5: self.n_input = int(row[0]) self.n_neuron_H1 = int(row[1]) self.n_output = int(row[3]) self.activation_H1 = row[2] self.activation_output = row[4] masterF.close() # Load weights and biases # Weights first hidden layer data_path = os.path.join(nn_path, "weight_1.csv") self.w_H1 = np.asarray(load_csv(data_path)) # Biases first hidden layer data_path = os.path.join(nn_path, "bias_1.csv") self.b_H1 = np.asarray(load_csv(data_path)) # Weights output layer data_path = os.path.join(nn_path, "weight_output.csv") self.w_output = np.asarray(load_csv(data_path)) # Biases output layer data_path = os.path.join(nn_path, "bias_output.csv") self.b_output = np.asarray(load_csv(data_path)) # Second hidden layer if self.n_neuron_H2 != -1: # Weights second hidden layer data_path = os.path.join(nn_path, "weight_2.csv") self.w_H2 = np.asarray(load_csv(data_path)) # Biases second hidden layer data_path = os.path.join(nn_path, "bias_2.csv") self.b_H2 = np.asarray(load_csv(data_path)) def use_nn(self, v_input): """ Use a populated neural network (i.e. from the input, returns the classification score or the regression result). Args: v_input (list or np.array): list or numpy array of the inputs (must be all numerical). Size must be equal to the NN input layer size. Returns: v_inter (np.array): numpy array containing the results. """ v_inter = np.array([]) # Transform input if required if isinstance(v_input, list): v_input = np.asarray(v_input) # First layer if self.activation_H1 == "sigmoid": v_inter = sigmoid(np.dot(v_input.T, self.w_H1) + self.b_H1) elif self.activation_H1 == "tanh": v_inter = tanh(np.dot(v_input.T, self.w_H1) + self.b_H1) else: v_inter = np.dot(v_input.T, self.w_H1) + self.b_H1 # If second layer exist if self.n_neuron_H2 != -1: if self.activation_H2 == "sigmoid": v_inter = sigmoid(np.dot(v_inter, self.w_H2) + self.b_H2) elif self.activation_H2 == "tanh": v_inter = tanh(np.dot(v_inter, self.w_H2) + self.b_H2) else: v_inter = np.dot(v_inter, self.w_H2) + self.b_H2 # Final layer if self.activation_output == "sigmoid": v_inter = sigmoid(np.dot(v_inter, self.w_output) + self.b_output) elif self.activation_output == "tanh": v_inter = tanh(np.dot(v_inter, self.w_output) + self.b_output) else: v_inter = np.dot(v_inter, self.w_output) + self.b_output return v_inter def deskew_data(data, model_name): """ Performs the deskewing operations used in Bellagamba et al. (2019) on the quality metrics vector. Depending on the selected model. Args: data (list of floats): 20 quality metrics computed as described in the paper model_name (string): name of the selected model. Available 'Cant' and 'CantWell' as described in the paper Returns: list of float: processed (deskewed) data """ if model_name == "Cant": for i in range(len(data)): if i == 0 or i == 1 or i == 11 or i == 15 or i == 16: data[i] = np.log(data[i]) elif i == 17: data[i] = -1.0 / data[i] ** 1.2 elif i == 2: data[i] = data[i] ** (-0.2) elif i == 10: data[i] = data[i] ** (-0.06) elif i == 19: data[i] = data[i] ** 0.43 elif i == 7: data[i] = data[i] ** 0.25 elif i == 8: data[i] = data[i] ** 0.23 elif i == 9: data[i] = data[i] ** 0.05 elif i == 18: data[i] = data[i] ** 0.33 elif i == 3: data[i] = data[i] ** (0.12) elif i == 5: data[i] = data[i] ** (0.48) elif i == 6: data[i] = data[i] ** (0.37) elif i == 12: data[i] = data[i] ** 0.05 elif i == 13: data[i] = data[i] ** 0.08 elif i == 4: data[i] = data[i] ** (0.16) elif i == 14: data[i] = data[i] ** (0.1) return data elif model_name == "CantWell": for i in range(len(data)): if i == 0 or i == 1 or i == 11 or i == 15 or i == 16: data[i] = np.log(data[i]) elif i == 17: data[i] = -1.0 / data[i] ** 1.2 elif i == 2: data[i] = data[i] ** (-0.2) elif i == 10: data[i] = data[i] ** (-0.06) elif i == 19: data[i] = data[i] ** 0.43 elif i == 7: data[i] = data[i] ** 0.1 elif i == 8: data[i] = data[i] ** 0.23 elif i == 9: data[i] = data[i] ** 0.2 elif i == 18: data[i] = data[i] ** 0.33 elif i == 3: data[i] = data[i] ** (0.05) elif i == 5: data[i] = data[i] ** (0.3) elif i == 6: data[i] = data[i] ** (0.37) elif i == 12: data[i] = data[i] ** 0.05 elif i == 13: data[i] = data[i] ** 0.08 elif i == 4: data[i] = data[i] ** (0.05) elif i == 14: data[i] = data[i] ** (0.05) return data def standardize_data(data, mu, sigma): """ Performs a standardization operation on the given data ((X-mu)/sigma) Args: data (list of float): data to standardize (size represents the dimensionality of the data and not the number of point to standardize) mu (list of float): means sigma (list of float): standard deviation Returns: list o float: standardized data """ for i in range(len(data)): data[i] = (data[i] - mu[i]) / sigma[i] return data def decorrelate_data(data, M): """ Decorrelate the data based on a Mahalanobis tranform. The transformation matrix is given as an input. Args: data (np.array): numpy array containing the data to be decorrelated (size = N). M (np.array): decorrelation matrix (size NxN) Returns: list of float containing the decorrelated data """ M = np.array(M) data = M.dot(data) data = np.transpose(data) return data.tolist() def preprocess_quality_metrics(qm, model_name): """ Pre-process the quality metrics according to Bellagamba et al. (2019) (i.e. deskews, standardizes and decorrelates the quality metrics) Args: qm (list of float): quality metrics estimated according to the paper model_name (string): name of the used model for processing. Available: 'Cant' and 'CantWell'. Returns: list of float containing the pre-processed quality metrics. """ # Building dir path from model name data_path = pathlib.Path(__file__).parent / ".." / "data" / "nn_qa" / model_name # Get resource from the correct dir M = load_csv(os.path.join(data_path, "M.csv")) csv_dir = os.path.join(data_path, "mu_sigma.csv") [mu, sigma] = load_csv(csv_dir) # Deskew, standardize and decorrelate data qm = deskew_data(qm, model_name) qm = standardize_data(qm, mu, sigma) qm = decorrelate_data(qm, M) return qm def get_husid(acceleration, time_vector): """ Returns the Husid vector, defined as int{acceleration ** 2.} Args: acceleration (np.array): Vector of acceleration values time_vector (np.array): Time vector in seconds """ husid = np.hstack([0.0, cumulative_trapezoid(acceleration**2.0, time_vector)]) AI = husid / max(husid) return husid, AI def get_freq_index(ft_freq, lower, upper): """ Gets the indices of a frequency range in the frequency vector Args: ft_freq (list of float): list of ordred frequencies lower (float): lower boud of the frequency range upper (float): upper bound of the frequency range Returns: int, int: the indices bounding the range """ lower_indices = [i for i, x in enumerate(ft_freq) if x > lower] upper_indices = [i for i, x in enumerate(ft_freq) if x < upper] lower_index = min(lower_indices) upper_index = max(upper_indices) return lower_index, upper_index def get_husid_index(husid, threshold): """ Returns the index of the husid for a particular threshold Args: husid (list of float): husid vector threshold (float): threshold not to be exceeded Returns: int: the index of the latest value below the threshold """ husid_indices = [i for i, x in enumerate(husid) if x > threshold] husid_index = min(husid_indices) return husid_index def calculate_snr_min(ft_freq, snr): """ Calculate the SNR min between 0.1 and 20 Hz Args: ft_freq (list of float): vector of frequencies used in the Fourier spectrum snr (list of float): vector of the snr at the frequencies in ft_freq Returns: float: min snr between 0.1 and 20 Hz """ # Frequencies must be available between 0.1 and 20 Hz lower_index, upper_index = get_freq_index(ft_freq, 0.1, 20) snr_min = min(snr[lower_index:upper_index]) return snr_min def calculate_husid(acc, t): """ Calculate the husid and Arias of a signal. Args: acc (np.array): accelerogram vector t (np.array): time vector (constant time step) Returns: husid: vector of floats AI: vector of floats Arias: float, max value of AI husid index at 5, 75 and 95% (used for duration) """ husid, AI = get_husid(acc, t) Arias = max(husid) husid_index_5 = get_husid_index(AI, 0.05) husid_index_75 = get_husid_index(AI, 0.75) husid_index_95 = get_husid_index(AI, 0.95) return husid, AI, Arias, husid_index_5, husid_index_75, husid_index_95 def get_classification_metrics(tr, p_pick, delta_t): """ Compute the quality metrics as in Bellagamba et al. (2019). More details in the paper. WARNINGS: - Acceleration untis changed into g at the beginning! - Vertical component is not used! Args: tr (list of list of float): each list contains an horizontal trace p_pick (float): estimated P-wave arrival time (in seconds) from the start of the record delta_t (float): time step used in the record in seconds (decimal) Returns: List of float containing the quality metrics (size = 20) """ ######################################## # Acceleration units changed into g!!! # # Vertical component not used!!! # ######################################## # Extract data from dictionary # First horizontal comp acc_comp1 = np.asarray(tr["acc_comp1"]) / 981.0 smooth_ft1 = np.asarray(tr["smooth_ft1"]) / 981.0 smooth_ft1_freq = np.asarray(tr["smooth_ft1_freq"]) smooth_ft1_pe = np.asarray(tr["smooth_ft1_pe"]) / 981.0 snr1_freq = np.asarray(tr["snr1_freq"]) # Second horizontal comp acc_comp2 = np.asarray(tr["acc_comp2"]) / 981.0 smooth_ft2 = np.asarray(tr["smooth_ft2"]) / 981.0 smooth_ft2_pe = np.asarray(tr["smooth_ft2_pe"]) / 981.0 # Sample rate sample_rate = 1.0 / delta_t # Index of the P-wave arrival time index_p_arrival = int(np.floor(np.multiply(p_pick, sample_rate))) # recreate a time vector t = np.arange(len(acc_comp1)) * delta_t # set up a copy of accelerations for plotting later (they get changed # by window/taper in the ft step) acc1 = copy.deepcopy(acc_comp1) acc2 = copy.deepcopy(acc_comp2) # calculate husid and Arias intensities ( husid1, AI1, Arias1, husid_index1_5, husid_index1_75, husid_index1_95, ) = calculate_husid(acc1, t) ( husid2, AI2, Arias2, husid_index2_5, husid_index2_75, husid_index2_95, ) = calculate_husid(acc2, t) # calculate max amplitudes of acc time series, final is geomean PGA1 = np.max(np.abs(acc1)) PGA2 = np.max(np.abs(acc2)) amp1_pe = max(abs(acc1[0:index_p_arrival])) amp2_pe = max(abs(acc2[0:index_p_arrival])) PGA = np.sqrt(PGA1 * PGA2) PN = np.sqrt(amp1_pe * amp2_pe) PN_average = np.sqrt( np.average(abs(acc1[0:index_p_arrival])) * np.average(abs(acc2[0:index_p_arrival])) ) PNPGA = PN / PGA # calculate effective head and tail lengths tail_duration = min([5.0, 0.1 * t[-1]]) tail_length = int(tail_duration * sample_rate) tail_average1 = np.mean(abs(acc1[-tail_length:])) tail_average2 = np.mean(abs(acc2[-tail_length:])) if PGA1 != 0 and PGA2 != 0: tail_ratio1 = tail_average1 / PGA1 tail_ratio2 = tail_average2 / PGA2 tail_ratio = np.sqrt(tail_ratio1 * tail_ratio2) tailnoise_ratio = tail_ratio / PN_average else: logging.debug("PGA1 or PGA2 is 0") tail_ratio1 = 1.0 tail_ratio2 = 1.0 tail_ratio = 1.0 mtail_duration = min([2.0, 0.1 * t[-1]]) mtail_length = int(mtail_duration * sample_rate) mtail_max1 = np.max(abs(acc1[-mtail_length:])) mtail_max2 = np.max(abs(acc2[-mtail_length:])) if PGA1 != 0 and PGA2 != 0: mtail_ratio1 = mtail_max1 / PGA1 mtail_ratio2 = mtail_max2 / PGA2 mtail_ratio = np.sqrt(mtail_ratio1 * mtail_ratio2) mtailnoise_ratio = mtail_ratio / PN else: logging.debug("PGA1 or PGA2 is 0") mtail_ratio1 = 1.0 mtail_ratio2 = 1.0 mtail_ratio = 1.0 head_duration = 1.0 head_length = int(head_duration * sample_rate) head_average1 = np.max(abs(acc1[0:head_length])) head_average2 = np.max(abs(acc2[0:head_length])) if PGA1 != 0 and PGA2 != 0: head_ratio1 = head_average1 / PGA1 head_ratio2 = head_average2 / PGA2 head_ratio = np.sqrt(head_ratio1 * head_ratio2) else: logging.debug("PGA1 or PGA2 is 0") head_ratio1 = 1.0 head_ratio2 = 1.0 head_ratio = 1.0 # bracketed durations between 10%, 20%, 30%, 40% and 50% of PGA # first get all vector indices where abs max acc is greater than or # equal, and less than or equal to x*PGA hindex1_10 = [ i for (i, a) in enumerate(acc1) if np.abs(a) >= (0.10 * np.max(np.abs(acc1))) ] hindex2_10 = [ i for (i, a) in enumerate(acc2) if np.abs(a) >= (0.10 * np.max(np.abs(acc2))) ] hindex1_20 = [ i for (i, a) in enumerate(acc1) if np.abs(a) >= (0.20 * np.max(np.abs(acc1))) ] hindex2_20 = [ i for (i, a) in enumerate(acc2) if np.abs(a) >= (0.20 * np.max(np.abs(acc2))) ] # get bracketed duration (from last and first time the index is exceeded) if len(hindex1_10) != 0 and len(hindex2_10) != 0: bracketedPGA_10 = np.sqrt( ((max(hindex1_10) - min(hindex1_10)) * delta_t) * ((max(hindex2_10) - min(hindex2_10)) * delta_t) ) else: bracketedPGA_10 = 9999.0 if len(hindex1_20) != 0 and len(hindex2_20) != 0: bracketedPGA_20 = np.sqrt( ((max(hindex1_20) - min(hindex1_20)) * delta_t) * ((max(hindex2_20) - min(hindex2_20)) * delta_t) ) else: bracketedPGA_20 = 9999.0 bracketedPGA_10_20 = bracketedPGA_10 / bracketedPGA_20 # calculate Ds575 and Ds595 Ds575 = np.sqrt( ((husid_index1_75 - husid_index1_5) * delta_t) * ((husid_index2_75 - husid_index2_5) * delta_t) ) Ds595 = np.sqrt( ((husid_index1_95 - husid_index1_5) * delta_t) * ((husid_index2_95 - husid_index2_5) * delta_t) ) # geomean of fourier spectra smooth_ftgm = np.sqrt(np.multiply(abs(smooth_ft1), abs(smooth_ft2))) smooth_ftgm_pe = np.sqrt(np.multiply(abs(smooth_ft1_pe), abs(smooth_ft2_pe))) # snr metrics - min, max and averages lower_index, upper_index = get_freq_index(smooth_ft1_freq, 0.1, 20) with np.errstate(invalid="ignore"): snrgm = np.divide(smooth_ftgm, smooth_ftgm_pe) snr_min = min(snrgm[lower_index:upper_index]) snr_max = max(snrgm) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 0.1, 10) snr_average = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 0.1, 0.5) ft_a1 = np.trapz( smooth_ftgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) snr_a1 = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 0.5, 1.0) ft_a2 = np.trapz( smooth_ftgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) snr_a2 = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 1.0, 2.0) snr_a3 = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 2.0, 5.0) snr_a4 = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) lower_index_average, upper_index_average = get_freq_index(snr1_freq, 5.0, 10.0) snr_a5 = np.trapz( snrgm[lower_index_average:upper_index_average], snr1_freq[lower_index_average:upper_index_average], ) / (snr1_freq[upper_index_average] - snr1_freq[lower_index_average]) ft_a1_a2 = ft_a1 / ft_a2 # calculate lf to max signal ratios signal1_max = np.max(smooth_ft1) lf1 = np.trapz(smooth_ft1[0:lower_index], smooth_ft1_freq[0:lower_index]) / ( smooth_ft1_freq[lower_index] - smooth_ft1_freq[0] ) lf1_pe = np.trapz(smooth_ft1_pe[0:lower_index], smooth_ft1_freq[0:lower_index]) / ( smooth_ft1_freq[lower_index] - smooth_ft1_freq[0] ) signal2_max = max(smooth_ft2) lf2 = np.trapz(smooth_ft2[0:lower_index], smooth_ft1_freq[0:lower_index]) / ( smooth_ft1_freq[lower_index] - smooth_ft1_freq[0] ) lf2_pe = np.trapz(smooth_ft2_pe[0:lower_index], smooth_ft1_freq[0:lower_index]) / ( smooth_ft1_freq[lower_index] - smooth_ft1_freq[0] ) signal_ratio_max = max([lf1 / signal1_max, lf2 / signal2_max]) signal_pe_ratio_max = max([lf1_pe / signal1_max, lf2_pe / signal2_max]) return [ signal_pe_ratio_max, signal_ratio_max, snr_min, snr_max, snr_average, tail_ratio, mtail_ratio, tailnoise_ratio, mtailnoise_ratio, head_ratio, snr_a1, snr_a2, snr_a3, snr_a4, snr_a5, ft_a1_a2, PNPGA, bracketedPGA_10_20, Ds575, Ds595, ] def compute_quality_metrics(st): """ Get the 2 horizontal components and format the P-wave arrival time before launching the computation of the qualtiy metrics as in Bellagamba et al. (2019) Args: st (list of trace): a list of trace as defined in gmprocess (USGS) Returns: List of float containing the 20 quality metrics """ # Initialize dictionary of variables necessary to the computation of the QM tr = {} # Determine if traces are horizontal or vertical ind = [] i = 1 for tr_i in st: if "Z" not in tr_i.stats["channel"].upper(): ind.append(str(i)) i = i + 1 else: ind.append("v") # Extract required info from each trace in the stream i = 0 for tr_i in st: if ind[i] != "v": # Raw accelerogram (debiased and detrended) str_i = "acc_comp" + ind[i] tr[str_i] = tr_i.data # Fourier spectrum str_i = "ft" + ind[i] tr[str_i] = tr_i.get_cached("signal_spectrum")["spec"] # Frequ of the Fourier spectrum str_i = "ft" + ind[i] + "_freq" tr[str_i] = tr_i.get_cached("signal_spectrum")["freq"] # Smoothed Fourier spectrum str_i = "smooth_ft" + ind[i] sig_spec = tr_i.get_cached("smooth_signal_spectrum")["spec"] sig_spec = np.where(np.isnan(sig_spec), 0.0, sig_spec) tr[str_i] = sig_spec # Freq of he smoothed Fourier spectrum str_i = "smooth_ft" + ind[i] + "_freq" tr[str_i] = tr_i.get_cached("smooth_signal_spectrum")["freq"] # Fourier spectrum of the pre-event trace str_i = "ft" + ind[i] + "_pe" tr[str_i] = tr_i.get_cached("noise_spectrum")["spec"] # Frequ of the Fourier spectrum (pre-event trace) str_i = "ft" + ind[i] + "_freq_pe" tr[str_i] = tr_i.get_cached("noise_spectrum")["freq"] # Smoothed Fourier spectrum of the pre-event trace str_i = "smooth_ft" + ind[i] + "_pe" noise_spec = tr_i.get_cached("smooth_noise_spectrum")["spec"] noise_spec = np.where(np.isnan(noise_spec), 0.0, noise_spec) tr[str_i] = noise_spec # SNR str_i = "snr" + ind[i] tr[str_i] = tr_i.get_cached("snr")["snr"] # SNR freq str_i = "snr" + ind[i] + "_freq" tr[str_i] = tr_i.get_cached("snr")["freq"] i = i + 1 # P-wave arrival time split_prov = st[0].get_parameter("signal_split") if isinstance(split_prov, list): split_prov = split_prov[0] split_time = split_prov["split_time"] start_time = st[0].stats.starttime p_pick = split_time - start_time # Get the delta t delta_t = st[0].stats.delta # Compute the QM qm = get_classification_metrics(tr, p_pick, delta_t) return qm