Source code for orbital_radar.suborbital

"""
This module contains the OrbitalRadar class that runs the simulator for
suborbital radar data. It is a subclass of the Simulator class.

Difference between ground-based and airborne radar geometry:
- along-track coordinate from mean wind for ground based and mean flight vel.
from airborne radar
- no ground echo added to airborne radar
- range grid of airborne radar assumed to be height above mean sea level and
height above ground for groundbased
- no attenuation correction for airborne radar
- lat/lon coordinates included as input to airborne radar
"""

import os

import numpy as np
import pandas as pd
import xarray as xr

from orbital_radar.helpers import db2li, li2db
from orbital_radar.radarspec import RadarBeam
from orbital_radar.readers.cloudnet import read_cloudnet
from orbital_radar.readers.config import read_config
from orbital_radar.readers.radar import Radar
from orbital_radar.simulator import Simulator
from orbital_radar.version import __version__
from orbital_radar.writers.spaceview import write_spaceview


[docs]class Suborbital(Simulator): """ Run the simulator for suborbital radar data. """ # list of all suborbital radar locations names = { "groundbased": [ "bco", "jue", "nor", "mag", "min", "nya", "arm", "pamtra", ], "airborne": [ "mp5", "rasta", ], } def __init__( self, geometry, name, config_file, suborbital_radar, input_radar_format ): """ Initialize the simulator for suborbital radar data. Parameters ---------- geometry : str Observation geometry of radar (groundbased or airborne). name : str Name of the suborbital radar (abbreviated). config_file : str Path to the configuration file that contains the site-dependent parameters and directory paths. suborbital_radar : str Name of the suborbital radar (abbreviated). input_radar_format : str Format of the input radar data (e.g. cloudnet). """ # make sure that geometry is valid if geometry not in self.names.keys(): raise ValueError( f"Geometry {geometry} not implemented. Choose from " f"{list(self.names.keys())}" ) # check if site is in list of implemented sites if name not in self.names[geometry]: raise ValueError( f"Site {name} not implemented. Choose from " f"{self.names[geometry]}" ) # check if config file is provided and exists if config_file is None: raise ValueError("No configuration file provided") if not os.path.isfile( os.path.join(os.environ["ORBITAL_RADAR_CONFIG_PATH"], config_file) ): raise FileNotFoundError( f"Configuration file {config_file} not found" ) # set class attributes self.geometry = geometry self.name = name self.suborbital_radar = suborbital_radar self.input_radar_format = input_radar_format # attributes that will be derived self.is_sea_level = False # read configuration file self.config = read_config(config_file) # check if output path exists assert os.path.exists( self.config["paths"][self.name]["output"] ), f"Output path {self.config['paths'][self.name]['output']} does not exist" self.path_out = self.config["paths"][self.name]["output"] self.frequency = self.config["suborbital_radar"][ self.suborbital_radar ]["frequency"] # preparation of input radar data self.prepare = self.config["prepare"]["general"] self.prepare.update(self.config["prepare"][self.geometry]) # overview of simulation settings self.summary # initialize simulator class with spaceborne radar settings super().__init__( sat_name=self.config["spaceborne_radar"]["sat_name"], file_earthcare=self.config["spaceborne_radar"]["file_earthcare"], nyquist_from_prf=self.config["spaceborne_radar"]["nyquist_from_prf"], ms_threshold=self.config["spaceborne_radar"]["ms_threshold"], ms_threshold_integral=self.config["spaceborne_radar"][ "ms_threshold_integral" ], **self.config["spaceborne_radar"]["radar_specs"], ) @property def summary(self): """ Prints short summary of simulator settings. """ print(f"Site: {self.name}") print("\n") print("Directory paths:") print(f"Input: {self.config['paths'][self.name]['radar']}") print(f"Output: {self.config['paths'][self.name]['output']}") print("\n") if self.geometry == "groundbased": print("Groundbased data is prepared with:") print(f"Mean wind: {self.prepare['mean_wind']} m/s") if self.geometry == "airborne": print("Airborne data is prepared with:") print( f"Mean flight velocity: " f"{self.prepare['mean_flight_velocity']} m/s" ) print(f"Height min: {self.prepare['height_min']} m") print(f"Height max: {self.prepare['height_max']} m") print(f"Height res: {self.prepare['height_res']} m") print("\n")
[docs] @staticmethod def prepare_dates(start_date, end_date): """ Creates a date array from start to end date. Parameters ---------- start_date : np.datetime64 Start date. end_date : np.datetime64 End date. Returns ------- dates: pd.DatetimeIndex Date range from start to end date. """ # check if start date is before end date if start_date > end_date: raise ValueError("Start date must be before end date") dates = pd.date_range(start_date, end_date) return dates
[docs] def convert_frequency(self, ds): """ Convert frequency from 35 to 94 GHz. The conversion is based on Kollias et al. (2019) (doi: https://doi.org/10.5194/amt-12-4949-2019) Parameters ---------- ds : xarray.Dataset Data with "ze" variable in mm6/mm3. Ze was measured at 35 GHz. Returns ------- ds : xarray.Dataset Data with converted "ze" variable. Ze is now transformed to 94 GHz. """ # keep only reflectivities below 30 dBZ ds["ze"] = ds["ze"].where(ds["ze"] < db2li(30)) a = -16.8251 b = 8.4923 ds["ze"] = db2li( li2db(ds["ze"]) - 10**a * (li2db(ds["ze"]) + 100) ** b ) # set negative ze to zero ds["ze"] = ds["ze"].where(ds["ze"] > 0.0, 0.0) return ds
[docs] def correct_dielectric_constant(self, ds): r""" Apply correction for dielectric constant assumed in Ze calculation of suborbital radar to match the dielectric constant of the spaceborne radar. Correction equation with :math:`K_g` and :math:`K_s` as dielectric constants of the suborbital and spaceborne radar, respectively: .. math:: Z_e = 10 \log_{10} \left( \frac{K_s}{K_g} \right) + Z_e Parameters ---------- ds : xarray.Dataset Data with "ze" variable. """ correction = ( self.config["spaceborne_radar"]["k2"] / self.config["suborbital_radar"][self.suborbital_radar]["k2"] ) ds["ze"] = db2li(li2db(ds["ze"]) + 10 * np.log10(correction)) return ds
[docs] def add_vmze_attrs(self, ds): """ Adds attributes to Doppler velocity and radar reflectivity variables. Parameters ---------- ds : xarray.Dataset Data with "ze" and "vm" variables. Returns ------- ds : xarray.Dataset Data with added attributes. """ ds["ze"].attrs = dict( units="mm6 m-3", standard_name="radar_reflectivity", long_name="Radar reflectivity", description="Radar reflectivity", ) ds["vm"].attrs = dict( units="m s-1", standard_name="Doppler_velocity", long_name="Doppler velocity", description="Doppler velocity", ) return ds
[docs] def check_is_sea_level(self, ds): """ Check if input radar range/height grid is defined with respect to ground level or sea level. The input to the simulator should be wrt. sea level. Parameters ---------- ds : xr.Dataset Input radar data """ if "height" in list(ds.coords.keys()): self.is_sea_level = True else: self.is_sea_level = False
[docs] def range_to_height(self, ds): """ Convert range coordinate to height coordinate by adding the station height above mean sea level to the range coordinate. The altitude is pre-defined for each station in the configuration file. Parameters ---------- ds : xarray.Dataset Data with "range" coordinate. """ ds["height"] = ds["range"] + ds.alt.item() # swap range with height ds = ds.swap_dims({"range": "height"}) # drop range coordinate ds = ds.reset_coords(drop=True) return ds
[docs] def create_along_track(self, ds): """ Creates along-track coordinates from time coordinates. Parameters ---------- ds : xarray.Dataset Data with "time" and "height" coordinates. Returns ------- ds : xarray.Dataset Data with "along_track" coordinate. """ if self.geometry == "groundbased": print("Using mean wind for along-track coordinates") v = self.prepare["mean_wind"] elif self.geometry == "airborne" and "ac_speed" in list(ds): print("Using flight velocity for along-track coordinates") v = ds.ac_speed.values elif self.geometry == "airborne" and "ac_speed" not in list(ds): print("Using mean flight velocity for along-track coordinates") v = self.prepare["mean_flight_velocity"] else: raise ValueError( f"Geometry {self.geometry} not implemented. Choose from " f"{list(self.names.keys())}" ) # calculate the along-track distance dt = ds.time.diff("time") / np.timedelta64(1, "s") dt = xr.align(ds.time, dt, join="outer")[1].fillna(0) # start is dt=0 arr_along_track = np.cumsum(v * dt) da_along_track = xr.DataArray( arr_along_track, dims="time", coords=[ds.time], name="along_track" ) da_along_track.attrs = dict( standard_name="along_track_distance", long_name="Along track distance", units="m", description="Distance along track of the suborbital radar", ) # swap from time to along track ds = ds.assign_coords(along_track=da_along_track) ds = ds.swap_dims({"time": "along_track"}) # add time as variable ds = ds.reset_coords() return ds
[docs] def create_regular_height(self): """ Creates regular height coordinate for suborbital radar. Returns ------- xarray.DataArray Regular height coordinate for suborbital radar. """ height_regular = np.arange( self.prepare["height_min"], self.prepare["height_max"], self.prepare["height_res"], ) da_height_regular = xr.DataArray( height_regular, dims="height", coords=[height_regular] ) da_height_regular.attrs = dict( units="m", standard_name="height", long_name="Height of radar bin above sea level", description="Height of radar bin above sea level", ) return da_height_regular
[docs] def create_regular_along_track(self, ds): """ Creates regular along-track coordinate for suborbital radar. Parameters ---------- ds : xarray.Dataset Data with "along_track" coordinate. Returns ------- xarray.DataArray Regular along-track coordinate for suborbital radar. """ along_track_res = np.round( ds.along_track.diff("along_track").median().item() ) along_track_max = ds.along_track.max().item() along_track_regular = np.arange( 0, along_track_max, along_track_res, ) da_along_track_regular = xr.DataArray( along_track_regular, dims="along_track", coords=[along_track_regular], ) da_along_track_regular.attrs = ds.along_track.attrs return da_along_track_regular
[docs] def interpolate_to_regular_grid(self, ds): """ Interpolates radar data to regular grid in along-track and height. Parameters ---------- ds : xarray.Dataset Data with "time" and "height" coordinates. Returns ------- ds : xarray.Dataset Data with interpolated "along_track" and "height" coordinates. """ da_height_regular = self.create_regular_height() da_along_track_regular = self.create_regular_along_track(ds=ds) # interpolation along-track and height # workaround for time: convert to seconds since start, then # interpolate, then convert back to datetime da_time = ds["time"] ds = ds.interp( along_track=da_along_track_regular, height=da_height_regular, method="nearest", ) # get nearest time for each regular along track grid point ds.coords["time"] = ( ("along_track"), da_time.sel(along_track=ds.along_track, method="nearest").values, ) # add attributes ds = self.add_vmze_attrs(ds) return ds
[docs] def add_ground_echo(self, ds): """ Calculates artificial ground echo inside ground-based radar range grid. The values are chosen such that the final ground echo after the along-range convolution is equal to the ground echo of the satellite. The pulse length used here is not the same as the pulse length of the satellite. Parameters ---------- ds : xarray.Dataset Data with "ze" and "vm" variables. Returns ------- ds : xarray.Dataset Data with added ground echo. """ assert len(np.unique(np.diff(ds.height))) == 1, ( "Height grid is not equidistant. " "Range weighting function cannot be calculated." ) # grid with size of two pulse lengths centered around zero height_bins = np.arange( -self.prepare["ground_echo_pulse_length"], self.prepare["ground_echo_pulse_length"] + self.prepare["height_res"], self.prepare["height_res"], ) # calculate range weighting function weights = RadarBeam.normalized_range_weighting_function_default( pulse_length=self.prepare["ground_echo_pulse_length"], range_bins=height_bins, ) ground_echo = weights * db2li(self.prepare["ground_echo_ze_max"]) # add ground echo to dataset shifted by one height bin to have maximum # below zero # get closest height bin to ground idx = (np.abs(ds.height - ds.alt.item())).argmin() base = ds.height[idx].item() # insert half of the calculated ground echo and shift maximum below the # surface ground_echo = ground_echo[int(len(ground_echo) / 2) :] height_bins = ( base + height_bins[int(len(height_bins) / 2) :] - self.prepare["height_res"] ) # add ground echo to dataset (first fill nan values with zero in this # height interval) ds["ze"].loc[{"height": height_bins}] = ( ds["ze"].loc[{"height": height_bins}].fillna(0) ) ds["ze"].loc[{"height": height_bins}] += ground_echo return ds
[docs] def add_groundbased_variables(self): """ Add variables specific to groundbased simulator to the dataset, i.e., the mean horizontal wind. """ self.ds["mean_wind"] = xr.DataArray( self.prepare["mean_wind"], attrs=dict( standard_name="v_hor", long_name="Mean horizontal wind", units="m s-1", description="Mean horizontal wind", ), )
[docs] def add_airborne_variables(self): """ Add variables specific to airborne simulator to the dataset, i.e., the mean flight velocity. """ self.ds["mean_flight_velocity"] = xr.DataArray( self.prepare["mean_flight_velocity"], attrs=dict( standard_name="v_hor", long_name="Mean flight velocity", units="m s-1", description="Mean flight velocity", ), )
[docs] def to_netcdf(self, date): """ Writes dataset to netcdf file. Note that not all variables are stored. Parameters ---------- date : np.datetime64 Date of the simulation. Used to create the filename. """ output_variables = [ "sat_ifov", "sat_range_resolution", "sat_along_track_resolution", "ze", "vm", "ze_sat", "vm_sat", "vm_sat_vel", "ze_sat_noise", "vm_sat_noise", "vm_sat_folded", "nubf_flag", "ms_flag", "folding_flag", ] if self.geometry == "groundbased": output_variables += ["mean_wind"] if self.geometry == "airborne": output_variables += ["mean_flight_velocity"] # name of output nc file filename = ( "_".join( [ "ora", __version__, self.config["spaceborne_radar"]["sat_name"], "l1", self.geometry, self.name, self.suborbital_radar, pd.Timestamp(date).strftime("%Y%m%d") + "T000000", pd.Timestamp(date).strftime("%Y%m%d") + "T235959", ] ) + ".nc" ) filename = os.path.join( self.path_out, filename, ) write_spaceview(ds=self.ds[output_variables], filename=filename)
[docs] def add_attenuation(self, ds, da_gas_atten): """ Add attenuation to dataset. Parameters ---------- ds : xarray.Dataset Data with "ze" variable. Unit: mm6 m-3 da_gas_atten : xarray.DataArray Interpolated gas attenuation data on the same grid as ds. Unit: dBZ. Returns ------- ds : xarray.Dataset Data with added attenuation. """ # add attenuation in dB and convert back to ds["ze"] = db2li(li2db(ds["ze"]) + da_gas_atten) return ds
[docs] def attenuation_correction(self, ds, ds_cloudnet): """ Gas attenuation correction based on Cloudnet. Cloudnet contains gas attenuation (gas_atten) as a function of 137 ERA5 levels. The height of each level varies with time. Therefore, the height is first interpolated onto the height grid of the radar data. Then, the gas attenuation is interpolated onto the time and height grid of the radar data. Finally, the gas attenuation is added to the radar reflectivity. There are major differences between the cloudnet_ecmwf and cloudnet_categorize attenuation products: - ecmwf height is time-dependent, categorize height is not - ecmwf height is wrt ground, categorize height is wrt mean sea level - ecmwf variable is named "radar_gas_atten", categorize variable is named "gas_atten" - ecmwf is calculated for both frequencies, categorize only for 94 GHz Parameters ---------- ds : xarray.Dataset Data with "ze" variable. Unit: mm6 m-3 ds_cloudnet : xarray.Dataset Cloudnet data with "gas_atten" variable. Unit: dBZ Returns ------- ds : xarray.Dataset Data with added attenuation. """ # select 94 GHz frequency if "frequency" in list(ds_cloudnet.dims): ds_cloudnet = ds_cloudnet.sel(frequency=94, method="nearest") if self.prepare["attenuation_correction_input"] == "cloudnet_ecmwf": lst_da_gas_atten = [] for time in ds_cloudnet.time: # interpolate gas attenuation to radar height grid ds_cloudnet_t = ds_cloudnet.sel(time=time).swap_dims( {"level": "height"} ) da_cloudnet_gas_atten_t = ds_cloudnet_t.gas_atten.interp( height=ds.height, method="linear" ) lst_da_gas_atten.append(da_cloudnet_gas_atten_t) # merge all time steps da_gas_atten = xr.concat(lst_da_gas_atten, dim="time") # drop levels da_gas_atten = da_gas_atten.reset_coords(drop=True) elif ( self.prepare["attenuation_correction_input"] == "cloudnet_categorize" ): # rename to match ecmwf variable ds_cloudnet = ds_cloudnet.rename({"radar_gas_atten": "gas_atten"}) # interpolate to radar height grid da_gas_atten = ds_cloudnet.gas_atten.interp( height=ds.height, method="linear" ) else: raise ValueError( f"Attenuation correction input " f"{self.prepare['attenuation_correction_input']} not " f"implemented" ) # interpolate to radar time grid and extrapolate if needed da_gas_atten = da_gas_atten.interp( time=ds.time, method="linear", kwargs={"fill_value": "extrapolate"} ) # ensures every time step contains attenuation in range column has_atten = ~da_gas_atten.isnull().all("height") assert ( has_atten.all() ), f"Attenuation missing for times: {da_gas_atten.time[~has_atten]}" # fill nan values with zero da_gas_atten = da_gas_atten.fillna(0) # apply attenuation correction ds = self.add_attenuation(ds=ds, da_gas_atten=da_gas_atten) return ds
[docs] def run_date(self, date, write_output=True): """ Runs simulation for a single day. Parameters ---------- date : np.datetime64 Date to simulate. write_output : bool If True, write output to netcdf file. """ # read radar data if self.input_radar_format == "cloudnet": radar_path = self.config["paths"][self.name]["cloudnet"] else: radar_path = self.config["paths"][self.name]["radar"] rad = Radar( date=date, site_name=self.name, path=radar_path, input_radar_format=self.input_radar_format, ) # skip if radar data does not exist if rad.ds_rad is None: print(f"{date}: No radar data found") return # read cloudnet data if self.geometry == "groundbased": ds_cloudnet = read_cloudnet( attenuation_correction_input=self.prepare[ "attenuation_correction_input" ], date=date, site_name=self.name, path=self.config["paths"][self.name]["cloudnet"], ) if ds_cloudnet is None: self.prepare["attenuation_correction"] = False # frequency conversion if self.frequency == 35: rad.ds_rad = self.convert_frequency(rad.ds_rad) # correct dielectric constant if self.frequency == 94: rad.ds_rad = self.correct_dielectric_constant(rad.ds_rad) # range to height self.check_is_sea_level(rad.ds_rad) if (self.geometry == "groundbased") and not self.is_sea_level: print("Converting radar range grid to height above mean sea level") rad.ds_rad = self.range_to_height(rad.ds_rad) else: print( "Assume that input radar grid is defined as height above mean " "sea level" ) # create along track dimension rad.ds_rad = self.create_along_track(ds=rad.ds_rad) # interpolate to regular grid ds = self.interpolate_to_regular_grid(rad.ds_rad) if self.geometry == "groundbased": # attenuation correction # no attenuation correction with 35 GHz radar reflectivity if self.frequency == 35: self.prepare["attenuation_correction"] = False if self.prepare["attenuation_correction"]: ds = self.attenuation_correction(ds, ds_cloudnet) # add ground echo ds = self.add_ground_echo(ds) # run simulator self.transform(ds) # add horizontal wind variable if self.geometry == "groundbased": self.add_groundbased_variables() if self.geometry == "airborne": self.add_airborne_variables() # write output to file if write_output: self.to_netcdf(date=date)
[docs] def run(self, start_date, end_date, write_output=True): """ Runs simulation for all days in the time frame. Parameters ---------- start_date : np.datetime64 Start date. end_date : np.datetime64 End date (inclusive). """ dates = self.prepare_dates(start_date, end_date) for i, date in enumerate(dates): print(f"Processing {date} ({i+1}/{len(dates)})") self.run_date(date, write_output=write_output)