Source code for BPMF.similarity_search

import os

from .config import cfg
from . import utils
from . import dataset
from . import clib

import numpy as np
import fast_matched_filter as fmf
import matplotlib.pyplot as plt

from obspy.core import UTCDateTime as udt
from obspy import Stream

from functools import partial
from scipy.stats import kurtosis
import concurrent.futures

from time import time as give_time


[docs] class MatchedFilter(object): """Class for running a matched filter search and detecting earthquakes.""" def __init__( self, template_group, min_channels=6, min_stations=3, max_kurto=100.0, remove_edges=True, normalize=True, max_CC_threshold=0.80, n_network_chunks=1, threshold_type="rms", step=cfg.MATCHED_FILTER_STEP_SAMP, max_memory=None, max_workers=None, num_threads_threshold=None, anomalous_cdf_at_mean_plus_1sig=0.00, window_for_validation_Tmax=100.0, offset_win_peak_amp_sec=1.0, duration_win_peak_amp_sec=3.0, phase_on_comp_peak_amp={"N": "S", "E": "S", "Z": "P"}, ): """Instanciate a MatchedFilter object. Parameters ------------ template_group: `dataset.TemplateGroup` instance The `dataset.TemplateGroup` instance with all the templates to search for in the data. remove_edges: boolean, default to True If True, remove the detections occurring at the beginning and end of the data time series. The duration of the edges are set by `BPMF.cfg.DATA_BUFFER_SEC`. min_channels: scalar int, default to 6 Minimum number of channels to consider the CCs valid. min_stations: scalar int, default to 3 Minimum number of stations to consider the CCs valid. max_kurto: scalar float, default to 100 Maximum kurtosis allowed on the CC distribution. Above this threshold, it is likely that something went wrong on that day. Warning! A higher number of stations will tend to improve the SNR in the CC time series, and thus to increase the kurtosis values of fine CC distribution. If not sure, set this value to something very large (e.g. 10000). normalize: boolean, default to True If True, normalize the traces by their RMS when given as input to FMF. This is recommended to avoid problems with low numbers. n_network_chunks: scalar int, default to 1 Increase this number is your GPU(s) encounters memory issues. threshold_type : string, default to 'rms' Either 'rms' or 'mad'. Determines whether the detection threshold uses the rms or the mad of the correlation coefficient time series. step: scalar float, default to `cfg.MATCHED_FILTER_STEP_SAMP` Step, in samples, of the matched filter search. That is, time interval between two sliding windows. max_memory: scalar float, default to None If not None, `max_memory` is the maximum memory, in Gb, to not exceed during the matched-filter search. The MatchedFilter instance will compute the maximum number of templates to use in one run to not exceed this memory threshold. max_workers: scalar int or None, default to None Controls the maximum number of threads created when finding detections of new events in the CC time series. If None, use one CPU. anomalous_cdf_at_mean_plus_1sig: scalar float, optional Anomalous cumulative distribution function (cdf), between 0 and 1, at {mean + 1xsigma}(CC), default to 0.00. The CC distribution is approximately gaussian and, therefore, the expected value of {mean + 1xsigma}(CC) is 0.78. If {mean + 1xsigma}(CC) is significantly lower than 0.78 in the vicinity of a CC(t) that exceeded `threshold`, that is, lower than `anomalous_cdf_at_mean_plus_1sig`, then we consider that {mean + 1xsigma}(CC) was not properly estimated at time t. Since {mean + 1xsigma}(CC) is computed from `threshold`, the detection threshold was probably not adequate and we discard CC(t). **Set this parameter to zero to deactivate validation of the detection threshold. Otherwise, a good default value seems to be 0.50.** window_for_validation_Tmax: scalar float, optional Window duration, in units of Tmax, the longest period in the data (that is, the inverse of `cfg.MIN_FREQ_HZ`), for analyzing the statistic of CC(t) in the vicinity of each CC(t) > `threshold`. Default to 100. offset_win_peak_amp_sec : scalar, optional The offset, in seconds, between the phase pick and the beginning of the window used to extract the event peak amplitude at a given channel. Default is 1 second. duration_win_peak_amp_sec : scalar, optional The duration, in seconds, of the window used to extract the event peak amplitude at a given channel. Default is 3 seconds. phase_on_comp_peak_amp : dictionary, optional A dictionary defining which phase is used on each channel when measuring the peak amplides. """ self.template_group = template_group self.min_channels = min_channels self.min_stations = min_stations self.max_kurto = max_kurto self.remove_edges = remove_edges self.normalize = normalize self.max_CC_threshold = max_CC_threshold self.n_network_chunks = n_network_chunks self.threshold_type = threshold_type.lower() self.step = step self.max_memory = max_memory if max_workers is None: max_workers = 1 self.max_workers = max_workers if num_threads_threshold is None: num_threads_threshold = max(1, os.cpu_count() // max_workers) self.num_threads_threshold = num_threads_threshold self.anomalous_cdf_at_mean_plus_1sig = anomalous_cdf_at_mean_plus_1sig self.window_for_validation_Tmax = window_for_validation_Tmax self.offset_win_peak_amp_sec = offset_win_peak_amp_sec self.duration_win_peak_amp_sec = duration_win_peak_amp_sec self.phase_on_comp_peak_amp = phase_on_comp_peak_amp # properties @property def components(self): return self.template_group.components @property def stations(self): return self.template_group.stations @property def memory_cc_time_series(self): if not hasattr(self, "data"): return 0.0 else: # 1 float32 = 4 bytes nbytes = 4 * int(self.data.duration / (self.step / self.data.sr)) nGbytes = nbytes / (1024.0**3) return nGbytes
[docs] def set_data(self, data): """Attribute `dataset.Data` instance to `self`. Parameters ------------ data: `dataset.Data` instance The `dataset.Data` instance to read the continuous data to scan. """ self.data = data self.data_arr = self.data.get_np_array( self.stations, components=self.components ) self.offset_win_peak_amp_samp = utils.sec_to_samp( self.offset_win_peak_amp_sec, sr=self.data.sr ) self.duration_win_peak_amp_samp = utils.sec_to_samp( self.duration_win_peak_amp_sec, sr=self.data.sr ) if self.normalize: norm = np.std(self.data_arr, axis=-1, keepdims=True) norm[norm == 0.0] = 1.0 self.data_norm = norm self.data_arr /= norm
[docs] def select_cc_indexes( self, cc_t, threshold, search_win, anomalous_cdf_at_mean_plus_1sig=0.50, window_for_validation_Tmax=100.0, ): """Select the peaks in the CC time series. Parameters ------------ cc_t: (n_corr,) numpy.ndarray The CC time series for one template. threshold: (n_corr,) numpy.ndarray or scalar The detection threshold. search_win: scalar int The minimum inter-event time, in units of correlation step. anomalous_cdf_at_mean_plus_1sig: scalar float, optional Anomalous cumulative distribution function (cdf), between 0 and 1, at {mean + 1xsigma}(CC), default to 0.50. The CC distribution is approximately gaussian and, therefore, the expected value of {mean + 1xsigma}(CC) is 0.78. If {mean + 1xsigma}(CC) is significantly lower than 0.78 in the vicinity of a CC(t) that exceeded `threshold`, that is, lower than `anomalous_cdf_at_mean_plus_1sig`, then we consider that {mean + 1xsigma}(CC) was not properly estimated at time t. Since {mean + 1xsigma}(CC) is computed from `threshold`, the detection threshold was probably not adequate and we discard CC(t). **Set this parameter to zero to deactivate validation of the detection threshold.** window_for_validation_Tmax: scalar float, optional Window duration, in units of Tmax, the longest period in the data (that is, the inverse of `cfg.MIN_FREQ_HZ`), for analyzing the statistic of CC(t) in the vicinity of each CC(t) > `threshold`. Default to 100. Returns -------- cc_idx: (n_detections,) numpy.ndarray The list of all selected CC indexes. They give the timings of the detected events. """ sr = self.data.sr step = self.step cc_detections = cc_t > threshold cc_idx = np.where(cc_detections)[0] WINDOW_FOR_VALIDATION = int(1.0 / cfg.MIN_FREQ_HZ * window_for_validation_Tmax) cc_at_mean_plus_1sig = threshold / cfg.N_DEV_MF_THRESHOLD if self.threshold_type == "mad": # get estimate of STD from MAD cc_at_mean_plus_1sig *= 1.48 cc_idx = list(cc_idx) n_rm = 0 for i in range(1, len(cc_idx)): if (cc_idx[i - n_rm] - cc_idx[i - n_rm - 1]) < search_win: if cc_t[cc_idx[i - n_rm]] > cc_t[cc_idx[i - n_rm - 1]]: # keep (i-n_rm)-th detection cc_idx.remove(cc_idx[i - n_rm - 1]) else: # keep (i-n_rm-1)-th detection cc_idx.remove(cc_idx[i - n_rm]) n_rm += 1 cc_idx = np.asarray(cc_idx) if anomalous_cdf_at_mean_plus_1sig > 0.0: # test the validity of detection threshold? valid_detections = np.ones(len(cc_idx), dtype=bool) for i in range(len(cc_idx)): idx0 = max(0, cc_idx[i] - WINDOW_FOR_VALIDATION // 2) idx1 = idx0 + WINDOW_FOR_VALIDATION if idx1 >= len(cc_t): idx1 = len(cc_t) - 1 idx0 = idx1 - WINDOW_FOR_VALIDATION cc1 = cc_t[idx0:idx1][: WINDOW_FOR_VALIDATION // 2] cc2 = cc_t[idx0:idx1][WINDOW_FOR_VALIDATION // 2 :] frac = min( np.sum(cc1 < cc_at_mean_plus_1sig[cc_idx[i]]) / float(len(cc1)), np.sum(cc2 < cc_at_mean_plus_1sig[cc_idx[i]]) / float(len(cc2)), ) if frac < anomalous_cdf_at_mean_plus_1sig: # an anomalous amount of CCs are above +1sig # the detection threshold most likely failed (gap in data?) valid_detections[i] = False cc_idx = cc_idx[valid_detections] # go back to regular sampling space detection_indexes = cc_idx * self.step if self.remove_edges: # remove detections from buffer limit = utils.sec_to_samp(cfg.DATA_BUFFER_SEC, sr=sr) idx = detection_indexes >= limit cc_idx = cc_idx[idx] detection_indexes = detection_indexes[idx] limit = utils.sec_to_samp(self.data.duration + cfg.DATA_BUFFER_SEC, sr=sr) idx = detection_indexes < limit cc_idx = cc_idx[idx] return cc_idx
[docs] def compute_cc_time_series(self, weight_type="simple", device="cpu", tids=None): """Compute the CC time series (step 1 of matched-filter search). Parameters ---------- weight_type: string, default to 'simple' 'simple' is the only option for now. device : string, default to 'cpu' Either 'cpu' or 'gpu'. tids: list of `tid`, default to None If None, run the matched filter search with all templates in `self.template_group`. If not None, the matched-filter search is run with a limited number of templates (convenient to not overflow the device's memory). """ if not hasattr(self, "data"): print("You should call `self.set_data(data)` first.") return if tids is None: select_tts = np.arange(self.template_group.n_templates, dtype=np.int32) else: select_tts = self.template_group.tindexes.loc[tids] # store in memory the list of tids corresponding to the subset # of templates selected for this run self.tids_subset = self.template_group.tids[select_tts].tolist() # ---------------------------------------------- # parameters nt, ns, nc, Nsamp = self.template_group.waveforms_arr[select_tts, ...].shape n_stations, n_components, n_samples_data = self.data_arr.shape if weight_type == "simple": # equal weights to all channels weights_arr = np.float32( self.template_group.network_to_template_map[select_tts, ...] ) if hasattr(self.data, "availability"): # turn weights to zero on unavailable stations weights_arr[:, ~self.data.availability_per_cha.values] = 0.0 norm = np.sum(weights_arr, axis=(1, 2), keepdims=True) norm[norm == 0.0] = 1.0 weights_arr /= norm # insufficient data invalid = ( np.sum((weights_arr != 0.0), axis=(1, 2)) < self.min_channels ) | (np.sum(np.sum(weights_arr, axis=2) > 0.0, axis=1) < self.min_stations) weights_arr[invalid] = 0.0 self.weights_arr = weights_arr # ---------------------------------------------- # are there templates with zero-weights only? # if yes, skip them to gain time invalid_weights = np.sum(self.weights_arr, axis=(1, 2)) == 0 tindexes_to_skip = select_tts[invalid_weights] select_tts = select_tts[~invalid_weights] # ---------------------------------------------- # compute the CC time series: run FMF if len(select_tts) > 0: CC_SUMS = [] L = ns // self.n_network_chunks + 1 for i in range(self.n_network_chunks): # to be memory friendly, we subdivide the network into n_network_chunks # and the resulting correlation coefficients are then manually stacked # in a separate loop id1 = i * L id2 = (i + 1) * L if id2 > ns: id2 = ns cc_sums = fmf.matched_filter( self.template_group.waveforms_arr[select_tts, id1:id2, :, :], self.template_group.moveouts_arr[select_tts, id1:id2, :], weights_arr[~invalid_weights, id1:id2, :], self.data_arr[id1:id2, ...], self.step, arch=device, ) CC_SUMS.append(cc_sums) cc_sums = CC_SUMS[0] for i in range(1, self.n_network_chunks): # stack the correlation coefficients cc_sums += CC_SUMS[i] cc_sums[np.isnan(cc_sums)] = 0.0 self.cc = {} for t, tid in enumerate(self.template_group.tids[select_tts]): self.cc[tid] = cc_sums[t, ...] for tid in self.template_group.tids[tindexes_to_skip]: self.cc[tid] = np.array([0.0])
[docs] def find_detections( self, minimum_interevent_time, threshold_window_dur=1800.0, overlap=0.25, sanity_check=True, verbose=0, ): """Analyze the time series of correlation coefficients to find detections. Parameters ----------- minimum_interevent_time: scalar, float The shortest duration, in seconds, allowed between two consecutive detections. threshold_window_dur: scalar float, default to 600 Duration, in seconds, of the sliding window used in the computation of the time-dependent detection threshold. sanity_check: boolean, default to True If True, check that the kurtosis of the CC time series is not above `self.max_kurto`. Set to False to speed up the computation. verbose: scalar int, default to 0 If > 0, print some messages. Returns ----------- detections: dictionary, Dictionary where `detections[tid]` is a list of `dataset.Event` for all events detected with template `tid`. """ self.minimum_interevent_time = minimum_interevent_time self.threshold_window_dur = threshold_window_dur self.overlap = overlap self.sanity_check = sanity_check self.white_noise = np.random.normal(size=self.data.n_samples).astype("float32") sr = self.data.sr detections = {} tids = list(self.cc.keys()) if self.max_workers == 1: output = list(map(self._find_detections_t, tids)) else: with concurrent.futures.ThreadPoolExecutor( max_workers=min(len(tids), self.max_workers) ) as executor: output = list(executor.map(self._find_detections_t, tids)) detections.update({output[i][1]: output[i][0] for i in range(len(output))}) if verbose > 0: for tid in tids: print(f"Template {tid} detected {len(detections[tid]):d} events.") return detections
def _find_detections_t(self, tid): """ """ # t: index in this run's loop # tt: index in the self.template_group.templates list # tid: template id t = self.tids_subset.index(tid) tt = self.template_group.tindexes.loc[tid] minimum_interevent_time = utils.sec_to_samp( self.minimum_interevent_time, sr=self.data.sr ) cc_t = self.cc[tid] weights_t = self.weights_arr[t, ...] valid = cc_t != 0.0 if np.sum(valid) == 0: # return no detection return [], tid else: threshold = time_dependent_threshold( cc_t, utils.sec_to_samp(self.threshold_window_dur, sr=self.data.sr), threshold_type=self.threshold_type, #white_noise=self.white_noise, overlap=self.overlap, num_threads=self.num_threads_threshold ) # saturate threshold as requested by the user threshold = np.minimum(self.max_CC_threshold * np.sum(weights_t), threshold) # temporary: if threshold.max() == 0.0: print(f"Issue with detection threshold on template {tid}!") if self.sanity_check: # ------------------ # sanity check: the cc time series should approximately # be normal, therefore, the kurtosis should be around 0 # strong deviations from 0 kurtosis arise when some of the # data are unavailable during the period of time being processed if kurtosis(cc_t) > self.max_kurto: # 20 is already a very conservative threshold print("Kurtosis too large! Set the CCs to zero. ()") cc_t = np.zeros(cc_t.size, dtype=np.float32) # select the peaks in the CC time series # ---------------------------------------------------------- # only keep highest correlation coefficient for grouped detections # --- different phases correlating with one another can typically # --- produce high enough CCs to trigger a detection # --- therefore, we use the time difference between the earliest # --- phase and the latest phase, on each station, as a proxy for # --- the allowed inter-event time d_mv = np.max(self.template_group.moveouts_arr[tt, ...], axis=-1) - np.min( self.template_group.moveouts_arr[tt, ...], axis=-1 ) # take the median across stations d_mv = int(np.median(d_mv)) + 1 search_win = min( 10 * minimum_interevent_time, max(d_mv, minimum_interevent_time) ) search_win /= self.step # time in correlation steps units cc_idx = self.select_cc_indexes( cc_t, threshold, search_win, anomalous_cdf_at_mean_plus_1sig=self.anomalous_cdf_at_mean_plus_1sig, window_for_validation_Tmax=self.window_for_validation_Tmax, ) detection_indexes = cc_idx * self.step # ---------------------------------------------------------- n_detections = len(detection_indexes) # extract waveforms detections_t = [] data_path, data_filename = os.path.split(self.data.where) # give template's attributes to each detection template = self.template_group.templates[tt] # make sure stations and mv are consistent stations = self.stations latitude = template.latitude longitude = template.longitude depth = template.depth mv = template.moveouts.loc[self.stations].values phases = template.phases for i in range(len(detection_indexes)): event = Stream() ot_i = self.data.date + detection_indexes[i] / self.data.sr event = dataset.Event( ot_i, mv, stations, phases, data_filename, data_path, latitude=latitude, longitude=longitude, depth=depth, sampling_rate=self.data.sr, data_reader=self.data.data_reader, ) if self.extract_peak_amplitudes: peak_amplitudes = np.zeros( (len(self.stations), len(self.components)), dtype=np.float32 ) for s, sta in enumerate(self.stations): for c, cp in enumerate(self.components): ph = self.phase_on_comp_peak_amp[cp].upper() mv_sc = utils.sec_to_samp( template.moveouts.loc[sta, f"moveouts_{ph}"], sr=self.data.sr, ) time_idx1 = ( detection_indexes[i] + mv_sc - self.offset_win_peak_amp_samp ) time_idx2 = time_idx1 + self.duration_win_peak_amp_samp win_peak_amp = self.data_arr[s, c, time_idx1:time_idx2] if len(win_peak_amp) > 0: peak_amplitudes[s, c] = ( win_peak_amp.max() * self.data_norm[s, c] ) aux_data = {} aux_data["cc"] = cc_t[cc_idx[i]] aux_data["n_threshold"] = cc_t[cc_idx[i]] / threshold[cc_idx[i]] aux_data["n_dev"] = aux_data["n_threshold"] * cfg.N_DEV_MF_THRESHOLD aux_data["tid"] = tid if self.extract_peak_amplitudes: aux_data["peak_amplitudes"] = peak_amplitudes event.set_aux_data(aux_data) detections_t.append(event) return detections_t, tid # ------------------------------------------- # Plotting methods # -------------------------------------------
[docs] def plot_cc(self, tid, ax=None, detection=None, **kwargs): """Plot the time series of correlation coefficients. Parameters ----------- tid: string or scalar int The id of the template that produces the CCs. ax: plt.Axes, default to None If not None, plot in this axis. detection: dataset.Event, default to None A dataset.Event instance of a given detection. figsize: tuple, default to (20, 7) Width and height of the figure, in inches. Returns -------- fig: plt.Figure The Figure instance produced by this method. """ import matplotlib.pyplot as plt import matplotlib.dates as mdates if ax is None: # plot the correlation coefficients fig = plt.figure( f"correlation_coefficients_tp{tid}", figsize=kwargs.get("figsize", (15, 7)), ) ax = fig.add_subplot(111) else: fig = ax.get_figure() if tid not in self.cc.keys(): print( f"{tid} is not in `self.cc.keys()`. Re-run" " `self.compute_cc_time_series` properly." ) return sr = self.data.sr cc_t = self.cc[tid] weights_t = self.weights_arr[self.tids_subset.index(tid), ...] detection_threshold = time_dependent_threshold( cc_t, utils.sec_to_samp(self.threshold_window_dur, sr=sr), threshold_type=self.threshold_type, overlap=self.overlap, ) # saturate threshold as requested by the user detection_threshold = np.minimum( self.max_CC_threshold * np.sum(weights_t), detection_threshold ) cc_idx = self.select_cc_indexes( cc_t, detection_threshold, utils.sec_to_samp(self.minimum_interevent_time, sr=sr) // self.step, anomalous_cdf_at_mean_plus_1sig=self.anomalous_cdf_at_mean_plus_1sig, window_for_validation_Tmax=self.window_for_validation_Tmax, ) time_indexes = np.arange(len(self.data.time), dtype=np.int32)[:: self.step][ : len(cc_t) ] ax.plot( self.data.time[time_indexes], cc_t, rasterized=kwargs.get("rasterized", True), ) ax.plot( self.data.time[time_indexes], detection_threshold, color="C3", ls="--", label="Detection Threshold", rasterized=kwargs.get("rasterized", True), ) if len(cc_idx) > 0: ax.plot( self.data.time[cc_idx * self.step], cc_t[cc_idx * self.step], marker="o", ls="", color="C3", ) ax.legend(loc="upper right") ax.set_xlabel("Time of the day") ax.set_ylabel("Network CC") ax.set_xlim(self.data.time.min(), self.data.time.max()) # ax.set_ylim(-0.1*(detection_threshold.max() - cnr.min()), 1.2*detection_threshold.max()) # ax.set_ylim(-0.1*(detection_threshold.max() - cnr.min()), 1.2*detection_threshold.max()) ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) ax.legend(loc="upper right") if detection is not None: ot = np.datetime64(detection.origin_time) ax.annotate( "detection", (ot, detection.aux_data["cc"]), ( ot + np.timedelta64(15, "m"), min(ax.get_ylim()[1], 2.0 * detection.aux_data["cc"]), ), arrowprops={"width": 2, "headwidth": 5, "color": "k"}, ) return fig
[docs] def plot_detection( self, detection, figsize=(20, 20), component_aliases={"N": ["N", "1"], "E": ["E", "2"], "Z": ["Z"]}, n_max_stations=None, ): """Plot a detection and the correlation coefficients. Parameters ----------- detection: dataset.Event A dataset.Event instance of a given detection. figsize: tuple, default to (20, 20) Widht and height of the figure, in inches. component_aliases: Dictionary, optional Sometimes, components might be named differently than N, E, Z. This dictionary tells the function which alternative component names can be associated with each "canonical" component. For example, `component_aliases['N'] = ['N', '1']` means that the function will also check the '1' component in case the 'N' component doesn't exist. Returns ------- fig: plt.Figure The Figure instance produced by this method. """ import matplotlib.pyplot as plt import matplotlib.dates as mdates tid = detection.aux_data["tid"] if n_max_stations is not None: from copy import deepcopy detection = deepcopy(detection) detection.n_closest_stations(n_max_stations) if tid not in self.cc.keys(): print( f"{tid} is not in `self.cc.keys()`. Re-run" " `self.compute_cc_time_series` properly." ) return tt = self.template_group.tindexes[tid] sr = self.data.sr fig = plt.figure(f"detection_{detection.origin_time}", figsize=figsize) grid = fig.add_gridspec( nrows=len(detection.stations) + 2, ncols=len(detection.components), hspace=0.35, ) start_times, end_times = [], [] wav_axes = [] ax_cc = fig.add_subplot(grid[:2, :]) self.plot_cc(tid, ax=ax_cc, detection=detection) ax_cc.set_ylim(-1.1, 1.1) for s, sta in enumerate(detection.stations): for c, cp in enumerate(detection.components): ax = fig.add_subplot(grid[2 + s, c]) for cp_alias in component_aliases[cp]: tr = detection.traces.select(station=sta, component=cp_alias) if len(tr) > 0: # succesfully retrieved data break if len(tr) == 0: print(f"Did not find any continuous data on {sta}.{cp}") continue else: tr = tr[0] time = utils.time_range( tr.stats.starttime, tr.stats.endtime, 1.0 / sr, unit="ms" ) start_times.append(time[0]) end_times.append(time[-1]) ax.plot( time[: detection.n_samples], utils.max_norm(tr.data[: detection.n_samples]), color="k", ) # plot the template waveforms tr_tp = self.template_group.templates[tt].traces.select( station=sta, component=cp_alias ) if len(tr_tp) > 0: tr_tp = tr_tp[0] ax.plot( time[: detection.n_samples], utils.max_norm(tr_tp.data[: detection.n_samples]), color="C3", lw=0.50, ) # plot the theoretical pick phase = detection.aux_data[f"phase_on_comp{cp_alias}"].upper() offset_ph = detection.aux_data[f"offset_{phase}"] ax.axvline( time[0] + np.timedelta64(int(1000.0 * offset_ph), "ms"), color="C3" ) ax.text(0.05, 0.05, f"{sta}.{cp_alias}", transform=ax.transAxes) wav_axes.append(ax) for ax in wav_axes: ax.set_xlim(min(start_times), max(end_times)) ax.xaxis.set_major_formatter( mdates.ConciseDateFormatter(ax.xaxis.get_major_locator()) ) plt.subplots_adjust(top=0.95, bottom=0.06, right=0.98, left=0.06) return fig
# def time_dependent_threshold( # time_series, sliding_window, overlap=0.66, threshold_type="rms", white_noise=None # ): # """ # Time dependent detection threshold. # # Parameters # ----------- # time_series: (n_correlations) array_like # The array of correlation coefficients calculated by # FMF (float 32). # sliding_window: scalar integer # The size of the sliding window, in samples, used # to calculate the time dependent central tendency # and deviation of the time series. # overlap: scalar float, default to 0.75 # threshold_type: string, default to 'rms' # Either rms or mad, depending on which measure # of deviation you want to use. # white_noise: `numpy.ndarray` or None, default to None # If not None, `white_noise` is a vector of random values sampled from the # standard normal distribution. It is used to fill zeros in the CC time # series. If None, a random vector is generated from scratch. # # Returns # ---------- # threshold: (n_correlations) array_like # Returns the time dependent threshold, with same # size as the input time series. # """ # # time_series = time_series.copy() # threshold_type = threshold_type.lower() # n_samples = len(time_series) # half_window = sliding_window // 2 # shift = int((1.0 - overlap) * sliding_window) # zeros = time_series == 0.0 # n_zeros = np.sum(zeros) # if white_noise is None: # white_noise = np.random.normal(size=n_zeros).astype("float32") # if threshold_type == "rms": # default_center = time_series[~zeros].mean() # default_deviation = np.std(time_series[~zeros]) # # note: white_noise[n_zeros] is necessary in case white_noise # # was not None # time_series[zeros] = ( # white_noise[:n_zeros] * default_deviation + default_center # ) # time_series_win = np.lib.stride_tricks.sliding_window_view( # time_series, sliding_window # )[::shift, :] # center = np.mean(time_series_win, axis=-1) # deviation = np.std(time_series_win, axis=-1) # elif threshold_type == "mad": # default_center = np.median(time_series[~zeros]) # default_deviation = np.median(np.abs(time_series[~zeros] - default_center)) # time_series[zeros] = ( # white_noise[:n_zeros] * default_deviation + default_center # ) # time_series_win = np.lib.stride_tricks.sliding_window_view( # time_series, sliding_window # )[::shift, :] # center = np.median(time_series_win, axis=-1) # deviation = np.median( # np.abs(time_series_win - center[:, np.newaxis]), axis=-1 # ) # threshold = center + cfg.N_DEV_MF_THRESHOLD * deviation # threshold[1:] = np.maximum(threshold[:-1], threshold[1:]) # threshold[:-1] = np.maximum(threshold[:-1], threshold[1:]) # time = np.arange(half_window, n_samples - (sliding_window - half_window)) # indexes_l = time // shift # indexes_l[indexes_l >= len(threshold)] = len(threshold) - 1 # #breakpoint() # threshold = threshold[indexes_l] # threshold = np.hstack( # ( # threshold[0] * np.ones(half_window, dtype=np.float32), # threshold, # threshold[-1] * np.ones(sliding_window - half_window, dtype=np.float32), # ) # ) # return threshold
[docs] def time_dependent_threshold( time_series, sliding_window, overlap=0.66, threshold_type="rms", white_noise=None, num_threads=None ): """ Time dependent detection threshold. Parameters ----------- time_series: (n_correlations) array_like The array of correlation coefficients calculated by FMF (float 32). sliding_window: scalar integer The size of the sliding window, in samples, used to calculate the time dependent central tendency and deviation of the time series. overlap: scalar float, default to 0.75 threshold_type: string, default to 'rms' Either rms or mad, depending on which measure of deviation you want to use. white_noise: `numpy.ndarray` or None, default to None If not None, `white_noise` is a vector of random values sampled from the standard normal distribution. It is used to fill zeros in the CC time series. If None, a random vector is generated from scratch. Returns ---------- threshold: (n_correlations) array_like Returns the time dependent threshold, with same size as the input time series. """ threshold_type = threshold_type.lower() if threshold_type == "rms": threshold = clib.time_dependent_threshold( time_series, sliding_window, cfg.N_DEV_MF_THRESHOLD, overlap=overlap, white_noise=white_noise, num_threads=num_threads ) return threshold time_series = time_series.copy() n_samples = len(time_series) half_window = sliding_window // 2 shift = int((1.0 - overlap) * sliding_window) zeros = time_series == 0.0 n_zeros = np.sum(zeros) if white_noise is None: white_noise = np.random.normal(size=n_zeros).astype("float32") # method is "mad" default_center = np.median(time_series[~zeros]) default_deviation = np.median(np.abs(time_series[~zeros] - default_center)) time_series[zeros] = white_noise[:n_zeros] * default_deviation + default_center time_series_win = np.lib.stride_tricks.sliding_window_view( time_series, sliding_window )[::shift, :] center = np.median(time_series_win, axis=-1) deviation = np.median(np.abs(time_series_win - center[:, np.newaxis]), axis=-1) threshold = center + cfg.N_DEV_MF_THRESHOLD * deviation threshold[1:] = np.maximum(threshold[:-1], threshold[1:]) threshold[:-1] = np.maximum(threshold[:-1], threshold[1:]) time = np.arange(half_window, n_samples - (sliding_window - half_window)) indexes_l = time // shift indexes_l[indexes_l >= len(threshold)] = len(threshold) - 1 # breakpoint() threshold = threshold[indexes_l] threshold = np.hstack( ( threshold[0] * np.ones(half_window, dtype=np.float32), threshold, threshold[-1] * np.ones(sliding_window - half_window, dtype=np.float32), ) ) return threshold