Source code for lsforce.lstrajectory

import warnings

import numpy as np
import rioxarray
import xarray as xr
from matplotlib import cm
from matplotlib import pyplot as plt
from obspy.core import AttribDict
from pyproj import Transformer
from rasterio.enums import Resampling

KM_PER_M = 1 / 1000  # [km/m]

MASS_INC = int(1e7)  # [kg] Smaller increment is slower but more precise


[docs] class LSTrajectory: r"""Class for force inversion derived trajectories. Attributes: force (:class:`~lsforce.lsforce.LSForce`): Inversion results used to compute this trajectory mass_requested (int or float): [kg] Mass specified mass_actual (int): [kg] Mass used (same as `mass_requested` if `target_length` is not specified) target_length (int or float): [km] Center-of-mass runout length of event, `None` if not specified jackknife (:class:`~obspy.core.util.attribdict.AttribDict`): Jackknifed trajectory results acceleration (:class:`~obspy.core.util.attribdict.AttribDict`): [m^2/s] Computed acceleration with Z, E, N components as attributes velocity (:class:`~obspy.core.util.attribdict.AttribDict`): [m/s] Computed velocity with Z, E, N components as attributes displacement (:class:`~obspy.core.util.attribdict.AttribDict`): [m] Computed displacement with Z, E, N components as attributes horizontal_distance (:class:`~numpy.ndarray`): [m] Computed horizontal distance traj_tvec (:class:`~numpy.ndarray`): [s] Time array for all trajectory arrays """ def __init__( self, force, mass=None, target_length=None, start=None, duration=None, detrend_velocity=None, zeroacc=None, ): r"""Create an LSTrajectory object. Args: force (:class:`~lsforce.lsforce.LSForce`): Completed force inversion mass (int or float): [kg] Mass of event. If `None`, the mass is computed using `target_length`, which must be specified target_length (int or float): [km] Center-of-mass runout length of event. If `None`, `mass` must be specified start (int or float): [s] If not `None`, only use the force time series from start seconds in the trajectory calculation duration (int or float): [s] If not `None`, only use the force time series from 0–`duration` seconds in the trajectory calculation detrend_velocity: [s] If provided, require the velocity to linearly go to zero at this time; if `None`, don't detrend zeroacc: [s] If provided, require the acceleration to be zero after this time, usually when forces are indistinguishable from zero, to reduce noise at end of trajectory. This is best used with the detrend_velocity option to avoid allowing the landslide to slide forever. """ if not force.inversion_complete: raise ValueError('Cannot compute trajectory if inversion has not been run!') if detrend_velocity is None and zeroacc is not None: print( 'Warning: zeroacc should be used with detrend_velocity to ' 'avoid simulating zero friction behavior. Continuing.' ) self.force = force self.mass_requested = mass self.target_length = target_length self.jackknife = None self.start = start compute_kwargs = dict( mass=self.mass_requested, target_length=self.target_length, start=start, duration=duration, detrend_velocity=detrend_velocity, zeroacc=zeroacc, ) self._compute_trajectory(**compute_kwargs)
[docs] def plot_trajectory( self, elevation_profile=False, plot_jackknife=False, image=None, dem=None, reference_point=None, ): r"""Plot trajectory results with context. Args: elevation_profile (bool): If `True`, plot vertical displacement versus horizontal runout distance (:math:`H` vs. :math:`L`) instead of a map view plot_jackknife (bool): Toggle plotting jackknifed displacements as well (if available) image (:class:`~xarray.DataArray`): An image with coordinates defined in km with the origin (0, 0) being the start location of the trajectory dem (str): A DEM GeoTIFF to slice thru for elevation profile plot reference_point (int or float or list): If not `None`, plot a dot on trajectory, and line on colorbar, at this specified time(s) for reference Returns: :class:`~matplotlib.figure.Figure`: Output figure handle """ # Convert reference points to numpy array reference_points = np.atleast_1d(reference_point) fig, ax = plt.subplots() # Converting to km below if elevation_profile: x = self.horizontal_distance * KM_PER_M y = self.displacement.Z * KM_PER_M else: x = self.displacement.E * KM_PER_M y = self.displacement.N * KM_PER_M sc = ax.scatter(x, y, c=self.traj_tvec, cmap='rainbow', zorder=100) if elevation_profile: ax.set_xlabel('Horizontal distance (km)') ax.set_ylabel('Vertical distance (km)') else: ax.set_xlabel('East distance (km)') ax.set_ylabel('North distance (km)') t0 = self.force.data.st_proc[0].stats.starttime if self.start: t0 += self.start if self.force.zero_time: t0 += self.force.zero_time cbar = plt.colorbar( sc, label='Time (s) from {}'.format(t0.strftime('%Y-%m-%d %H:%M:%S')) ) # Plot reference points, if any if np.any(reference_points): cmap = cm.get_cmap('Greys_r', reference_points.size) for i, time in enumerate(reference_points): try: ref_pt_ind = np.where(self.traj_tvec == time)[0][0] except IndexError: raise # No point corresponding to requested reference time ax.scatter(x[ref_pt_ind], y[ref_pt_ind], color=cmap(i), zorder=150) cbar.ax.plot( [self.traj_tvec.min(), self.traj_tvec.max()], [time, time], color=cmap(i), linewidth=2, ) title = ( f'mass = {self.mass_actual:,} kg\n' f'runout length = {self.horizontal_distance[-1] * KM_PER_M:.2f} km' ) if self.target_length: title += f'\n(target length = {self.target_length:g} km)' ax.set_title(title) # Plot jackknife trajectories as well if desired if plot_jackknife: if self.jackknife: for i in range(self.force.jackknife.num_iter): if elevation_profile: x = self.jackknife.horizontal_distance[i] * KM_PER_M y = self.jackknife.displacement.Z[i] * KM_PER_M else: x = self.jackknife.displacement.E[i] * KM_PER_M y = self.jackknife.displacement.N[i] * KM_PER_M ax.scatter(x, y, c=self.traj_tvec, cmap='rainbow', alpha=0.02) else: warnings.warn('No jackknife iterations to plot.') ax.axis('equal') if (image is not None) and (not elevation_profile): xlim = ax.get_xlim() ylim = ax.get_ylim() image.plot.imshow( ax=ax, cmap='Greys_r', add_colorbar=False, add_labels=False, zorder=-10 ) ax.axis('equal') ax.set_xlim(xlim) ax.set_ylim(ylim) if (dem is not None) and elevation_profile: distance, drop = self._slice_dem(dem) ax.plot(distance * KM_PER_M, drop * KM_PER_M, color='black', zorder=100) fig.tight_layout() fig.show() return fig
def _compute_trajectory( self, mass=None, target_length=None, start=None, duration=None, detrend_velocity=None, zeroacc=None, ): r"""Integrate force time series to velocity and then to displacement. Either provide a mass or a target horizontal runout length. If a length is provided, the code will find the mass that achieves this length. Calls :meth:`~lsforce.lstrajectory.LSTrajectory._trajectory_automass`. See :class:`~lsforce.lstrajectory.LSTrajectory` for description of arguments. """ # For the full inversion (all channels) result ( self.acceleration, self.velocity, self.displacement, self.mass_actual, self.traj_tvec, ) = self._trajectory_automass( self.force.Z, self.force.E, self.force.N, mass=mass, target_length=target_length, start=start, duration=duration, detrend=detrend_velocity, zeroacc=zeroacc, ) self.horizontal_distance = _calculate_horizontal_distance( self.displacement.E, self.displacement.N ) # Compute jackknife trajectories as well if the inversion was jackknifed if self.force.jackknife: self.jackknife = AttribDict( num_iter=self.force.jackknife.num_iter, displacement=AttribDict(Z=[], E=[], N=[]), horizontal_distance=[], ) for i in range(self.jackknife.num_iter): *_, disp_i, _, _ = self._trajectory_automass( self.force.jackknife.Z.all[i], self.force.jackknife.E.all[i], self.force.jackknife.N.all[i], mass=mass, target_length=target_length, start=start, duration=duration, detrend=detrend_velocity, zeroacc=zeroacc, ) horiz_dist_i = _calculate_horizontal_distance(disp_i.E, disp_i.N) # Store jackknifed trajectories self.jackknife.displacement.Z.append(disp_i.Z) self.jackknife.displacement.E.append(disp_i.E) self.jackknife.displacement.N.append(disp_i.N) self.jackknife.horizontal_distance.append(horiz_dist_i) def _integrate_acceleration( self, z_force, e_force, n_force, mass, startidx, endidx, detrend=None, zeroaccidx=None, ): r"""Integrate forces (acceleration) to velocity and displacement.""" traj_tvec = self.force.tvec[startidx : endidx + 1] dx = 1.0 / self.force.force_sampling_rate acceleration = AttribDict( Z=-z_force.copy()[startidx : endidx + 1] / mass, E=-e_force.copy()[startidx : endidx + 1] / mass, N=-n_force.copy()[startidx : endidx + 1] / mass, ) if zeroaccidx is not None: aidx = zeroaccidx - startidx for comp in acceleration.values(): comp[aidx:] = 0.0 velocity = AttribDict( Z=np.cumsum(acceleration.Z) * dx, E=np.cumsum(acceleration.E) * dx, N=np.cumsum(acceleration.N) * dx, ) # Detrend is either None (no detrending) or a time where velocity should # be fully tapered to zero if detrend: # Index corresponding to time where velocity should be zero (closest entry) zeroidx = (np.abs(traj_tvec - detrend)).argmin() for comp in velocity.values(): trend = np.linspace(0, comp[zeroidx], len(traj_tvec[:zeroidx])) comp[:zeroidx] -= trend comp[zeroidx:] = np.zeros(len(comp[zeroidx:])) displacement = AttribDict( Z=np.cumsum(velocity.Z) * dx, E=np.cumsum(velocity.E) * dx, N=np.cumsum(velocity.N) * dx, ) return ( acceleration, velocity, displacement, traj_tvec, ) def _trajectory_automass( self, z_force, e_force, n_force, mass=None, target_length=None, start=None, duration=None, detrend=None, zeroacc=None, ): r"""Calls :meth:`~lsforce.lstrajectory.LSTrajectory._integrate_acceleration`.""" # Check args if mass and target_length: raise ValueError('Cannot specify both mass and target length!') if not mass and not target_length: raise ValueError('You must specify either mass or target length!') # Clip time series as clost to `start` [s] as possible, if desired if start: startidx = (np.abs(self.force.tvec - start)).argmin() else: # Start as close to t = 0 as possible startidx = (np.abs(self.force.tvec - 0)).argmin() # Clip time series as close to `duration` [s] as possible, if desired if duration: endidx = (np.abs(self.force.tvec - duration)).argmin() else: endidx = len(self.force.tvec) - 1 # Figure out the index for the time after which acceleration must be 0. if zeroacc is not None: zeroaccidx = (np.abs(self.force.tvec - zeroacc)).argmin() else: zeroaccidx = None # Either use the mass that was provided, or calculate one if target_length: # Initialize with end-members mass = 0 # [kg] current_length = np.inf # [km] while current_length > target_length: mass += MASS_INC # Increase the mass # Calculate the runout length [km] based on this mass *_, disp, _ = self._integrate_acceleration( z_force, e_force, n_force, mass, startidx, endidx, detrend, zeroaccidx=zeroaccidx, ) current_length = ( _calculate_horizontal_distance(disp.E, disp.N)[-1] * KM_PER_M ) # [km] else: mass = int(mass) # Calculate compute_trajectory based on mass assigned above ( acceleration, velocity, displacement, traj_tvec, ) = self._integrate_acceleration( z_force, e_force, n_force, mass, startidx, endidx, detrend, zeroaccidx=zeroaccidx, ) return ( acceleration, velocity, displacement, mass, traj_tvec, ) def _slice_dem(self, dem_file, interp_spacing=0.1): r"""Slice through an input DEM along the trajectory path. Args: dem_file (str): DEM GeoTIFF to slice interp_spacing (int or float): [m] Density of interpolation points Returns: tuple: Tuple containing: - **horizontal_distance** (:class:`~numpy.ndarray`) – [m] Distance along path - **elevation** (:class:`~numpy.ndarray`) – [m] Elevation along `horizontal_distance` """ dem = xr.open_dataarray(dem_file).squeeze() # Define interpolation points in UTM space utm_crs = dem.rio.estimate_utm_crs() dem = dem.rio.reproject(utm_crs, resampling=Resampling.cubic_spline) transformer = Transformer.from_crs(4326, utm_crs) loc_utm = transformer.transform( self.force.data.source_lat, self.force.data.source_lon ) points = [ [x + loc_utm[0], y + loc_utm[1]] for x, y in zip(self.displacement.E, self.displacement.N) ] # Densify the coarse points path_x, path_y = [], [] x_prev, y_prev = points[0] for pt in points[1:]: x, y = pt seg_length = np.linalg.norm([y - y_prev, x - x_prev]) # Choose a number of pts that gives approximately interp_spacing n = int(seg_length / interp_spacing) # Append densified path path_x = np.hstack([path_x, np.linspace(x_prev, x, n)]) path_y = np.hstack([path_y, np.linspace(y_prev, y, n)]) x_prev, y_prev = x, y # Actually interpolate! profile = dem.interp( x=xr.DataArray(path_x), y=xr.DataArray(path_y), method='linear' ) # Find horizontal distance vector (works for curvy paths!) horizontal_distance = np.hstack( [ 0, np.cumsum( np.linalg.norm([np.diff(profile.x), np.diff(profile.y)], axis=0) ), ] ) # Check that interp_spacing wasn't too coarse by matching path lengths if not np.isclose(horizontal_distance[-1], self.horizontal_distance[-1]): raise ValueError('interp_spacing was too coarse. Try decreasing.') warnings.warn('Assuming DEM vertical unit is meters!') elevation = profile.data - profile.data[0] # Start at 0 and go negative return horizontal_distance, elevation
def _calculate_horizontal_distance(east_displacement, north_displacement): r"""Calculate horizontal distance vector from east and north displacement vectors. This is the horizontal distance "along the avalanche path" as a function of time. `horizontal_distance[-1]` is :math:`L`, the horizontal runout distance (which is shorter than the 3-D runout distance). Args: east_displacement (:class:`~numpy.ndarray`): Eastward displacement vector as a function of time [m] north_displacement (:class:`~numpy.ndarray`): Northward displacement vector as a function of time [m] Returns: :class:`~numpy.ndarray`: Horizontal distance as a function of time [m] """ dx = np.diff(east_displacement) dy = np.diff(north_displacement) return np.hstack([0, np.cumsum(np.linalg.norm([dx, dy], axis=0))])