import copy
import glob
import os
import pickle
import random as rnd
import subprocess
import tempfile
import warnings
from shutil import copyfile, which
from urllib.request import urlretrieve
import numpy as np
import scipy as sp
from matplotlib import lines as mlines
from matplotlib import pyplot as plt
from obspy import Stream, Trace, UTCDateTime, read
from obspy.core import AttribDict
from scipy.signal.windows import triang
# Base URL for Syngine queries
SYNGINE_BASE_URL = 'https://service.iris.edu/irisws/syngine/1/query?'
# [s] Sampling interval for Green's functions downloaded from Syngine
SYNGINE_DT = 0.25
# [s] Sampling interval for the triangular source time function given to Syngine
TRIANGLE_STF_DT = 0.5
# A nice constant start time for Syngine and CPS GFs
GF_STARTTIME = UTCDateTime(1900, 1, 1)
# Convert m/s to μm/s
UMS_PER_MS = 1e6
# Colors for force components
Z_COLOR = 'blue'
N_COLOR = 'red'
E_COLOR = 'green'
# Transparency level for jackknife lines and patches
JK_ALPHA = 0.2
# Constants controlling behavior of `plot_fits()`
AX_WIDTH = 2 # [in] Waveform subplot panel width
AX_HEIGHT = 0.5 # [in] Waveform subplot panel height
PAD = 0.1 # [in] Internal padding around waveform subplot panels
MARGINS = (0.75, PAD, 0.2, PAD) # [in] Outer [left, right, top, bottom] overall margins
COMPONENT_ORDER = ('Z', 'R', 'T') # Order from left to right of component columns
TEXT_PAD = PAD # [in] Padding for station info/metrics labels on left of waveforms
FONT_SIZE = 8 # Main plot font size
PRECISION = 0 # Number of decimal places for station metrics
[docs]
class LSForce:
r"""Class for performing force inversions.
Attributes:
gf_dir (str): Directory containing Green's functions
gf_computed (bool): Whether or not Green's functions have been computed for this
object
filtered_gf_st (:class:`~obspy.core.stream.Stream`): Stream containing filtered
Green's functions
inversion_complete (bool): Whether or not the inversion has been run
filter (dict): Dictionary with keys ``'freqmin'``, ``'freqmax'``,
``'zerophase'``, ``'periodmin'``, ``'periodmax'``, and ``'order'``
specifying filter parameters
data_length (int): Length in samples of each data trace
force_sampling_rate (int or float): [Hz] The sampling rate of the force-time
function
W (2D array): Weight matrix
Wvec (1D array): Weight vector
jackknife (:class:`~obspy.core.util.attribdict.AttribDict`): Dictionary with
keys ``'Z'``, ``'N'``, ``'E'``, ``'VR_all'``, ``'alphas'``, ``'num_iter'``,
and ``'frac_delete'`` containing jackknife parameters and results
angle_magnitude (:class:`~obspy.core.util.attribdict.AttribDict`): Dictionary
with keys ``'magnitude'``, ``'magnitude_upper'``, ``'magnitude_lower'``,
``'vertical_angle'``, and ``'horizontal_angle'`` containing inversion angle
and magnitude information
G (2D array): Design matrix
d (1D array): Data vector
model: Model vector of concatenated components (n x 1) of solution
Z: [N] Vertical force time series extracted from model (positive up)
N: [N] North force time series extracted from model (positive north)
E: [N] East force time series extracted from model (positive east)
tvec: [s] Time vector for forces, referenced using `zero_time` (if specified)
VR: [%] Variance reduction. Rule of thumb: This should be ~50–80%, if ~100%,
solution is fitting data exactly and results are suspect. If ~5%, model may
be wrong or something else may be wrong with setup
dtorig: Original data vector
dtnew: Modeled data vector (Gm-d)
alpha: Regularization parameter that was used
alphafit (dict): Dictionary with keys ``'alphas'``, ``'fit'``, and ``'size'``
specifying regularization parameters tested
"""
def __init__(self, data, data_sampling_rate, main_folder=None, method='full'):
r"""Create an LSForce object.
Args:
data (:class:`~lsforce.lsdata.LSData`): LSData object, corrected for station
response but not filtered
data_sampling_rate (int or float): [Hz] Samples per second to use in
inversion. All data will be resampled to this rate, and Green's
functions will be created with this rate
main_folder (str): If `None`, will use current folder
method (str): How to parameterize the force-time function. One of `'full'`
— full inversion using Tikhonov regularization (L2 norm minimization) or
`'triangle'` — inversion parameterized using overlapping triangles,
variation on method of Ekström & Stark (2013)
"""
self.data = data
self.data_sampling_rate = data_sampling_rate
self.gf_computed = False
self.inversion_complete = False
if not main_folder:
self.main_folder = os.getcwd()
else:
self.main_folder = main_folder
if method not in ['full', 'triangle']:
raise ValueError(f'Method {method} not yet implemented.')
self.method = method
def _get_greens(self):
r"""Get the Green's function Stream using either Syngine or CPS."""
# Name the directory where GFs will be stored based on method
if self.cps_model:
gf_dir_name = f'cps_{os.path.basename(self.cps_model).split(".")[-2]}'
else:
gf_dir_name = f'syngine_{self.syngine_model}'
self.gf_dir = os.path.join(self.main_folder, gf_dir_name)
# Label directories containing triangular GFs as such
if self.method == 'triangle':
self.gf_dir += f'_triangle_{self.triangle_half_width:g}s'
# Make GF directory if it doesn't exist
if not os.path.exists(self.gf_dir):
os.mkdir(self.gf_dir)
# Choose the correct delta
if self.cps_model:
gf_dt = 1 / self.data_sampling_rate
else:
gf_dt = SYNGINE_DT
# Get list of unique stations
unique_stations = np.unique([tr.stats.station for tr in self.data.st_proc])
# Make lists of stations with and without GFs calculated/downloaded
existing_stations = []
stations_to_calculate = []
for station in unique_stations:
distance = self.data.st_proc.select(station=station)[0].stats.distance
filename = os.path.join(self.gf_dir, f'{station}.pkl')
# Check if this EXACT GF exists already
if os.path.exists(filename):
stats = read(filename)[0].stats
if (
stats.syngine_model == self.syngine_model
and stats.cps_model == self.cps_model
and stats.triangle_half_width == self.triangle_half_width
and stats.sourcedepthinmeters == self.source_depth
and stats.distance == distance
and stats.T0 == self.T0
and stats.duration == self.gf_duration
and stats.delta == gf_dt
):
gf_exists = True
else:
gf_exists = False
else:
gf_exists = False
# Append to the correct vector
if gf_exists:
existing_stations.append(station)
else:
stations_to_calculate.append(station)
# Initialize empty Stream to hold all GFs
st_gf = Stream()
# CPS
if self.cps_model:
# If we have to calculate some stations, go through the process
if stations_to_calculate:
# Print status
print(
f'Calculating Green\'s functions for {len(stations_to_calculate)} '
'station(s):'
)
for station in stations_to_calculate:
print(f'\t{station}')
# Create a temporary directory to run CPS in, and change into it
cwd = os.getcwd()
temp_dir = tempfile.TemporaryDirectory()
os.chdir(temp_dir.name)
# Copy the model file over to temporary directory to avoid model paths too long for CPS
modelfilename = os.path.basename(
self.cps_model
) # get just model filename without path
copyfile(self.cps_model, os.path.join(temp_dir.name, modelfilename))
# Write the "dist" file
gf_length_samples = int(self.gf_duration * self.data_sampling_rate)
with open('dist', 'w') as f:
for sta in stations_to_calculate:
dist = self.data.st_proc.select(station=sta)[0].stats.distance
f.write(f'{dist} {gf_dt} {gf_length_samples} {self.T0} 0\n')
# # move copy of dist to main folder for debugging
# copyfile(os.path.join(temp_dir.name, 'dist'), os.path.join(self.main_folder, 'dist'))
# Run hprep96 and hspec96
cpscall = [
'hprep96',
'-HR',
'0.',
'-HS',
str(self.source_depth / 1000), # Converting to km for CPS
'-M',
modelfilename,
'-d',
'dist',
'-R',
'-EXF',
]
# print('Running: %s' % subprocess.list2cmdline(cpscall))
subprocess.call(cpscall)
with open('hspec96.out', 'w') as f:
subprocess.call('hspec96', stdout=f)
# Run hpulse96 (using the multiplier here to get a 1 N impulse), also
# keep track of pulse half-width so we can make the GFs acausal later
args = ['hpulse96', '-d', 'dist', '-m', f'{1e-10:.10f}', '-OD', '-V']
if self.triangle_half_width is not None:
args += ['-t', '-l', str(int(self.triangle_half_width / gf_dt))]
pulse_half_width = self.triangle_half_width # [s]
else:
args += ['-p']
pulse_half_width = 2 * gf_dt # [s]
with open('Green', 'w') as f:
subprocess.call(args, stdout=f)
# Convert to SAC files
subprocess.call(['f96tosac', '-B', 'Green'])
# Go through and read in files (same order as dist file)
for i, station in enumerate(stations_to_calculate):
for file in glob.glob(f'B{i + 1:03d}1???F.sac'):
# Grab stats of data trace
stats = self.data.st_proc.select(station=station)[0].stats
# Read GF in as an ObsPy Trace
gf_tr = read(file)[0]
# Add metadata
gf_tr.stats.network = stats.network
gf_tr.stats.station = station
gf_tr.stats.location = 'SE'
gf_tr.stats.distance = stats.distance
gf_tr.stats.syngine_model = self.syngine_model
gf_tr.stats.cps_model = self.cps_model
gf_tr.stats.sourcedepthinmeters = self.source_depth
gf_tr.stats.T0 = self.T0
gf_tr.stats.duration = self.gf_duration
gf_tr.stats.triangle_half_width = self.triangle_half_width
gf_tr.data *= 0.01 # Convert from cm to m
# Add Trace to overall GF Stream
st_gf += gf_tr
# Clean up
temp_dir.cleanup()
os.chdir(cwd)
# Trim to length (gf_duration - T0) seconds, and correct for the pulse
# half-width to make GFs acausal (since the -Z flag doesn't work!)
starttime = st_gf[0].stats.starttime
endtime = starttime + self.gf_duration - self.T0
st_gf.trim(starttime + pulse_half_width, endtime + pulse_half_width)
# Give a nice start time
for tr in st_gf:
tr.stats.starttime = GF_STARTTIME
# Save as individual files
for station in stations_to_calculate:
filename = os.path.join(self.gf_dir, f'{station}.pkl')
st_gf.select(station=station).write(filename, format='PICKLE')
# Now just load in the GFs which already exist
for i, station in enumerate(existing_stations):
filename = os.path.join(self.gf_dir, f'{station}.pkl')
st_gf += read(filename)
# Print status
progress = len(stations_to_calculate) + i + 1
print(f'Found {station} ({progress}/{len(unique_stations)})')
# Syngine
else:
# Go station-by-station
for i, station in enumerate(unique_stations):
filename = os.path.join(self.gf_dir, f'{station}.pkl')
# Either load this GF if it already exists, or download it
if station in existing_stations:
st_syn = read(filename)
else:
# Grab stats of data trace
stats = self.data.st_proc.select(station=station)[0].stats
# Get GFs for this station
st_syn = self._get_greens_for_station(
receiverlatitude=stats.latitude,
receiverlongitude=stats.longitude,
back_azimuth=stats.back_azimuth,
distance=stats.distance,
)
# Add metadata, as Syngine uses 'XX' and 'SYN' by default
for tr in st_syn:
tr.stats.network = stats.network
tr.stats.station = stats.station
# TODO: Understand why we have to flip the polarity of these!
for channel in 'RHF', 'RVF', 'THF':
for tr in st_syn.select(channel=channel):
tr.data *= -1
st_syn.write(filename, format='PICKLE')
# Add this station's GFs to overall Stream
st_gf += st_syn
# Print status
if station in existing_stations:
action_string = 'Found'
else:
action_string = 'Downloaded'
print(f'{action_string} {station} ({i + 1}/{len(unique_stations)})')
self.gf_computed = True
return st_gf
def _get_greens_for_station(
self, receiverlatitude, receiverlongitude, back_azimuth, distance
):
r"""Get the Green's functions for a single station (Syngine)."""
# Provide triangle STF params if we're using the triangle method
if self.method == 'triangle':
stf_offset = self.triangle_half_width # Ensure peak of triangle at t=0
stf_spacing = TRIANGLE_STF_DT
# The below construction ensures the triangle is centered on 1 and goes to 0
# at each end, e.g. [0, 0.5, 1, 0.5, 0] instead of [0.25, 0.75, 0.75, 0.25]
stf_data = np.hstack(
[
0,
triang((int(self.triangle_half_width / TRIANGLE_STF_DT) * 2) - 1),
0,
]
)
read_func = _read # Use the long-URL wrapper for ObsPy read
else:
stf_offset = None
stf_spacing = None
stf_data = None
read_func = read # Just directly read using ObsPy
# Convert to radians for NumPy
back_azimuth_radians = np.deg2rad(back_azimuth)
# Vertical force (downward)
st_vf = read_func(
self._build_syngine_url(
receiverlatitude=receiverlatitude,
receiverlongitude=receiverlongitude,
components='ZR',
forces=(-1, 0, 0),
stf_offset=stf_offset,
stf_spacing=stf_spacing,
stf_data=stf_data,
)
)
for tr in st_vf.select(component='Z'):
tr.stats.channel = 'ZVF'
for tr in st_vf.select(component='R'):
tr.stats.channel = 'RVF'
# Horizontal force (radial)
st_hf_r = read_func(
self._build_syngine_url(
receiverlatitude=receiverlatitude,
receiverlongitude=receiverlongitude,
components='ZR',
forces=(0, np.cos(back_azimuth_radians), -np.sin(back_azimuth_radians)),
stf_offset=stf_offset,
stf_spacing=stf_spacing,
stf_data=stf_data,
)
)
for tr in st_hf_r.select(component='Z'):
tr.stats.channel = 'ZHF'
for tr in st_hf_r.select(component='R'):
tr.stats.channel = 'RHF'
# Horizontal force (transverse)
st_hf_t = read_func(
self._build_syngine_url(
receiverlatitude=receiverlatitude,
receiverlongitude=receiverlongitude,
components='T',
forces=(
0,
-np.sin(back_azimuth_radians),
-np.cos(back_azimuth_radians),
),
stf_offset=stf_offset,
stf_spacing=stf_spacing,
stf_data=stf_data,
)
)
for tr in st_hf_t.select(component='T'):
tr.stats.channel = 'THF'
# Assemble big Stream
st_syn = st_vf + st_hf_r + st_hf_t
# Add metadata and sort
for tr in st_syn:
tr.stats.distance = distance
tr.stats.cps_model = self.cps_model
tr.stats.syngine_model = self.syngine_model
tr.stats.sourcedepthinmeters = self.source_depth
tr.stats.T0 = self.T0
tr.stats.duration = self.gf_duration
tr.stats.triangle_half_width = self.triangle_half_width
tr.stats.starttime = GF_STARTTIME # Give a nice start time
st_syn.sort(keys=['channel'])
return st_syn
def _build_syngine_url(
self,
receiverlatitude,
receiverlongitude,
components,
forces,
stf_offset=None,
stf_spacing=None,
stf_data=None,
):
r"""Build a URL to be fed to Syngine."""
parameters = [
'format=miniseed',
'components=' + components,
'units=displacement',
'model=' + self.syngine_model,
'dt=' + str(SYNGINE_DT),
'starttime=' + str(self.T0),
'endtime=' + str(self.gf_duration - self.T0),
'receiverlatitude=' + str(receiverlatitude),
'receiverlongitude=' + str(receiverlongitude),
'sourcelatitude=' + str(self.data.source_lat),
'sourcelongitude=' + str(self.data.source_lon),
'sourcedepthinmeters=' + str(self.source_depth),
'sourceforce=' + ','.join([str(force) for force in forces]),
'nodata=404',
]
if stf_offset is not None and stf_spacing is not None and stf_data is not None:
parameters += [
'cstf-relative-origin-time-in-sec=' + str(stf_offset),
'cstf-sample-spacing-in-sec=' + str(stf_spacing),
'cstf-data=' + ','.join([str(sample) for sample in stf_data]),
]
elif stf_offset is not None or stf_spacing is not None or stf_data is not None:
raise ValueError('All three CSTF parameters must be provided!')
url = SYNGINE_BASE_URL + '&'.join(parameters)
return url
[docs]
def setup(
self,
period_range,
syngine_model=None,
cps_model=None,
triangle_half_width=None,
source_depth=0,
weights=None,
noise_window_dur=None,
filter_order=2,
zerophase=True,
skip_datafilter=False,
):
r"""Downloads/computes Green's functions (GFs) and creates all matrices.
Args:
period_range (list or tuple): [s] Bandpass filter corners
syngine_model (str): Name of Syngine model to use. If this is not None, then
we calculate GFs using Syngine (preferred)
cps_model (str): Filename of CPS model to use. If this is not None, then we
calculate GFs using CPS
triangle_half_width (int or float): [s] Half-width of triangles; only used
if the triangle method is being used
source_depth (int or float): [m] Source depth in meters
weights (list or tuple or str): If `None`, no weighting is applied. An array
of floats with length ``st_proc.count()`` (and in the order of the ``st_proc``
attribute of the :class:`~lsforce.lsdata.LSData` object) applies manual
weighting. If `'prenoise'`, uses standard deviation of a noise window
defined by `noise_window_dur` to weight. If `'distance'`, weights by 1 /
distance
noise_window_dur (int or float): [s] Length of noise window for `'prenoise'`
weighting scheme (if not `None`, `weights` is set to `'prenoise'`)
filter_order (int): Order of filter applied over period_range
zerophase (bool): If `True`, zero-phase filtering will be used
skip_datafilter (bool): If `True`, filtering will not be applied to
the input data and will only be applied to the Green's functions.
This should be chosen only if the data were pre-filtered
manually already with the same band as `period_range` so the
user doesn't want to filter them again
"""
self.syngine_model = syngine_model
self.source_depth = source_depth
# Check if input data were pre-filtered and suggest skip_datafilter if not set to True
if self.data._is_pre_filt and not skip_datafilter:
warnings.warn(
'Caution: pre-filtering appears to have been applied'
' to input data. Setting `skip_datafilter=True` is'
' recommended if you want to avoid double filtering'
)
# Explicitly ignore the triangle half-width parameter if it's not relevant
if self.method != 'triangle' and triangle_half_width is not None:
triangle_half_width = None
print(
'Ignoring `triangle_half_width` parameter since you\'re not using the '
'triangle method.'
)
# Make sure user specifies the triangle half-width if they want that method
if self.method == 'triangle' and triangle_half_width is None:
raise ValueError('triangle method is specified but no half-width given!')
self.triangle_half_width = triangle_half_width
# If user wants CPS to be run, make sure that 1) they have it installed; 2) it
# runs; and 3) they have provided a valid filepath
if cps_model:
# 1) Is CPS installed?
if not which('hprep96'):
raise OSError(
'CPS Green\'s function calculation requested, but CPS not found on '
'system. Install CPS and try again.'
)
# 2) Does CPS run?
if subprocess.call('hprep96', stderr=subprocess.DEVNULL) != 0:
raise OSError('Issue with CPS. Check install and try again.')
# 3) Is `cps_model` a file?
if not os.path.exists(cps_model):
raise OSError(f'Could not find CPS model file "{cps_model}"')
else:
cps_model = os.path.abspath(cps_model) # Get full path
self.cps_model = cps_model
# The user must specify ONE of `syngine_model` and `cps_model`
if (self.syngine_model and self.cps_model) or (
not self.syngine_model and not self.cps_model
):
raise ValueError('You must specify ONE of syngine_model or cps_model!')
# Automatically choose an appropriate T0 and GF duration based on data/method
if self.method == 'triangle':
self.T0 = -2 * self.triangle_half_width # [s] Double the half-width
else:
self.T0 = -10 # [s]
min_time = np.min([tr.stats.starttime for tr in self.data.st_proc])
max_time = np.max([tr.stats.endtime for tr in self.data.st_proc])
self.gf_duration = max_time - min_time # [s]
# Create filter dictionary to keep track of filter used without creating too
# many new attributes
self.filter = {
'freqmin': 1.0 / period_range[1],
'freqmax': 1.0 / period_range[0],
'zerophase': zerophase,
'periodmin': period_range[0],
'periodmax': period_range[1],
'order': filter_order,
}
# Clear weights
self.Wvec = None
self.W = None
if weights is None:
# Don't weight at all
weight_method = None
elif isinstance(weights, str):
# The user specified a weight method
weight_method = weights
else:
# The user specified a vector of station weights
weight_method = 'Manual'
self.weights = weights
if weight_method != 'Manual':
if weights == 'prenoise' and noise_window_dur is None:
raise ValueError(
'noise_window_dur must be defined if prenoise weighting is used.'
)
# Check if sampling rate specified is compatible with period_range
if 2.0 * self.filter['freqmax'] > self.data_sampling_rate:
raise ValueError(
'data_sampling_rate and period_range are not compatible (violates '
'Nyquist).'
)
# Always work on copy of data
st = self.data.st_proc.copy()
# Filter data to band specified
if not skip_datafilter:
st.filter(
'bandpass',
freqmin=self.filter['freqmin'],
freqmax=self.filter['freqmax'],
corners=self.filter['order'],
zerophase=self.filter['zerophase'],
)
# Resample st to data_sampling_rate
st.resample(self.data_sampling_rate, window='hann')
# Make sure st data are all the same length
lens = [len(trace.data) for trace in st]
if len(set(lens)) != 1:
print(
'Resampled records are of differing lengths. Interpolating all records '
'to same start time and sampling rate.'
)
stts = [tr.stats.starttime for tr in st]
lens = [tr.stats.npts for tr in st]
st.interpolate(
self.data_sampling_rate, starttime=np.max(stts), npts=np.min(lens) - 1
)
self.data_length = st[0].stats.npts
total_data_length = self.data_length * st.count()
# Load in GFs
print('Getting Green\'s functions...')
st_gf = self._get_greens()
# Process GFs in bulk
st_gf.detrend()
st_gf.taper(max_percentage=0.05)
st_gf.filter(
'bandpass',
freqmin=self.filter['freqmin'],
freqmax=self.filter['freqmax'],
corners=self.filter['order'],
zerophase=self.filter['zerophase'],
)
if self.syngine_model: # Only need to do this if Syngine
st_gf.interpolate(
sampling_rate=self.data_sampling_rate, method='lanczos', a=20
)
st_gf.sort(keys=['channel'])
# Store the filtered GFs
self.filtered_gf_st = st_gf
# Initialize weighting matrices
Wvec = np.ones(total_data_length)
indx = 0
weight = np.ones(self.data.st_proc.count())
# Store data length
n = self.data_length
if self.method == 'full':
# Set sampling rate
self.force_sampling_rate = self.data_sampling_rate
elif self.method == 'triangle':
self.force_sampling_rate = 1.0 / self.triangle_half_width
# Number of samples to shift each triangle by
fshiftby = int(self.triangle_half_width * self.data_sampling_rate)
# Number of shifts, corresponds to length of force time function
Flen = int(np.floor(self.data_length / fshiftby))
# Triangle GFs are multiplied by the triangle half-width so that they
# reflect the ground motion induced for a triangle with PEAK 1 N instead of
# AREA of 1 N*s
for tr in st_gf:
tr.data = tr.data * self.triangle_half_width
else:
raise ValueError(f'Method {self.method} not supported.')
for i, tr in enumerate(st):
# Find component and station of Trace
component = tr.stats.channel[-1]
station = tr.stats.station
if component == 'Z':
zvf = st_gf.select(station=station, channel='ZVF')[0]
zhf = st_gf.select(station=station, channel='ZHF')[0]
if self.method == 'full':
ZVF = _makeconvmat(zvf.data, size=(n, n))
ZHF = _makeconvmat(zhf.data, size=(n, n))
else:
ZVF = _makeshiftmat(zvf.data, shiftby=fshiftby, size1=(n, Flen))
ZHF = _makeshiftmat(zhf.data, shiftby=fshiftby, size1=(n, Flen))
az_radians = np.deg2rad(tr.stats.azimuth)
newline = np.hstack(
[ZVF, ZHF * np.cos(az_radians), ZHF * np.sin(az_radians)]
)
elif component == 'R':
rvf = st_gf.select(station=station, channel='RVF')[0]
rhf = st_gf.select(station=station, channel='RHF')[0]
if self.method == 'full':
RVF = _makeconvmat(rvf.data, size=(n, n))
RHF = _makeconvmat(rhf.data, size=(n, n))
else:
RVF = _makeshiftmat(rvf.data, shiftby=fshiftby, size1=(n, Flen))
RHF = _makeshiftmat(rhf.data, shiftby=fshiftby, size1=(n, Flen))
az_radians = np.deg2rad(tr.stats.azimuth)
newline = np.hstack(
[RVF, RHF * np.cos(az_radians), RHF * np.sin(az_radians)]
)
elif component == 'T':
thf = st_gf.select(station=station, channel='THF')[0]
if self.method == 'full':
THF = _makeconvmat(thf.data, size=(n, n))
else:
THF = _makeshiftmat(thf.data, shiftby=fshiftby, size1=(n, Flen))
TVF = 0.0 * THF.copy() # Just zeros for TVF
az_radians = np.deg2rad(tr.stats.azimuth)
newline = np.hstack(
[TVF, THF * np.sin(az_radians), -THF * np.cos(az_radians)]
)
else:
raise ValueError(f'Data not rotated to ZRT for {station}.')
# Deal with data
datline = tr.data
if i == 0: # If this is the first station, initialize G and d
G = newline.copy()
d = datline.copy()
else: # Otherwise build on G and d
G = np.vstack((G, newline.copy()))
d = np.hstack((d, datline.copy()))
if weights is not None:
if weight_method == 'Manual':
weight[i] = weights[i]
elif weights == 'prenoise':
weight[i] = 1.0 / np.std(
tr.data[0 : int(noise_window_dur * tr.stats.sampling_rate)]
)
elif weights == 'distance':
weight[i] = tr.stats.distance
Wvec[indx : indx + self.data_length] = (
Wvec[indx : indx + self.data_length] * weight[i]
)
indx += self.data_length
if self.method == 'full':
# Need to multiply G by sample interval [s] since convolution is an integral
self.G = G * 1.0 / self.data_sampling_rate
else:
# We don't need to scale the triangle method GFs by the sample rate since
# this method is not a convolution
self.G = G
# Normalize Wvec so largest weight is 1.
self.Wvec = Wvec / np.max(np.abs(Wvec))
self.weights = weight / np.max(np.abs(weight))
if np.shape(G)[0] != len(d):
raise ValueError(
'G and d sizes are not compatible, fix something somewhere.'
)
self.d = d
if weights is not None:
self.W = np.diag(self.Wvec)
else:
self.W = None
[docs]
def invert(
self,
zero_time=None,
impose_zero_start=False,
add_to_zero=False,
duration=None,
jackknife=False,
num_iter=200,
frac_delete=0.5,
alpha=None,
zero_scaler=2.0,
zero_start_taper_length=0,
tikhonov_ratios=(1.0, 0.0, 0.0),
jk_refine_alpha=False,
save_matrices=False,
):
r"""Performs single-force inversion using Tikhonov regularization.
Args:
zero_time (int or float): [s] Optional estimated start time of real
(landslide-related) part of signal, in seconds from start time of
seismic data. Useful for making figures showing selected start time and
also for the `impose_zero_start` option
impose_zero_start (bool): Adds weighting matrix to suggest that forces tend
towards zero prior to `zero_time` (`zero_time` must be defined)
add_to_zero (bool): Adds weighting matrix to suggest that all components of
force integrate to zero
duration (int or float): Maximum duration allowed for the event, starting at
`zero_time` if defined, otherwise starting from the beginning of the
seismic data. Forces after this will tend towards zero. This helps tamp
down artifacts due to edge effects, etc.
jackknife (bool): If `True`, perform `num_iter` additional iterations of the
model while randomly discarding `frac_delete` of the data
num_iter (int): Number of jackknife iterations to perform
frac_delete (int or float): Fraction (out of 1) of data to discard for each
iteration, if frac_delete=1, will do leave a one out error analysis
alpha (int or float): Set regularization parameter. If `None`, will search
for best alpha using the L-curve method
zero_scaler (int or float): Relative strength of zero constraint for
`impose_zero_start` and `duration` options. Ranges from 0 to 10. The
lower the number, the weaker the constraint. Values up to 30 are
technically allowed but discouraged because high `zero_scaler` values
risk the addition of high frequency oscillations due to the sudden
release of the constraint
zero_start_taper_length (int or float): [s] Length of taper for
`impose_zero_start` option
tikhonov_ratios (list or tuple): Proportion each regularization method
contributes to the overall regularization effect, where values
correspond to [0th order, 1st order, 2nd order]. Must sum to 1
jk_refine_alpha (bool): Refine the alpha parameter used for each jackknife
iteration by searching over order of magnitude around the best alpha
for the full solution. If `False`, each jackknife iteration will use the
same alpha as the main solution (note that this is much faster but can
result in some jackknife iterations having depressed amplitudes)
save_matrices (bool): If True, will save the inverted matrices as
part of the object (Ghat, dhat, I, L1, L2) in case user wants
to do additional alpha searching
"""
# Check inputs
if impose_zero_start and not zero_time:
raise ValueError('impose_zero_start set to True but no zero_time provided.')
if zero_scaler < 0.0 or zero_scaler > 30.0:
raise ValueError('zero_scaler cannot be less than 0 or more than 30')
if np.sum(tikhonov_ratios) != 1.0:
raise ValueError('Tikhonov ratios must add to 1.')
# Save input choices
self.add_to_zero = add_to_zero
self.zero_time = zero_time
self.impose_zero_start = impose_zero_start
self.duration = duration
# Initialize (also serves to clear any previous results if this is a rerun)
self.model = None
self.Z = None
self.N = None
self.E = None
self.tvec = None
self.VR = None
self.dtorig = None
self.dtnew = None
self.alpha = None
self.alphafit = {'alphas': None, 'fit': None, 'size': None}
self.angle_magnitude = None
if jackknife:
if frac_delete == 1.0:
# If frac_delete is one, do leave one out analysis instead
num_iter = self.data.st_proc.count()
# Initialize
self.jackknife = AttribDict(
Z=AttribDict(lower=[], upper=[], all=[]),
N=AttribDict(lower=[], upper=[], all=[]),
E=AttribDict(lower=[], upper=[], all=[]),
VR_all=[],
num_iter=num_iter,
frac_delete=frac_delete,
)
else:
self.jackknife = None
if self.W is not None:
Ghat = self.W.dot(self.G)
dhat = self.W.dot(self.d)
else:
Ghat = self.G
dhat = self.d
if self.jackknife is not None: # Save version at this point for use later
Ghatori = Ghat.copy()
dhatori = dhat.copy()
m, n = np.shape(Ghat)
Ghatnorm = np.linalg.norm(Ghat)
dl = self.data_length
gl = int(n / 3) # Force vector length
if self.add_to_zero is True: # Constrain forces to add to zero
scaler = Ghatnorm
first1 = np.hstack((np.ones(gl), np.zeros(2 * gl)))
second1 = np.hstack((np.zeros(gl), np.ones(gl), np.zeros(gl)))
third1 = np.hstack((np.zeros(2 * gl), np.ones(gl)))
A1 = np.vstack((first1, second1, third1)) * scaler
Ghat = np.vstack((Ghat, A1))
dhat = np.hstack((dhat, np.zeros(3)))
else:
A1 = None
scaler = Ghatnorm * (zero_scaler / 30.0)
if self.impose_zero_start: # Tell model when there should be no forces
len2 = int(((self.zero_time + self.T0) * self.force_sampling_rate))
if self.method == 'triangle':
# No taper
vals2 = np.hstack((np.ones(len2), np.zeros(gl - len2)))
elif self.method == 'full':
# Taper
len3 = int(zero_start_taper_length * self.force_sampling_rate)
temp = np.hanning(2 * len3)
temp = temp[len3:]
vals2 = np.hstack((np.ones(len2 - len3), temp))
else:
raise NotImplementedError
A2 = None
for i, val in enumerate(vals2):
first1 = np.zeros(3 * gl)
second1 = first1.copy()
third1 = first1.copy()
if val > 0: # Only add a line to A2 if val is nonzero
first1[i] = val
second1[i + gl] = val
third1[i + 2 * gl] = val
if A2 is None: # initialize A2 on first iteration
A2 = np.vstack((first1, second1, third1))
else:
A2 = np.vstack((A2, first1, second1, third1))
A2 *= scaler
Ghat = np.vstack((Ghat, A2))
dhat = np.hstack((dhat, np.zeros(np.shape(A2)[0])))
else:
A2 = None
if self.duration is not None:
if self.zero_time is None:
zerotime = 0.0
else:
zerotime = self.zero_time
startind = int(
(zerotime + self.T0 + self.duration) * self.force_sampling_rate
)
A3 = None
if self.method == 'triangle':
vals3 = np.zeros(gl)
vals3[startind:] = 1.0 # No taper
for i, val in enumerate(vals3):
first1 = np.zeros(3 * gl)
second1 = first1.copy()
third1 = first1.copy()
if val > 0: # only add rows to A3 if will be nonzero
first1[i] = val
second1[i + gl] = val
third1[i + 2 * gl] = val
if A3 is None: # initialize A3
A3 = np.vstack((first1, second1, third1))
else:
A3 = np.vstack((A3, first1, second1, third1))
if self.method == 'full':
len2 = int(gl - startind)
len3 = int(
np.round(0.2 * len2)
) # 20% taper so zero imposition isn't sudden
temp = np.hanning(2 * len3)
temp = temp[:len3]
vals3 = np.hstack((temp, np.ones(len2 - len3)))
for i, val in enumerate(vals3):
place = i + startind
first1 = np.zeros(3 * gl)
second1 = first1.copy()
third1 = first1.copy()
if val > 0: # only add rows to A3 if will be nonzero
first1[place] = val
second1[place + gl] = val
third1[place + 2 * gl] = val
if A3 is None: # initialize A3
A3 = np.vstack((first1, second1, third1))
else:
A3 = np.vstack((A3, first1, second1, third1))
A3 *= scaler
Ghat = np.vstack((Ghat, A3))
dhat = np.hstack((dhat, np.zeros(np.shape(A3)[0])))
else:
A3 = None
dhat = dhat.T
# Build roughening matrix
I = np.eye(n, n)
if tikhonov_ratios[1] != 0.0:
# Build L1 (first order) roughening matrix
L1 = np.diag(-1 * np.ones(n)) + np.diag(np.ones(n - 1), k=1)
L1part = L1.T @ L1
else:
L1part = 0.0
L1 = 0.0
if tikhonov_ratios[2] != 0.0:
# Build L2 (second order) roughening matrix
L2 = (
np.diag(np.ones(n))
+ np.diag(-2 * np.ones(n - 1), k=1)
+ np.diag(np.ones(n - 2), k=2)
)
L2part = L2.T @ L2
else:
L2 = 0.0
L2part = 0.0
if alpha is None:
alpha, fit1, size1, alphas = find_alpha(
Ghat, dhat, I, L1, L2, tikhonov_ratios=tikhonov_ratios
)
print(f'best alpha is {alpha:6.1e}')
self.alphafit['alphas'] = alphas
self.alphafit['fit'] = fit1
self.alphafit['size'] = size1
if save_matrices:
self.matrices = dict(
Ghat=Ghat,
dhat=dhat,
I=I,
L1=L1,
L2=L2,
tikhonov_ratios=tikhonov_ratios,
)
else:
self.matrices = None
self.alpha = alpha
Apart = Ghat.conj().T @ Ghat
A = Apart + alpha**2 * (
tikhonov_ratios[0] * I
+ tikhonov_ratios[1] * L1part
+ tikhonov_ratios[2] * L2part
) # Combo of all regularization things (if any are zero they won't matter)
x = np.squeeze(Ghat.conj().T @ dhat)
model, residuals, rank, s = sp.linalg.lstsq(A, x)
self.model = model.copy()
div = int(len(model) / 3)
# Flip so up is positive
self.Z = -model[0:div]
self.N = model[div : 2 * div]
self.E = model[2 * div :]
dtnew = self.G.dot(model)
self.dtnew = np.reshape(dtnew, (self.data.st_proc.count(), dl))
self.dtorig = np.reshape(self.d, (self.data.st_proc.count(), dl))
# Compute variance reduction
self.VR = _varred(self.dtorig, self.dtnew)
print(f'Variance reduction = {self.VR:f} percent')
tvec = (
np.arange(
0,
len(self.Z) * 1 / self.force_sampling_rate,
1 / self.force_sampling_rate,
)
- self.T0
)
# Adjust time vector for zero time, if one was provided
if self.zero_time is not None:
tvec -= self.zero_time
self.tvec = tvec
self.dtvec = np.arange(
0, self.data_length / self.data_sampling_rate, 1 / self.data_sampling_rate
)
if self.zero_time is not None:
self.dtvec -= self.zero_time
# Use constant alpha parameter (found above, if not previously set) for
# jackknife iterations
stasets = []
alphajs = []
if self.jackknife is not None:
if self.jackknife.frac_delete == 1.0:
print('Starting leave-one-out analysis')
else:
print('Starting jackknife iterations')
for ii in range(self.jackknife.num_iter):
if self.jackknife.frac_delete == 1:
indxcut = [ii]
numkeep = self.jackknife.num_iter - 1
else:
numcut = int(
round(self.jackknife.frac_delete * self.data.st_proc.count())
)
numkeep = self.data.st_proc.count() - numcut
indxcut = rnd.sample(list(range(self.data.st_proc.count())), numcut)
stasets.append(indxcut)
obj = [
sum(ind)
for ind in zip(
np.tile(list(range(self.data_length)), len(indxcut)),
np.repeat(
[x1 * self.data_length for x1 in indxcut], self.data_length
),
)
]
dhat1 = np.delete(dhatori.copy(), obj)
Ghat1 = np.delete(Ghatori.copy(), obj, axis=0)
Gtemp = np.delete(self.G.copy(), obj, axis=0)
dtemp = np.delete(self.d.copy(), obj)
if A1 is not None: # Use A1, A2, A3 from full solution, if exist
Ghat1 = np.vstack((Ghat1, A1))
dhat1 = np.hstack((dhat1, np.zeros(3)))
if A2 is not None:
Ghat1 = np.vstack((Ghat1, A2))
dhat1 = np.hstack((dhat1, np.zeros(len(vals2[vals2 > 0]) * 3)))
if A3 is not None:
Ghat1 = np.vstack((Ghat1, A3))
dhat1 = np.hstack((dhat1, np.zeros(len(vals3[vals3 > 0]) * 3)))
dhat1 = dhat1.T
Apart = Ghat1.conj().T @ Ghat1
if jk_refine_alpha:
# Fine tune the alpha
rndalph = np.log10(self.alpha)
(
alphaj,
*_,
) = find_alpha(
Ghat1,
dhat1,
I,
L1,
L2,
tikhonov_ratios=tikhonov_ratios,
rough=True,
int_rough=0.1,
range_rough=(rndalph - 0.4, rndalph + 0.4),
plot_Lcurve=False,
)
else:
alphaj = self.alpha
alphajs.append(alphaj)
# Combo of all regularization things (if any are zero they won't matter)
Aj = Apart + alphaj**2 * (
tikhonov_ratios[0] * I
+ tikhonov_ratios[1] * L1part
+ tikhonov_ratios[2] * L2part
)
xj = np.squeeze(Ghat1.conj().T @ dhat1)
model, residuals, rank, s = sp.linalg.lstsq(Aj, xj)
div = int(len(model) / 3)
Zf = -model[0:div] # Flip so up is positive
Nf = model[div : 2 * div]
Ef = model[2 * div :]
dtnew = Gtemp.dot(model)
dt = np.reshape(dtemp, (numkeep, dl))
VR = _varred(dt, dtnew)
self.jackknife.Z.all.append(Zf.copy())
self.jackknife.N.all.append(Nf.copy())
self.jackknife.E.all.append(Ef.copy())
self.jackknife.VR_all.append(VR.copy())
# Calculate bounds of jackknife runs (these bounds do not necessarily
# represent a single run and in fact are more likely to be composites of
# several runs)
self.jackknife.Z.lower = np.min(self.jackknife.Z.all, axis=0)
self.jackknife.Z.upper = np.max(self.jackknife.Z.all, axis=0)
self.jackknife.E.lower = np.min(self.jackknife.E.all, axis=0)
self.jackknife.E.upper = np.max(self.jackknife.E.all, axis=0)
self.jackknife.N.lower = np.min(self.jackknife.N.all, axis=0)
self.jackknife.N.upper = np.max(self.jackknife.N.all, axis=0)
self.jackknife.alphas = alphajs
self.jackknife.VR_all = np.array(self.jackknife.VR_all)
print(
f'Jackknife VR stats: max {self.jackknife.VR_all.max():2.0f}, '
f'min {self.jackknife.VR_all.min():2.0f}, '
f'median {np.median(self.jackknife.VR_all):2.0f}'
)
# Calculate angles and magnitudes
Mag = np.linalg.norm(list(zip(self.Z, self.E, self.N)), axis=1)
if self.jackknife is not None:
# Calculate magnitude of each jackknife run
mag_all = [
np.linalg.norm(list(zip(z, e, n)), axis=1)
for z, e, n in zip(
self.jackknife.Z.all, self.jackknife.E.all, self.jackknife.N.all
)
]
# Calculate magnitude bounds
MagL = np.min(mag_all, axis=0)
MagU = np.max(mag_all, axis=0)
else:
MagU = None
MagL = None
Vang = (180 / np.pi) * np.arctan(self.Z / np.sqrt(self.N**2 + self.E**2))
# Get angle counterclockwise relative to N
tempang = (180 / np.pi) * np.arctan2(self.N, self.E) - 90
# For any negative values, add 360
for i, temp in enumerate(tempang):
if temp < 0:
tempang[i] = temp + 360
if self.jackknife is not None:
tempangU = (180 / np.pi) * np.arctan2(
self.jackknife.N.upper, self.jackknife.E.upper
) - 90
for i, temp in enumerate(tempangU):
if temp < 0:
tempangU[i] = temp + 360
tempangL = (180 / np.pi) * np.arctan2(
self.jackknife.N.lower, self.jackknife.E.lower
) - 90
for i, temp in enumerate(tempangL):
if temp < 0:
tempangL[i] = temp + 360
# Now flip to clockwise to get azimuth
Haz = 360 - tempang
# Store angles and magnitudes
self.angle_magnitude = AttribDict(
magnitude=Mag,
magnitude_upper=MagU,
magnitude_lower=MagL,
vertical_angle=Vang,
horizontal_angle=Haz,
)
self.inversion_complete = True
[docs]
def forward(self, Z, N, E):
r"""Execute the forward problem :math:`\mathbf{d} = \mathbf{G}\mathbf{m}`
using a user-supplied force time series :math:`\mathbf{m}` composed of
components `Z`, `N`, and `E`.
Args:
Z: [N] Vertical force time series (positive up)
N: [N] North force time series (positive north)
E: [N] East force time series (positive east)
Returns:
:class:`~obspy.core.stream.Stream`: [m] Stream containing synthetic
data, :math:`\mathbf{d}`
"""
# Check if G is defined
if not self.gf_computed:
raise RuntimeError('G is not defined. Did you run LSForce.setup()?')
# Convert and check sizes
Z = np.array(Z).squeeze()
N = np.array(N).squeeze()
E = np.array(E).squeeze()
for vec in Z, N, E:
assert (
vec.size == self.data_length
), f'Each force component must have length {self.data_length}!'
# Form model vector
model = np.hstack([-Z, N, E])
# KEY: Forward problem
d = np.reshape(self.G.dot(model), (-1, self.data_length))
# Form Stream
st_syn = self.data.st_proc.copy() # Quick way to pre-populate metadata
for data, tr in zip(d, st_syn):
tr.stats.sampling_rate = self.data_sampling_rate
tr.data = data
tr.stats.location = 'SE' # Synthetics Engine
tr.stats.channel = _get_band_code(tr.stats.delta) + 'X' + tr.stats.component
for key in (
'_fdsnws_dataselect_url',
'_format',
'mseed',
'processing',
'response',
):
try:
del tr.stats[key] # Remove N/A metadata
except KeyError:
pass # If the key doesn't exist, just proceed silently
return st_syn
[docs]
def plot_fits(
self,
equal_scale=True,
xlim=None,
add_time_axis=True,
axis_frames=False,
add_title=True,
add_legend=True,
legacy=False,
):
r"""Create a plot showing the model-produced waveform fit to the data.
Args:
equal_scale (bool): If `True`, all plots will share the same y-axis scale
xlim (list or tuple): [s] Array (length two) of x-axis limits (time relative
to zero time)
add_time_axis (bool): If `True`, will add a time axis to the bottom of the
plot
axis_frames (bool): If `True`, will add frames around each waveform subplot
panel
add_title (bool): If `True`, will add a title to the plot with VR and
normalization information
add_legend (bool): If `True`, will add a legend to the plot
legacy (bool): If `True`, will plot using the legacy fit plotting function
(all waveforms in a single column in R, T, Z order, no azimuth info)
Returns:
:class:`~matplotlib.figure.Figure`: Output figure handle
"""
# Plot using the legacy plotting function if the user has requested it
if legacy:
return self._plot_fits_legacy(equal_scale=equal_scale, xlim=xlim)
# Set xlim if not provided
if xlim is None:
xlim = self.dtvec[0], self.dtvec[-1]
# Form Stream objects for data and model waveforms
st_data = Stream()
st_model = Stream()
keys_to_populate = ('network', 'station', 'component', 'distance', 'azimuth')
for i, tr in enumerate(self.data.st_proc):
header = {k: tr.stats[k] for k in keys_to_populate}
header['sampling_rate'] = self.data_sampling_rate
st_data += Trace(data=self.dtorig[i], header=header)
st_model += Trace(data=self.dtnew[i], header=header)
# Determine the unique stations, and sort by distance from source (can use
# st_data here since it has same meta as st_model)
unique_net_sta_pairs_unsorted = tuple(
set([(tr.stats.network, tr.stats.station) for tr in st_data])
)
distances = []
for net, sta in unique_net_sta_pairs_unsorted:
distances.append(st_data.select(network=net, station=sta)[0].stats.distance)
unique_net_sta_pairs = tuple(
[unique_net_sta_pairs_unsorted[i] for i in np.argsort(distances)]
)
# Determine which components we have — this controls how many columns of
# waveforms we plot (e.g., if we only have verticals, we only plot one column)
unique_components = set([tr.stats.component for tr in st_data])
component_order = [c for c in COMPONENT_ORDER if c in unique_components]
# Make figure
margins = list(MARGINS)
if add_time_axis:
margins[3] += 0.3 if axis_frames else 0.38 # Make room for time axis
if add_title:
margins[2] += 0.15 # Make room for title
n_stations = len(unique_net_sta_pairs)
n_components = len(component_order)
fig_width = (
n_components * AX_WIDTH + (n_components - 1) * PAD + sum(margins[:2])
)
fig_height = n_stations * AX_HEIGHT + (n_stations - 1) * PAD + sum(margins[2:])
fig, axs = plt.subplots(
nrows=n_stations,
ncols=n_components,
figsize=(fig_width, fig_height),
sharex=True,
sharey=True,
squeeze=False,
)
# Reposition subplots
for i in range(n_stations):
for j in range(n_components):
x0 = margins[0] + PAD * j + AX_WIDTH * j
y0 = fig_height - margins[2] - PAD * i - AX_HEIGHT * (i + 1)
width = AX_WIDTH
height = AX_HEIGHT
axs[i, j].set_position(
[
x0 / fig_width,
y0 / fig_height,
width / fig_width,
height / fig_height,
]
)
# Now do the plotting
for i in range(n_stations):
for j in range(n_components):
ax = axs[i, j]
net, sta = unique_net_sta_pairs[i]
selection_kw = dict(
network=net, station=sta, component=component_order[j]
)
_st_data = st_data.select(**selection_kw)
_st_model = st_model.select(**selection_kw)
if _st_data.count() == 0:
pass # This component is missing; we just don't plot anything
elif _st_data.count() > 1:
raise ValueError(
'More than one trace found for this net, sta, component!'
)
else: # _st_data.count() == 1
tr_data = _st_data[0]
tr_model = _st_model[0]
t = self.dtvec
mask = (t > xlim[0]) & (t < xlim[1])
d_data = tr_data.data[mask]
d_model = tr_model.data[mask]
if not equal_scale:
# Scale to whichever waveform has the largest absolute value
abs_max = max(max(np.abs(d_data)), max(np.abs(d_model)))
d_data /= abs_max
d_model /= abs_max
plot_kw = dict(lw=0.8, solid_capstyle='round')
ax.plot(t[mask], d_data, color='black', label='data', **plot_kw)
ax.plot(t[mask], d_model, color='tab:red', label='model', **plot_kw)
ax.autoscale(enable=True, axis='y', tight=True)
# Label the stations, just at left
for i in range(n_stations):
ax = axs[i, 0]
net, sta = unique_net_sta_pairs[i]
tr = st_data.select(network=net, station=sta)[0]
text_kw = dict(
x=0 - TEXT_PAD / AX_WIDTH,
ha='right',
transform=ax.transAxes,
size=FONT_SIZE,
)
# network.station code
ax.text(
y=0.65,
s=f'{tr.stats.network}.{tr.stats.station}',
va='bottom',
weight='bold',
family='monospace',
**text_kw,
)
# Distance to source
ax.text(
y=0.5, s=f'{tr.stats.distance:.{PRECISION}f} km', va='center', **text_kw
)
# Azimuth to source
ax.text(y=0.35, s=f'{tr.stats.azimuth:.{PRECISION}f}°', va='top', **text_kw)
# Label the component columns, just at the top
for j in range(n_components):
ax = axs[0, j]
ax.text(
0,
1,
component_order[j],
weight='bold',
ha='left',
va='bottom',
transform=ax.transAxes,
size=FONT_SIZE,
)
# Axis formatting
abs_max_ylim = np.max([np.abs(ax.get_ylim()) for ax in axs.flatten()])
pad = 1.05 # So that peaks/troughs don't get clipped
for ax in axs.flatten():
ax.set_xlim(xlim)
if not equal_scale:
ax.set_ylim(-1 * pad, 1 * pad)
else:
ax.set_ylim(-abs_max_ylim * pad, abs_max_ylim * pad)
ax.tick_params(
which='both',
left=False,
labelleft=False,
bottom=False,
labelbottom=False,
)
if not axis_frames:
for spine in ax.spines.values():
spine.set_visible(False)
if add_time_axis or add_title:
# Need to make an axis that spans [potentially] multiple columns
pos_top_left = axs[0, 0].get_position()
pos_bottom_left = axs[-1, 0].get_position()
pos_bottom_right = axs[-1, -1].get_position()
x0 = pos_bottom_left.x0
y0 = pos_bottom_left.y0
width = pos_bottom_right.x1 - x0
height = pos_top_left.y1 - y0
ax_label = fig.add_axes([x0, y0, width, height], frameon=False)
ax_label.tick_params(
labelcolor='none', bottom=False, left=False, labelsize=FONT_SIZE
)
# Add time axis to `ax_label`
if add_time_axis:
for ax in axs[-1, :]:
ax.spines['bottom'].set_visible(True)
if not axis_frames:
ax.spines['bottom'].set_position(('outward', 5))
ax.tick_params(
which='both', bottom=True, labelbottom=True, labelsize=FONT_SIZE
)
if not axis_frames:
position = axs[-1, 0].spines['bottom'].get_position()
ax_label.spines['bottom'].set_position(position)
ax_label.set_xlabel(
'Time (s) from inversion zero time', fontsize=FONT_SIZE
)
# Add title to `ax_label`
if add_title:
amp_string = 'equal scale' if equal_scale else 'normalized'
ax_label.set_title(
f'Variance reduction {self.VR:.0f}% ({amp_string})',
y=1,
pad=15,
size=FONT_SIZE,
)
# Make legend
if add_legend:
axs[0, 0].legend(
fontsize=FONT_SIZE - 3,
fancybox=False,
facecolor='0.85',
edgecolor='none',
bbox_to_anchor=(-TEXT_PAD / AX_WIDTH, 0.95),
loc='lower right',
borderaxespad=0,
borderpad=0.3,
labelspacing=0.05,
handlelength=1.5,
)
return fig
def _plot_fits_legacy(self, equal_scale=True, xlim=None):
r"""Create a plot showing the model-produced waveform fit to the data.
Args:
equal_scale (bool): If `True`, all plots will share the same y-axis scale
xlim (list or tuple): [s] Array (length two) of x-axis limits (time relative
to zero time)
Returns:
:class:`~matplotlib.figure.Figure`: Output figure handle
"""
data_color = 'black'
model_color = 'red'
lw = 1
if not equal_scale:
amp_string = 'normalized'
spacing = 2
else:
amp_string = 'equal scale'
spacing = np.abs(self.dtorig).max() * 2 # Spacing controlled by largest amp
fig, ax = plt.subplots(figsize=(8, 12))
offset = 0
yticks = []
yticklabels = []
for i, tr in enumerate(self.data.st_proc):
if not equal_scale:
scaler = np.abs(self.dtorig[i].max()) # Scale to original data
else:
scaler = 1
ax.plot(
self.dtvec,
self.dtorig[i] / scaler + offset,
color=data_color,
linewidth=lw,
)
ax.plot(
self.dtvec,
self.dtnew[i] / scaler + offset,
color=model_color,
linewidth=lw,
)
label = (
f'{tr.stats.network}.{tr.stats.station} ({tr.stats.channel[-1]}) '
f'– {tr.stats.distance:.1f} km'
)
yticklabels.append(label)
yticks.append(offset)
offset -= spacing
# Misc. tweaks
if xlim:
ax.set_xlim(xlim)
else:
ax.set_xlim(self.dtvec[0], self.dtvec[-1])
ax.set_ylim(yticks[-1] - spacing / 2, yticks[0] + spacing / 2)
ax.set_xlabel('Time (s)')
ax.set_yticks(yticks)
ax.set_yticklabels(yticklabels)
ax.tick_params(left=False, top=True, which='both')
ax.yaxis.set_tick_params(length=0, pad=4)
ax.set_title(f'Variance reduction {self.VR:.1f}% ({amp_string})', pad=10)
# Make legend
data_handle = mlines.Line2D(
[], [], color=data_color, label='Data', linewidth=lw
)
model_handle = mlines.Line2D(
[], [], color=model_color, label='Model', linewidth=lw
)
ax.legend(handles=[data_handle, model_handle], loc='upper right')
fig.tight_layout()
fig.show()
return fig
[docs]
def plot_forces(
self,
subplots=False,
xlim=None,
ylim=None,
same_y=True,
highf_tr=None,
hfshift=0.0,
hfylabel=None,
infra_tr=None,
infra_shift=0,
jackshowall=False,
):
r"""Plot inversion result.
Args:
subplots (bool): If `True`, make subplots for components, otherwise plot all
on one plot
xlim (list or tuple): x-axis limits
ylim (list or tuple): y-axis limits
same_y (bool): If `True`, use same y-axis limits for all plots
highf_tr (:class:`~obspy.core.trace.Trace`): Seismic trace with start time
identical to start time of the data used in the inversion
hfshift (int or float): [s] Time shift for seismic trace
hfylabel (str): Label used for seismic trace. If not defined, will use
station name
infra_tr (:class:`~obspy.core.trace.Trace`): Infrasound trace with start
time identical to start time of the data used in the inversion
infra_shift (int or float): [s] Time shift for infrasound trace
jackshowall (bool): If `True` and jackknife was run, will show all
individual runs (changes `subplots` to `True`)
Returns:
:class:`~matplotlib.figure.Figure`: Output figure handle
"""
tvec = self.tvec
annot_kwargs = dict(xy=(0.99, 0.25), xycoords='axes fraction', ha='right')
# Find y limits
if self.jackknife is None:
if ylim is None:
ylim1 = (
np.amin([self.Z.min(), self.E.min(), self.N.min()]),
np.amax([self.Z.max(), self.E.max(), self.N.max()]),
)
# Add 10% on each side to make it look nicer
ylim = (ylim1[0] + 0.1 * ylim1[0], ylim1[1] + 0.1 * ylim1[1])
else:
Zupper = self.jackknife.Z.upper
Nupper = self.jackknife.N.upper
Eupper = self.jackknife.E.upper
Zlower = self.jackknife.Z.lower
Nlower = self.jackknife.N.lower
Elower = self.jackknife.E.lower
if ylim is None:
ylim1 = (
np.amin(
[
Zlower.min(),
Elower.min(),
Nlower.min(),
self.Z.min(),
self.N.min(),
self.E.min(),
]
),
np.amax(
[
Zupper.max(),
Eupper.max(),
Nupper.max(),
self.Z.max(),
self.N.max(),
self.E.max(),
]
),
)
# Add 10% on each side to make it look nicer
ylim = (ylim1[0] + 0.1 * ylim1[0], ylim1[1] + 0.1 * ylim1[1])
if jackshowall:
subplots = True
if subplots:
if highf_tr is None:
fig, (ax1, ax2, ax3) = plt.subplots(nrows=3, figsize=(10, 8))
else:
fig, (ax1, ax2, ax3, ax4) = plt.subplots(nrows=4, figsize=(10, 10))
if infra_tr is not None:
plt.close(fig)
fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(
nrows=5, figsize=(10, 12)
)
ax1.plot(tvec, self.Z, Z_COLOR, linewidth=1)
ax1.set_ylabel('Up force (N)')
ax2.plot(tvec, self.N, N_COLOR, linewidth=1)
ax2.set_ylabel('North force (N)')
ax3.plot(tvec, self.E, E_COLOR, linewidth=1)
ax3.set_ylabel('East force (N)')
if self.jackknife is not None:
if jackshowall:
for Z, N, E in zip(
self.jackknife.Z.all,
self.jackknife.N.all,
self.jackknife.E.all,
):
ax1.plot(self.tvec, Z, Z_COLOR, alpha=JK_ALPHA, linewidth=1)
ax2.plot(self.tvec, N, N_COLOR, alpha=JK_ALPHA, linewidth=1)
ax3.plot(self.tvec, E, E_COLOR, alpha=JK_ALPHA, linewidth=1)
else:
ax1.fill_between(
tvec, Zlower, Zupper, facecolor=Z_COLOR, alpha=JK_ALPHA
)
ax2.fill_between(
tvec, Nlower, Nupper, facecolor=N_COLOR, alpha=JK_ALPHA
)
ax3.fill_between(
tvec, Elower, Eupper, facecolor=E_COLOR, alpha=JK_ALPHA
)
if highf_tr is not None:
if type(highf_tr) != Trace:
raise TypeError('highf_tr is not an ObsPy Trace.')
tvec2 = np.linspace(
0,
(len(highf_tr.data) - 1) * 1 / highf_tr.stats.sampling_rate,
num=len(highf_tr.data),
)
# Temporary fix, adjust for same zero time
if self.zero_time:
tvec2 -= self.zero_time
tvec2 -= hfshift
ax4.plot(tvec2, highf_tr.data * UMS_PER_MS, 'black')
ax4.set_ylabel('Velocity (μm/s)')
if infra_tr is not None:
if type(infra_tr) != Trace:
raise TypeError('infra_tr is not an ObsPy Trace.')
tvec2 = np.linspace(
0,
(len(infra_tr.data) - 1) * 1 / infra_tr.stats.sampling_rate,
num=len(infra_tr.data),
)
# Temporary fix, adjust for same zero time
if self.zero_time:
tvec2 -= self.zero_time
tvec2 -= infra_shift
ax5.plot(tvec2, infra_tr.data, 'black')
ax5.set_ylabel('Pressure (Pa)')
if infra_shift != 0:
ax5.annotate(
f'{infra_tr.id} (shifted –{infra_shift:1.0f} s)',
**annot_kwargs,
)
else:
ax5.annotate(infra_tr.id, **annot_kwargs)
# Remove x-axis labels for plots above this one
for axis in [ax1, ax2, ax3, ax4]:
plt.setp(axis.get_xticklabels(), visible=False)
if not xlim:
xlim = [self.tvec.min(), self.tvec.max()]
axes = fig.get_axes()
[axe.set_xlim(xlim) for axe in axes]
[axe.grid(True) for axe in axes]
if same_y or ylim is not None:
ax1.set_ylim(ylim)
ax2.set_ylim(ylim)
ax3.set_ylim(ylim)
if self.impose_zero_start:
[axe.axvline(0, color='gray', linestyle='solid', lw=3) for axe in axes]
if self.duration is not None:
[
axe.axvline(self.duration, color='gray', linestyle='solid', lw=3)
for axe in axes
]
if highf_tr is not None:
if hfylabel is not None:
ax4.set_ylabel(hfylabel)
if hfshift != 0:
ax4.annotate(
f'{highf_tr.id} (shifted –{hfshift:1.0f} s)', **annot_kwargs
)
else:
ax4.annotate(highf_tr.id, **annot_kwargs)
else:
if highf_tr is None:
fig, ax = plt.subplots(figsize=(10, 4))
else:
fig, (ax, ax4) = plt.subplots(nrows=2, figsize=(10, 6))
if type(highf_tr) != Trace:
raise TypeError('highf_tr is not an ObsPy Trace.')
tvec2 = np.linspace(
0,
(len(highf_tr.data) - 1) * 1 / highf_tr.stats.sampling_rate,
num=len(highf_tr.data),
)
# Temporary fix, adjust for same zerotime
if self.zero_time:
tvec2 -= self.zero_time
tvec2 -= hfshift
ax4.plot(tvec2, highf_tr.data * UMS_PER_MS, 'black')
ax4.set_ylabel('Velocity (μm/s)')
if hfshift != 0:
ax4.annotate(
f'{highf_tr.id} - shifted –{hfshift:1.0f} s', **annot_kwargs
)
else:
ax4.annotate(highf_tr.id, **annot_kwargs)
ax.plot(tvec, self.Z, Z_COLOR, label='Up')
ax.plot(tvec, self.N, N_COLOR, label='North')
ax.plot(tvec, self.E, E_COLOR, label='East')
if self.jackknife is not None:
ax.fill_between(tvec, Zlower, Zupper, facecolor=Z_COLOR, alpha=JK_ALPHA)
ax.fill_between(tvec, Nlower, Nupper, facecolor=N_COLOR, alpha=JK_ALPHA)
ax.fill_between(tvec, Elower, Eupper, facecolor=E_COLOR, alpha=JK_ALPHA)
if xlim:
ax.set_xlim(xlim)
if highf_tr is not None:
ax4.set_xlim(ax.get_xlim())
ax4.grid(True)
if hfylabel is not None:
ax4.set_ylabel(hfylabel)
ax.legend(loc='upper right')
ax.grid(True)
ax.set_ylabel('Force (N)')
ax.set_ylim(ylim)
if self.impose_zero_start:
ax.axvline(0, color='gray', linestyle='solid', lw=3)
if self.duration is not None:
ax.axvline(self.duration, color='gray', linestyle='solid', lw=3)
t0 = self.data.st_proc[0].stats.starttime
if self.zero_time:
t0 += self.zero_time
plt.xlabel('Time (s) from {}'.format(t0.strftime('%Y-%m-%d %H:%M:%S')))
fig.tight_layout()
fig.show()
return fig
[docs]
def plot_angle_magnitude(self, xlim=None, ylim=None):
r"""Plot angles and magnitudes of inversion result.
Args:
xlim (list or tuple): x-axis limits
ylim (list or tuple): y-axis limits
Returns:
:class:`~matplotlib.figure.Figure`: Output figure handle
"""
tvec = self.tvec
if self.jackknife is None:
if ylim is None:
ylim1 = (
np.amin([self.Z.min(), self.E.min(), self.N.min()]),
np.amax([self.Z.max(), self.E.max(), self.N.max()]),
)
# Add 10% on each side to make it look nicer
ylim = (ylim1[0] + 0.1 * ylim1[0], ylim1[1] + 0.1 * ylim1[1])
else:
Zupper = self.jackknife.Z.upper
Nupper = self.jackknife.N.upper
Eupper = self.jackknife.E.upper
Zlower = self.jackknife.Z.lower
Nlower = self.jackknife.N.lower
Elower = self.jackknife.E.lower
if ylim is None:
ylim1 = (
np.amin([Zlower.min(), Elower.min(), Nlower.min()]),
np.amax([Zupper.max(), Eupper.max(), Nupper.max()]),
)
# Add 10% on each side to make it look nicer
ylim = (ylim1[0] + 0.1 * ylim1[0], ylim1[1] + 0.1 * ylim1[1])
fig = plt.figure(figsize=(10, 10))
# Plot the inversion result in the first one
ax = fig.add_subplot(411)
ax.plot(tvec, self.Z, Z_COLOR, label='Up')
ax.plot(tvec, self.N, N_COLOR, label='North')
ax.plot(tvec, self.E, E_COLOR, label='East')
if self.jackknife is not None:
ax.fill_between(tvec, Zlower, Zupper, facecolor=Z_COLOR, alpha=JK_ALPHA)
ax.fill_between(tvec, Nlower, Nupper, facecolor=N_COLOR, alpha=JK_ALPHA)
ax.fill_between(tvec, Elower, Eupper, facecolor=E_COLOR, alpha=JK_ALPHA)
if xlim:
ax.set_xlim(xlim)
ax.legend(loc='upper right')
ax.grid(True)
ax.set_ylabel('Force (N)')
ax.set_ylim(ylim)
# Plot the magnitudes in second one
ax1 = fig.add_subplot(412)
ax1.plot(tvec, self.angle_magnitude.magnitude, color='black', label='Best')
if self.jackknife is not None:
# Calculate magnitude of each jackknife run
mag_all = [
np.linalg.norm(list(zip(z, e, n)), axis=1)
for z, e, n in zip(
self.jackknife.Z.all, self.jackknife.E.all, self.jackknife.N.all
)
]
# Plot mean magnitude as dashed line
Mag_mean = np.mean(mag_all, axis=0)
ax1.plot(tvec, Mag_mean, color='black', linestyle='--', label='Mean')
# Plot magnitude bounds as grey patch (these bounds do not
# necessarily represent a single run's magnitude and in fact are more likely
# to be composites of several runs)
ax1.fill_between(
tvec,
self.angle_magnitude.magnitude_upper,
self.angle_magnitude.magnitude_lower,
facecolor='black',
alpha=JK_ALPHA,
)
# Add legend
ax1.legend()
ax1.set_ylabel('Force (N)')
ax1.set_ylim(bottom=0)
# Plot the horizontal azimuth
ax2 = fig.add_subplot(413)
ax2.plot(tvec, self.angle_magnitude.horizontal_angle)
ax2.set_ylabel('Azimuth (deg CW from N)')
ax2.set_ylim(0, 360)
# Plot the vertical angle
ax3 = fig.add_subplot(414)
ax3.plot(tvec, self.angle_magnitude.vertical_angle)
ax3.set_ylabel('Vertical angle (deg)')
axes = fig.get_axes()
if xlim:
axes = fig.get_axes()
[axe.set_xlim(xlim) for axe in axes]
[axe.grid(True) for axe in axes]
if self.impose_zero_start:
[axe.axvline(0, color='gray', linestyle='solid', lw=3) for axe in axes]
if self.duration is not None:
[
axe.axvline(self.duration, color='gray', linestyle='solid', lw=3)
for axe in axes
]
plt.xlabel('Time (s)')
fig.tight_layout()
fig.show()
return fig
[docs]
def saverun(
self,
prefix,
filepath=None,
timestamp=False,
figs2save=None,
figs2save_names=None,
save_size='light',
filetype='png',
):
r"""Save a force inversion run for later use.
Warning:
Do not expect this to work if you have the ``autoreload``
`IPython extension`_ enabled!
Args:
prefix (str): Run name to prepend to all output files
filepath (str): Full path to directory where all files should be saved. If
`None`, will use `self.main_folder`
timestamp (bool): Name results with current time to avoid overwriting
previous results
figs2save (list or tuple): Figure handles to save
figs2save_names (list or tuple): Names of figures (appends to end)
save_size (str or None): If `light`, does not save seismic data with object to save size.
If `ultralight`, does not save seismic data, Green's functions, weight matrices,
design matrix, data vector, and model vector. If None, the full object is saved.
filetype (str): Filetype given as extension, e.g. `'png'`
.. _IPython extension: https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html
"""
if filepath is None:
filepath = self.main_folder
obj = copy.copy(self)
if save_size == 'light':
obj.st = None
elif save_size == 'ultralight':
obj.st = None
obj.G = None
obj.W = None
obj.Wvec = None
obj.d = None
obj.model = None
obj.filtered_gf_st = None
elif save_size == None:
pass
else:
raise ValueError("save_size must be 'light' or 'ultralight' or None")
# Get file name prefix
if self.jackknife is None:
jk = ''
else:
jk = 'JK'
if timestamp:
filename = '{}_{:1.0f}-{:1.0f}sec_{}{}'.format(
prefix,
self.filter['periodmin'],
self.filter['periodmax'],
jk,
UTCDateTime.now().strftime('%Y-%m-%dT%H%M'),
)
else:
filename = '{}_{:1.0f}-{:1.0f}sec_{}'.format(
prefix, self.filter['periodmin'], self.filter['periodmax'], jk
)
with open(os.path.join(filepath, f'{filename}.pickle'), 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
if figs2save is not None:
if figs2save_names is None:
figs2save_names = range(len(figs2save))
for i, fig in enumerate(figs2save):
fig.savefig(
os.path.join(
filepath, f'{filename}_{figs2save_names[i]}.{filetype}'
),
bbox_inches='tight',
)
[docs]
def write_forces(
self,
prefix,
filepath=None,
timestamp=False,
):
r"""Save E, N, Z forces to a text file for non-*lsforce* users.
File can be read in using, e.g., NumPy as follows:
.. code-block:: python
import numpy as np
e, n, z = np.loadtxt('/path/to/file.txt', unpack=True)
Args:
prefix (str): Run name to prepend to file
filepath (str): Full path to directory where file should be saved.
If `None`, will use `self.main_folder`
timestamp (bool): Name file with current time to avoid overwriting
previous results
"""
if filepath is None:
filepath = self.main_folder
# Format filename
current_time = UTCDateTime.now()
filename = '{}_{:1.0f}-{:1.0f}sec{}{}.txt'.format(
prefix,
self.filter['periodmin'],
self.filter['periodmax'],
'' if self.jackknife is None else '_JK',
'_' + current_time.strftime('%Y-%m-%dT%H%M') if timestamp else '',
)
t0 = self.data.st_proc[0].stats.starttime
if self.zero_time:
t0 += self.zero_time
# Form informative header
header = (
'lsforce (https://code.usgs.gov/ghsc/lhp/lsforce) output forces\n'
'--------------------------------------------------------------\n'
f'FORCE SAMPLING RATE: {self.force_sampling_rate} Hz\n'
f'FORCE START TIME: {t0.strftime("%Y-%m-%d %H:%M:%S")} UTC\n'
'FORCE UNITS: newtons\n'
'--------------------------------------------------------------\n'
f'FILE CREATED: {current_time.strftime("%Y-%m-%d %H:%M")} UTC\n'
'FILE COLUMNS: east north up\n'
'--------------------------------------------------------------'
)
# Save
np.savetxt(
os.path.join(filepath, filename),
np.vstack([self.E, self.N, self.Z]).T,
header=header,
)
print(f'{filename} saved to {os.path.abspath(filepath)}')
[docs]
def find_alpha(
Ghat,
dhat,
I,
L1=0,
L2=0,
tikhonov_ratios=(1.0, 0.0, 0.0),
rough=False,
range_rough=None,
int_rough=0.75,
plot_Lcurve=True,
):
r"""Finds best regularization (trade-off) parameter alpha.
Computes model with many values of alpha, plots L-curve, and finds point
of steepest curvature where slope is negative.
Args:
Ghat (array): (m x n) matrix
dhat (array): (1 x n) array of weighted data
I (array): Identity matrix
L1 (array): First order roughening matrix. If `0`, will use only
0th-order Tikhonov regularization
L2 (array): Second order roughening matrix. If `0`, will use only
0th-order Tikhonov regularization
tikhonov_ratios (list or tuple): Proportion each regularization method
contributes to the overall regularization effect, where values
correspond to [0th order, 1st order, 2nd order]. Must sum to 1
rough (bool): If `False`, will do two iterations to fine tune the alpha
parameter. The second iteration searches over +/- one order of
magnitude from the best alpha found from the first round. If
`True`, time will be saved because it will only do one round of
searching.
range_rough (tuple): Lower and upper bound of range to search over in
log units. If `None`, the program will choose a range based on the
norm of ``Ghat``
int_rough (float): Interval, in log units, to use for rough alpha search
plot_Lcurve (bool): Toggle showing the L-curve plot
Returns:
tuple: Tuple containing:
- **bestalpha** (float) – The optimal alpha
- **fit1** (1D array) – List of residuals
- **size1** (1D array) – List of model norms
- **alphas** (1D array) – List of alphas tried
"""
# Roughly estimate largest singular value (should not use alpha larger than expected
# largest singular value)
templ1 = np.ceil(np.log10(np.linalg.norm(Ghat)))
if range_rough is None:
templ2 = np.arange(templ1 - 5, templ1 - 1, int_rough)
else:
templ2 = np.arange(range_rough[0], range_rough[1], int_rough)
alphas = 10.0**templ2
fit1 = []
size1 = []
Apart = Ghat.conj().T @ Ghat
if type(L2) == float or type(L2) == int:
L2part = 0.0
else:
L2part = L2.T @ L2
if type(L1) == float or type(L1) == int:
L1part = 0.0
else:
L1part = L1.T @ L1
x = np.squeeze(Ghat.conj().T @ dhat)
if rough:
maxloops = 1
else:
maxloops = 2
loop = 1
while loop <= maxloops:
for alpha in alphas:
A = Apart + alpha**2 * (
tikhonov_ratios[0] * I
+ tikhonov_ratios[1] * L1part
+ tikhonov_ratios[2] * L2part
) # Combo of all regularization things
model, residuals, rank, s = sp.linalg.lstsq(A, x)
temp1 = Ghat @ model.T - dhat
fit1.append(sp.linalg.norm(temp1))
size1.append(
sp.linalg.norm(tikhonov_ratios[0] * model)
+ sp.linalg.norm(tikhonov_ratios[1] * np.dot(L1part, model))
+ sp.linalg.norm(tikhonov_ratios[2] * np.dot(L2part, model))
)
fit1 = np.array(fit1)
size1 = np.array(size1)
curves = _curvature(np.log10(fit1), np.log10(size1))
# Zero out any points where function is concave to avoid picking points from
# drop-off at end
slp2 = np.gradient(np.gradient(np.log10(size1), np.log10(fit1)), np.log10(fit1))
alphas = np.array(alphas)
tempcurve = curves.copy()
tempcurve[slp2 < 0] = np.inf
idx = np.argmin(tempcurve)
bestalpha = alphas[idx]
loop += 1
if loop > maxloops:
break
else: # Loop again over smaller range
alphas = np.logspace(
np.round(np.log10(bestalpha)) - 1,
np.round(np.log10(bestalpha)) + 0.5,
10,
)
fit1 = []
size1 = []
if plot_Lcurve:
Lcurve(fit1, size1, alphas, bestalpha=bestalpha)
if type(bestalpha) == list:
if len(bestalpha) > 1:
raise ValueError('Returned more than one alpha value, check codes.')
bestalpha = bestalpha[0]
return bestalpha, fit1, size1, alphas
[docs]
def Lcurve(fit1, size1, alphas, bestalpha=None):
r"""Plot an L-curve.
Args:
fit1 (1D array): List of residuals
size1 (1D array): List of model norms
alphas (1D array): List of alphas tried
bestalpha (float): The alpha value chosen
Returns:
figure handle
"""
fig, ax = plt.subplots(figsize=(7, 6))
ax.loglog(fit1, size1, '.', markersize=10, color='black')
for i, alpha in enumerate(alphas):
ax.text(fit1[i], size1[i], f' {alpha:.1e}', va='center')
if bestalpha is not None:
idx = np.argmin(np.abs(alphas - bestalpha))
ax.plot(fit1[idx], size1[idx], 'or')
ax.set_xlabel(r'Residual norm $||{\bf G}{\bf m}-{\bf d}||^2$')
ax.set_ylabel(r'Solution norm $||{\bf m}||^2$')
fig.tight_layout()
plt.show()
return fig
def _varred(dt, dtnew):
r"""Compute variance reduction :math:`\mathrm{VR}`.
The formula is
.. math::
\mathrm{VR} = \left(1 - \frac{\|\mathbf{d}
- \mathbf{d}_\mathbf{obs}\|^2}{\|\mathbf{d}_\mathbf{obs}\|^2}\right)
\times 100\%\,,
where :math:`\mathbf{d}_\mathbf{obs}` are the observed data, `dt`, and
:math:`\mathbf{d}` are the synthetic data predicted by the forward model, `dtnew`.
Args:
dt: Array of original data
dtnew: Array of modeled data
Returns:
float: Variance reduction :math:`\mathrm{VR}`
"""
shp = np.shape(dt)
shp = shp[0] * shp[1]
dt_temp = np.reshape(dt, shp)
dtnew_temp = np.reshape(dtnew, shp)
d_dnew2 = (dt_temp - dtnew_temp) ** 2
d2 = dt_temp**2
VR = (1 - (np.sum(d_dnew2) / np.sum(d2))) * 100
return VR
def _makeconvmat(c, size=None):
r"""Build matrix used for convolution as implemented by matrix multiplication.
Args:
c (1D array): Signal to make convolution matrix for
size (list or tuple): Optional input for desired size as ``(nrows, ncols)``;
this will just shift ``cflip`` until it reaches the right size
Returns:
:class:`~numpy.ndarray`: Convolution matrix
"""
cflip = c[::-1] # Flip order
if size is None:
C = np.zeros((2 * len(c) - 1, 2 * len(c) - 1))
for i in range(2 * len(c) - 1):
if i > len(c) - 1:
zros = np.zeros(i + 1 - len(c))
p = np.concatenate((zros, cflip, np.zeros(2 * len(c))))
else:
p = np.concatenate(((cflip[-(i + 1) :]), np.zeros(2 * len(c))))
p = p[: 2 * len(c) - 1]
C[i, :] = p.copy()
else:
# Make it the correct size
C = np.zeros(size)
for i in range(size[0]):
if i > len(c) - 1:
zros = np.zeros(i + 1 - len(c))
p = np.concatenate((zros, cflip, np.zeros(size[1])))
else:
p = np.concatenate(((cflip[-(i + 1) :]), np.zeros(size[1])))
p = p[: size[1]] # Cut p to the correct size
C[i, :] = p.copy()
return C
def _makeshiftmat(c, shiftby, size1):
r"""Build matrix that can be used for shifting of overlapping triangles.
Used for triangle method. Signal goes across rows and each shift is a new column
(opposite orientation to :func:`_makeconvmat`)
Args:
c: Array of data (usually Green's function)
shiftby (int): Number of samples to shift Green's function in each row
size1 (list or tuple): Shape ``(nrows, ncols)`` of desired result. Will pad `c`
if ``nrows`` is greater than ``len(c)``. Will shift `c` forward `shiftby`
:math:`\times` ``ncols`` times
Returns:
:class:`~numpy.ndarray`: Matrix of shifted `c` of size `size1`
"""
diff = len(c) - size1[0]
if diff < 0:
cpad = np.pad(c, (0, -diff), mode='edge')
elif diff > 0:
cpad = c[: size1[0]]
else:
cpad = c
C = np.zeros(size1)
for i in range(size1[1]): # Loop over shifts and apply
nshift = i * shiftby
temp = np.pad(cpad.copy(), (nshift, 0), mode='edge') # , end_values=(0., 0.))
temp = temp[: size1[0]]
C[:, i] = temp.copy()
return C
def _curvature(x, y):
r"""Estimate radius of curvature for each point on line to find corner of L-curve.
Args:
x: Array of x data
y: Array of y data
Returns:
:class:`~numpy.ndarray`: Radius of curvature for each point (ends will be NaN)
"""
# For each set of three points, find the radius of the circle that fits them (ignore
# ends - these should be infinity since it's a straight line that fits them)
R_2 = np.ones(len(x)) * float('inf')
for i in range(1, len(R_2) - 1):
xsub = x[i - 1 : i + 2]
ysub = y[i - 1 : i + 2]
# Slope of bisector of first segment
m1 = -1 / ((ysub[0] - ysub[1]) / (xsub[0] - xsub[1]))
# Slope of bisector of second segment
m2 = -1 / ((ysub[1] - ysub[2]) / (xsub[1] - xsub[2]))
# Compute b for first bisector
b1 = ((ysub[0] + ysub[1]) / 2) - m1 * ((xsub[0] + xsub[1]) / 2)
# Compute b for second bisector
b2 = ((ysub[1] + ysub[2]) / 2) - m2 * ((xsub[1] + xsub[2]) / 2)
Xc = (b1 - b2) / (m2 - m1) # Find intercept point of bisectors
Yc = b2 + m2 * Xc
# Get distance from any point to intercept of bisectors to get radius
R_2[i] = np.sqrt((xsub[0] - Xc) ** 2 + (ysub[0] - Yc) ** 2)
return R_2
def _read(url):
r"""Wrapper for :func:`obspy.core.stream.read` for long URLs."""
with tempfile.NamedTemporaryFile() as f:
urlretrieve(url, f.name)
return read(f.name)
def _get_band_code(dt):
r"""Determine SEED band code for a given sampling interval.
SEED band code reference:
https://www.fdsn.org/pdf/SEEDManual_V2.4_Appendix-A.pdf (see page 2)
Code copied from Instaseis:
https://github.com/krischer/instaseis/blob/dc9d4f16e55837236712e3dde2fbe10902393940/instaseis/helpers.py#L45-L61
"""
if dt <= 0.001:
band_code = 'F'
elif dt <= 0.004:
band_code = 'C'
elif dt <= 0.0125:
band_code = 'H'
elif dt <= 0.1:
band_code = 'B'
elif dt < 1:
band_code = 'M'
else:
band_code = 'L'
return band_code
[docs]
def readrun(filename):
r"""Read in a saved LSForce object.
Warning:
Do not expect this to work if you have the ``autoreload``
`IPython extension`_ enabled!
Args:
filename (str): File path to LSForce object saved using
:meth:`~lsforce.lsforce.LSForce.saverun`
Returns:
:class:`~lsforce.lsforce.LSForce`: Saved LSForce object
.. _IPython extension: https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html
"""
with open(filename, 'rb') as f:
result = pickle.load(f)
return result