import os
import sys
import glob
import numpy as np
import h5py as h5
import pandas as pd
import pathlib
import warnings
import obspy as obs
from scipy.signal import find_peaks, peak_widths
from obspy.core import UTCDateTime as udt
from time import time as give_time
from .config import cfg
from . import dataset
# -------------------------------------------------
# Filtering routines
# -------------------------------------------------
[docs]
def bandpass_filter(
X,
filter_order=4,
freqmin=cfg.MIN_FREQ_HZ,
freqmax=cfg.MAX_FREQ_HZ,
f_Nyq=cfg.SAMPLING_RATE_HZ / 2.0,
taper_alpha=0.01,
zerophase=True,
):
"""
Parameters
-----------
X: (n x m) numpy array,
Numpy array of n observations of m samples each.
Use X.reshape(1, -1) if you want to process a
single observation.
filter_order: integer scalar, default to 4,
Order/number of corners of the bandpass filter.
freqmin: scalar float, default to cfg.MIN_FREQ_HZ,
Low frequency cutoff.
freqmax: scalar float, default to cfg.MAX_FREQ_HZ,
High frequency cutoff.
f_Nyq: scalar float, default to cfg.SAMPLING_RATE_HZ/2,
Nyquist frequency of the data. By definition,
the Nyquist frequency is half the sampling rate.
taper_alpha: scalar float, default to 0.01,
Defines the sharpness of the edges of the taper
function. We use the tukey window function. The
default value of 0.01 produces sharp windows.
zerophase : bool, optional
If True, the filter is applied in the forward and
reverse direction to cancel out phase shifting.
Returns
--------
filtered_X: (n x m) numpy array,
The input array X after filtering.
"""
from scipy.signal import iirfilter, tukey
# from scipy.signal import lfilter
from scipy.signal import zpk2sos, sosfilt
from scipy.signal import detrend
# detrend the data
X = detrend(X, type="constant", axis=-1)
X = detrend(X, type="linear", axis=-1)
# design the taper function
taper = np.repeat(tukey(X.shape[1], alpha=taper_alpha), X.shape[0])
taper = taper.reshape(X.shape[0], X.shape[1], order="F")
# design the filter
z, p, k = iirfilter(
filter_order,
[freqmin / f_Nyq, freqmax / f_Nyq],
btype="bandpass",
ftype="butter",
output="zpk",
)
sos = zpk2sos(z, p, k)
# apply the filter
filtered_X = sosfilt(sos, X * taper)
if zerophase:
filtered_X = sosfilt(sos, filtered_X[:, ::-1])[:, ::-1]
return filtered_X
[docs]
def lowpass_chebyshev_I(
X, freqmax, sampling_rate, order=8, max_ripple=5, zerophase=False
):
"""Apply low-pass type I Chebyshev filter.
Parameters
----------
X : array-like
1D array-like with data to filter.
freqmax : float
High cut-off frequency, in Hz.
sampling_rate : float
Sampling rate, in Hz, of `X`.
order : int, optional
Order of the filter. Defaults to 8.
max_ripple : int, optional
Defaults to 5.
zerophase : bool, optional
If True, the filter is applied in the forward and
reverse direction to cancel out phase shifting.
Returns
-------
X : array-like
1D array-like with filtered data.
"""
from scipy.signal import cheby1, sosfilt
nyquist = sampling_rate / 2.0
sos = cheby1(
order,
max_ripple,
freqmax / nyquist,
analog=False,
btype="lowpass",
output="sos",
)
X = sosfilt(sos, X)
if zerophase:
X = sosfilt(sos, X[::-1])[::-1]
return X
[docs]
def lowpass_chebyshev_II(
X, freqmax, sampling_rate, order=3, min_attenuation_dB=40.0, zerophase=False
):
"""Apply low-pass type II Chebyshev filter.
Parameters
----------
X : array-like
1D array-like with data to filter.
freqmax : float
High cut-off frequency, in Hz.
sampling_rate : float
Sampling rate, in Hz, of `X`.
order : int, optional
Order of the filter. Defaults to 3.
min_attenuation_dB : float, optional
The minimum attenuation required in the stop band, in decibels.
Defaults to 40.
zerophase : bool, optional
If True, the filter is applied in the forward and
reverse direction to cancel out phase shifting.
Returns
-------
X : array-like
1D array-like with filtered data.
"""
from scipy.signal import cheby2, sosfilt
nyquist = sampling_rate / 2.0
sos = cheby2(
order,
min_attenuation_dB,
# freqmax/nyquist,
freqmax,
analog=False,
fs=sampling_rate,
btype="lowpass",
output="sos",
)
X = sosfilt(sos, X)
if zerophase:
X = sosfilt(sos, X[::-1])[::-1]
return X
[docs]
def preprocess_stream(
stream,
freqmin=None,
freqmax=None,
target_SR=None,
remove_response=False,
remove_sensitivity=False,
plot_resp=False,
target_duration=None,
target_starttime=None,
target_endtime=None,
minimum_length=0.75,
minimum_chunk_duration=600.0,
verbose=True,
SR_decimals=1,
decimation_method="simple",
allow_oversampling=False,
unit="VEL",
n_threads=1,
**kwargs,
):
"""
Preprocesses a stream of seismic data.
Parameters
----------
stream : obspy.Stream
A stream of seismic data.
freqmin : float, optional
The minimum frequency to bandpass filter the data.
freqmax : float, optional
The maximum frequency to bandpass filter the data.
target_SR : float, optional
The target sampling rate of the data.
remove_response : bool, optional
Whether to remove instrument response from the data.
remove_sensitivity : bool, optional
Whether to remove instrument sensitivity from the data.
plot_resp : bool, optional
Whether to plot the instrument response of the data.
target_duration : float, optional
The target duration of the data.
target_starttime : obspy.UTCDateTime, optional
The start time of the target data.
target_endtime : obspy.UTCDateTime, optional
The end time of the target data.
minimum_length : float, optional
The minimum length of the data as a fraction of the target duration.
minimum_chunk_duration : float, optional
The minimum duration of each data chunk.
verbose : bool, optional
Whether to print verbose output during processing.
SR_decimals : int, optional
The number of decimals to round sampling rate values to.
decimation_method : str, optional
The method used for decimation. Either 'simple' or 'fourier'.
allow_oversampling : bool, optional
If True, raw traces that have a sampling rate lower than the target
sampling rate will be oversampled instead of being discarded.Defaults
to False.
unit : str, optional
The unit of the data.
n_threads : int, optional
The number of threads over which preprocesing is parallelized.
**kwargs : dict
Other keyword arguments to pass to the function.
Returns
-------
obspy.Stream
A preprocessed stream of seismic data.
"""
from functools import partial
data_preprocessor = partial(
_preprocess_stream,
freqmin=freqmin,
freqmax=freqmax,
target_SR=target_SR,
remove_response=remove_response,
remove_sensitivity=remove_sensitivity,
plot_resp=plot_resp,
target_duration=target_duration,
target_starttime=target_starttime,
target_endtime=target_endtime,
minimum_length=minimum_length,
minimum_chunk_duration=minimum_chunk_duration,
verbose=verbose,
SR_decimals=SR_decimals,
decimation_method=decimation_method,
allow_oversampling=allow_oversampling,
unit=unit,
**kwargs,
)
if n_threads != 1:
from concurrent.futures import ProcessPoolExecutor
stream, _ = _premerge(stream, verbose=verbose)
with ProcessPoolExecutor(max_workers=n_threads) as executor:
# we need to group traces from same channels, therefore,
# we use merge to fill gaps with masked arrays
stream.merge()
preprocessed_stream = list(executor.map(data_preprocessor, stream))
try:
preprocessed_stream = [
tr[0] for tr in preprocessed_stream if len(tr) > 0
]
except Exception as e:
print(e)
print(preprocessed_stream)
for i, tr in enumerate(preprocessed_stream):
print(i, tr, len(tr))
sys.exit(0)
return obs.Stream(preprocessed_stream)
else:
return data_preprocessor(stream)
def _premerge(stream, verbose=False):
"""Clean-up stream before calling merge.
"""
# first, make a list of all stations in stream
stations = []
for tr in stream:
stations.append(tr.stats.station)
stations = list(set(stations))
# second, make a list of all channel types for each station
for station in stations:
st = stream.select(station=station)
channels = []
for tr in st:
channels.append(tr.stats.channel[:-1])
channels = list(set(channels))
# third, for each channel type, make a list of all sampling
# rates and detect anomalies if there are more than one single
# sampling rate
sampling_rates = []
for cha in channels:
st_cha = st.select(channel=f"{cha}*")
for tr in st_cha:
sampling_rates.append(tr.stats.sampling_rate)
unique_sampling_rates, sampling_rates_counts = np.unique(
sampling_rates, return_counts=True
)
# if more than one sampling rate, remove the traces with the least
# represented sampling rate
if len(unique_sampling_rates) > 1:
if sampling_rates_counts[unique_sampling_rates.argmax()] >= 3:
ref_sampling_rate = unique_sampling_rates.max()
else:
ref_sampling_rate = unique_sampling_rates[sampling_rates_counts.argmax()]
for tr in st:
if tr.stats.sampling_rate != ref_sampling_rate:
if verbose:
print(f"Removing {tr.id} because not desired sampling rate "
f"({tr.stats.sampling_rate} vs {ref_sampling_rate})" )
stream.remove(tr)
return stream, stations
def _preprocess_stream(
stream,
freqmin=None,
freqmax=None,
target_SR=None,
remove_response=False,
remove_sensitivity=False,
plot_resp=False,
target_duration=None,
target_starttime=None,
target_endtime=None,
minimum_length=0.75,
minimum_chunk_duration=600.0,
verbose=True,
SR_decimals=1,
decimation_method="simple",
unit="VEL",
pre_filt=None,
allow_oversampling=False,
**kwargs,
):
"""
See `preprocess_stream`.
"""
if isinstance(stream, obs.Trace):
# user gave a single trace instead of a stream
stream = obs.Stream(stream)
preprocessed_stream = obs.Stream()
if len(stream) == 0:
if verbose:
print("Input data is empty!")
return preprocessed_stream
if (target_duration is None) and (
(target_starttime is not None) and target_endtime is not None
):
target_duration = target_endtime - target_starttime
# find errors in sampling rate metadata
# first, round sampling rates that may be misrepresented in floating point numbers
for tr in stream:
tr.stats.sampling_rate = np.round(tr.stats.sampling_rate, decimals=SR_decimals)
# and remove the short chunks now to gain time
# (only works if the traces do not come with masked arrays)
t1 = udt(tr.stats.starttime.timestamp)
t2 = udt(tr.stats.endtime.timestamp)
if t2 - t1 < minimum_chunk_duration:
# don't include this chunk
stream.remove(tr)
if len(stream) == 0:
if verbose:
print("Removed all traces because they were too short.")
return preprocessed_stream
stream, stations = _premerge(stream, verbose=verbose)
# start by cleaning the gaps if there are any
# start with a simple merge to unite data from same channels into unique
# trace but without losing information on gaps
stream.merge()
for tr in stream:
trace_id = tr.id
if np.isnan(tr.data.max()):
if verbose:
print(f"Problem with {tr.id} (detected NaNs)!")
continue
T1 = udt(tr.stats.starttime.timestamp)
T2 = udt(tr.stats.endtime.timestamp)
trace_duration = T2 - T1
if trace_duration < minimum_length * target_duration:
# don't include this trace
if verbose:
print(f"Too much gap duration on {trace_id}.")
print(f"Duration is: {trace_duration:.2f}s (min is {minimum_length * target_duration:.2f}s)")
continue
# split will lose information about start and end times
# if the start or the end is masked
tr = tr.split()
for chunk in tr:
t1 = udt(chunk.stats.starttime.timestamp)
t2 = udt(chunk.stats.endtime.timestamp)
if t2 - t1 < minimum_chunk_duration:
# don't include this chunk
tr.remove(chunk)
if len(tr) == 0:
# all chunks were too short
if verbose:
print(f"All chunks within {trace_id} were too short.")
continue
# measure gap duration
gap_duration = target_duration - trace_duration
for gap in tr.get_gaps():
gap_duration += gap[6]
if (target_duration is not None) and (
gap_duration > (1. - minimum_length) * target_duration
):
if verbose:
print(f"Too much gap duration on {trace_id}.")
print(f"Gap duration is: {trace_duration:.2f}s (max is {(1. - minimum_length) * target_duration:.2f}s)")
continue
tr.detrend("constant")
tr.detrend("linear")
tr.taper(0.05, type="cosine")
# it's now safe to fill the gaps with zeros
tr.merge(fill_value=0.0)[0]
tr.trim(starttime=T1, endtime=T2, pad=True, fill_value=0.0)
preprocessed_stream += tr
# if the trace came as separated segments without masked
# elements, it is necessary to merge the stream
preprocessed_stream = preprocessed_stream.merge(fill_value=0.0)
# delete the original data to save memory
del stream
# resample if necessary:
for tr in preprocessed_stream:
if target_SR is None:
continue
sr_ratio = tr.stats.sampling_rate / target_SR
if sr_ratio > 1:
tr.data = lowpass_chebyshev_II(
tr.data,
0.49 * target_SR,
tr.stats.sampling_rate,
order=kwargs.get("antialiasing_filter_order", 4),
min_attenuation_dB=40.0,
zerophase=True,
)
if (
np.round(sr_ratio, decimals=0) == sr_ratio
) and decimation_method == "simple":
# tr's sampling rate is an integer
# multiple of target_SR
# do not re-filter the data
tr.decimate(int(sr_ratio), no_filter=True)
elif decimation_method == "fourier":
tr.resample(target_SR, no_filter=True)
else:
tr.resample(target_SR, no_filter=True)
elif sr_ratio < 1 and allow_oversampling:
tr.resample(target_SR, no_filter=True)
elif sr_ratio < 1:
if verbose:
print(f"Sampling rate is too high on {tr.id}.")
print(tr)
preprocessed_stream.remove(tr)
continue
else:
pass
# remove response if requested
if remove_response:
for tr in preprocessed_stream:
if not hasattr(tr.stats, "response"):
print(f"Could not find the instrument response for {tr.id}.")
continue
if pre_filt is None:
T_max = tr.stats.npts * tr.stats.delta
T_min = tr.stats.delta
f_min = 1.0 / T_max
f_max = 1.0 / (2.0 * T_min)
pre_filt = [f_min, 3.0 * f_min, 0.90 * f_max, 0.97 * f_max]
tr.remove_response(pre_filt=pre_filt, output=unit, plot=plot_resp)
elif remove_sensitivity:
for tr in preprocessed_stream:
if not hasattr(tr.stats, "response"):
print(f"Could not find the instrument response for {tr.id}.")
continue
tr.remove_sensitivity()
# filter
preprocessed_stream.detrend("constant")
preprocessed_stream.detrend("linear")
preprocessed_stream.taper(0.02, type="cosine")
if freqmin is None and freqmax is None:
# no filtering
pass
elif freqmin is None:
# lowpass filtering
preprocessed_stream.filter("lowpass", freq=freqmax, zerophase=True)
elif freqmax is None:
# highpass filtering
preprocessed_stream.filter("highpass", freq=freqmin, zerophase=True)
else:
# bandpass filtering
preprocessed_stream.filter(
"bandpass", freqmin=freqmin, freqmax=freqmax, zerophase=True
)
# adjust duration
if target_starttime is not None:
preprocessed_stream.trim(starttime=target_starttime, pad=True, fill_value=0.0)
if target_endtime is not None:
preprocessed_stream.trim(endtime=target_endtime, pad=True, fill_value=0.0)
if target_duration is not None:
for i in range(len(preprocessed_stream)):
n_samples = sec_to_samp(
target_duration, sr=preprocessed_stream[i].stats.sampling_rate
)
preprocessed_stream[i].data = preprocessed_stream[i].data[:n_samples]
return preprocessed_stream
# -------------------------------------------------
# Loading travel-time data
# -------------------------------------------------
[docs]
def get_moveout_array(tts, stations, phases):
"""Format the travel times into a numpy.ndarray.
Parameters
-----------
tts: dictionary
Output of `load_travel_times`.
stations: list of strings
List of station names. Determine the order in which travel times are
written in the output array.
phases: list of strings
List of seismic phases. Determine the order in which travel times are
writtem in the output array.
Returns
---------
moveout_arr: (n_sources, n_stations, n_phases) numpy.ndarray
Numpy array of travel times, in the order specified by `stations` and
`phases`. At this stage, moveout_arr should still be in units of
seconds.
"""
n_stations = len(stations)
n_phases = len(phases)
moveout_arr = np.array([tts[ph][sta] for sta in stations for ph in phases]).T
return moveout_arr.reshape(-1, n_stations, n_phases)
[docs]
def load_travel_times(
path, phases, source_indexes=None, return_coords=False, stations=None
):
"""Load the travel times from `path`.
Parameters
------------
path: string
Path to the file with the travel times.
phases: list of strings
List of the seismic phases to load.
source_indexes: (n_sources,) int numpy.ndarray, default to None
If not None, this is used to select a subset of sources from the grid.
return_coords: boolean, default to False
If True, also return the source coordinates.
stations: list of strings, default to None
If not None, only read the travel times for stations in `stations`.
Returns
---------
tts: dictionary
Dictionary with one field per phase. `tts[phase_n]` is itself
a dictionary with one field per station.
source_coords: dictionary, optional
Returned only if `return_coords` is True. This is a dictionary with
three (n_sources,) numpy.ndarray: `source_coords['latitude']`,
`source_coords['longitude']`, `source_coords['depth']`.
"""
tts = pd.DataFrame(columns=phases, index=stations)
with h5.File(path, mode="r") as f:
grid_shape = f["source_coordinates"]["depth"].shape
for ph in phases:
for sta in f[f"tt_{ph}"].keys():
if stations is not None and sta not in stations:
continue
# flatten the lon/lat/dep grid as we work with
# flat source indexes
if source_indexes is not None:
# select a subset of the source grid
source_indexes_unravelled = np.unravel_index(
source_indexes, grid_shape
)
selection = np.zeros(grid_shape, dtype=bool)
selection[source_indexes_unravelled] = True
tts.loc[sta, ph] = f[f"tt_{ph}"][sta][selection].flatten()
else:
tts.loc[sta, ph] = f[f"tt_{ph}"][sta][()].flatten()
if return_coords:
if source_indexes is not None:
source_indexes_unravelled = np.unravel_index(source_indexes, grid_shape)
selection = np.zeros(grid_shape, dtype=bool)
selection[source_indexes_unravelled] = True
source_coords = pd.DataFrame(
columns=["longitude", "latitude", "depth"], index=source_indexes
)
for coord in f["source_coordinates"].keys():
source_coords.loc[source_indexes, coord] = f["source_coordinates"][
coord
][selection].flatten()
else:
source_coords = pd.DataFrame(
columns=["longitude", "latitude", "depth"],
index=np.arange(np.prod(grid_shape)),
)
for coord in f["source_coordinates"].keys():
source_coords[coord] = f["source_coordinates"][coord][()].flatten()
if return_coords:
return tts, source_coords
else:
return tts
# -------------------------------------------------
# Stacking routines
# -------------------------------------------------
[docs]
def SVDWF(
matrix,
expl_var=0.4,
max_singular_values=5,
freqmin=cfg.MIN_FREQ_HZ,
freqmax=cfg.MAX_FREQ_HZ,
sampling_rate=cfg.SAMPLING_RATE_HZ,
wiener_filter_colsize=None,
):
"""
Implementation of the Singular Value Decomposition Wiener Filter (SVDWF)
described in Moreau et al 2017.
Note: The SVD-WF-filtered matrix is filtered between `freqmin` and
`freqmax` before being returned.
Parameters
----------
matrix : (n x m) numpy.ndarray
n is the number of events, m is the number of time samples
per event.
expl_var : float, optional
Fraction of the total variance explained by the retained
singular vectors. Defaults to 0.4.
max_singular_values : int, optional
Number of singular values to retain in the
SVD decomposition of matrix. Defaults to 5.
freqmin : float, optional
Minimum target frequency, in Hz. Defaults to `BPMF.cfg.MIN_FREQ_HZ`.
freqmax : float, optional
Maximum target frequency, in Hz. Defaults to `BPMF.cfg.MAX_FREQ_HZ`.
sampling_rate : float, optional
Sampling rate of `matrix`. Defaults to `BPMF.cfg.SAMPLING_RATE_HZ`.
wiener_filter_colsize : int, optional
Size of the wiener filter in the outermost dimension, that is, the
event axis. Defaults to None, in which case the filter has length `n`.
Returns
--------
filtered_data: (n x m) numpy array
The matrix filtered through the SVD procedure.
"""
from scipy.linalg import svd
from scipy.signal import wiener
import matplotlib.pyplot as plt
try:
U, S, Vt = svd(matrix, full_matrices=False)
except Exception as e:
print(e)
print("Problem while computing the svd...!")
return np.random.normal(loc=0.0, scale=1.0, size=matrix.shape)
if wiener_filter_colsize is None:
wiener_filter_colsize = U.shape[0]
# wiener_filter = [wiener_filter_colsize, int(cfg.SAMPLING_RATE_HZ/max_freq)]
wiener_filter = [wiener_filter_colsize, 1]
filtered_data = np.zeros((U.shape[0], Vt.shape[1]), dtype=np.float32)
# select the number of singular values
# in order to explain 100xn_singular_values%
# of the variance of the matrix
var = np.cumsum(S**2)
if var[-1] == 0.0:
# only zeros in matrix
return filtered_data
var /= var[-1]
n_singular_values = np.min(np.where(var >= expl_var)[0]) + 1
n_singular_values = min(max_singular_values, n_singular_values)
for n in range(min(U.shape[0], n_singular_values)):
s_n = np.zeros(S.size, dtype=np.float32)
s_n[n] = S[n]
projection_n = np.dot(U, np.dot(np.diag(s_n), Vt))
if wiener_filter[0] == 1 and wiener_filter[1] == 1:
# no wiener filtering
filtered_projection = projection_n
else:
# the following application of Wiener filtering is questionable: because each projection in this loop is a projection
# onto a vector space with one dimension, all the waveforms are colinear: they just differ by an amplitude factor (but same shape).
filtered_projection = wiener(
projection_n,
# mysize=[max(2, int(U.shape[0]/10)), int(cfg.SAMPLING_RATE_HZ/freqmax)]
mysize=wiener_filter,
)
# filtered_projection = projection_n
if np.isnan(filtered_projection.max()):
continue
filtered_data += filtered_projection
if wiener_filter[0] == 1 and wiener_filter[1] == 1:
# no wiener filtering
pass
else:
filtered_data = wiener(filtered_data, mysize=wiener_filter)
# remove nans or infs
filtered_data[np.isnan(filtered_data)] = 0.0
filtered_data[np.isinf(filtered_data)] = 0.0
# SVD adds noise in the low and the high frequencies
# refiltering the SVD-filtered data seems necessary
filtered_data = bandpass_filter(
filtered_data,
filter_order=4,
freqmin=freqmin,
freqmax=freqmax,
f_Nyq=sampling_rate / 2.0,
)
return filtered_data
[docs]
def SVDWF_multiplets(
tid,
db_path=cfg.INPUT_PATH,
db_path_M="matched_filter_1",
db_path_T="template_db_1",
best=False,
norm_rms=True,
max_singular_values=5,
expl_var=0.4,
freqmin=cfg.MIN_FREQ_HZ,
freqmax=cfg.MAX_FREQ_HZ,
sampling_rate=cfg.SAMPLING_RATE_HZ,
wiener_filter_colsize=None,
attach_raw_data=False,
detection_waveforms=None,
):
"""
Parameters
-----------
tid: scalar integer,
Template id.
db_path: string, default to cfg.INPUT_PATH
Root path of the database.
db_path_M: string, default to 'matched_filter_1'
Name of the folder where matched-filtering results
are stored.
db_path_T: string, default to 'template_db_1'
Name of the folder where template files are stored.
best: boolean, default to False
If True, only use the detections with higher
correlation coefficients.
norm_rms: boolean, default to True
If True, individual event are RMS-normalized
before stacking.
max_singular_values: scalar integer, default to 5
Disregarding how many singular values are needed
to reconstruct 100xexp_var% of the variance of
the detection matrix, the number of singular values
will not be larger than max_singular_values.
max_freq: scalar float, default to cfg.MAX_FREQ_HZ
The maximum frequency of the data, or maximum target
frequency, is used to determined the size in the
time axis of the Wiener filter.
wiener_filter_colsize: scalar integer, default to None,
Size of the wiener filter in the vertical direction
(i.e. in the observation axis). If set to None,
then it will be equal to the number of rows.
attach_raw_data: boolean, default to False.
If True, the data extracted during the matched-filter
search are returned.
Returns
--------
S: obspy Stream,
Return the SVDWF-processed detections traces in the
format of an obspy Stream. Stacked traces can be
found in the obspy Traces, and the filtered (and raw,
if attach_raw_data is True) data matrix is returned as
an attribute. See also all the useful metadata in
the attributes.
"""
warnings.warn(
"Deprecated function! (Old, under construction or experimental)"
)
# -----------------------------------------------------------------------------------------------
T = dataset.Template("template{:d}".format(tid), db_path_T, db_path=db_path)
# -----------------------------------------------------------------------------------------------
files_all = glob.glob(os.path.join(db_path, db_path_M, "*multiplets_*meta.h5"))
files = []
# ------------------------------
stack = dataset.Stack(
T.network_stations, T.channels, sampling_rate=sampling_rate, tid=tid
)
n_stations = len(stack.stations)
n_components = len(stack.components)
stack.latitude = T.latitude
stack.longitude = T.longitude
stack.depth = T.depth
# ------------------------------
if detection_waveforms is None:
detection_waveforms, CCs = fetch_detection_waveforms(
tid, db_path_T, db_path_M, best_CC=best, norm_rms=norm_rms, db_path=db_path
)
else:
# provided by the user
pass
print("{:d} events.".format(detection_waveforms.shape[0]))
filtered_data = np.zeros_like(detection_waveforms)
for s in range(n_stations):
for c in range(n_components):
filtered_data[:, s, c, :] = SVDWF(
detection_waveforms[:, s, c, :],
max_singular_values=max_singular_values,
expl_var=expl_var,
freqmin=freqmin,
freqmax=freqmax,
sampling_rate=sampling_rate,
wiener_filter_colsize=wiener_filter_colsize,
)
if np.sum(filtered_data[:, s, c, :]) == 0:
print(
"Problem with station {} ({:d}), component {} ({:d})".format(
stack.stations[s], s, stack.components[c], c
)
)
stacked_waveforms = np.mean(filtered_data, axis=0)
norm = np.max(stacked_waveforms, axis=-1)[..., np.newaxis]
norm[norm == 0.0] = 1.0
stacked_waveforms /= norm
stack.add_data(stacked_waveforms)
stack.data = filtered_data
if attach_raw_data:
stack.raw_data = detection_waveforms
stack.n_detections = detection_waveforms.shape[0]
try:
stack.correlation_coefficients = CCs
except:
stack.correlation_coefficients = "N/A"
return stack
# ------------------------------------
# hierarchical clustering
# ------------------------------------
[docs]
def find_template_clusters(
TpGroup,
method="single",
metric="correlation",
criterion="distance",
clustering_threshold=0.33,
color_singleton="dimgray",
ax_dendogram=None,
):
"""
Find non-overlapping groups of similar templates
with the hierarchical clustering package from scipy.
"""
from scipy.spatial.distance import squareform
from scipy.cluster import hierarchy
warnings.warn(
"Deprecated function! (Old, under construction or experimental)"
)
# first, transform the CC matrix into a condensed matrix
np.fill_diagonal(TpGroup.intertp_cc.values, 1.0)
corr_dist = squareform(1.0 - TpGroup.intertp_cc.values)
if corr_dist.min() < -1.0e-6:
print("Prob with FMF")
else:
# avoid tiny negative values because of numerical imprecision
corr_dist[corr_dist < 0.0] = 0.0
# link the events
Z = hierarchy.linkage(
corr_dist, method=method, metric=metric, optimal_ordering=True
)
# get cluster labels
labels = hierarchy.fcluster(Z, clustering_threshold, criterion=criterion)
cluster_ids, cluster_sizes = np.unique(labels, return_counts=True)
if ax_dendogram is not None:
# plot dendogram
dendogram = hierarchy.dendrogram(
Z,
count_sort=True,
above_threshold_color=color_singleton,
color_threshold=clustering_threshold,
ax=ax_dendogram,
)
# get cluster colors from the dendogram
cluster_colors = extract_colors_from_tree(dendogram, labels, color_singleton)
## count all singleton clusters as one, for plotting purposes
# n_clusters = np.sum(cluster_sizes > 1)
# if np.sum(cluster_sizes == 1) > 0:
# n_clusters += 1
# sort_by_size = np.argsort(cluster_sizes)
return labels, cluster_ids, cluster_sizes, dendogram, cluster_colors
else:
return labels, cluster_ids, cluster_sizes
# -------------------------------------------------
# Convert and round times
# -------------------------------------------------
[docs]
def round_time(t, sr=cfg.SAMPLING_RATE_HZ):
"""
Parameters
-----------
t: scalar float,
Time, in seconds, to be rounded so that the number
of meaningful decimals is consistent with the precision
allowed by the sampling rate.
sr: scalar float, default to cfg.SAMPLING_RATE_HZ,
Sampling rate of the data. It is used to
round the time.
Returns
--------
t: scalar float,
Rounded time.
"""
# convert t to samples
t_samp = np.int64(t * sr)
# get it back to seconds
t = np.float64(t_samp) / sr
return t
[docs]
def sec_to_samp(t, sr=cfg.SAMPLING_RATE_HZ, epsilon=0.2):
"""Convert seconds to samples taking into account rounding errors.
Parameters
-----------
"""
# we add epsilon so that we fall onto the right
# integer number even if there is a small precision
# error in the floating point number
sign = np.sign(t)
t_samp_float = abs(t * sr) + epsilon
# round and restore sign
t_samp_int = np.int64(sign * np.int64(t_samp_float))
return t_samp_int
[docs]
def time_range(
start_time,
end_time,
dt_sec,
unit="ms",
unit_value={"ms": 1.0e3, "us": 1.0e6, "ns": 1.0e9},
):
"""Compute a range of datetime64.
Parameters
------------
start_time: string or datetime
Start of the time range.
end_time: string or datetime
End of the time range.
dt_sec: scalar float
Time step, in seconds, of the time range.
unit: string, default to 'ms'
Unit in which dt_sec is converted in order to reach an integer number.
unit_value: dictionary, optional
Dictionary with the value of 1 second in different units.
Returns
---------
time_range: (n_samples,) numpy.ndarray of numpy.datetime64
The time range computed from the input parameters.
"""
start_time = np.datetime64(start_time)
end_time = np.datetime64(end_time)
dt = np.timedelta64(int(dt_sec * unit_value[unit]), unit)
return np.arange(start_time, end_time, dt)
# -------------------------------------------------
# Regression
# -------------------------------------------------
[docs]
def linear_regression(x, y):
"""
cf. https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.linregress.html
Returns
-------
a: slope
b: intercept
r_val: correlation coefficient, usually
people use the coefficient of determination
R**2 = r_val**2 to measure the quality of
the fit
p_val: two-sided p-value for a hypothesis test whose null
hypothesis is that the slope is zero
std_err: standard error of the estimated slope
"""
from scipy.stats import linregress
a, b, r_val, p_val, std_err = linregress(x, y)
return a, b, r_val, p_val, std_err
[docs]
def weighted_linear_regression(X, Y, W=None):
"""
Parameters
-----------
X: (n,) numpy array or list
Y: (n,) numpy array or list
W: default to None, (n,) numpy array or list
Returns
--------
best_slope: scalar float,
Best slope from the least square formula
best_intercept: scalar float,
Best intercept from the least square formula
std_err: scalar float,
Error on the slope
"""
X = np.asarray(X)
if W is None:
W = np.ones(X.size)
W_sum = W.sum()
x_mean = np.sum(W * X) / W_sum
y_mean = np.sum(W * Y) / W_sum
x_var = np.sum(W * (X - x_mean) ** 2)
xy_cov = np.sum(W * (X - x_mean) * (Y - y_mean))
best_slope = xy_cov / x_var
best_intercept = y_mean - best_slope * x_mean
# errors in best_slope and best_intercept
estimate = best_intercept + best_slope * X
s2 = sum(estimate - Y) ** 2 / (Y.size - 2)
s2_intercept = s2 * (1.0 / X.size + x_mean**2 / ((X.size - 1) * x_var))
s2_slope = s2 * (1.0 / ((X.size - 1) * x_var))
return best_slope, best_intercept, np.sqrt(s2_slope)
# -------------------------------------------------
# Others
# -------------------------------------------------
[docs]
def cov_mat_intersection(cov_mat, axis1=0, axis2=1):
"""Compute intersection between covariance matrix and plane.
Note that we assume the following coordinate system:
- X: westward
- Y: southward
- Z: upward
Parameters
----------
cov_mat : numpy.ndarray
The (3x3) covariance matrix returned by Event.relocate(method='NLLoc').
axis1 : integer, optional
Index of the first axis defining the intersecting plane.
axis2 : integer, optional
Index of the second axis defining the intersecting plane.
Returns
-------
max_unc : float
Maximum uncertainty, in km, of the intersected covariance matrix.
min_unc : float
Minimum uncertainty, in km, of the intersected covariance matrix.
az_max : float
"Azimuth", that is, angle from `axis2` of maximum uncertainty.
az_min : float
"Azimuth", that is, angle from `axis2` of minimum uncertainty.
"""
# X: west, Y: south, Z: upward
s_68_3df = 3.52
s_68_2df = 2.28
# eigendecomposition of restricted matrix
indexes = np.array([axis1, axis2])
w, v = np.linalg.eigh(cov_mat[indexes, :][:, indexes])
semi_axis_length = np.sqrt(s_68_2df * w)
max_unc = np.max(semi_axis_length)
min_unc = np.min(semi_axis_length)
max_dir = v[:, w.argmax()]
min_dir = v[:, w.argmin()]
# "azimuth" is angle between `axis2` (`max_dir[1]`) and ellipse's semi-axis
az_max = np.arctan2(max_dir[0], max_dir[1]) * 180.0 / np.pi
az_min = (az_max + 90.) % 360.
return max_unc, min_unc, az_max, az_min
[docs]
def compute_distances(
source_longitudes,
source_latitudes,
source_depths,
receiver_longitudes,
receiver_latitudes,
receiver_depths,
return_epicentral_distances=False,
):
"""
Fast distance computation between all source points and all receivers.
This function uses `cartopy.geodesic.Geodesic` to compute pair-wise distances
between source points and receivers. It computes both hypocentral distances
and, if specified, epicentral distances.
Parameters
----------
source_longitudes : numpy.ndarray or list
Longitudes, in decimal degrees, of the source points.
source_latitudes : numpy.ndarray or list
Latitudes, in decimal degrees, of the source points.
source_depths : numpy.ndarray or list
Depths, in kilometers, of the source points.
receiver_longitudes : numpy.ndarray or list
Longitudes, in decimal degrees, of the receivers.
receiver_latitudes : numpy.ndarray or list
Latitudes, in decimal degrees, of the receivers.
receiver_depths : numpy.ndarray or list
Depths, in kilometers, of the receivers. Negative depths indicate
receivers located at the surface.
return_epicentral_distances : bool, optional
Flag indicating whether to return epicentral distances in addition to
hypocentral distances. Default is False.
Returns
-------
hypocentral_distances : numpy.ndarray
Array of hypocentral distances between source points and receivers.
The shape of the array is (n_sources, n_receivers).
epicentral_distances : numpy.ndarray, optional
Array of epicentral distances between source points and receivers.
This array is returned only if `return_epicentral_distances` is True.
The shape of the array is (n_sources, n_receivers).
"""
from cartopy.geodesic import Geodesic
# convert types if necessary
if isinstance(source_longitudes, list):
source_longitudes = np.asarray(source_longitudes)
if isinstance(source_latitudes, list):
source_latitudes = np.asarray(source_latitudes)
if isinstance(source_depths, list):
source_depths = np.asarray(source_depths)
# initialize distance array
hypocentral_distances = np.zeros(
(len(source_latitudes), len(receiver_latitudes)), dtype=np.float32
)
epicentral_distances = np.zeros(
(len(source_latitudes), len(receiver_latitudes)), dtype=np.float32
)
# initialize the Geodesic instance
G = Geodesic()
for s in range(len(receiver_latitudes)):
epi_distances = G.inverse(
np.array([[receiver_longitudes[s], receiver_latitudes[s]]]),
np.hstack(
(source_longitudes[:, np.newaxis], source_latitudes[:, np.newaxis])
),
)
epicentral_distances[:, s] = np.asarray(epi_distances)[:, 0].squeeze() / 1000.0
hypocentral_distances[:, s] = np.sqrt(
epicentral_distances[:, s] ** 2 + (source_depths - receiver_depths[s]) ** 2
)
if return_epicentral_distances:
return hypocentral_distances, epicentral_distances
else:
return hypocentral_distances
[docs]
def event_count(
event_timings_str,
start_date,
end_date,
freq="1D",
offset=0.0,
trim_start=True,
trim_end=False,
mode="end",
):
"""
Parameters
----------
event_timings_str: list of array of str
Timings of the events given as strings of characters.
start_date: str
Starting date of the event count time series.
end_date: str
End date of the event count time series.
freq: str, default to '1D'
Desired frequency of the event count time series.
Default is one day.
offset: float, default to 0.
Fraction of the frequency used for defining
the beginning of each bin. For example, offset=0.5
with freq='1D' will return daily event counts
from noon to noon.
mode: str, default to 'end'
Can be 'end' or 'beginning'. This string defines whether
the seismicity counted between time 1 and time 2 is
indexed at time 2 ('end') or time 1 ('beginning').
Returns
-------
event_count: Pandas Series
Pandas Series with temporal indexes defined
by freq and base, and values given by the
event count.
To get a numpy array from this Pandas Series,
use: event_count.values
"""
import pandas as pd
start_date = pd.to_datetime(start_date.replace(",", "-"))
end_date = pd.to_datetime(end_date.replace(",", "-"))
offset_str = "{}{}".format(offset, freq[-1])
event_occurrence = pd.Series(
data=np.ones(len(event_timings_str), dtype=np.int32),
index=pd.to_datetime(np.asarray(event_timings_str).astype("U")).astype(
"datetime64[ns]"
),
)
# trick to force a good match between initial indexes and new indexes
event_occurrence[start_date] = 0
event_occurrence[end_date] = 0
if mode == "end":
# note: we use mode='end' so that the number of events
# counted at time t is the event count between t-dt and t
# this is consistent with the timing convention of pandas diff()
# namely: diff(t) = x(t)-x(t-dt)
event_count = event_occurrence.groupby(
pd.Grouper(freq=freq, offset=offset_str, label="right")
).agg("sum")
elif mode == "beginning":
event_count = event_occurrence.groupby(
pd.Grouper(freq=freq, offset=offset_str, label="left")
).agg("sum")
else:
print("mode should be end or beginning")
return
if event_count.index[0] > pd.Timestamp(start_date):
event_count[event_count.index[0] - pd.Timedelta(freq)] = 0
if event_count.index[-1] < pd.Timestamp(end_date):
event_count[event_count.index[-1] + pd.Timedelta(freq)] = 0
if trim_start or offset == 0.0:
event_count = event_count[event_count.index >= start_date]
if trim_end or offset == 0.0:
if offset > 0.0:
stop_date = pd.to_datetime(end_date) + pd.Timedelta(freq)
else:
stop_date = end_date
event_count = event_count[event_count.index <= stop_date]
# force the manually added items to be well
# located in time
event_count.sort_index(inplace=True)
return event_count
[docs]
def get_np_array(
stream,
stations,
components=["N", "E", "Z"],
priority="HH",
n_samples=None,
component_aliases={"N": ["N", "1"], "E": ["E", "2"], "Z": ["Z"]},
verbose=True,
):
"""Fetch data from Obspy Stream and returns an ndarray.
Parameters
-----------
stream: Obspy Stream instance
The Obspy Stream instance with the waveform time series.
stations: list of strings
Names of the stations to include in the output array. Define the order
of the station axis.
components: list of strings, default to ['N','E','Z']
Names of the components to include in the output array. Define the order
of the component axis.
component_aliases: dictionary, optional
Sometimes, components might be named differently than N, E, Z. This
dictionary tells the function which alternative component names can be
associated with each "canonical" component. For example,
`component_aliases['N'] = ['N', '1']` means that the function will also
check the '1' component in case the 'N' component doesn't exist.
priority: string, default to 'HH'
When a station has multiple instruments, this string tells which
channel to use in priority.
n_samples: scalar int, default to None
Duration, in samples, of the output numpy.ndarray. Select the
`n_samples` first samples of each trace. If None, take `n_samples` as
the length of the first trace.
verbose: boolean, default to True
If True, print extra output in case the target data cannot be fetched.
Returns
---------
data: (n_stations, n_components, n_samples) numpy.ndarray
The waveform time series formatted as an numpy.ndarray.
"""
if len(stream) == 0:
print("The input data stream is empty!")
return
if n_samples is None:
n_samples = stream[0].data.size
data = np.zeros((len(stations), len(components), n_samples), dtype=np.float32)
for s, sta in enumerate(stations):
for c, cp in enumerate(components):
for cp_alias in component_aliases[cp]:
channel = stream.select(station=sta, component=cp_alias)
if len(channel) > 0:
# succesfully retrieved data
break
if len(channel) > 0:
try:
# try selecting the preferred channel if it exists
cha = channel.select(channel=f"{priority}{cp_alias}")[0]
# data[s, c, :] = channel.select(
# channel=f'{priority}{cp_alias}')[0].data[:n_samples]
except IndexError:
cha = channel[0]
# data[s, c, :] = channel[0].data[:n_samples]
if len(cha.data) < n_samples:
length_diff = n_samples - len(cha.data)
data[s, c, :] = np.hstack(
(cha.data, np.zeros(length_diff, dtype=np.float32))
)
else:
data[s, c, :] = cha.data[:n_samples]
return data
[docs]
def max_norm(X):
max_ = np.abs(X).max()
if max_ != 0.0:
return X / max_
else:
return X
[docs]
def running_mad(time_series, window, n_mad=10.0, overlap=0.75):
from scipy.stats import median_abs_deviation as scimad
# calculate n_windows given window
# and overlap
shift = int((1.0 - overlap) * window)
n_windows = int((len(time_series) - window) // shift) + 1
mad_ = np.zeros(n_windows + 2, dtype=np.float32)
med_ = np.zeros(n_windows + 2, dtype=np.float32)
time = np.zeros(n_windows + 2, dtype=np.float32)
for i in range(1, n_windows + 1):
i1 = i * shift
i2 = min(len(time_series), i1 + window)
sliding_window = time_series[i1:i2]
# non_zero = cnr_window != 0
# if sum(non_zero) < 3:
# # won't be possible to calculate median
# # and mad on that few samples
# continue
# med_[i] = np.median(cnr_window[non_zero])
# mad_[i] = scimad(cnr_window[non_zero])
med_[i] = np.median(sliding_window)
mad_[i] = scimad(sliding_window)
time[i] = (i1 + i2) / 2.0
# add boundary cases manually
time[0] = 0.0
mad_[0] = mad_[1]
med_[0] = med_[1]
time[-1] = len(time_series)
mad_[-1] = mad_[-2]
med_[-1] = med_[-2]
running_stat = med_ + n_mad * mad_
interpolator = interp1d(
time,
running_stat,
kind="slinear",
fill_value=(running_stat[0], running_stat[-1]),
bounds_error=False,
)
full_time = np.arange(0, len(time_series))
running_stat = interpolator(full_time)
return running_stat
[docs]
def spectrogram(
x,
window_duration_sec,
overlap,
sampling_rate,
detrend=False,
window="hann",
nfft=None,
boundary=None,
padded=False,
scaling="spectrum"
):
"""
Parameters
----------
window_duration_sec : float
Duration of the sliding window, in seconds, of the
short-time Fourier transform. This is later converted to
samples to define the `nperseg` argument for `scipy.signal.stft`.
overlap : float
Ratio of overlap, from 0 to 1, between subsequent windows. This is
later converted to samples to define the `noverlap` argument for
`scipy.signal.stft`.
detrend : bool, optional
If False, no detrending is done. If it is a string or a function,
detrending is done (see doc of `scipy.signal.stft`).
Defaults to False.
window : str or tuple or array-like, optional
The string is the name of the taper window to apply to each segment.
See the doc of `scipy.signal.stft` for more details.
boundary : str or None, optional
Define how the time series are extended at the boundaries. See the
doc of `scipy.signal.stft` for more details. Defaults to None (only
valid segments are used).
padded : bool, optional
If True, padding occurs after boundary extension. Defaults to False.
scaling : str, optional
Either 'spectrum' or 'psd'.
Returns
-------
frequency : numpy.ndarray
Frequencies, in Hertz, at which the spectra are estimated.
time : numpy.ndarray
Times, in seconds, of the sliding windows.
spectrogram : numpy.ndarray
"""
from scipy.signal import stft
stft_params = {}
stft_params["nperseg"] = int(window_duration_sec * sampling_rate)
stft_params["noverlap"] = int(overlap * stft_params["nperseg"])
stft_params["detrend"] = detrend
stft_params["window"] = window
stft_params["nfft"] = nfft
stft_params["boundary"] = boundary
stft_params["padded"] = padded
stft_params["scaling"] = scaling
frequency, time, spec = stft(
x, sampling_rate, **stft_params
)
return frequency, time, np.abs(spec)
[docs]
def two_point_epicentral_distance(lon_1, lat_1, lon_2, lat_2):
"""Compute the distance between two points.
Parameters
-----------
lon_1: scalar, float
Longitude of Point 1.
lat_1: scalar, float
Latitude of Point 1.
lon_2: scalar, float
Longitude of Point 2.
lat_2: scalar, float
Latitude of Point 2.
Returns
---------
dist: scalar, float
Distance between Point 1 and Point 2 in kilometers.
"""
from obspy.geodetics.base import calc_vincenty_inverse
dist, az, baz = calc_vincenty_inverse(lat_1, lon_1, lat_2, lon_2)
dist /= 1000.0 # from m to km
return dist
[docs]
def two_point_distance(lon_1, lat_1, depth_1, lon_2, lat_2, depth_2):
"""Compute the distance between two points.
Parameters
-----------
lon_1: scalar, float
Longitude of Point 1.
lat_1: scalar, float
Latitude of Point 1.
depth_1: scalar, float
Depth of Point 1 (in km).
lon_2: scalar, float
Longitude of Point 2.
lat_2: scalar, float
Latitude of Point 2.
depth_2: scalar, float
Depth of Point 2 (in km).
Returns
---------
dist: scalar, float
Distance between Point 1 and Point 2 in kilometers.
"""
from obspy.geodetics.base import calc_vincenty_inverse
dist, az, baz = calc_vincenty_inverse(lat_1, lon_1, lat_2, lon_2)
dist /= 1000.0 # from m to km
dist = np.sqrt(dist**2 + (depth_1 - depth_2) ** 2)
return dist
[docs]
def donefun(french=False):
"""
Super useful function.
"""
if not french:
msg = "ALL DONE!"
else:
from random import random
r = random()
if r < 0.25:
msg = "HOP LÀ!"
elif r < 0.50:
msg = "VOILÀ!"
elif r < 0.75:
msg = "BIM!"
else:
msg = "STYLÉ!"
print(
f"""⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⢀⡤⣤⣀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⡀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⢀⡏⠀⠀⠈⠳⣄⠀⠀⠀⠀⠀⣀⠴⠋⠉⠉⡆⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⢸⠀⠀⠀⠀⠀⠈⠉⠉⠙⠓⠚⠁⠀⠀⠀⠀⣿⠀⠀⠀⠀⠀
⠀⠀⠀⠀⢀⠞⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠹⣄⠀⠀⠀⠀
⠀⠀⠀⠀⡞⠀⠀⠀⠀⠀⠶⠀⠀⠀⠀⠀⠀⠦⠀⠀⠀⠀⠀⠸⡆⠀⠀⠀
⢠⣤⣶⣾⣧⣤⣤⣀⡀⠀⠀⠀⠀⠈⠀⠀⠀⢀⡤⠴⠶⠤⢤⡀⣧⣀⣀⠀
⠻⠶⣾⠁⠀⠀⠀⠀⠙⣆⠀⠀⠀⠀⠀⠀⣰⠋⠀⠀⠀⠀⠀⢹⣿⣭⣽⠇
⠀⠀⠙⠤⠴⢤⡤⠤⠤⠋⠉⠉⠉⠉⠉⠉⠉⠳⠖⠦⠤⠶⠦⠞⠁⠀⠀⠀
⠀{msg}⠀⠀⠀
"""
)
[docs]
def write_lock_file(path, check=False, flush=False):
if check:
assert os.path.isfile(path) is False, f"Lock file {path} already exists!"
if not flush:
open(path, "w").close()
else:
f = open(path, "w")
f.flush()
os.fsync(f.fileno())
f.close()
[docs]
def read_write_waiting_list(func, path, unit_wait_time=0.2):
"""Read/write queue to avoid conflicts between jobs."""
from time import sleep
path_no_ext, _ = os.path.splitext(path)
while True:
try:
path_lock = path_no_ext + "_lock"
path_wait = path_no_ext + "_waiting_list"
sleep(unit_wait_time * np.random.random())
waiting_list_position = len(glob.glob(path_wait + "*"))
waiting_list_ticket = f"{path_wait}{waiting_list_position:d}"
while True:
try:
# take place in waiting list by creating empty file
write_lock_file(waiting_list_ticket, check=True)
break
except AssertionError:
# several jobs trying to create the same ticket?
# randomness will solve the conflict
waiting_list_position += np.random.randint(5)
waiting_list_ticket = f"{path_wait}{waiting_list_position:d}"
next_place_ticket = f"{path_wait}{waiting_list_position-1:d}"
# print(f"1: Created {os.path.basename(waiting_list_ticket)}")
while True:
# sleep(unit_wait_time * np.random.random())
if waiting_list_position == 0:
# is first in the waiting list!
if not os.path.isfile(path_lock):
# first, create lock file
write_lock_file(path_lock, flush=True)
# print(f"2: Created {os.path.basename(path_lock)}")
# then, free the waiting list position #0
pathlib.Path(waiting_list_ticket).unlink()
# print(f"2: Deleted {os.path.basename(waiting_list_ticket)}")
# now the process can proceed with the reading or writing
break
else:
# wait a bit
sleep(unit_wait_time * np.random.random())
elif not os.path.isfile(next_place_ticket):
# front place in the waiting list was freed!
# first, create new ticket at the new position
write_lock_file(next_place_ticket)
# print(f"3: Created {os.path.basename(next_place_ticket)}")
# then, free previous place in the waiting list
pathlib.Path(waiting_list_ticket).unlink()
# print(f"3: Deleted {os.path.basename(waiting_list_ticket)}")
# update place in waiting list
waiting_list_position -= 1
# update ticket names
waiting_list_ticket = f"{path_wait}{waiting_list_position:d}"
next_place_ticket = f"{path_wait}{waiting_list_position-1:d}"
# and wait for its turn
sleep(unit_wait_time * np.random.random())
else:
# waiting list didn't change, just wait
sleep(unit_wait_time * np.random.random())
# start reading/writing
try:
func(path)
except Exception as e:
pathlib.Path(path_lock).unlink()
raise (e)
pathlib.Path(path_lock).unlink()
break
# done!
except FileNotFoundError:
print(
"4: Concurent error, reset queue "
f"(last ticket was {waiting_list_ticket})"
)
if waiting_list_position == 0:
print(os.path.isfile(path_lock))
pathlib.Path(path_lock).unlink()
continue
# =======================================================
# routines for automatic picking
# =======================================================
[docs]
def normalize_batch(seismogram, normalization_window_sample=3000, overlap=0.50):
"""Apply Z-score normalization in running windows.
Following Zhu et al. 2019, this function applied Z-score
normalization in running windows with length `normalization_window_sample`.
Parameters
-----------
seismogram : numpy.ndarray
Three-component seismograms. `seismogram` has shape
(num_traces, num_channels=3, num_time_samples).
normalization_window_sample : integer, optional
The window length, in samples, over which normalization is applied.
Default is 3000 (like in Zhu et al. 2019).
Returns
--------
normalized_seismogram : numpy.ndarray
Normalized seismogram with same shape as `seismogram`.
"""
from scipy.interpolate import interp1d
#shift = normalization_window_sample // 2
shift = int((1. - overlap) * normalization_window_sample)
num_stations, num_channels, num_time_samples = seismogram.shape
# std in sliding windows
seismogram_pad = np.pad(
seismogram, ((0, 0), (0, 0), (shift, shift)), mode="reflect"
)
# time = np.arange(0, num_time_samples, shift, dtype=np.int32)
seismogram_view = np.lib.stride_tricks.sliding_window_view(
seismogram_pad, normalization_window_sample, axis=-1
)[:, :, ::shift, :]
sliding_std = np.std(seismogram_view, axis=-1)
sliding_mean = np.mean(seismogram_view, axis=-1)
# time at centers of sliding windows
num_sliding_windows = seismogram_view.shape[2]
time = np.linspace(shift, num_time_samples - shift, num_sliding_windows)
sliding_std[:, :, -1], sliding_mean[:, :, -1] = (
sliding_std[:, :, -2],
sliding_mean[:, :, -2],
)
sliding_std[:, :, 0], sliding_mean[:, :, 0] = (
sliding_std[:, :, 1],
sliding_mean[:, :, 1],
)
sliding_std[sliding_std == 0] = 1
# normalize data with sliding std and mean
t_interp = np.arange(num_time_samples)
std_interp = np.stack(
tuple(np.interp(
t_interp, time, sld_std, left=sld_std[0], right=sld_std[-1]
)
for sld_std in sliding_std.reshape(-1, sliding_std.shape[-1])
),
axis=0
).reshape(
sliding_std.shape[:-1] + (len(t_interp),)
)
mean_interp = np.stack(
tuple(np.interp(
t_interp, time, m_std, left=m_std[0], right=m_std[-1]
)
for m_std in sliding_mean.reshape(-1, sliding_mean.shape[-1])
),
axis=0
).reshape(
sliding_mean.shape[:-1] + (len(t_interp),)
)
seismogram = (seismogram - mean_interp) / std_interp
return seismogram
[docs]
def find_picks(
phase_probability, threshold, **kwargs
):
"""Find phase picks from time series of phase probability.
Phase picks are given by the peaks exceeding `threshold` in
the time series `phase_probability`. The probability neighborhood
around each peak is used as the pdf of a given pick measurement.
Note: Additional key-word arguments are passed to
'scipy.signal.find_peaks'.
Parameters
----------
phase_probability : array-like
Probability time series of observing the arrival of a
given seismic phase.
threshold : float
Value above which peaks are taken to be candidate phase picks.
Returns
-------
peaks_value : `numpy.ndarray`
Probability value of the selected peaks.
peaks_mean : `numpy.ndarray`
Expected pick timing, in samples.
peaks_std : `numpy.ndarray`
Uncertainty on pick timing, in samples.
"""
# set the width kwarg to 1 if not defined
# so that `properties` has peak width info
kwargs.setdefault("width", 1)
peak_indexes, peak_properties = find_peaks(
phase_probability, height=threshold, **kwargs
)
peaks_value, peaks_mean, peaks_std = [], [], []
for i in range(len(peak_indexes)):
idx1 = int(peak_properties["left_ips"][i])
idx2 = int(peak_properties["right_ips"][i])
samples = np.arange(idx1, idx2+1)
prob = phase_probability[samples]
mean = np.sum(samples * prob) / prob.sum()
std = np.sqrt(np.sum((samples - mean)**2) / prob.sum())
peaks_mean.append(mean)
peaks_std.append(std)
peaks_value.append(phase_probability[peak_indexes[i]])
return np.asarray(peaks_value), np.asarray(peaks_mean), np.asarray(peaks_std)
[docs]
def get_picks(
picks,
buffer_length=int(2. * cfg.SAMPLING_RATE_HZ),
prior_knowledge=None,
search_win_samp=int(4. * cfg.SAMPLING_RATE_HZ)
):
"""Select a single P- and S-pick on each 3-comp seismogram.
Parameters
----------
picks : `pandas.DataFrame`
`pandas.DataFrame` as formatted in `BPMF.dataset.pick_PS_phases`.
buffer_length: scalar int, optional
Picks that are before this buffer length, in samples, are discarded.
prior_knowledge: pandas.DataFrame, optional
If given, picks that are closer to the a priori pick
(for example, given by a preliminary location) will be given
a larger weight and will be more likely to be selected. In practice,
pick probabilities are multiplied by gaussian weights and the highest
modified pick probability is selected.
search_win_samp: scalar int, optional
Standard deviation, in samples, used in the gaussian weights.
Returns
-------
picks : `pandas.DataFrame`
`pandas.DataFrame` with a single P- and S-wave pick per channel.
"""
columns = ["_picks", "_probas", "_unc"]
phases = ["P", "S"]
p_columns = ["P" + col for col in columns]
s_columns = ["S" + col for col in columns]
for sta in picks.index:
if prior_knowledge is not None and sta in prior_knowledge.index:
prior_P = prior_knowledge.loc[sta, "P"]
prior_S = prior_knowledge.loc[sta, "S"]
else:
prior_P, prior_S = None, None
#for n in range(len(picks["P_picks"][st])):
# ----------------
# remove picks from the buffer length
for ph in phases:
valid_picks = picks.loc[sta, f"{ph}_picks"] > int(buffer_length)
for col in columns:
picks.loc[sta, f"{ph}{col}"] = picks.loc[sta, f"{ph}{col}"][valid_picks]
search_S_pick = True
search_P_pick = True
if len(picks.loc[sta, "S_picks"]) == 0:
# if no valid S pick: fill in with nan
picks.loc[sta, s_columns] = np.nan
search_S_pick = False
if len(picks.loc[sta, "P_picks"]) == 0:
# if no valid P pick: fill in with nan
picks.loc[sta, p_columns] = np.nan
search_P_pick = False
if search_S_pick:
if prior_S is None:
# take only the highest probability trigger
best_S_trigger = picks.loc[sta, "S_probas"].argmax()
else:
# use a priori picks
tapered_S_probas = (
picks.loc[sta, "S_probas"]
*
np.exp(
-(picks.loc[sta, "S_picks"] - prior_S)**2/(2.*search_win_samp**2)
)
)
best_S_trigger = tapered_S_probas.argmax()
## don't keep if too far from a priori
#if abs(picks["S_picks"][st][best_S_trigger] - prior_S) > 4 * search_win_samp:
# best_S_trigger = np.nan
if np.isnan(best_S_trigger):
picks.loc[sta, s_columns] = np.nan
else:
for col in s_columns:
picks.loc[sta, col] = picks.loc[sta, col][best_S_trigger]
# update P picks: keep only those that are before the best S pick
if search_P_pick:
valid_P_picks = picks.loc[sta, "P_picks"] < picks.loc[sta, "S_picks"]
for col in p_columns:
picks.loc[sta, col] = picks.loc[sta, col][valid_P_picks]
if len(picks.loc[sta, "P_picks"]) == 0:
# if no valid P pick: fill in with nan
picks.loc[sta, p_columns] = np.nan
search_P_pick = False
if search_P_pick:
if prior_P is None:
# take only the highest probability trigger
best_P_trigger = picks.loc[sta, "P_probas"].argmax()
else:
# use a priori picks
tapered_P_probas = (
picks.loc[sta, "P_probas"]
*
np.exp(
-(picks.loc[sta, "P_picks"] - prior_P)**2/(2.*search_win_samp**2)
)
)
best_P_trigger = tapered_P_probas.argmax()
## don't keep if too far from a priori
#if abs(picks["P_picks"][st][best_P_trigger] - prior_P) > 4 * search_win_samp:
# best_P_trigger = np.nan
if np.isnan(best_P_trigger):
picks.loc[sta, p_columns] = np.nan
else:
for col in p_columns:
picks.loc[sta, col] = picks.loc[sta, col][best_P_trigger]
for col in picks:
picks[col] = np.float32(picks[col])
return picks
def _detect_peaks(
x,
mph=None,
mpd=1,
threshold=0,
edge="rising",
kpsh=False,
valley=False,
show=False,
ax=None,
):
"""Detect peaks in data based on their amplitude and other features.
Note: Will be removed in future versions because all these features are
now implemented in `scipy.signal.find_peaks`.
Parameters
----------
x : 1D array_like
data.
mph : {None, number}, optional (default = None)
detect peaks that are greater than minimum peak height.
mpd : positive integer, optional (default = 1)
detect peaks that are at least separated by minimum peak distance (in
number of data).
threshold : positive number, optional (default = 0)
detect peaks (valleys) that are greater (smaller) than `threshold`
in relation to their immediate neighbors.
edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising')
for a flat peak, keep only the rising edge ('rising'), only the
falling edge ('falling'), both edges ('both'), or don't detect a
flat peak (None).
kpsh : bool, optional (default = False)
keep peaks with same height even if they are closer than `mpd`.
valley : bool, optional (default = False)
if True (1), detect valleys (local minima) instead of peaks.
show : bool, optional (default = False)
if True (1), plot data in matplotlib figure.
ax : a matplotlib.axes.Axes instance, optional (default = None).
Returns
-------
ind : 1D array_like
indeces of the peaks in `x`.
Notes
-----
The detection of valleys instead of peaks is performed internally by simply
negating the data: `ind_valleys = detect_peaks(-x)`
The function can handle NaN's
See this IPython Notebook [1]_.
References
----------
.. [1]:http://nbviewer.ipython.org/github/demotu/BMC/blob/master/
notebooks/DetectPeaks.ipynb
Examples
--------
>>> from detect_peaks import detect_peaks
>>> x = np.random.randn(100)
>>> x[60:81] = np.nan
>>> # detect all peaks and plot data
>>> ind = detect_peaks(x, show=True)
>>> print(ind)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # set minimum peak height = 0 and minimum peak distance = 20
>>> detect_peaks(x, mph=0, mpd=20, show=True)
>>> x = [0, 1, 0, 2, 0, 3, 0, 2, 0, 1, 0]
>>> # set minimum peak distance = 2
>>> detect_peaks(x, mpd=2, show=True)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # detection of valleys instead of peaks
>>> detect_peaks(x, mph=0, mpd=20, valley=True, show=True)
>>> x = [0, 1, 1, 0, 1, 1, 0]
>>> # detect both edges
>>> detect_peaks(x, edge='both', show=True)
>>> x = [-2, 1, -2, 2, 1, 1, 3, 0]
>>> # set threshold = 2
>>> detect_peaks(x, threshold = 2, show=True)
"""
x = np.atleast_1d(x).astype("float64")
if x.size < 3:
return np.array([], dtype=int)
if valley:
x = -x
# find indices of all peaks
dx = x[1:] - x[:-1]
# handle NaN's
indnan = np.where(np.isnan(x))[0]
if indnan.size:
x[indnan] = np.inf
dx[np.where(np.isnan(dx))[0]] = np.inf
ine, ire, ife = np.array([[], [], []], dtype=int)
if not edge:
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]
else:
if edge.lower() in ["rising", "both"]:
ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]
if edge.lower() in ["falling", "both"]:
ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0]
ind = np.unique(np.hstack((ine, ire, ife)))
# handle NaN's
if ind.size and indnan.size:
# NaN's and values close to NaN's cannot be peaks
ind = ind[
np.in1d(
ind, np.unique(np.hstack((indnan, indnan - 1, indnan + 1))), invert=True
)
]
# first and last values of x cannot be peaks
if ind.size and ind[0] == 0:
ind = ind[1:]
if ind.size and ind[-1] == x.size - 1:
ind = ind[:-1]
# remove peaks < minimum peak height
if ind.size and mph is not None:
ind = ind[x[ind] >= mph]
# remove peaks - neighbors < threshold
if ind.size and threshold > 0:
dx = np.min(np.vstack([x[ind] - x[ind - 1], x[ind] - x[ind + 1]]), axis=0)
ind = np.delete(ind, np.where(dx < threshold)[0])
# detect small peaks closer than minimum peak distance
if ind.size and mpd > 1:
ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height
idel = np.zeros(ind.size, dtype=bool)
for i in range(ind.size):
if not idel[i]:
# keep peaks with the same height if kpsh is True
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) & (
x[ind[i]] > x[ind] if kpsh else True
)
idel[i] = 0 # Keep current peak
# remove the small peaks and sort back the indices by their occurrence
ind = np.sort(ind[~idel])
if show:
if indnan.size:
x[indnan] = np.nan
if valley:
x = -x
_plot_peaks(x, mph, mpd, threshold, edge, valley, ax, ind)
return ind