Source code for beampower.beampower

# coding: utf-8

import ctypes as ct
import numpy as np

from .core import load_library
from os import cpu_count


[docs] def beamform( waveform_features, time_delays, weights_phases, weights_sources, device="cpu", reduce="max", mode="direct", out_of_bounds="strict", num_threads=None, ): """Compute the beamformed network response. This routine computes and returns the whole network response over the entire duration of `waveform_features`. Thus, if the input source grid is large, the output might be considerably memory-consuming. Therefore, this routine is more appropriate for small scale studies such as the rupture imaging of an earthquake. Parameters ---------- waveform_features: (n_stations, n_channels, n_samples) numpy.ndarray, float Any characterization function computed from the continuous seismograms. time_delays: (n_sources, n_stations, n_phases) numpy.ndarray, int Moveouts, in samples, from each of the `n_sources` theoretical sources to each of the `n_stations` seismic stations and for the `n_phases` back-projected seismic phases. weights_phases: (n_stations, n_channels, n_phases) numpy.ndarray, float Weight given to each station and channel for a given phase. For example, horizontal components might be given a small or zero weight for the P-wave stacking. weights_sources: (n_sources, n_stations) numpy.ndarray, float Source-receiver-specific weights. For example, based on the source-receiver distance. device: string, default to 'cpu' Either 'cpu' or 'gpu', depending on the available hardware and user's preferences. reduce: string, default to 'max' Reduction operation applied to the beamformed network response. If `reduce` is `'max'`, return the maximum network response of the grid at each time step, as well as the source indexes. If `reduce` is `'none'`, `None` or `'None'`, return the full beamformed network response. mode: string, default to 'direct' Either 'direct' (default) or 'differential'. If 'direct', the time delays are the (relative) source-receiver propagation times. If 'differential', the time delays are the inter-station differential propagation times. The latter requires `waveform_features` to be based on inter-station cross-correlations. out_of_bounds: string, default to 'strict' Either 'strict' (default) or 'flexible'. - 'strict': A beam is computed if and only if the moveouts point to a valid sample (that is, within the bounds of the data stream) for every channel used in the beam. - 'flexible': A beam is computed as long as the moveouts point to a valid sample for at least one channel. This option is particularly useful for real time applications where an event might have been recorded at the closest stations but not yet at the more distant ones. num_threads: int or None Number of threads for CPU parallelization. If None, uses one thread per available (visible) CPU. Returns -------- beam: (n_sources, n_samples) or (n_samples,) numpy.ndarray, float Full network response (n_sources, n_samples) or maximum network response (n_samples,). See `reduce`. beam_argmax: (n_samples,) numpy.ndarray, int, optional If `reduce` is `'max'`, return the maximum network response source indexes. """ if out_of_bounds not in ["strict", "flexible"]: print("out_of_bounds should be either of 'strict' or 'flexible'," f" not {out_of_bounds}") return elif out_of_bounds == "strict": out_of_bounds = 0 elif out_of_bounds == "flexible": out_of_bounds = 1 if num_threads is None: # set num_threads to -1 so that the C routine # understands to use all CPUs num_threads = cpu_count() # Load library lib = load_library(device) # Get shapes n_stations, _, n_samples = waveform_features.shape n_sources, _, n_phases = time_delays.shape # Prestack detection traces waveform_features = prestack_traces( waveform_features, weights_phases, num_threads=num_threads, device="cpu" ) # Get waveform features waveform_features = waveform_features.flatten().astype(np.float32) time_delays = time_delays.flatten().astype(np.int32) weights_sources = weights_sources.flatten().astype(np.float32) # Essential feature if np.random.random() < 1.0e-6: print("beampower to the people!") if mode in ["normal", "direct"]: # time delays are (relative) source-receiver propagation times # We keep four cases separate in case the signature differs if device.lower() == "cpu": if reduce in ["none", "None", None]: beam = np.zeros(n_sources * n_samples, dtype=np.float32) lib.beamform( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), time_delays.ctypes.data_as(ct.POINTER(ct.c_int)), weights_sources.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_sources, n_stations, n_phases, int(out_of_bounds), int(num_threads), beam.ctypes.data_as(ct.POINTER(ct.c_float)), ) return beam.reshape(n_sources, n_samples) elif reduce == "max": beam_max = np.zeros(n_samples, dtype=np.float32) beam_argmax = np.zeros(n_samples, dtype=np.int32) lib.beamform_max( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), time_delays.ctypes.data_as(ct.POINTER(ct.c_int)), weights_sources.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_sources, n_stations, n_phases, int(out_of_bounds), int(num_threads), beam_max.ctypes.data_as(ct.POINTER(ct.c_float)), beam_argmax.ctypes.data_as(ct.POINTER(ct.c_int)), ) return beam_max, beam_argmax elif device.lower() == "gpu": if reduce in ["none", "None", None]: beam = np.zeros(n_sources * n_samples, dtype=np.float32) lib.beamform( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), time_delays.ctypes.data_as(ct.POINTER(ct.c_int)), weights_sources.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_sources, n_stations, n_phases, int(out_of_bounds), #int(num_threads), beam.ctypes.data_as(ct.POINTER(ct.c_float)), ) return beam.reshape(n_sources, n_samples) elif reduce == "max": beam_max = np.zeros(n_samples, dtype=np.float32) beam_argmax = np.zeros(n_samples, dtype=np.int32) lib.beamform_max( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), time_delays.ctypes.data_as(ct.POINTER(ct.c_int)), weights_sources.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_sources, n_stations, n_phases, int(out_of_bounds), #int(num_threads), beam_max.ctypes.data_as(ct.POINTER(ct.c_float)), beam_argmax.ctypes.data_as(ct.POINTER(ct.c_int)), ) return beam_max, beam_argmax if mode == "differential": # time delays are (relative) source-receiver propagation times # We keep four cases separate in case the signature differs if device.lower() == "cpu": beam = np.zeros(n_sources, dtype=np.float32) lib.beamform_differential( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), time_delays.ctypes.data_as(ct.POINTER(ct.c_int)), weights_sources.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_sources, n_stations, n_phases, num_threads, beam.ctypes.data_as(ct.POINTER(ct.c_float)), ) if reduce in ["none", "None", None]: return beam elif reduce == "max": beam_max = np.max(beam, axis=0) beam_argmax = np.argmax(beam, axis=0) return beam_max, beam_argmax return beam.reshape(n_sources, n_samples) elif device.lower() == "gpu": print("differential mode not yet implemented on GPU") return else: print(f"Mode should either be 'direct' or 'differential', not {mode}.") return 1
[docs] def prestack_traces(waveform_features, weights_phases, num_threads=None, device="cpu"): """Prestack the detection traces ahead of the beamforming. Channel-wise stacking for each target seismic phase can be done once and for all at the beginning of the computation. Parameters ----------- waveform_features: (n_stations, n_channels, n_stations) numpy.ndarray, float Any characterization function computed from the continuous seismograms. weights_phases: (n_stations, n_channels, n_phases) numpy.ndarray, float Weight given to each station and channel for a given phase. For example, horizontal components might be given a small or zero weight for the P-wave stacking. device: string, default to 'cpu' Either 'cpu' or 'gpu', depending on the available hardware and user's preferences. num_threads: int or None Number of threads for CPU parallelization. If None, uses one thread per available (visible) CPU. Returns ---------- prestacked_traces: (n_stations, n_samples, n_phases) numpy.ndarray, float Channel-wise stacked detection traces, optimally formatted for the C CUDA-C routines. """ # Load library lib = load_library(device) if num_threads is None: num_threads = cpu_count() # Get shapes n_stations, n_channels, n_samples = waveform_features.shape _, _, n_phases = weights_phases.shape prestacked_traces = np.zeros( (n_stations * n_samples * n_phases), dtype=np.float32 ) # Cast waveform_features = waveform_features.flatten().astype(np.float32) weights_phases = weights_phases.flatten().astype(np.float32) # Prestack if device.lower() == "cpu": lib.prestack_waveform_features( waveform_features.ctypes.data_as(ct.POINTER(ct.c_float)), weights_phases.ctypes.data_as(ct.POINTER(ct.c_float)), n_samples, n_stations, n_channels, n_phases, int(num_threads), prestacked_traces.ctypes.data_as(ct.POINTER(ct.c_float)), ) return prestacked_traces.reshape((n_stations, n_samples, n_phases))