Source code for BPMF.template_search
import os
import sys
from .config import cfg
from . import clib, utils, dataset
import matplotlib.pylab as plt
import numpy as np
import obspy as obs
import pandas as pd
import beampower as bp
try:
from scipy.stats import median_abs_deviation as scimad
except ImportError:
from scipy.stats import median_absolute_deviation as scimad
# from scipy.stats import median_abs_deviation as scimad
from scipy.ndimage.filters import gaussian_filter1d
from scipy.interpolate import interp1d
from scipy.signal import hilbert
from time import time as give_time
from math import isnan
from obspy.core import UTCDateTime as udt
[docs]
class TravelTimes(object):
"""Class for handling travel time tables.
"""
def __init__(
self,
tt_filename,
tt_folder_path=cfg.MOVEOUTS_PATH,
):
"""
Parameters
----------
tt_filename : str
Name of the hdf5 file with the travel time tables.
tt_folder_path : str, optional
Path to the folder where `tt_filename`` is located. Default
is `cfg.MOVEOUTS_PATH`.
"""
self.where = os.path.join(tt_folder_path, tt_filename)
@property
def n_sources(self):
if hasattr(self, "source_indexes"):
return len(self.source_indexes)
else:
print("Call self.read first.")
@property
def num_sources(self):
if hasattr(self, "source_indexes"):
return len(self.source_indexes)
else:
print("Call self.read first.")
@property
def phases(self):
if hasattr(self, "travel_times"):
return list(self.travel_times.columns)
elif hasattr(self, "travel_times_samp"):
return list(self.travel_times_samp.columns)
else:
return
# aliases
@property
def tts(self):
if hasattr(self, "travel_times"):
return self.travel_times
else:
print("Call self.read first.")
@property
def source_coords(self):
if hasattr(self, "source_coordinates"):
return self.source_coordinates
else:
print("Call self.read first.")
[docs]
def read(self, phases, source_indexes=None, read_coords=False, stations=None):
"""
Parameters
----------
phases : list of str
List of the seismic phases to read from `self.where`.
source_indexes : array-like, optional
Array-like with the source indexes to read. Default is None
(read the whole travel time table).
read_coords : boolean, optional
If True, the source coordinates are read from `self.where`.
stations : list of str, optional
If not None, is a list of station names to read a subset of
travel times from `self.where`. Default is None, that is, all
stations are read.
"""
import h5py as h5
tts = {}
with h5.File(self.where, mode="r") as fin:
grid_shape = fin["source_coordinates"]["depth"].shape
if source_indexes is None:
self.source_indexes = np.arange(np.prod(grid_shape))
else:
self.source_indexes = source_indexes
for ph in phases:
tts[ph] = {}
for sta in fin[f"tt_{ph}"].keys():
if stations is not None and sta not in stations:
continue
# flatten the lon/lat/dep grid as we work with
# flat source indexes
if source_indexes is not None:
# select a subset of the source grid
source_indexes_unravelled = np.unravel_index(
source_indexes, grid_shape
)
selection = np.zeros(grid_shape, dtype=bool)
selection[source_indexes_unravelled] = True
tts[ph][sta] = fin[f"tt_{ph}"][sta][selection].flatten().astype(
"float32"
)
else:
tts[ph][sta] = fin[f"tt_{ph}"][sta][()].flatten().astype(
"float32"
)
self.travel_times = pd.DataFrame(tts)
if read_coords:
source_coords = {}
if source_indexes is not None:
source_indexes_unravelled = np.unravel_index(source_indexes, grid_shape)
selection = np.zeros(grid_shape, dtype=bool)
selection[source_indexes_unravelled] = True
for coord in fin["source_coordinates"].keys():
source_coords[coord] = fin[
"source_coordinates"
][coord][selection].flatten()
else:
for coord in fin["source_coordinates"].keys():
source_coords[coord] = fin["source_coordinates"][coord][()].flatten()
self.source_coordinates = pd.DataFrame(source_coords, index=self.source_indexes)
[docs]
def convert_to_samples(self, sampling_rate, remove_tt_seconds=False):
"""
Creates a new `self.travel_times_samp` attribute.
Parameters
----------
sampling_rate : float
The sampling rate to use to convert times from seconds to samples.
remove_tt_seconds : boolean, optional
If True, delete `self.travel_times` after conversion. This may be
necessary to save memory.
"""
travel_times_samp = {}
for ph in self.travel_times.columns:
travel_times_samp[ph] = {}
for sta in self.travel_times.index:
travel_times_samp[ph][sta] = utils.sec_to_samp(
self.travel_times.loc[sta, ph],
sr=sampling_rate,
)
self.travel_times_samp = pd.DataFrame(travel_times_samp)
if remove_tt_seconds:
del self.travel_times
[docs]
def get_travel_times_array(
self, units="seconds", stations=None, phases=None, relative_to_first=False
):
"""
Parameters
----------
units : str, optional
Either of 'seconds' or 'samples'.
If 'seconds', build array from `self.travel_times`.
If 'samples', build array from `self.travel_times_samp`.
stations : list of str, optional
List of stations to include in the output array.
Default is None, which uses all stations.
phases : list of str, optional
List of phases to include in the output array.
Default is None, which uses all phases.
relative_to_first : boolean, optional
If True, the travel times are given as relative to the
earliest phase for each source.
"""
assert units in ["seconds", "samples"], print(
"units shoulds be either of 'seconds' or 'samples'"
)
if units == "seconds" and not hasattr(self, "travel_times"):
print("Call `self.read` first.")
return
elif units == "samples" and not hasattr(self, "travel_times_samp"):
print("Call `self.convert_to_samples` first.")
return
if units == "seconds":
attr = getattr(self, "travel_times")
elif units == "samples":
attr = getattr(self, "travel_times_samp")
if stations is None:
stations = attr.index
if phases is None:
phases = attr.columns
dtype = attr.loc[stations[0], phases[0]].dtype
tts = np.zeros(
(self.n_sources, len(stations), len(phases)),
dtype=dtype
)
for s, sta in enumerate(stations):
for p, ph in enumerate(phases):
tts[:, s, p] = attr.loc[sta, ph]
if relative_to_first:
tts = tts - np.min(tts, axis=(1, 2), keepdims=True)
return tts
[docs]
class WaveformTransform(object):
"""Class for waveform transform.
"""
def __init__(
self, transform_arr, stations, components, starttime, sampling_rate_hz
):
"""Initializes an instance that represents the transformed wavefield.
Two important class attributes are:
- transform_arr : Same as the input argument with the same name.
- transform : Transformed wavefield represented as a `obspy.Stream`.
Parameters
----------
transform_arr : numpy.ndarray
(num_stations, num_components, num_samples) `numpy.ndarray` with
some transform of the wavefield.
stations : array-like
Station names of the first axis of `transform_arr`.
components : array-like
Component names of the second axis of `transform_arr`.
starttime : `obspy.UTCDateTime`
Start time of `transform_arr`.
sampling_rate_hz : float
Sampling rate, in Hertz, of the time series in `transform_arr`.
"""
self.stations = stations
self.components = components
self.transform_arr = transform_arr
self.starttime = starttime
self.n_samples = transform_arr.shape[-1]
self.sampling_rate = sampling_rate_hz
self.transform = obs.Stream()
for s in range(len(self.stations)):
for c in range(len(self.components)):
tr = obs.Trace(data=transform_arr[s, c, :])
tr.stats.station = stations[s]
tr.stats.channel = components[c]
tr.stats.sampling_rate = sampling_rate_hz
tr.stats.starttime = starttime
self.transform += tr
@property
def sr(self):
return self.sampling_rate
@property
def delta(self):
return 1.0 / self.sampling_rate
@property
def duration(self):
return self.n_samples / self.sampling_rate
@property
def time(self):
if not hasattr(self, "sampling_rate"):
print("You need to define the instance's sampling rate first.")
return
if not hasattr(self, "_time"):
self._time = utils.time_range(
self.starttime, self.starttime + self.duration, 1.0 / self.sr, unit="ms"
)
return self._time
[docs]
def data_frame_view(self):
"""Returns `self.transform_arr` as a `pandas.DataFrame`.
The data frame's columns are the component names and the
rows are the station names.
"""
df = pd.DataFrame(index=self.stations, columns=self.components)
for s, sta in enumerate(df.index):
for p, ph in enumerate(df.columns):
df.loc[sta, ph] = self.transform_arr[s, p, :]
return df
[docs]
def get_np_array(
self,
stations,
components=None,
verbose=True,
):
"""Arguments go to `BPMF.utils.get_np_array`."""
if components is None:
components = self.components
component_aliases = {ph: ph for ph in components}
return utils.get_np_array(
self.transform,
stations,
components=components,
component_aliases=component_aliases,
n_samples=self.n_samples,
verbose=verbose,
)
[docs]
def slice(
self,
starttime,
duration=None,
num_samples=None,
stations=None,
components=None,
):
"""Returns a new `WaveformTransform` instance at the requested time.
Note that either `duration` or `num_samples` must be different
from None.
Parameters
----------
starttime : `obspy.UTCDateTime`
Time at which slicing starts.
duration : float, optional
If not None, this is the duration, in seconds, of the slice.
Defaults to None, in which case `duration` is calculated from
`num_samples` and `self.sampling_rate`.
num_samples : int, optional
If not None, this is the duration, in samples, of the slice.
Defaults to None, in which case `num_samples` is calculated
from `duration` and `self.sampling_rate`.
stations : array-like, optional
Names of the stations to use in the slice. Defaults to None,
in which case `stations` is taken to be the same as `self.stations`.
components : array-like, optional
Names of the components to use in the slice. Defaults to None,
in which case `components` is taken to be the same as
`self.components`.
Returns
-------
new_instance : `BPMF.template_search.WaveformTransform`
The new `WaveformTransform` instance sliced from `self`.
"""
if duration is None and num_samples is None:
print("One of `duration` or `num_samples` must be specified.")
return
if duration is None:
duration = num_samples / self.sampling_rate
if num_samples is None:
num_samples = int(duration * self.sampling_rate)
# first, slice using obspy.Stream's method
slice_ = self.transform.slice(
starttime=starttime, endtime=starttime + duration + self.delta
)
# for tr in slice_:
# tr.data = tr.data[:num_samples]
# then, format the output
if stations is None:
stations = self.stations
if components is None:
components = self.components
component_aliases = {ph: ph for ph in components}
transform_arr = utils.get_np_array(
slice_,
stations,
components=components,
component_aliases=component_aliases,
n_samples=num_samples,
)
# return a new class instance
new_instance = WaveformTransform(
transform_arr, stations, components, starttime, self.sr
)
return new_instance
[docs]
class Beamformer(object):
"""Class for backprojecting waveform features with beamforming."""
def __init__(
self,
data=None,
network=None,
phases=None,
travel_times=None,
moveouts_relative_to_first=True,
):
"""Initialize the essential attributes.
Once initialized, the `Beamformer` instance can be re-used for
different settings. For example, when processing multiple days in a row,
`Beamformer.set_data` and `Beamformer.set_network` can be called at the
beginning of each day to adapt to new data and potentially different
network configurations.
Parameters
-----------
data : `dataset.Data`, optional
`dataset.Data` class instance representing the seismic data
that `Beamformer` will backproject. Default is None, in which
case `self.set_data` must be called later.
network : `dataset.Network`, optional
`dataset.Network` class instance with the seismic network metadata.
Default is None, in which case `self.set_network` must be called later.
phases : list, optional
List of seismic phases used in the computation of the network
response. `phases` determines in which order `self.moveouts` is built.
Default is None, in which case `self.set_phases` must be called later.
travel_times : `TravelTimes`, optional
`TravelTimes` class instance with the travel time table that will
be used for backprojection the waveform features. Default is None,
in which case `self.set_travel_times` must be called later.
moveouts_relative_to_first : boolean, optional
If True, the moveouts used for backprojection are set relative to the
first seismic arrival for each source. Default is True.
"""
self.data = data
self.network = network
self.phases = phases
self.travel_times = travel_times
self.moveouts_relative_to_first = moveouts_relative_to_first
@property
def moveouts(self):
if hasattr(self, "travel_times"):
return self.travel_times.get_travel_times_array(
units="samples",
stations=self.stations,
phases=self.phases,
relative_to_first=self.moveouts_relative_to_first,
)
else:
print("Call `set_travel_times` first.")
@property
def n_stations(self):
if not hasattr(self, "network") or self.network == None:
print("You need to call set_network first.")
return
return self.network.n_stations
@property
def n_phases(self):
if not hasattr(self, "phases") or self.phases == None:
print("You need to call set_phases first.")
return
return len(self.phases)
@property
def n_sources(self):
if hasattr(self, "travel_times"):
return self.travel_times.n_sources
else:
print("Call `set_travel_times` first.")
@property
def num_sources(self):
if hasattr(self, "travel_times"):
return self.travel_times.num_sources
else:
print("Call `set_travel_times` first.")
@property
def stations(self):
if not hasattr(self, "network") or self.network == None:
print("You need to call set_network first.")
return
return self.network.stations
@property
def source_coordinates(self):
if hasattr(self, "travel_times"):
return self.travel_times.source_coordinates
else:
print("Call `set_travel_times` first.")
@staticmethod
def _likelihood(beam_volume):
likelihood = (beam_volume - beam_volume.min()) / (
beam_volume.max() - beam_volume.min()
)
# likelihood is not meant to be outside [0, 1] beside numerical
# imprecisions
likelihood = np.clip(likelihood, a_min=0., a_max=1.)
return likelihood
[docs]
def backproject(
self,
waveform_features,
reduce="max",
device="cpu",
out_of_bounds="strict",
num_threads=None,
):
"""Backproject the waveform features.
Parameters
--------------
waveform_features : (n_stations, n_components, n_samples) numpy.ndarray
Features of the waveform time series used for the
backprojection onto the grid of theoretical sources.
device : string, defaults to 'cpu'
Either 'cpu' or 'gpu', depending on the available hardware and
user's preferences.
reduce : string, defaults to 'max'
Either 'max' or 'none'. If 'max', returns the maximum beam at every
time. If 'none', returns the full space-time beam.
out_of_bounds : string, defaults to 'strict'
Either 'strict' (default) or 'flexible'.
- 'strict': A beam is computed if and only if the moveouts point to a
valid sample (that is, within the bounds of the data stream) for every
channel used in the beam.
- 'flexbile': A beam is computed as long as the moveouts point to a
valid sample for at least one channel. This option is particularly
useful for real time applications where an event might have been
recorded at the closest stations but not yet at the more distant ones.
num_threads : int or None, defaults to None
Number of threads for CPU parallelization. If None (default), uses one thread
per available (visible) CPU.
"""
if not hasattr(self, "weights_phases"):
print("You need to set self.weights_phases first.")
return
if not hasattr(self, "weights_sources"):
print("You need to set self.weights_sources first.")
return
if reduce == "max":
self.maxbeam, self.maxbeam_sources = bp.beampower.beamform(
waveform_features,
self.moveouts,
self.weights_phases,
self.weights_sources,
device=device,
out_of_bounds=out_of_bounds,
num_threads=num_threads,
reduce=reduce,
)
elif reduce == "none":
self.beam = bp.beampower.beamform(
waveform_features,
self.moveouts,
self.weights_phases,
self.weights_sources,
device=device,
out_of_bounds=out_of_bounds,
num_threads=num_threads,
reduce=reduce,
)
else:
print(f"'reduce' should be 'max' or 'none' but {reduce} was given.")
self.beam = None
[docs]
def find_detections(
self, detection_threshold, minimum_interevent_time, n_max_stations=None
):
"""Analyze the composite network response to find detections.
Parameters
-----------
detection_threshold: scalar or (n_samples,) numpy.ndarray, float
The number of running MADs taken above the running median
to define detections.
minimum_interevent_time: scalar, float
The shortest duration, in seconds, allowed between two
consecutive detections.
n_max_stations: integer, default to None
If not None and if smaller than the total number of stations in the
network, only extract the `n_max_stations` closest stations for
each theoretical source.
Returns
-----------
detections: dictionary,
Dictionary with data and metadata of the detected earthquakes.
"""
from obspy import Stream
self.detection_threshold = detection_threshold
self.minimum_interevent_time = minimum_interevent_time
sr = self.data.sr
minimum_interevent_time = utils.sec_to_samp(minimum_interevent_time, sr=sr)
# select peaks
peak_indexes = _detect_peaks(self.maxbeam, mpd=minimum_interevent_time)
# only keep peaks above detection threshold
peak_indexes = peak_indexes[
self.maxbeam[peak_indexes] > detection_threshold[peak_indexes]
]
# keep the largest peak for grouped detection
for i in range(len(peak_indexes)):
idx = np.int32(
np.arange(
max(0, peak_indexes[i] - minimum_interevent_time / 2),
min(
peak_indexes[i] + minimum_interevent_time / 2, len(self.maxbeam)
),
)
)
idx_to_update = np.where(peak_indexes == peak_indexes[i])[0]
peak_indexes[idx_to_update] = np.argmax(self.maxbeam[idx]) + idx[0]
peak_indexes = np.unique(peak_indexes)
peak_indexes = np.asarray(peak_indexes)
source_indexes = self.maxbeam_sources[peak_indexes]
# extract waveforms
detections = []
data_path, data_filename = os.path.split(self.data.where)
for i in range(len(peak_indexes)):
src_idx = self.source_coordinates.index[source_indexes[i]]
event = Stream()
ot_i = self.data.date + peak_indexes[i] / sr
mv = self.moveouts[source_indexes[i], ...] / sr
if n_max_stations is not None:
# use moveouts as a proxy for distance
# keep only the n_max_stations closest stations
mv_max = np.sort(mv[:, 0])[n_max_stations - 1]
else:
mv_max = np.finfo(np.float32).max
stations_in = np.asarray(self.network.stations)[mv[:, 0] < mv_max]
latitude = self.source_coordinates.loc[src_idx, "latitude"]
longitude = self.source_coordinates.loc[src_idx, "longitude"]
depth = self.source_coordinates.loc[src_idx, "depth"]
event = dataset.Event(
ot_i,
mv,
stations_in,
self.phases,
data_filename,
data_path,
latitude=latitude,
longitude=longitude,
depth=depth,
sampling_rate=sr,
data_reader=self.data.data_reader,
)
aux_data = {}
aux_data["maxbeam"] = self.maxbeam[peak_indexes[i]]
aux_data["source_index"] = src_idx
event.set_aux_data(aux_data)
detections.append(event)
print(f"Extracted {len(detections):d} events.")
self.peak_indexes = peak_indexes
self.source_indexes = source_indexes
return detections, peak_indexes, source_indexes
[docs]
def remove_baseline(self, window, attribute="composite"):
"""Remove baseline from network response
"""
# convert window from seconds to samples
window = int(window * self.sampling_rate)
attr_baseline = self._baseline(getattr(self, attribute), window)
setattr(self, attribute, getattr(self, attribute) - attr_baseline)
[docs]
def return_pd_series(self, attribute="maxbeam"):
"""Return the network response as a Pandas.Series.
"""
import pandas as pd
time_series = getattr(self, attribute)
indexes = pd.date_range(
start=str(self.starttime),
freq="{}S".format(1.0 / self.data.sr),
periods=len(time_series),
)
pd_attr = pd.Series(data=time_series, index=indexes)
return pd_attr
[docs]
def smooth_maxbeam(self, window):
"""Smooth the network response with a gaussian kernel."""
from scipy.ndimage.filters import gaussian_filter1d
# convert window from seconds to samples
window = int(window * self.sampling_rate)
self.smoothed = gaussian_filter1d(self.composite, window)
[docs]
def set_data(self, data):
"""Attribute `data` to the class instance.
Parameters
----------
data: `dataset.Data`
Instance of `dataset.Data`.
"""
self.data = data
self.starttime = self.data.date
[docs]
def set_network(self, network):
"""Attribute `network` to the class instance.
`network` determines which stations are included in the computation
of the beams. All arrays with a station axis are ordered according to
`network.stations`.
Parameters
----------
network : dataset.Network
The Network instance with the station network information.
`network` can force the network response to be computed only on
a subset of the data stored in `data`.
"""
self.network = network
[docs]
def set_phases(self, phases):
"""Attach `phases` to the class instance.
Parameters
----------
phases : list of str
List of strings representing the phase names to read from
the travel time table. `phases` determines the order in which
`self.moveouts` is built.
"""
self.phases = phases
[docs]
def set_travel_times(self, travel_times):
"""Attaches `travel_times` to the class instance.
Parameters
-----------
travel_times : TravelTimes
TravelTimes instance.
"""
self.travel_times = travel_times
[docs]
def set_source_coordinates(self, source_coords):
"""Attribute `_source_coordinates` to the class instance.
Parameters
------------
source_coords: dictionary
Dictionary with 3 fields: 'latitude', 'longitude' and 'depth'
"""
self._source_coordinates = source_coords
[docs]
def set_weights(self, weights_phases=None, weights_sources=None):
"""Set the weights required by `beampower`.
weights_phases: (n_stations, n_channels, n_phases) np.ndarray, optional
Weight given to each station and channel for a given phase. For
example, horizontal components might be given a small or zero
weight for the P-wave stacking.
weights_sources: (n_sources, n_stations) np.ndarray, optional
Source-receiver-specific weights. For example, based on the
source-receiver distance.
"""
if weights_phases is not None:
self.weights_phases = weights_phases
if weights_sources is not None:
self.weights_sources = weights_sources
[docs]
def set_weights_sources(self, n_max_stations, n_min_stations=0):
"""Set network-geometry-based weights of each source-receiver pair.
Parameters
------------
n_max_stations: scalar, int
Maximum number of stations used at each theoretical source. Only
the `n_max_stations` stations will be set a weight > 0.
"""
weights_sources = np.ones((self.n_sources, self.n_stations), dtype=np.float32)
self.data.set_availability(self.stations)
operational_stations = self.data.availability_per_sta.loc[self.stations].values
mv = self.moveouts[:, operational_stations, 0]
n_max_stations = min(mv.shape[1], n_max_stations)
if (n_max_stations < self.n_stations) and (n_max_stations > 0):
cutoff_mv = np.max(
np.partition(mv, n_max_stations - 1)[:, :n_max_stations],
axis=1,
keepdims=True,
)
weights_sources[self.moveouts[:, :, 0] > cutoff_mv] = 0.0
weights_sources[:, ~operational_stations] = 0.
if n_min_stations > 0:
n_stations_per_source = np.sum(weights_sources > 0.0, axis=-1)
weights_sources[n_stations_per_source < n_min_stations, :] = 0.0
self.weights_sources = weights_sources
[docs]
def weights_station_density(
self, cutoff_dist=None, lower_percentile=0.0, upper_percentile=100.0
):
"""Compute station weights to balance station density.
Areas of high station density produce stronger network responses than
areas with sparse coverage, and thus might prevent detecting earthquakes
where stations are scarcer.
Parameters
-----------
cutoff_dist: scalar float, default to None
All station pairs (i,j) are attributed a number from a gaussian
distribution with standard deviation `cutoff_dist`, in km. The
weight of station i is: `w_i = 1/(sum_j(exp(-D_ij**2/cutoff_dist**2)))`.
If None, `cutoff_dist` is set to the median interstation distance.
lower_percentile: scalar float, default to 0
If `lower_percentile > 0`, the weights are clipped above the
`lower_percentile`-th percentile.
upper_percentile: scalar float, default to 100
If `upper_percentile < 100`, the weights are clipped below the
`upper_percentile`-th percentile.
Returns
-------
weights_sta_density: (n_stations,) `numpy.ndarray`
The station density weights.
"""
if cutoff_dist is None:
cutoff_dist = np.median(
self.network.interstation_distances.values[
self.network.interstation_distances.values != 0.0
]
)
weights_sta_density = np.zeros(self.network.n_stations, dtype=np.float32)
for s, sta in enumerate(self.network.stations):
dist_sj = self.network.interstation_distances.loc[sta]
weights_sta_density[s] = 1.0 / np.sum(
np.exp(-(dist_sj**2) / cutoff_dist**2)
)
if lower_percentile > 0.0:
weights_sta_density = np.clip(
weights_sta_density,
np.percentile(weights_sta_density, lower_percentile),
weights_sta_density.max(),
)
if upper_percentile < 100.0:
weights_sta_density = np.clip(
weights_sta_density,
weights_sta_density.min(),
np.percentile(weights_sta_density, upper_percentile),
)
return weights_sta_density
def _baseline(self, X, window):
"""Compute a baseline.
The baseline is a curve connecting the local minima. Removing the
baseline is equivalent to some kind of low-pass filtering.
"""
n_windows = np.int32(np.ceil(X.size / window))
minima = np.zeros(n_windows, dtype=X.dtype)
minima_args = np.zeros(n_windows, dtype=np.int32)
for i in range(n_windows):
minima_args[i] = i * window + X[i * window : (i + 1) * window].argmin()
minima[i] = X[minima_args[i]]
# ----------------------------------------
# --------- build interpolation ----------
interpolator = interp1d(
minima_args, minima, kind="linear", fill_value="extrapolate"
)
bline = interpolator(np.arange(X.size))
return bline
# -------------------------------------------
# Plotting methods
# -------------------------------------------
[docs]
def plot_maxbeam(self, ax=None, detection=None, **kwargs):
"""Plot the composite network response.
Parameters
-----------
ax: plt.Axes, default to None
If not None, plot in this axis.
detection: dataset.Event, default to None
A dataset.Event instance of a given detection.
figsize: tuple, default to (20, 7)
Width and height of the figure, in inches.
Returns
--------
fig: plt.Figure
The Figure instance produced by this method.
"""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
if ax is None:
# plot the maximum beam
fig = plt.figure(
"maximum_beam", figsize=kwargs.get("figsize", (15, 10))
)
ax = fig.add_subplot(111)
else:
fig = ax.get_figure()
ax.plot(
self.data.time, self.maxbeam, rasterized=kwargs.get("rasterized", True)
)
if hasattr(self, "detection_threshold"):
ax.plot(
self.data.time,
self.detection_threshold,
color="C3",
ls="--",
label="Detection Threshold",
)
ax.plot(
self.data.time[self.peak_indexes],
self.maxbeam[self.peak_indexes],
marker="o",
ls="",
color="C3",
)
ax.legend(loc="upper right")
ax.set_xlabel("Time of the day")
ax.set_ylabel("Maximum Beam")
ax.set_xlim(self.data.time.min(), self.data.time.max())
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.legend(loc="upper right")
if detection is not None:
ot = np.datetime64(detection.origin_time)
ax.annotate(
"detection",
(ot, detection.aux_data["maxbeam"]),
(
ot + np.timedelta64(15, "m"),
min(ax.get_ylim()[1], 2.0 * detection.aux_data["maxbeam"]),
),
arrowprops={"width": 2, "headwidth": 5, "color": "k"},
)
return fig
[docs]
def plot_detection(
self,
detection,
figsize=(20, 20),
component_aliases={"N": ["N", "1"], "E": ["E", "2"], "Z": ["Z"]},
n_stations=None,
):
"""Plot a detection and the maximum beam.
Parameters
-----------
detection: dataset.Event
A dataset.Event instance of a given detection.
figsize: tuple, default to (20, 20)
Widht and height of the figure, in inches.
component_aliases: Dictionary, optional
Sometimes, components might be named differently than N, E, Z. This
dictionary tells the function which alternative component names can be
associated with each "canonical" component. For example,
`component_aliases['N'] = ['N', '1']` means that the function will also
check the '1' component in case the 'N' component doesn't exist.
n_stations: scalar int or None, default to None
If not None, is the number of stations to plot. The closest
stations will be plotted.
Returns
-------
fig: plt.Figure
The Figure instance produced by this method.
"""
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
if n_stations is None:
stations = self.network.stations
else:
stations = (
detection.moveouts[detection.moveouts.columns[0]]
.sort_values()[:n_stations]
.index
)
sr = self.data.sr
fig = plt.figure(f"detection_{detection.origin_time}", figsize=figsize)
grid = fig.add_gridspec(
nrows=len(stations) + 2, ncols=len(self.network.components), hspace=0.35
)
start_times, end_times = [], []
wav_axes = []
ax_maxbeam = fig.add_subplot(grid[:2, :])
self.plot_maxbeam(ax=ax_maxbeam, detection=detection)
ax_maxbeam.set_ylim(
max(-0.5, ax_maxbeam.get_ylim()[0]), 2.0 * detection.aux_data["maxbeam"]
)
beam = 0.0
for s, sta in enumerate(stations):
for c, cp in enumerate(self.network.components):
ax = fig.add_subplot(grid[2 + s, c])
for cp_alias in component_aliases[cp]:
tr = detection.traces.select(station=sta, component=cp_alias)
if len(tr) > 0:
# succesfully retrieved data
break
if len(tr) == 0:
continue
else:
tr = tr[0]
time = utils.time_range(
tr.stats.starttime, tr.stats.endtime + 1.0 / sr, 1.0 / sr, unit="ms"
)
start_times.append(time[0])
end_times.append(time[-1])
ax.plot(
time[: detection.n_samples],
tr.data[: detection.n_samples],
color="k",
)
# plot the theoretical pick
phase = detection.aux_data[f"phase_on_comp{cp_alias}"].upper()
offset_ph = detection.aux_data[f"offset_{phase}"]
ax.axvline(
time[0] + np.timedelta64(int(1000.0 * offset_ph), "ms"), color="C3"
)
ax.text(0.05, 0.05, f"{sta}.{cp_alias}", transform=ax.transAxes)
wav_axes.append(ax)
for ax in wav_axes:
ax.set_xlim(min(start_times), max(end_times))
ax.xaxis.set_major_formatter(
mdates.ConciseDateFormatter(ax.xaxis.get_major_locator())
)
plt.subplots_adjust(top=0.95, bottom=0.06, right=0.98, left=0.06)
return fig
[docs]
def plot_likelihood(self, likelihood=None, time_index=None, **kwargs):
"""Plot likelihood (beam) slices at a given time."""
from cartopy.crs import PlateCarree
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
from mpl_toolkits.axes_grid1 import make_axes_locatable
from . import plotting_utils
if time_index is None:
src_idx, time_index = np.unravel_index(self.beam.argmax(), self.beam.shape)
if likelihood is None:
likelihood = self._likelihood(self.beam[:, time_index])
# define slices
longitude = self.source_coordinates["longitude"].iloc[src_idx]
latitude = self.source_coordinates["latitude"].iloc[src_idx]
depth = self.source_coordinates["depth"].iloc[src_idx]
hor_slice = np.where(self.source_coordinates["depth"] == depth)[0]
lon_slice = np.where(self.source_coordinates["latitude"] == latitude)[0]
lat_slice = np.where(self.source_coordinates["longitude"] == longitude)[0]
# initialize map
data_coords = PlateCarree()
lat_min = np.min(self.source_coordinates["latitude"].iloc[hor_slice])
lat_max = np.max(self.source_coordinates["latitude"].iloc[hor_slice])
lon_min = np.min(self.source_coordinates["longitude"].iloc[hor_slice])
lon_max = np.max(self.source_coordinates["longitude"].iloc[hor_slice])
ax = plotting_utils.initialize_map(
[lon_min, lon_max],
[lat_min, lat_max],
**kwargs,
)
fig = ax.get_figure()
mappable = ax.tricontourf(
self.source_coordinates["longitude"].iloc[hor_slice],
self.source_coordinates["latitude"].iloc[hor_slice],
likelihood[hor_slice],
levels=np.linspace(0., 1.0, 10),
cmap="inferno",
alpha=0.50,
transform=data_coords,
zorder=-1,
)
ax.scatter(
self.network.longitude,
self.network.latitude,
marker="v",
color="k",
s=50,
transform=data_coords,
zorder=-1.5,
)
# add slices
divider = make_axes_locatable(ax)
ax_lon = divider.append_axes("bottom", size="50%", pad=0.2, axes_class=plt.Axes)
ax_lat = divider.append_axes("right", size="50%", pad=0.2, axes_class=plt.Axes)
projected_coords = ax.projection.transform_points(
data_coords,
self.source_coordinates["longitude"].iloc[lon_slice],
self.source_coordinates["latitude"].iloc[lon_slice],
)
ax_lon.tricontourf(
projected_coords[..., 0],
self.source_coordinates["depth"].iloc[lon_slice],
likelihood[lon_slice],
levels=np.linspace(0.0, 1.0, 10),
cmap="inferno",
alpha=0.50,
zorder=-1,
)
plt.setp(ax_lon.get_xticklabels(), visible=False)
ax_lon.invert_yaxis()
ax_lon.set_ylabel("Depth (km)")
projected_coords = ax.projection.transform_points(
data_coords,
self.source_coordinates["longitude"].iloc[lat_slice],
self.source_coordinates["latitude"].iloc[lat_slice],
)
ax_lat.tricontourf(
self.source_coordinates["depth"].iloc[lat_slice],
projected_coords[..., 1],
likelihood[lat_slice],
levels=np.linspace(0.0, 1.0, 10),
cmap="inferno",
alpha=0.50,
zorder=-1,
)
plt.setp(ax_lat.get_yticklabels(), visible=False)
ax_lat.set_xlabel("Depth (km)")
cax = divider.append_axes("top", size="3%", pad=0.1, axes_class=plt.Axes)
plt.colorbar(
mappable, cax=cax, label="Location Likelihood", orientation="horizontal"
)
cax.xaxis.set_label_position("top")
cax.xaxis.tick_top()
return fig
def _rectangular_domain(self, lon0, lat0, side_km=100.0):
"""Return a boolean array indicating which points in a given grid are inside
a rectangular domain centered at the given longitude and latitude.
Parameters
----------
lon0 : float
Longitude of the center of the domain, in degrees.
lat0 : float
Latitude of the center of the domain, in degrees.
side_km : float, optional
Length of the sides of the rectangular domain, in kilometers.
Default is 100km.
Returns
-------
selection : ndarray
1-D boolean array indicating which grid points are inside the domain.
Notes
-----
This function uses the Haversine formula to compute the distances between
the center of the domain and each grid point. It assumes a spherical Earth
of radius 6371.0 km.
"""
R_earth_km = 6371.0 # km
colat0 = 90.0 - lat0
Rlat = R_earth_km * np.sin(np.deg2rad(colat0))
dist_per_lat = 2.0 * np.pi * (1.0 / 360.0) * Rlat
dist_per_lon = 2.0 * np.pi * (1.0 / 360.0) * R_earth_km
longitudes = self.source_coordinates["longitude"].values
latitudes = self.source_coordinates["latitude"].values
selection = (np.abs(longitudes - lon0) * dist_per_lon < side_km / 2.0) & (
np.abs(latitudes - lat0) * dist_per_lat < side_km / 2.0
)
return selection
def _compute_location_uncertainty(
self, event_longitude, event_latitude, event_depth, likelihood, domain
):
"""
Compute the horizontal and vertical uncertainties of an event location.
Parameters
----------
event_longitude : float
The longitude, in decimal system, of the event located with the
present Beamformer instance.
event_latitude : float
The latitude, in decimal system, of the event located with the
present Beamformer instance.
event_depth : float
The depth, in km, of the event located with the present
Beamformer instance.
likelihood : ndarray
A 1D numpy array containing the likelihood of each source location in
the beamformer domain.
domain : ndarray
A 1D numpy array containing the indices of the beamformer domain.
Returns
-------
tuple
A tuple of two floats representing the horizontal and vertical
uncertainties of the event location in km, respectively.
Notes
-----
The horizontal uncertainty is computed as the weighted average of the
distance from each source in the domain to the event location, with the
weight being the likelihood of each source. The vertical uncertainty is
computed as the weighted average of the absolute depth difference between
each source in the domain and the event depth, with the weight being the
likelihood of each source. The distance is calculated using the Geodesic
library from the Cartopy package.
"""
from cartopy.geodesic import Geodesic
# initialize the Geodesic instance
G = Geodesic()
epi_distances = G.inverse(
np.array([event_longitude, event_latitude]),
np.hstack(
(
self.source_coordinates["longitude"].values[domain, None],
self.source_coordinates["latitude"].values[domain, None],
)
),
)
pointwise_distances = np.asarray(epi_distances)[:, 0].squeeze() / 1000.0
# horizontal uncertainty
hunc = np.sum(likelihood * pointwise_distances) / np.sum(likelihood)
# vertical uncertainty
depth_diff = np.abs(event_depth - self.source_coordinates["depth"].values[domain])
vunc = np.sum(likelihood * depth_diff) / np.sum(likelihood)
return hunc, vunc
def _detect_peaks(
x,
mph=None,
mpd=1,
threshold=0,
edge="rising",
kpsh=False,
valley=False,
show=False,
ax=None,
):
"""see `utils._detect_peaks`.
"""
return utils._detect_peaks(
x, mph=mph, mpd=mpd, threshold=threshold, edge=edge,
kpsh=kpsh, valley=valley, show=show, ax=ax
)
def _plot_peaks(x, mph, mpd, threshold, edge, valley, ax, ind):
"""Plot results of the detect_peaks function, see its help."""
try:
import matplotlib.pyplot as plt
except ImportError:
print("matplotlib is not available.")
else:
if ax is None:
_, ax = plt.subplots(1, 1, figsize=(8, 4))
ax.plot(x, "b", lw=1)
if ind.size:
label = "valley" if valley else "peak"
label = label + "s" if ind.size > 1 else label
ax.plot(
ind,
x[ind],
"+",
mfc=None,
mec="r",
mew=2,
ms=8,
label="{:d} {}".format(ind.size, label),
)
ax.legend(loc="best", framealpha=0.5, numpoints=1)
ax.set_xlim(-0.02 * x.size, x.size * 1.02 - 1)
ymin, ymax = x[np.isfinite(x)].min(), x[np.isfinite(x)].max()
yrange = ymax - ymin if ymax > ymin else 1
ax.set_ylim(ymin - 0.1 * yrange, ymax + 0.1 * yrange)
ax.set_xlabel("Data #", fontsize=14)
ax.set_ylabel("Amplitude", fontsize=14)
mode = "Valley detection" if valley else "Peak detection"
ax.set_title(
"{} (mph={}, mpd={:d}, threshold={}, edge='{}')".format(
mode, str(mph), mpd, str(threshold), edge
)
)
# plt.grid()
plt.show()
[docs]
def baseline(X, w):
n_windows = np.int32(np.ceil(X.size / w))
minima = np.zeros(n_windows, dtype=X.dtype)
minima_args = np.zeros(n_windows, dtype=np.int32)
for i in range(n_windows):
minima_args[i] = i * w + X[i * w : (i + 1) * w].argmin()
minima[i] = X[minima_args[i]]
# ----------------------------------------
# --------- build interpolation ----------
interpolator = interp1d(
minima_args, minima, kind="linear", fill_value="extrapolate"
)
bline = interpolator(np.arange(X.size))
return bline
[docs]
def time_dependent_threshold(
network_response, window, overlap=0.75, CNR_threshold=cfg.N_DEV_BP_THRESHOLD
):
"""Compute a time-dependent detection threshold.
Parameters
-----------
network_response: (n_samples,) numpy.ndarray, float
Composite network response on which we calculate
the detection threshold.
window: scalar, integer
Length of the sliding window, in samples, over
which we calculate the running statistics used
in the detection threshold.
overlap: scalar, float, default to 0.75
Ratio of overlap between two contiguous windows.
CNR_threshold: scalar, float, default to 10
Number of running MADs above running median that
defines the detection threshold.
Returns
--------
detection_threshold: (n_samples,) numpy.ndarray
Detection threshold on the network response.
"""
try:
from scipy.stats import median_abs_deviation as scimad
except ImportError:
from scipy.stats import median_absolute_deviation as scimad
from scipy.interpolate import interp1d
# calculate n_windows given window
# and overlap
shift = int((1.0 - overlap) * window)
n_windows = int((len(network_response) - window) // shift) + 1
mad_ = np.zeros(n_windows + 2, dtype=np.float32)
med_ = np.zeros(n_windows + 2, dtype=np.float32)
time = np.zeros(n_windows + 2, dtype=np.float32)
for i in range(1, n_windows + 1):
i1 = i * shift
i2 = min(network_response.size, i1 + window)
maxbeam_window = network_response[i1:i2]
# non_zero = maxbeam_window != 0
# if sum(non_zero) < 3:
# # won't be possible to calculate median
# # and mad on that few samples
# continue
# med_[i] = np.median(maxbeam_window[non_zero])
# mad_[i] = scimad(maxbeam_window[non_zero])
med_[i] = np.median(maxbeam_window)
mad_[i] = scimad(maxbeam_window)
time[i] = (i1 + i2) / 2.0
# add boundary cases manually
time[0] = 0.0
mad_[0] = mad_[1]
med_[0] = med_[1]
time[-1] = len(network_response)
mad_[-1] = mad_[-2]
med_[-1] = med_[-2]
threshold = med_ + CNR_threshold * mad_
interpolator = interp1d(
time,
threshold,
kind="slinear",
fill_value=(threshold[0], threshold[-1]),
bounds_error=False,
)
full_time = np.arange(0, len(network_response))
threshold = interpolator(full_time)
return threshold
[docs]
def time_dependent_threshold_pd(network_response, window):
"""
Calculate a time dependent detection threshold
using the rolling function from pandas.
Parameters
-----------
network_response: numpy array,
Composite network response on which we calculate
the detection threshold.
window: scalar, integer
Length of the sliding window, in samples, over
which we calculate the running statistics used
in the detection threshold.
Returns
--------
detection_threshold: numpy array,
Detection threshold that will serve to select
the well backprojected events.
"""
network_response_pd = pd.Series(network_response)
r = network_response_pd.rolling(window=window)
# get running median and running mad
run_med = r.median().shift(1)
run_mad = r.apply(scimad).shift(1)
# combine these into a detection threshold
detection_threshold = run_med + cfg.N_DEV_BP_THRESHOLD * run_mad
return detection_threshold.values
# ---------------------------------------------------------------
# Detection traces
# ---------------------------------------------------------------
[docs]
def saturated_envelopes(traces, anomaly_threshold=1.0e-11, max_dynamic_range=1.0e5):
"""Compute the saturated envelopes.
Parameters
------------
traces: (n_stations, n_components, n_samples) numpy.ndarray
Input waveform time series.
anomaly_threshold: scalar, float, default to 1e-11
Scalar threshold below which the MAD is suspicious. It should be a very
small number if you are working on physical unit seismograms.
max_dynamic_range: scalar, float, default to 1e5
Higher cutoff on the standardized envelopes. This mitigates the
contamination of the network response by transient, undesired high
energy signals such as spikes.
"""
n_stations, n_components, n_samples = traces.shape
tstart = give_time()
waveform_features = envelope_parallel(
traces
) # take the upper envelope of the traces
tend = give_time()
print(f"Computed the envelopes in {tend-tstart:.2f}sec.")
data_availability = np.zeros(n_stations, dtype=np.int32)
for s in range(n_stations):
for c in range(n_components):
missing_samples = waveform_features[s, c, :] == 0.0
if np.sum(missing_samples) > waveform_features.shape[-1] / 2:
# too many samples are missing, don't use this trace
# do not increment data_availability
waveform_features[s, c, :] = 0.0
continue
median = np.median(waveform_features[s, c, ~missing_samples])
mad = scimad(waveform_features[s, c, ~missing_samples])
if mad < anomaly_threshold:
waveform_features[s, c, :] = 0.0
continue
waveform_features[s, c, :] = (waveform_features[s, c, :] - median) / mad
waveform_features[s, c, missing_samples] = 0.0
# saturate traces
waveform_features[s, c, :] = np.clip(
waveform_features[s, c, :],
waveform_features[s, c, :],
max_dynamic_range,
)
data_availability[s] += 1
return waveform_features, data_availability
[docs]
def envelope_parallel(traces):
"""Compute the envelope of traces.
The envelope is defined as the modulus of the complex
analytical signal (a signal whose Fourier transform only has
energy in positive frequencies).
Parameters
-------------
traces: (n_stations, n_channels, n_samples) numpy.ndarray, float
The input time series.
Returns
-------------
envelopes: (n_stations, n_channels, n_samples) numpy.ndarray, float
The moduli of the analytical signal of the input traces.
"""
import concurrent.futures
traces_reshaped = traces.reshape(-1, traces.shape[-1])
with concurrent.futures.ProcessPoolExecutor() as executor:
envelopes = np.float32(list(executor.map(envelope, traces_reshaped)))
return envelopes.reshape(traces.shape)
[docs]
def envelope(trace):
"""Compute the envelope of trace.
The envelope is defined as the modulus of the complex
analytical signal (a signal whose Fourier transform only has
energy in positive frequencies).
Parameters
-------------
trace: (n_samples) numpy.ndarray, float
The input time series.
Returns
-------------
envelope: (n_samples) numpy.ndarray, float
The modulus of the analytical signal of the input traces.
"""
from scipy.signal import hilbert
return np.float32(np.abs(hilbert(trace)))