Source code for BPMF.spectrum

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.signal as scisig
from scipy.interpolate import interp1d
from obspy import Stream


[docs] class Spectrum: """ Class for handling spectral data and calculations.$a """ def __init__(self, event=None, frequency_bands=None): """ Parameters ---------- event : str or None, optional The event associated with the spectrum. Default is None. frequency_bands : list or None, optional The frequency bands for the spectrum. Default is None. Attributes ---------- event : str or None The event associated with the spectrum. frequency_bands : list or None The frequency bands for the spectrum. """ self.event = event if frequency_bands is not None: self.set_frequency_bands(frequency_bands)
[docs] def set_Q_model(self, Q, frequencies): """ Set the attenuation Q model for P and S phases. Parameters ---------- Q : array-like Array of attenuation Q values. frequencies : array-like Array of corresponding frequencies. These are used to later keep track of which frequencies were used for the Q model. Returns ------- None The Q model is stored in the `Q0` attribute. The frequencies are stored in the `Q0_frequencies` attribute. These are later used to computed the Q-model and attenuation factor at arbitrary frequencies. """ self.Q0 = np.asarray(Q) self.Q0_frequencies = np.asarray(frequencies) interpolator = interp1d( frequencies, Q, kind="linear", fill_value=(Q[0], Q[-1]), bounds_error=False ) self.Q = np.asarray(interpolator(self.frequencies))
[docs] def update_Q_model(self): """ Interpolate the Q-model at the current `self.frequencies`. """ interpolator = interp1d( self.Q0_frequencies, self.Q0, kind="linear", fill_value=(self.Q0[0], self.Q0[-1]), bounds_error=False, ) self.Q = interpolator(self.frequencies)
[docs] def update_attenuation_factor(self): """ Compute attenuation factor at the current `self.frequencies`. """ self.update_Q_model() for sta in self.event.source_receiver_dist.index: r_m = 1000.0 * self.event.source_receiver_dist.loc[sta] tt_s = self.event.arrival_times.loc[sta, "S_tt_sec"] tt_p = self.event.arrival_times.loc[sta, "P_tt_sec"] self.attenuation_factor.loc[sta, f"attenuation_S"] = np.exp( np.pi * tt_s * np.asarray(self.frequencies) / self.Q ) self.attenuation_factor.loc[sta, f"attenuation_P"] = np.exp( np.pi * tt_s * np.asarray(self.frequencies) / self.Q )
[docs] def compute_correction_factor( self, rho_source, rho_receiver, vp_source, vp_receiver, vs_source, vs_receiver, radiation_S=np.sqrt(2.0 / 5.0), radiation_P=np.sqrt(4.0 / 15.0), ): """ Compute the correction factor and attenuation factor for a seismic event. Parameters ---------- rho_source : float Density of the source medium, in kg/m3. rho_receiver : float Density of the receiver medium, in kg/m3. vp_source : float P-wave velocity of the source medium, in m/s. vp_receiver : float P-wave velocity of the receiver medium, in m/s. vs_source : float S-wave velocity of the source medium, in m/s. vs_receiver : float S-wave velocity of the receiver medium, in m/s. radiation_S : float, optional Radiation coefficient for S-wave. Default is sqrt(2/5). radiation_P : float, optional Radiation coefficient for P-wave. Default is sqrt(4/15). Returns ------- None The correction factor and attenuation factor are stored in the object's attributes `correction_factor` and `attenuation_factor`. Notes ----- This method requires the object to have an attached `BPMF.dataset.Event` instance and for the instance to have called the `set_source_receiver_dist(network)` method. """ from scipy.interpolate import interp1d if not hasattr(self, "event"): print("Attach the BPMF.dataset.Event instance first.") return if not hasattr(self.event, "_source_receiver_dist"): print("Call event.set_source_receiver_dist(network) first.") return stations = np.sort(self.event.source_receiver_dist.index) correction_factor = pd.DataFrame(index=stations) attenuation_factor = pd.DataFrame( index=stations, columns=["attenuation_P", "attenuation_S"], dtype=object ) if hasattr(self, "Q"): self.update_Q_model() for sta in stations: r_m = 1000.0 * self.event.source_receiver_dist.loc[sta] tt_s = self.event.arrival_times.loc[sta, "S_tt_sec"] corr_s = ( 4.0 * np.pi * np.sqrt(rho_receiver) * np.sqrt(rho_source) * np.sqrt(vs_receiver) * vs_source ** (5.0 / 2.0) * r_m / radiation_S ) correction_factor.loc[sta, f"correction_S"] = corr_s if hasattr(self, "Q"): attenuation_factor.loc[sta, f"attenuation_S"] = np.exp( np.pi * tt_s * np.asarray(self.frequencies) / self.Q ) else: attenuation_factor.loc[sta, f"attenuation_S"] = None tt_p = self.event.arrival_times.loc[sta, "P_tt_sec"] corr_p = ( 4.0 * np.pi * np.sqrt(rho_receiver) * np.sqrt(rho_source) * np.sqrt(vp_receiver) * vp_source**(5./2.) * r_m / radiation_P ) correction_factor.loc[sta, f"correction_P"] = corr_p if hasattr(self, "Q"): attenuation_factor.loc[sta, f"attenuation_P"] = np.exp( np.pi * tt_p * np.asarray(self.frequencies) / self.Q ) else: attenuation_factor.loc[sta, f"attenuation_P"] = None self.correction_factor = correction_factor self.attenuation_factor = attenuation_factor
[docs] def compute_network_average_spectrum( self, phase, snr_threshold, correct_propagation=True, average_log=True, min_num_valid_channels_per_freq_bin=0, max_relative_distance_err_pct=25.0, reduce="mean", verbose=0, ): """ Compute the network average spectrum for a given phase. Parameters ---------- phase : str Phase of the seismic event. Should be either 'p' or 's'. snr_threshold : float Signal-to-noise ratio threshold for valid channels. correct_propagation : bool, optional Flag indicating whether to correct for propagation effects. Default is True. average_log : bool, optional Flag indicating whether to average the logarithm of the spectra. Default is True. min_num_valid_channels_per_freq_bin : int, optional Minimum number of valid channels required per frequency bin. Default is 0. max_relative_distance_err_pct : float, optional Maximum relative distance error percentage for a valid channel. Default is 25.0. verbose : int, optional Verbosity level. Set to 0 for no output. Default is 0. Returns ------- None The average spectrum and related information are stored in the object's attributes. Notes ----- This method requires the object to have already computed the spectrum for the specified phase and to have set the target frequencies using the `set_target_frequencies` method. If `correct_propagation` is set to True, the method also requires the object to have computed the correction factor using the `compute_correction_factor` method. """ phase = phase.lower() assert phase in ["p", "s"], "phase should be 'p' or 's'" assert phase in self.phases, f"You need to compute the {phase} spectrum first" assert reduce in ["median", "mean"], "reduce should be 'mean' or 'median'" assert hasattr( self, "frequencies" ), "You need to use set_target_frequencies first" if correct_propagation and not hasattr(self, "correction_factor"): print("You requested correcting for propagation effects. ") print("You need to use compute_correction_factor first.") return average_spectrum = np.ma.zeros(len(self.frequencies), dtype=np.float32) masked_spectra = [] signal_spectrum = getattr(self, f"{phase}_spectrum") snr_spectrum = getattr(self, f"snr_{phase}_spectrum") if hasattr(self, "Q"): self.update_Q_model() self.update_attenuation_factor() for trid in signal_spectrum: if ( signal_spectrum[trid]["relative_distance_err_pct"] > max_relative_distance_err_pct ): if verbose > 0: print( f"Source-receiver distance relative error is too high: " f"{signal_spectrum[trid]['relative_distance_err_pct']:.2f}" ) # the location uncertainty implies too much error # on this station, skip it continue mask = snr_spectrum[trid]["snr"] < snr_threshold amplitude_spectrum = signal_spectrum[trid]["spectrum"].copy() if correct_propagation: sta = trid.split(".")[1] amplitude_spectrum *= self.correction_factor.loc[ sta, f"correction_{phase.upper()}" ] if ( self.attenuation_factor.loc[sta, f"attenuation_{phase.upper()}"] is not None ): # att = self.attenuation_factor.loc[ # sta, f"attenuation_{phase.upper()}" # ](signal_spectrum[trid]["freq"]) amplitude_spectrum *= self.attenuation_factor.loc[ sta, f"attenuation_{phase.upper()}" ] masked_spectra.append( np.ma.masked_array(data=amplitude_spectrum, mask=mask) ) if len(masked_spectra) == 0: # there seems to be cases when to spectra were in signal_spectrum?? if verbose > 0: print(f"No spectra found in {phase}_spectrum") self.average_spectra = [] return # it looks like we need this explicit definition of the mask # otherwise the mask can be converted to a single boolean when # all elements are False masked_spectra = np.ma.masked_array( data=np.stack([arr.data for arr in masked_spectra], axis=0), mask=np.stack([arr.mask for arr in masked_spectra], axis=0), ) # count the number of channels that satisfied the SNR criterion num_valid_channels = np.sum(~masked_spectra.mask, axis=0) # discard the frequency bins for which the minimum number # of valid channels was not achieved discarded_freq_bins = num_valid_channels < min_num_valid_channels_per_freq_bin masked_spectra.mask[:, discarded_freq_bins] = True # compute average spectrum without masked elements mask = masked_spectra.mask.copy() if average_log: log10_masked_spectra = np.ma.log10(masked_spectra) # another mysterious feature of numpy.... # need to use exp to propagate mask correctly if reduce == "mean": average_spectrum = np.exp( np.ma.mean(log10_masked_spectra, axis=0) * np.log(10.0) ) elif reduce == "median": average_spectrum = np.exp( np.ma.median(log10_masked_spectra, axis=0) * np.log(10.0) ) std_spectrum = np.ma.std(log10_masked_spectra, axis=0) else: if reduce == "mean": average_spectrum = np.ma.mean(masked_spectra, axis=0) elif reduce == "median": average_spectrum = np.ma.median(masked_spectra, axis=0) std_spectrum = np.ma.std(masked_spectra, axis=0) setattr( self, f"average_{phase}_spectrum", { "spectrum": average_spectrum, "std": std_spectrum, "num_valid_channels": num_valid_channels, "spectra": masked_spectra, "freq": self.frequencies, "snr_threshold": snr_threshold, }, ) if not hasattr(self, "average_spectra"): self.average_spectra = [phase] else: self.average_spectra.append(phase) self.average_spectra = list(set(self.average_spectra))
[docs] def compute_multi_band_spectrum(self, traces, phase, buffer_seconds, **kwargs): """ Compute spectrum from the maximum amplitude in multiple 1-octave frequency bands. Parameters ---------- traces : list List of seismic traces. phase : str Phase of the seismic event. Should be 'noise', 'p', or 's'. buffer_seconds : float Buffer duration in seconds to remove from the beginning and end of the trace. **kwargs Additional keyword arguments for filtering. Returns ------- None The computed spectrum is stored in the object's attribute `{phase}_spectrum`. Notes ----- - The attribute `frequency_bands` is required for this method. - The spectrum is computed by finding the maximum amplitude in each 1-octave frequency band. - The resulting spectrum and frequency values are stored in the `spectrum` dictionary. - The relative distance error percentage is calculated and stored for each trace. - The attribute `{phase.lower()}_spectrum` is updated with the computed spectrum. - If the attribute `phases` exists, the phase is appended to the list. Otherwise, a new list is created. """ assert phase.lower() in ( "noise", "p", "s", ), "phase should be 'noise', 'p' or 's'." assert hasattr( self, "frequency_bands" ), "Attribute `frequency_bands` is required for this method." num_freqs = len(self.frequency_bands) spectrum = {} dev_mode = kwargs.get("dev_mode", False) for tr in traces: nyq = tr.stats.sampling_rate / 2. buffer_samples = int(buffer_seconds * tr.stats.sampling_rate) spectrum[tr.id] = {} spectrum[tr.id]["spectrum"] = np.zeros(num_freqs, dtype=np.float32) spectrum[tr.id]["freq"] = np.zeros(num_freqs, dtype=np.float32) if dev_mode: spectrum[tr.id]["filtered_traces"] = {} for i, band in enumerate(self.frequency_bands): bandwidth = ( self.frequency_bands[band][1] - self.frequency_bands[band][0] ) center_freq = 0.5 * ( self.frequency_bands[band][0] + self.frequency_bands[band][1] ) spectrum[tr.id]["freq"][i] = center_freq if self.frequency_bands[band][1] >= nyq: # cannot use this frequency band on this channel continue tr_band = tr.copy() # preprocess before filtering tr_band.detrend("constant") tr_band.detrend("linear") tr_band.taper(0.25, max_length=buffer_seconds, type="cosine") # filter tr_band.filter( "bandpass", freqmin=self.frequency_bands[band][0], freqmax=self.frequency_bands[band][1], corners=kwargs.get("corners", 4), zerophase=True, ) trimmed_tr = tr_band.data[buffer_samples:-buffer_samples] if dev_mode: tr_band.trim( starttime=tr_band.stats.starttime + buffer_seconds, endtime=tr_band.stats.endtime - buffer_seconds ) spectrum[tr.id]["filtered_traces"][str(band)] = tr_band if len(trimmed_tr) == 0: # gap in data? continue max_amp = np.max(np.abs(trimmed_tr)) / bandwidth spectrum[tr.id]["spectrum"][i] = max_amp #print(spectrum[tr.id]["spectrum"]) max_err = np.sqrt(self.event.hmax_unc**2 + self.event.vmax_unc**2) spectrum[tr.id]["relative_distance_err_pct"] = 100.0 * ( max_err / self.event.source_receiver_dist.loc[tr.stats.station] ) setattr(self, f"{phase.lower()}_spectrum", spectrum) if hasattr(self, "phases"): self.phases.append(phase) else: self.phases = [phase]
[docs] def compute_spectrum(self, traces, phase, taper=None, **taper_kwargs): """ Compute spectrum using the Fast Fourier Transform (FFT) on the input traces. Parameters ---------- traces : list List of seismic traces. phase : str Phase of the seismic event. Should be 'noise', 'p', or 's'. taper : callable or None, optional Tapering function to apply to the traces before computing the spectrum. Default is None. **taper_kwargs Additional keyword arguments for the tapering function. Returns ------- None The computed spectrum is stored in the object's attribute `{phase}_spectrum`. Notes ----- - The spectrum is computed using the FFT on the input traces. - The computed spectrum and frequency values are stored in the `spectrum` dictionary. - The relative distance error percentage is calculated and stored for each trace. - The attribute `{phase.lower()}_spectrum` is updated with the computed spectrum. - If the attribute `phases` exists, the phase is appended to the list. Otherwise, a new list is created. """ assert phase.lower() in ( "noise", "p", "s", ), "phase should be 'noise', 'p' or 's'." if taper is None: taper = scisig.tukey taper_kwargs.setdefault("alpha", 0.05) spectrum = {} for tr in traces: spectrum[tr.id] = {} spectrum[tr.id]["spectrum"] = ( np.fft.rfft(tr.data * taper(tr.stats.npts, **taper_kwargs)) * tr.stats.delta ) spectrum[tr.id]["freq"] = np.fft.rfftfreq(tr.stats.npts, d=tr.stats.delta) max_err = np.sqrt(self.event.hmax_unc**2 + self.event.vmax_unc**2) spectrum[tr.id]["relative_distance_err_pct"] = 100.0 * ( max_err / self.event.source_receiver_dist.loc[tr.stats.station] ) setattr(self, f"{phase.lower()}_spectrum", spectrum) if hasattr(self, "phases"): self.phases.append(phase) else: self.phases = [phase]
[docs] def compute_signal_to_noise_ratio(self, phase): """ Compute the signal-to-noise ratio (SNR) for the specified phase. Parameters ---------- phase : str Phase of the seismic event. Should be 'p' or 's'. Returns ------- None The computed SNR values are stored in the object's attribute `snr_{phase}_spectrum`. Raises ------ AssertionError - If `phase` is not 'p' or 's'. - If the {phase} spectrum has not been computed. - If the noise spectrum has not been computed. Notes ----- - The SNR is calculated as the modulus of the signal spectrum divided by the modulus of the noise spectrum. - The SNR values and corresponding frequencies are stored in the `snr` dictionary. - The attribute `snr_{phase}_spectrum` is updated with the computed SNR values. """ phase = phase.lower() assert phase in ["p", "s"], "phase should be 'p' or 's'" assert phase in self.phases, f"You need to compute the {phase} spectrum first" assert "noise" in self.phases, f"You need to compute the noise spectrum first" signal_spectrum = getattr(self, f"{phase}_spectrum") noise_spectrum = getattr(self, f"noise_spectrum") snr = {} for trid in signal_spectrum: snr[trid] = {} snr_ = np.zeros( len(signal_spectrum[trid]["spectrum"]), dtype=np.float32 ) if trid in noise_spectrum: signal = np.abs(signal_spectrum[trid]["spectrum"]) noise = np.abs(noise_spectrum[trid]["spectrum"]) zero_by_zero = (signal == 0.) & (noise == 0.) snr_[~zero_by_zero] = signal[~zero_by_zero] / noise[~zero_by_zero] else: # no noise spectrum, probably because of gap pass snr[trid]["snr"] = snr_ snr[trid]["freq"] = signal_spectrum[trid]["freq"] setattr(self, f"snr_{phase}_spectrum", snr)
[docs] def integrate(self, phase, average=True): """ Integrate the spectrum for a specific phase. Parameters ---------- phase : str Phase name. Should be 'p' or 's'. average : bool, optional Specifies whether to integrate the average spectrum (if True) or individual spectra (if False). Default is True. Returns ------- None The spectrum is integrated in-place. Raises ------ AssertionError If the specified phase spectrum has not been computed. """ phase = phase.lower() if average: assert ( phase in self.average_spectra ), f"You need to compute the average {phase} spectrum first." getattr(self, f"average_{phase}_spectrum")["spectrum"] = ( getattr(self, f"average_{phase}_spectrum")["spectrum"] / getattr(self, f"average_{phase}_spectrum")["freq"] ) else: assert ( phase in self.phases ), f"You need to compute the {phase} spectrum first." spectrum = getattr(self, f"{phase}_spectrum") for trid in spectrum: spectrum[trid]["spectrum"] /= spectrum[trid]["freq"]
[docs] def differentiate(self, phase, average=True): """ Apply differentiation to the spectrum of the specified phase. Parameters ---------- phase : str Phase of the seismic event. Should be 'p' or 's'. average : bool, optional Flag indicating whether to differentiate the average spectrum (True) or individual spectra (False). Defaults to True. Returns ------- None The spectrum is modified in-place by multiplying it with the corresponding frequency values. Raises ------ AssertionError - If `average` is True and the average {phase} spectrum has not been computed. - If `average` is False and the {phase} spectrum has not been computed. """ phase = phase.lower() if average: assert ( phase in self.average_spectra ), f"You need to compute the average {phase} spectrum first." getattr(self, f"average_{phase}_spectrum")["spectrum"] = ( getattr(self, f"average_{phase}_spectrum")["spectrum"] * getattr(self, f"average_{phase}_spectrum")["freq"] ) else: assert ( phase in self.phases ), f"You need to compute the {phase} spectrum first." spectrum = getattr(self, f"{phase}_spectrum") for trid in spectrum: spectrum[trid]["spectrum"] *= spectrum[trid]["freq"]
[docs] def fit_average_spectrum( self, phase, model="brune", log=True, min_fraction_valid_points_below_fc=0.10, min_fraction_valid_points=0.50, weighted=False, **kwargs, ): """ Fit the average displacement spectrum with a specified model. Parameters ---------- phase : str Phase of the seismic event. Should be 'p' or 's'. model : str, optional Model to use for fitting the spectrum. Default is 'brune'. log : bool, optional Flag indicating whether to fit the logarithm of the spectrum. Default is True. min_fraction_valid_points_below_fc : float, optional Minimum fraction of valid points required below the corner frequency. Default is 0.10. min_fraction_valid_points : float, optional Minimum fraction of valid points required overall. Default is 0.50. weighted : bool, optional Flag indicating whether to apply weighted fitting using sigmoid weights. Default is False. **kwargs Additional keyword arguments to be passed to the curve fitting function. Returns ------- None The fitting results are stored as attributes of the object. Raises ------ AssertionError If the average {phase} spectrum has not been computed. Notes ----- - If the spectrum is below the SNR threshold everywhere, the fitting cannot be performed. - If there are not enough valid points or valid points below the corner frequency, the fitting is considered unsuccessful. """ from scipy.optimize import curve_fit from functools import partial phase = phase.lower() assert ( phase in self.average_spectra ), f"You need to compute the average {phase} spectrum first." spectrum = getattr(self, f"average_{phase}_spectrum") if np.sum(~spectrum["spectrum"].mask) == 0: print("Spectrum is below SNR threshold everywhere, cannot fit it.") self.inversion_success = False return valid_fraction = np.sum(~spectrum["spectrum"].mask) / float( len(spectrum["spectrum"]) ) if valid_fraction < min_fraction_valid_points: print(f"Not enough valid points! (Only {100.*valid_fraction:.2f}%)") self.inversion_success = False return omega0_first_guess = spectrum["spectrum"].data[~spectrum["spectrum"].mask][0] fc_first_guess = fc_circular_crack(moment_to_magnitude(omega0_first_guess)) standardized_num_valid_channels = ( spectrum["num_valid_channels"] - spectrum["num_valid_channels"].mean() ) / spectrum["num_valid_channels"].mean() sigmoid_weights = 1.0 / (1.0 + np.exp(-standardized_num_valid_channels)) if model == "brune": mod = brune elif model == "boatwright": mod = boatwright if log: obs = np.log10(spectrum["spectrum"].data) else: obs = spectrum["spectrum"].data # misfit = ( # lambda f, omega0, fc: # weights * (obs - mod(f, omega0, fc, log=log)) # ) mod = partial(mod, log=log) y = obs[~spectrum["spectrum"].mask] x = spectrum["freq"][~spectrum["spectrum"].mask] if weighted: inverse_weights = 1.0 / sigmoid_weights[~spectrum["spectrum"].mask] else: inverse_weights = None p0 = np.array([omega0_first_guess, fc_first_guess]) bounds = (np.array([0.0, 0.0]), np.array([np.inf, np.inf])) try: popt, pcov = curve_fit( mod, x, y, p0=p0, bounds=bounds, sigma=inverse_weights, **kwargs ) self.inversion_success = True except (RuntimeError, ValueError): print("Inversion (scipy.optimize.cuve_fit) failed.") self.inversion_success = False return # check whether the low-frequency plateau was well constrained npts_valid_below_fc = np.sum(x < popt[1]) fraction_valid_points_below_fc = float(npts_valid_below_fc) / float( len(spectrum["freq"]) ) # print(f"Fraction of valid points below fc is: {fraction_valid_points_below_fc:.2f}") if fraction_valid_points_below_fc < min_fraction_valid_points_below_fc: self.inversion_success = False print( "Not enough valid points below corner frequency " f"(only {100.*fraction_valid_points_below_fc:.1f}%)" ) return perr = np.sqrt(np.diag(pcov)) self.M0 = popt[0] self.fc = popt[1] self.Mw = moment_to_magnitude(self.M0) self.M0_err = perr[0] self.fc_err = perr[1] self.model = model
[docs] def resample(self, new_frequencies, phase): """ Resample the spectrum to new frequencies. Parameters ---------- new_frequencies : array-like New frequency values to resample the spectrum to. phase : str or list Phase(s) of the seismic event to resample. Can be a single phase or a list of phases. Returns ------- None The spectrum is resampled and updated in-place. """ if isinstance(phase, str): phase = [phase] for ph in phase: ph = ph.lower() if not hasattr(self, f"{ph}_spectrum"): print(f"Attribute {ph}_spectrum does not exist.") continue spectrum = getattr(self, f"{ph}_spectrum") for trid in spectrum: spectrum[trid]["spectrum"] = np.interp( new_frequencies, spectrum[trid]["freq"], np.abs(spectrum[trid]["spectrum"]), ) spectrum[trid]["freq"] = new_frequencies
[docs] def set_frequency_bands(self, frequency_bands): """ Set the frequency bands for spectrum analysis. Parameters ---------- frequency_bands : dict Dictionary specifying the frequency bands. The keys are the names of the bands, and the values are tuples of the form (freqmin, freqmax). freqmin and freqmax represent the minimum and maximum frequencies of each band, respectively. Returns ------- None The frequency bands are set and stored as an attribute in the object. The center frequencies of each band are also calculated and stored in the `frequencies` attribute. """ self.frequency_bands = frequency_bands self.frequencies = np.zeros(len(self.frequency_bands), dtype=np.float32) for i, band in enumerate(self.frequency_bands): self.frequencies[i] = 0.5 * ( self.frequency_bands[band][0] + self.frequency_bands[band][1] )
[docs] def set_target_frequencies(self, freq_min, freq_max, num_points): """ Set the target frequencies for spectrum analysis. Parameters ---------- freq_min : float Minimum frequency. freq_max : float Maximum frequency. num_points : int Number of target frequencies to generate. Returns ------- None The target frequencies are set and stored as an attribute in the object. """ self.frequencies = np.logspace( np.log10(freq_min), np.log10(freq_max), num_points )
[docs] def plot_average_spectrum( self, phase, figname="spectrum", figtitle="", figsize=(10, 10), colors={"noise": "dimgrey", "s": "black", "p": "C3"}, linestyle={"noise": "--", "s": "-", "p": "-"}, plot_fit=False, plot_std=False, plot_num_valid_channels=False, ): """ Plot the average spectrum for a given phase. Parameters ---------- phase : str or list of str The phase(s) for which to plot the average spectrum. figname : str, optional The name of the figure. Default is "spectrum". figtitle : str, optional The title of the figure. Default is an empty string. figsize : tuple, optional The size of the figure in inches. Default is (10, 10). colors : dict, optional A dictionary specifying the colors for different phases. Default is {"noise": "dimgrey", "s": "black", "p": "C3"}. linestyle : dict, optional A dictionary specifying the line styles for different phases. Default is {"noise": "--", "s": "-", "p": "-"}. plot_fit : bool, optional Whether to plot the fitted model spectrum. Default is False. plot_std : bool, optional Whether to plot the standard deviation range of the average spectrum. Default is False. plot_num_valid_channels : bool, optional Whether to plot the number of channels above the signal-to-noise ratio (SNR) threshold. Default is False. Returns ------- matplotlib.figure.Figure The generated figure. """ import fnmatch from matplotlib.ticker import MaxNLocator if isinstance(phase, str): phase = [phase] fig, ax = plt.subplots(num=figname, figsize=figsize) ax.set_title(figtitle) for ph in phase: ph = ph.lower() if not hasattr(self, f"average_{ph}_spectrum"): print(f"Attribute average_{ph}_spectrum does not exist.") continue spectrum = getattr(self, f"average_{ph}_spectrum") fft = spectrum["spectrum"] freq = spectrum["freq"] amplitude_spec = np.abs(fft) ax.plot( freq, amplitude_spec, color=colors[ph], ls=linestyle[ph], label=f"Average {ph} spectrum", ) if plot_std: lower_amp = 10.0 ** (np.log10(amplitude_spec) - spectrum["std"]) upper_amp = 10.0 ** (np.log10(amplitude_spec) + spectrum["std"]) ax.fill_between( freq, lower_amp, upper_amp, color=colors[ph], alpha=0.33 ) if plot_num_valid_channels: axb = ax.twinx() axb.plot( freq, spectrum["num_valid_channels"], marker="o", ls="", color=colors[ph], label="Number of channels above SNR threshold", ) axb.set_ylabel("Number of channels above SNR threshold") ylim = axb.get_ylim() axb.set_ylim(max(0, ylim[0] - 1), ylim[1] + 1) axb.yaxis.set_major_locator(MaxNLocator(integer=True)) ax.set_zorder(axb.get_zorder() + 1) ax.patch.set_visible(False) axb.legend(loc="upper right") if plot_fit and hasattr(self, "M0"): if self.model == "brune": model = brune elif self.model == "boatwright": model = boatwright fit = model(freq, self.M0, self.fc) label = ( f"{self.model.capitalize()} model:\n" r"$M_w=$" f"{self.Mw:.1f}, " r"$f_c$=" f"{self.fc:.2f}Hz" ) ax.plot(freq, fit, color=colors[ph], ls="--", label=label) ax.legend(loc="lower left") ax.set_xlabel("Frequency (Hz)") ax.set_ylabel("Amplitude spectrum ([input units/Hz])") ax.loglog() return fig
[docs] def plot_spectrum( self, phase, station=None, component=None, figname="spectrum", figsize=(10, 10), correct_propagation=False, plot_snr=False, colors={"noise": "dimgrey", "s": "black", "p": "C3"}, linestyle={"noise": "--", "s": "-", "p": "-"}, ): """ Plot the spectrum for the specified phase(s) and trace(s). Parameters ---------- phase : str or list of str The phase(s) for which to plot the spectrum. station : str or None, optional The station code. If None, all stations will be plotted. Default is None. component : str or None, optional The component code. If None, all components will be plotted. Default is None. figname : str, optional The name of the figure. Default is "spectrum". figsize : tuple, optional The size of the figure in inches. Default is (10, 10). correct_propagation : bool, optional Whether to correct the spectrum for propagation effects. Default is False. plot_snr : bool, optional Whether to plot the signal-to-noise ratio (SNR) spectrum if available. Default is False. colors : dict, optional A dictionary specifying the colors for different phases. Default is {"noise": "dimgrey", "s": "black", "p": "C3"}. linestyle : dict, optional A dictionary specifying the line styles for different phases. Default is {"noise": "--", "s": "-", "p": "-"}. Returns ------- matplotlib.figure.Figure The generated figure. """ import fnmatch if isinstance(phase, str): phase = [phase] fig, ax = plt.subplots(num=figname, figsize=figsize) for ph in phase: ph = ph.lower() if not hasattr(self, f"{ph}_spectrum"): print(f"Attribute {ph}_spectrum does not exist.") continue spectrum = getattr(self, f"{ph}_spectrum") trace_ids = list(spectrum.keys()) if station is None: station = "*" if component is None: component = "*" target_tr_id = f"*.{station}.*.*{component}" selected_ids = fnmatch.filter(trace_ids, target_tr_id) for trid in selected_ids: fft = spectrum[trid]["spectrum"] freq = spectrum[trid]["freq"] amplitude_spec = np.abs(fft) if correct_propagation and ph in ["p", "s"]: # sta = trid.split(".")[1] amplitude_spec *= self.correction_factor.loc[ sta, f"correction_{ph.upper()}" ] ax.plot( freq, amplitude_spec, color=colors[ph], ls=linestyle[ph], label=f"{ph} spectrum: {trid}", ) if plot_snr: if not hasattr(self, f"snr_{ph}_spectrum"): # print(f"Attribute snr_{ph}_spectrum does not exist.") continue snr_spectrum = getattr(self, f"snr_{ph}_spectrum") ax.plot( snr_spectrum[trid]["freq"], snr_spectrum[trid]["snr"], color=colors[ph], ls=linestyle["noise"], label=f"{ph} snr: {trid}", ) plt.subplots_adjust(right=0.85, bottom=0.20) ax.legend(bbox_to_anchor=(1.01, 1.00), loc="upper left", handlelength=0.9) ax.set_xlabel("Frequency (Hz)") ax.set_ylabel("Amplitude spectrum ([input units/Hz])") ax.loglog() return fig
# classic models of earthquake displacement far-field spectrum
[docs] def brune(freqs, omega0, fc, log=False): """Brune model.""" if log: return np.log10(omega0) - np.log10(1.0 + (freqs / fc) ** 2) else: return omega0 / (1.0 + (freqs / fc) ** 2)
[docs] def boatwright(freqs, omega0, fc, log=False): """Boatwright model.""" if log: return np.log10(omega0) - 0.5 * np.log10(1.0 + (freqs / fc) ** 4) else: return omega0 / np.sqrt(1.0 + (freqs / fc) ** 4)
[docs] def magnitude_to_moment(Mw): """Convert moment magnitude to seismic moment [N.m].""" return 10.0 ** (3.0 / 2.0 * Mw + 9.1)
[docs] def moment_to_magnitude(M0): """Convert seismic moment [N.m] to moment magnitude.""" return 2.0 / 3.0 * (np.log10(M0) - 9.1)
[docs] def fc_circular_crack( Mw, stress_drop_Pa=1.0e6, phase="p", vs_m_per_s=3500.0, vr_vs_ratio=0.9 ): """ Compute the corner frequency assuming a circular crack model (Eshelby). Parameters ---------- Mw : float Moment magnitude of the earthquake. stress_drop_Pa : float, optional Stress drop in Pascals. Default is 1.0e6. phase : str, optional Seismic phase. Valid values are 'p' for P-wave and 's' for S-wave. Default is 'p'. vs_m_per_s : float, optional Shear wave velocity in meters per second. Default is 3500.0. vr_vs_ratio : float, optional Ratio of rupture velocity to shear wave velocity. Default is 0.9. Returns ------- corner_frequency : float Corner frequency in Hertz. Raises ------ AssertionError If phase is not 'p' or 's'. """ phase = phase.lower() assert phase in ["p", "s"], "phase should 'p' or 's'." M0 = magnitude_to_moment(Mw) crack_radius = np.power((7.0 / 16.0) * (M0 / stress_drop_Pa), 1.0 / 3.0) if phase == "p": constant = 2.23 elif phase == "s": constant = 1.47 vr = vr_vs_ratio * vs_m_per_s corner_frequency = (constant * vr) / (2.0 * np.pi * crack_radius) return corner_frequency
[docs] def stress_drop_circular_crack(Mw, fc, phase="p", vs_m_per_s=3500.0, vr_vs_ratio=0.9): """ Compute the stress drop assuming a circular crack model (Eshelby). Parameters ---------- Mw : float Moment magnitude of the earthquake. fc : float Corner frequency in Hertz. phase : str, optional Seismic phase. Valid values are 'p' for P-wave and 's' for S-wave. Default is 'p'. vs_m_per_s : float, optional Shear wave velocity in meters per second. Default is 3500.0. vr_vs_ratio : float, optional Ratio of rupture velocity to shear wave velocity. Default is 0.9. Returns ------- stress_drop : float Stress drop in Pascals. Raises ------ AssertionError If phase is not 'p' or 's'. """ phase = phase.lower() assert phase in ["p", "s"], "phase should 'p' or 's'." M0 = magnitude_to_moment(Mw) if phase == "p": constant = 2.23 elif phase == "s": constant = 1.47 vr = vr_vs_ratio * vs_m_per_s crack_radius = constant * vr / (2.0 * np.pi * fc) stress_drop = 7.0 / 16.0 * M0 / crack_radius**3 return stress_drop
# workflow function
[docs] def compute_moment_magnitude( event, mag_params, plot_above_Mw=100.0, plot_above_random=1.0, path_figures="", phases=["p", "s"], plot=False, figsize=(8, 8), ): """ """ event.set_moveouts_to_theoretical_times() event.set_moveouts_to_empirical_times() spectrum = Spectrum(event=event) # --------------------------------------------------------------- # extract waveforms # first, read short extract before signal as an estimate of noise event.read_waveforms( mag_params["DURATION_SEC"], time_shifted=False, data_folder=mag_params["DATA_FOLDER"], offset_ot=mag_params["OFFSET_OT_SEC_NOISE"], attach_response=mag_params["ATTACH_RESPONSE"], ) noise = event.traces.copy() noise.remove_sensitivity() spectrum.compute_spectrum(noise, "noise") # then, read signal if "p" in phases: event.read_waveforms( mag_params["DURATION_SEC"], phase_on_comp=mag_params["PHASE_ON_COMP_P"], offset_phase=mag_params["OFFSET_PHASE"], time_shifted=mag_params["TIME_SHIFTED"], data_folder=mag_params["DATA_FOLDER"], attach_response=mag_params["ATTACH_RESPONSE"], ) event.traces.remove_sensitivity() event.zero_out_clipped_waveforms(kurtosis_threshold=-1) p_wave = event.traces.copy() spectrum.compute_spectrum(p_wave, "p") if "s" in phases: event.read_waveforms( mag_params["DURATION_SEC"], phase_on_comp=mag_params["PHASE_ON_COMP_S"], offset_phase=mag_params["OFFSET_PHASE"], time_shifted=mag_params["TIME_SHIFTED"], data_folder=mag_params["DATA_FOLDER"], attach_response=mag_params["ATTACH_RESPONSE"], ) event.traces.remove_sensitivity() event.zero_out_clipped_waveforms(kurtosis_threshold=-1) s_wave = event.traces.copy() spectrum.compute_spectrum(s_wave, "s") # ----------------------------------------------------------- spectrum.set_target_frequencies( mag_params["FREQ_MIN_HZ"], mag_params["FREQ_MAX_HZ"], mag_params["NUM_FREQS"] ) spectrum.resample(spectrum.frequencies, spectrum.phases) for ph in phases: spectrum.compute_signal_to_noise_ratio(ph) # from Ford et al 2008, BSSA Q = mag_params["Q_1HZ"] * np.power( spectrum.frequencies, mag_params["ATTENUATION_N"] ) spectrum.attenuation_Q_model(Q, spectrum.frequencies) spectrum.compute_correction_factor( mag_params["RHO_SOURCE_KGM3"], mag_params["RHO_RECEIVER_KGM3"], mag_params["VP_SOURCE_MS"], mag_params["VP_RECEIVER_MS"], mag_params["VS_SOURCE_MS"], mag_params["VS_RECEIVER_MS"], ) source_parameters = {} for phase_for_mag in phases: spectrum.compute_network_average_spectrum( phase_for_mag, mag_params["SNR_THRESHOLD"], min_num_valid_channels_per_freq_bin=mag_params[ "MIN_NUM_VALID_CHANNELS_PER_FREQ_BIN" ], max_relative_distance_err_pct=mag_params["MAX_RELATIVE_DISTANCE_ERR_PCT"], ) if not phase_for_mag in spectrum.average_spectra: continue spectrum.integrate(phase_for_mag, average=True) spectrum.fit_average_spectrum( phase_for_mag, model=mag_params["SPECTRAL_MODEL"], min_fraction_valid_points_below_fc=mag_params[ "MIN_FRACTION_VALID_POINTS_BELOW_FC" ], weighted=mag_params["NUM_CHANNEL_WEIGHTED_FIT"], ) if spectrum.inversion_success: rel_M0_err = 100.0 * spectrum.M0_err / spectrum.M0 rel_fc_err = 100.0 * spectrum.fc_err / spectrum.fc if ( rel_M0_err > mag_params["MAX_REL_M0_ERR_PCT"] or spectrum.fc < 0.0 or spectrum.fc > mag_params["MAX_REL_FC_ERR_PCT"] ): continue print(f"Relative error on M0: {rel_M0_err:.2f}%") print(f"Relative error on fc: {rel_fc_err:.2f}%") # event.set_aux_data({f"Mw_{phase_for_mag}": spectrum.Mw}) figtitle = ( f"{event.origin_time.strftime('%Y-%m-%dT%H:%M:%S')}: " f"{event.latitude:.3f}" "\u00b0" f"N, {event.longitude:.3f}" "\u00b0" f"E, {event.depth:.1f}km, " r"$\Delta M_0 / M_0=$" f"{rel_M0_err:.1f}%, " r"$\Delta f_c / f_c=$" f"{rel_fc_err:.1f}%" ) source_parameters[f"M0_{phase_for_mag}"] = spectrum.M0 source_parameters[f"Mw_{phase_for_mag}"] = spectrum.Mw source_parameters[f"fc_{phase_for_mag}"] = spectrum.fc source_parameters[f"M0_err_{phase_for_mag}"] = spectrum.M0_err source_parameters[f"fc_err_{phase_for_mag}"] = spectrum.fc_err if ( plot and (spectrum.Mw > plot_above_Mw) or np.random.random() > plot_above_random ): fig = spectrum.plot_average_spectrum( phase_for_mag, plot_fit=True, figname=f"{phase_for_mag}_spectrum_{event.id}", figsize=figsize, figtitle=figtitle, plot_std=True, plot_num_valid_channels=True, ) fig.savefig( os.path.join(path_figures, fig._label + ".png"), format="png", bbox_inches="tight", ) plt.close(fig) Mw_exists = False norm = 0.0 Mw = 0.0 Mw_err = 0.0 for ph in ["p", "s"]: if f"Mw_{ph}" in source_parameters: Mw += source_parameters[f"Mw_{ph}"] Mw_err += ( 2.0 / 3.0 * source_parameters[f"M0_err_{ph}"] / source_parameters[f"M0_{ph}"] ) norm += 1 Mw_exists = True if Mw_exists: Mw /= norm Mw_err /= norm source_parameters["Mw"] = Mw source_parameters["Mw_err"] = Mw_err else: Mw = np.nan Mw_err = np.nan if Mw_exists: print(f"The P-S averaged moment magnitude is {Mw:.2f} +/- {Mw_err:.2f}") source_parameters["Mw"] = Mw source_parameters["Mw_err"] = Mw_err event.set_aux_data(source_parameters) return event, spectrum