Source code for gmprocess.waveform_processing.sanity_checks
"""This module is for waveform procesisng simple sanity checks."""
import numpy as np
from gmprocess.waveform_processing.integrate import get_vel
from gmprocess.waveform_processing.integrate import get_disp
from gmprocess.utils.config import get_config
from gmprocess.waveform_processing.processing_step import processing_step
[docs]
@processing_step
def check_tail(st, duration=5.0, max_vel_ratio=0.3, max_dis_ratio=0.9, config=None):
"""Check for abnormally large values in the tail of the stream.
This QA check looks for the presence of abnormally large values in the tail
velocity and displacement traces. This can occur due to baseline shifts or
long period noise that has not been properly filtered out that manifest
as long-period drifts in the velocity and/or displacement traces.
Note that an additional problem that this check should eliminate is records
in which the time window has not captured the full duration of shaking.
Args:
st (StationStream):
StationStream object.
duration (float):
Duration of tail.
max_vel_ratio (float):
Trace is labeled as failed if the max absolute velocity in the tail
is greater than max_vel_ratio times the max absolute velocity of
the whole trace.
max_dis_ratio (float):
Trace is labeled as failed if the max absolute displacement in the
tail is greater than max_disp_ratio times the max absolute
displacement of the whole trace.
config (dict):
Configuration dictionary (or None). See get_config().
Returns:
StationStream: With QA checks applied.
"""
if not st.passed:
return st
start_time = st[0].stats.starttime
end_time = st[0].stats.endtime
new_start_time = end_time - duration
if new_start_time < start_time:
for tr in st:
if tr.passed:
tr.fail(
"Cannot apply tail check because tail duration exceeds record "
"duration."
)
return st
if config is None:
config = get_config()
vel = st.copy()
dis = st.copy()
for tr in vel:
tr = get_vel(tr, config=config)
for tr in dis:
tr = get_disp(tr, config=config)
vel_trim = vel.copy()
dis_trim = dis.copy()
for tr in vel_trim:
tr.trim(starttime=new_start_time)
for tr in dis_trim:
tr.trim(starttime=new_start_time)
for i, tr in enumerate(st):
abs_max_vel = np.max(np.abs(vel[i].data))
abs_max_vel_trim = np.max(np.abs(vel_trim[i].data))
abs_max_dis = np.max(np.abs(dis[i].data))
abs_max_dis_trim = np.max(np.abs(dis_trim[i].data))
vel_ratio = abs_max_vel_trim / abs_max_vel
dis_ratio = abs_max_dis_trim / abs_max_dis
if vel_ratio > max_vel_ratio:
tr.fail(f"Velocity ratio is greater than {max_vel_ratio}")
if dis_ratio > max_dis_ratio:
tr.fail(f"Displacement ratio is greater than {max_dis_ratio}")
tail_conf = {
"max_vel_ratio": max_vel_ratio,
"max_dis_ratio": max_dis_ratio,
"start_time": new_start_time,
}
tr.set_parameter("tail_conf", tail_conf)
return st