Source code for BPMF.plotting_utils

import os

from .config import cfg
from . import dataset
from . import utils

import numpy as np
import matplotlib.pyplot as plt

from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
import matplotlib.colors as mcolors
from mpl_toolkits.axes_grid1 import make_axes_locatable

from obspy.core import UTCDateTime as udt

# -------------------------------------------------------
#       Many functions will disappear soon, replaced
#       by plotting methods of new classes introduced in dataset.py
# -------------------------------------------------------


[docs] def plot_template( idx, db_path_T="template_db_2/", db_path=cfg.INPUT_PATH, n_stations=10, stations=None, mv_view=True, show=True, ): # --------------------------- font = {"family": "sans-serif", "weight": "normal", "size": 14} plt.rc("font", **font) # --------------------------- template = dataset.Template( "template{:d}".format(idx), db_path_T, db_path=db_path, attach_waveforms=True ) if hasattr(template, "loc_uncertainty"): uncertainty_label = r"($\Delta r$ = {:.2f} km)".format( template.location_uncertainty ) elif hasattr(template, "cov_mat"): uncertainty_label = ( r"($\Delta X$={:.2f}km, $\Delta Y$={:.2f}km, $\Delta Z$={:.2f}km)".format( np.sqrt(template.cov_mat[0, 0]), np.sqrt(template.cov_mat[1, 1]), np.sqrt(template.cov_mat[2, 2]), ) ) else: uncertainty_label = "" if stations is not None: template.subnetwork(stations) n_stations = len(stations) else: # select the n_stations closest stations template.n_closest_stations(n_stations) sta = list(template.stations) n_stations = min(n_stations, len(sta)) # sta.sort() n_components = len(template.channels) plt.figure( "template_{:d}_from_{}".format(idx, db_path + db_path_T), figsize=(18, 9) ) if mv_view: MVs = np.column_stack( [template.s_moveouts, template.s_moveouts, template.p_moveouts] ) MVs -= MVs.min() time = ( np.arange(template.traces[0].data.size + MVs.max()) / template.sampling_rate ) else: time = np.arange(template.traces[0].data.size) / template.sampling_rate for s in range(n_stations): for c in range(n_components): ax = plt.subplot(n_stations, n_components, s * n_components + c + 1) lab = "{}.{}".format(sta[s], template.channels[c]) if mv_view: id1 = MVs[s, c] id2 = id1 + template.traces[0].data.size plt.plot( time[id1:id2], template.traces.select(station=sta[s])[c].data, label=lab, ) if c < 2: plt.axvline(time[int((id1 + id2) / 2)], lw=2, ls="--", color="k") else: plt.axvline(time[id1] + 1.0, lw=2, ls="--", color="k") else: plt.plot( time, template.traces.select(station=sta[s])[c].data, label=lab ) plt.xlim((time[0], time[-1])) plt.yticks([]) if c < 2: plt.legend( loc="upper left", frameon=False, handlelength=0.1, borderpad=0.0 ) else: plt.legend( loc="upper right", frameon=False, handlelength=0.1, borderpad=0.0 ) if s == n_stations - 1: plt.xlabel("Time (s)") else: plt.xticks([]) plt.subplots_adjust(bottom=0.07, top=0.94, hspace=0.04, wspace=0.12) plt.suptitle( "Template {:d}, location: {:.2f}$^{{\mathrm{{o}}}}$E," "{:.2f}$^{{\mathrm{{o}}}}$N,{:.2f}km {}".format( template.template_idx, template.longitude, template.latitude, template.depth, uncertainty_label, ), fontsize=16, ) if show: plt.show()
[docs] def plot_detection_matrix( X, datetimes=None, stack=None, title=None, ax=None, show=True, **kwargs ): kwargs["time_min"] = kwargs.get("time_min", None) kwargs["time_max"] = kwargs.get("time_max", None) kwargs["text_offset"] = kwargs.get("text_offset", 0.1) kwargs["text_size"] = kwargs.get("text_size", plt.rcParams["font.size"]) kwargs["datetime_format"] = kwargs.get("datetime_format", "%Y,%m,%d--%H:%M:%S") if datetimes is not None: # reorder X new_order = np.argsort(datetimes) X = X[new_order, :] datetimes = datetimes[new_order] n_detections = X.shape[0] if ax is None: fig = plt.figure("detection_matrix", figsize=(18, 9)) ax = fig.add_subplot(111) else: fig = ax.get_figure() ax.set_title(title) time = np.linspace(0.0, X.shape[-1] / cfg.SAMPLING_RATE_HZ, X.shape[-1]) time_min = kwargs["time_min"] if kwargs["time_min"] is not None else time.min() time_max = kwargs["time_max"] if kwargs["time_max"] is not None else time.max() time -= time_min if stack is not None: offset = 2.0 ax.plot(time, stack, color="C3", label="SVDWF Stack") else: offset = 0.0 for i in range(n_detections): label = "Individual Events" if i == 0 else "" ax.plot(time, utils.max_norm(X[i, :]) + offset, lw=0.75, color="k", label=label) if datetimes is not None: plt.text( 0.50, offset + kwargs["text_offset"], udt(datetimes[i]).strftime(kwargs["datetime_format"]), bbox={"facecolor": "white", "alpha": 0.75}, fontsize=kwargs["text_size"], ) offset += 2.0 ax.set_ylabel("Offset normalized amplitude") ax.set_xlabel("Time (s)") ax.set_xlim(0.0, time_max - time_min) ax.legend(loc="upper right") plt.subplots_adjust(top=0.96, bottom=0.06) if show: plt.show() return fig
[docs] def plot_catalog( tids=None, db_path_T=None, db_path_M=None, catalog=None, ax=None, remove_multiples=True, scat_kwargs={}, cmap=None, db_path=cfg.INPUT_PATH, ): if cmap is None: try: import colorcet as cc cmap = cc.cm.bjy except Exception as e: print(e) cmap = "viridis" # ------------------------------------------------ # Scattering plot kwargs scat_kwargs["edgecolor"] = scat_kwargs.get("edgecolor", "k") scat_kwargs["linewidths"] = scat_kwargs.get("linewidths", 0.5) scat_kwargs["s"] = scat_kwargs.get("s", 10) scat_kwargs["zorder"] = scat_kwargs.get("zorder", 0) # ------------------------------------------------ if catalog is None: # if catalog is None, tids, db_path_T and db_path_M # should be given # ------------------ # compile detections from these templates in a single # earthquake catalog catalog_filenames = [f"multiplets{tid}catalog.h5" for tid in tids] AggCat = dataset.AggregatedCatalogs( filenames=catalog_filenames, db_path_M=db_path_M, db_path=db_path ) AggCat.read_data(items_in=["origin_times", "location", "unique_events"]) catalog = AggCat.flatten_catalog( attributes=["origin_times", "latitude", "longitude", "depth"], unique_events=True, ) cNorm = Normalize(vmin=catalog["latitude"].min(), vmax=catalog["latitude"].max()) scalar_map = ScalarMappable(norm=cNorm, cmap=cmap) scalar_map.set_array([]) # plot catalog if ax is None: fig = plt.figure("earthquake_catalog", figsize=(18, 9)) ax = fig.add_subplot(111) else: # use the user-provided axis fig = ax.get_figure() ax.set_title("{:d} events".format(len(catalog["origin_times"]))) ax.set_xlabel("Calendar Time") ax.set_ylabel("Longitude") times = np.array( [str(udt(time)) for time in catalog["origin_times"]], dtype="datetime64" ) ax.scatter( times, catalog["longitude"], color=scalar_map.to_rgba(catalog["latitude"]), rasterized=True, **scat_kwargs, ) ax.set_xlim(times.min(), times.max()) ax_divider = make_axes_locatable(ax) cax = ax_divider.append_axes("right", size="2%", pad=0.08) plt.colorbar(scalar_map, cax, orientation="vertical", label="Latitude") return fig
# --------------------------------------------------------------- # Utils for maps # ---------------------------------------------------------------
[docs] def initialize_map( map_longitudes, map_latitudes, map_axis=None, seismic_stations=None, text_size=14, markersize=10, topography_file=None, path_topo="", faults=None, right_labels=False, left_labels=True, bottom_labels=True, top_labels=False, **kwargs, ): """Initialize map instance with Cartopy.""" import cartopy as ctp kwargs["topo_alpha"] = kwargs.get("topo_alpha", 0.30) kwargs["downsample_faults"] = kwargs.get("downsample_faults", True) kwargs["shaded_topo"] = kwargs.get("shaded_topo", True) kwargs["topo_cmap"] = kwargs.get("topo_cmap", "gray") kwargs["topo_cnorm"] = kwargs.get("topo_cnorm", None) kwargs["fault_zorder"] = kwargs.get("fault_zorder", 1.01) figsize = kwargs.get("figsize", (15, 15)) map_corners = [ map_longitudes[0], map_latitudes[0], map_longitudes[1], map_latitudes[1], ] data_coords = ctp.crs.PlateCarree() if map_axis is None: # projection = ctp.crs.PlateCarree() projection = ctp.crs.Mercator( central_longitude=sum(map_longitudes) / 2.0, min_latitude=map_latitudes[0], max_latitude=map_latitudes[1], ) fig = plt.figure(kwargs.get("figname", "map"), figsize=figsize) map_axis = fig.add_subplot(111, projection=projection) map_axis.set_rasterization_zorder(1) map_axis.set_extent( [map_longitudes[0], map_longitudes[1], map_latitudes[0], map_latitudes[1]], crs=data_coords, ) RES = "10m" if topography_file is not None: # ----------------------- # get topography import netCDF4 with netCDF4.Dataset(os.path.join(path_topo, topography_file), "r") as f: if "z" in f.variables: topo = f.variables["z"][:].data elif "Band1" in f.variables: topo = f.variables["Band1"][:].data if "lon" in f.variables: lon_topo = f.variables["lon"][:].data elif "x" in f.variables: lon_topo = f.variables["x"][:].data if "lat" in f.variables: lat_topo = f.variables["lat"][:].data elif "y" in f.variables: lat_topo = f.variables["y"][:].data # select relevant area selected_lon = np.where( (lon_topo >= map_longitudes[0]) & (lon_topo <= map_longitudes[1]) )[0] selected_lat = np.where( (lat_topo >= map_latitudes[0]) & (lat_topo <= map_latitudes[1]) )[0] lon_topo = lon_topo[selected_lon] lat_topo = lat_topo[selected_lat] topo = topo[selected_lat, :] topo = topo[:, selected_lon] # make sure to take these arrays in ascending lons and lats ascending_lon = np.argsort(lon_topo) ascending_lat = np.argsort(lat_topo) lon_topo, lat_topo = lon_topo[ascending_lon], lat_topo[ascending_lat] topo = topo[ascending_lat, :] topo = topo[:, ascending_lon] if kwargs["shaded_topo"]: # get topography gradient grad_x, grad_y = np.gradient(topo) slope = np.pi / 2.0 - np.arctan(np.sqrt(grad_x**2 + grad_y**2)) aspect = np.arctan2(grad_x, grad_y) altitude = np.pi / 4.0 # sun angle azimuth = np.pi / 2.0 # sun direction topo = np.sin(altitude) * np.sin(slope) + np.cos(altitude) * np.cos( slope ) * np.cos((azimuth - np.pi / 2.0) - aspect) # levels_topo = np.linspace(-500., topo.max(), 20) # print('Plot the topography.') # transform data # fast version lon_g, lat_g = np.meshgrid(lon_topo, lat_topo, indexing="xy") trans_data = map_axis.projection.transform_points( data_coords, lon_g, lat_g, topo ) X = trans_data[..., 0] Y = trans_data[..., 1] Z = trans_data[..., 2] map_axis.imshow( Z, extent=[X.min(), X.max(), Y.min(), Y.max()], cmap=kwargs["topo_cmap"], norm=kwargs["topo_cnorm"], origin="lower", alpha=kwargs["topo_alpha"], interpolation="bilinear", zorder=-1, ) # ------------ DRAW MERIDIANS AND PARALLELS -------------- LON0 = (int(map_longitudes[0] / 0.5) + 1.0) * 0.5 LON1 = (int(map_longitudes[1] / 0.5) + 1.0) * 0.5 # lon_ticks = np.arange(map_longitudes[0], map_longitudes[1]+0.5, 0.5) lon_ticks = np.arange(LON0 - 0.5, LON1 + 0.5, 0.5) LAT0 = (int(map_latitudes[0] / 0.5) + 1.0) * 0.5 LAT1 = (int(map_latitudes[1] / 0.5) + 1.0) * 0.5 # lat_ticks = np.arange(map_latitudes[0], map_latitudes[1]+0.5, 0.5) lat_ticks = np.arange(LAT0 - 0.5, LAT1 + 0.5, 0.5) # -------------------------------------------------------- # ----------- DRAW BORDERS ------------- brds = ctp.feature.BORDERS # -------------------------------------- gl = map_axis.gridlines( draw_labels=True, linewidth=1, alpha=0.5, color="k", linestyle="--" ) gl.right_labels = right_labels gl.left_labels = left_labels gl.top_labels = top_labels gl.bottom_labels = bottom_labels if kwargs.get("coastlines", True): # print('Add coast lines.') map_axis.add_feature( ctp.feature.GSHHSFeature( scale="full", levels=kwargs.get("coastline_levels", [1, 2]), zorder=0.49 ) ) # , rasterized=True)) # comment this to save time on plotting high-resolution oceans if kwargs.get("oceans", False): oceans = ctp.feature.OCEAN map_axis.add_feature( ctp.feature.NaturalEarthFeature( category=oceans.category, name=oceans.name, scale="10m", facecolor="#b0bfe8ff", ) ) if faults is not None: map_axis.add_geometries( faults["geometry"], crs=data_coords, facecolor="none", edgecolor="k" ) # plot optional elements props = dict( boxstyle="round", facecolor="white", edgecolor=None, alpha=0.6, pad=0.2 ) # plot seismic stations if seismic_stations is not None: # print('Add seismic stations.') for s in range(len(seismic_stations["stations"])): if ( (seismic_stations["longitude"][s] > map_longitudes[1]) or (seismic_stations["longitude"][s] < map_longitudes[0]) or (seismic_stations["latitude"][s] > map_latitudes[1]) or (seismic_stations["latitude"][s] < map_latitudes[0]) ): continue map_axis.plot( seismic_stations["longitude"][s], seismic_stations["latitude"][s], marker="v", color="k", markersize=markersize, transform=data_coords, zorder=1, ) if seismic_stations["stations"][s] != "": map_axis.text( seismic_stations["longitude"][s] + 0.02, seismic_stations["latitude"][s], seismic_stations["stations"][s], fontsize=text_size, transform=data_coords, zorder=2, bbox=props, ) return map_axis
[docs] def add_scale_bar( ax, x_start, y_start, distance, source_crs, orientation="longitudinal", **kwargs ): """ Parameters ----------- ax: GeoAxes instance The axis on which we want to add a scale bar. x_start: float The x coordinate of the left end of the scale bar, given in the axis coordinate system, i.e. from 0 to 1. y_start: float The y coordinate of the left end of the scale bar, given in the axis coordinate system, i.e. from 0 to 1. distance: float The distance covered by the scale bar, in km. source_crs: cartopy.crs The coordinate system in which the data are written. orientation: string, default to 'longitudinal' Either 'longitudinal' or 'latitudinal'. Determine the orientation of the scale bar. """ from cartopy.geodesic import Geodesic from cartopy.crs import PlateCarree G = Geodesic() # default values kwargs["lw"] = kwargs.get("lw", 2) kwargs["color"] = kwargs.get("color", "k") data_coords = PlateCarree() # transform the axis coordinates into display coordinates display = ax.transAxes.transform([x_start, y_start]) # take display coordinates into data coordinates data = ax.transData.inverted().transform(display) # take data coordinates into lon/lat lon_start, lat_start = data_coords.transform_point(data[0], data[1], source_crs) # get the coordinates of the end of the scale bar if orientation == "latitudinal": lon_end, lat_end, _ = np.asarray( G.direct([lon_start, lat_start], 0.0, 1000.0 * distance) )[0] elif orientation == "longitudinal": # first, compute distance a function of longitude at # the given latitude lat_start dist_per_lon = 0.0 longitudes = np.linspace(lon_start, lon_start + 1.0, 100) for i in range(1, len(longitudes)): dist_per_lon += utils.two_point_distance( (longitudes[i - 1] + 180.0) % 360.0 - 180.0, lat_start, 0.0, (longitudes[i] + 180.0) % 360.0 - 180.0, lat_start, 0.0, ) lon_end = ((lon_start + distance / dist_per_lon) + 180.0) % 360.0 - 180.0 lat_end = lat_start else: print("`orientation` should be 'longitudinal' or 'latitudinal'.") return ax.plot([lon_start, lon_end], [lat_start, lat_end], transform=data_coords, **kwargs) ax.text( (lon_start + lon_end) / 2.0, (lat_start + lat_end) / 2.0 - 0.001, "{:.0f}km".format(distance), transform=data_coords, ha="center", va="top", ) return
[docs] def uncertainty_ellipse( hmax_uncertainty_km, hmin_uncertainty_km, hmax_azimuth_deg, longitude_center, latitude_center, num_points=100 ): """ Compute a set of longitude and latitude points that describe the uncertainty ellipse in the horizontal plane. Parameters ---------- hmax_uncertainty_km : float The length of the major axis of the uncertainty ellipse in kilometers. hmin_uncertainty_km : float The length of the minor axis of the uncertainty ellipse in kilometers. hmax_azimuth_deg : float The azimuth angle of the major axis of the uncertainty ellipse in degrees. longitude_center : float The longitude coordinate of the center point of the ellipse. latitude_center : float The latitude coordinate of the center point of the ellipse. num_points : int, optional The number of points used to describe the ellipse, default is 100. Returns ------- longitude_ellipse : numpy.ndarray The longitude coordinates of the points that make up the uncertainty ellipse. latitude_ellipse : numpy.ndarray The latitude coordinates of the points that make up the uncertainty ellipse. """ from cartopy.geodesic import Geodesic azimuths = np.linspace(0., 360., num_points) theta = np.deg2rad(-(azimuths - hmax_azimuth_deg)) # ellipse formula eccentricity_pow2 = 1. - (hmin_uncertainty_km / hmax_uncertainty_km)**2 ellipse_km = hmin_uncertainty_km / np.sqrt(1. - eccentricity_pow2 * np.cos(theta)**2) # ray shooting with Geodesic G = Geodesic() longitude_ellipse, latitude_ellipse, _ = np.asarray( G.direct([longitude_center, latitude_center], azimuths, 1000. * ellipse_km) ).T return longitude_ellipse, latitude_ellipse
[docs] def vertical_uncertainty_ellipse( cov_mat, longitude_center, latitude_center, depth_center, horizontal_direction="longitude", num_points=100 ): """ Compute a set of longitude and latitude points that describe the uncertainty ellipse in the vertical plane. Parameters ----------- cov_mat : numpy.ndarray The (3x3) covariance matrix returned by Event.relocate(method='NLLoc'). longitude_center : float The longitude coordinate of the center point of the ellipse. latitude_center : float The latitude coordinate of the center point of the ellipse. depth_center : float The depth, in km, of the center point of the ellipse. horizontal_direction : str, optional Either 'longitude' or 'latitude'. Define the x-coordinate of the plane. num_points : int, optional The number of points used to describe the ellipse, default is 100. Returns ------- longitude_ellipse : numpy.ndarray The longitude coordinates of the points that make up the uncertainty ellipse. latitude_ellipse : numpy.ndarray The latitude coordinates of the points that make up the uncertainty ellipse. depth_ellipse : numpy.ndarray The depth, in km, of the points that make up the uncertainty ellipse. """ from cartopy.geodesic import Geodesic # ray shooting with Geodesic G = Geodesic() phis = np.linspace(0., 360., num_points) if horizontal_direction == "longitude": # az_max_edg is angle between vertical axis and max_unc_km, min_unc_km, phi_max_deg, phi_min_deg = utils.cov_mat_intersection( cov_mat, axis1=0, axis2=2 ) theta = np.deg2rad((phis - phi_max_deg)) # ellipse formula eccentricity_pow2 = 1. - (min_unc_km / max_unc_km)**2 ellipse_km = min_unc_km / np.sqrt(1. - eccentricity_pow2 * np.cos(theta)**2) # this is the ellipse depth depth_ellipse = depth_center - ellipse_km * np.cos(np.deg2rad(phis)) # this is the ray length ellipse_horizontal_km = ellipse_km * np.sin(np.deg2rad(phis)) # this is the ray azimuth ray_azimuth = 270. elif horizontal_direction == "latitude": max_unc_km, min_unc_km, phi_max_deg, phi_min_deg = utils.cov_mat_intersection( cov_mat, axis1=1, axis2=2 ) theta = np.deg2rad((phis - phi_max_deg)) # ellipse formula eccentricity_pow2 = 1. - (min_unc_km / max_unc_km)**2 ellipse_km = min_unc_km / np.sqrt(1. - eccentricity_pow2 * np.cos(theta)**2) # this is the ellipse depth depth_ellipse = depth_center - ellipse_km * np.cos(np.deg2rad(phis)) # this is the ray length ellipse_horizontal_km = ellipse_km * np.sin(np.deg2rad(phis)) # this is the ray azimuth ray_azimuth = 180. else: print( "horizontal_direction should be either of 'longitude' or 'latitude'." ) longitude_ellipse, latitude_ellipse, _ = np.asarray( G.direct( [longitude_center, latitude_center], ray_azimuth, 1000. * ellipse_horizontal_km ) ).T return longitude_ellipse, latitude_ellipse, depth_ellipse