Source code for orbital_radar.readers.radar

"""
This script contains all reader functions for the different radar formats. 
These functions are wrapped by the main function, which picks the correct
reader depending on the radar site.

The final output is always an xarray.Dataset with the two variables radar 
reflectivity "ze" in [mm6 m-3] and "vm" in [m s-1] as a function of range and
time.

The input Doppler velocity should be negative for downward motion and positive
for upward motion. This is changed to negative upward and positive downward to
match spaceborne convention.
"""

import os
import os.path
from glob import glob
from scipy.interpolate import interp1d

import numpy as np
import pandas as pd
import xarray as xr

from orbital_radar.readers.cloudnet import read_cloudnet


[docs]class Radar: """ This class selects the reading function for the provided site and performs quality-checks of the imported data. The output contains "ze" and "vm" variables. Implemented readers ------------------- - bco: Barbados Cloud Observatory, Barbados - jue: JOYCE, Juelich, Germany - mag: Magurele, Romania - min: Mindelo, Cape Verde - nor: Norunda, Sweden - nya: Ny-Alesund, Svalbard - mirac_p5: Polar 5, Mirac radar - rasta: Falcon, RASTA radar - arm: ARM sites - pamtra: PAMTRA simulations - cloudnet: cloudnet format """ def __init__(self, date, site_name, path, input_radar_format) -> None: """ Reads hourly radar data for a specific site and date with these standardized output variables: - Radar reflectivity (ze) in mm6 m-3 - Mean Doppler velocity (vm) in m s-1 - Range as height above NN (range) in m - Time (time) Parameters ---------- date : pd.Timestamp Radar data will be read for this day site_name : str Name of the radar site (e.g. Mindelo) path : str Directory of radar data. The NetCDF files are expected inside a sub-folder structure starting from path: "path/yyyy/mm/dd/*.nc". Other options are not implemented yet. input_radar_format : str Format of input NetCDF radar data (uoc_v0, uoc_v1, uoc_v2, uoc_geoms, bco, mirac_p5). Default is cloudnet. Otherwise, these formats might be correct: uoc_v0 for nya, uoc_v1 for jue, uoc_v2 for nor and mag, geoms for min, bco for bco, mirac_p5 for mp5. """ self.date = pd.Timestamp(date) self.site_name = site_name self.path = path self.make_date_path() self.ds_rad = xr.Dataset() # defines the reader for each site readers = { "uoc_v0": self.read_uoc_v0, "uoc_v1": self.read_uoc_v1, "uoc_v2": self.read_uoc_v2, "geoms": self.read_geoms, "bco": self.read_bco, "mirac_p5": self.read_mirac_p5, "cloudnet": self.read_cloudnet, "pamtra": self.read_pamtra, "rasta": self.read_rasta, "arm": self.read_arm, } reader = readers.get(input_radar_format) if reader is None: raise NotImplementedError( f"No reader that handles input format {input_radar_format}. " f"Please choose one of {list(readers.keys())}." ) reader() if self.ds_rad == xr.Dataset(): print("No radar data found.") self.ds_rad = None else: print("Vm sign convention: negative=upward, " "positive=downward") self.ds_rad["vm"] = -self.ds_rad["vm"] print(f"Quality checks for {self.site_name} radar data.") # ensure ze and vm variables exist assert "ze" in list(self.ds_rad) assert "vm" in list(self.ds_rad) # ensure same dimension order if "height" in list(self.ds_rad.dims): dim_order = ["time", "height"] else: dim_order = ["time", "range"] self.ds_rad["ze"] = self.ds_rad.ze.transpose(*dim_order) self.ds_rad["vm"] = self.ds_rad.vm.transpose(*dim_order) # ensure reasonable value ranges assert ( self.ds_rad.ze.isnull().all() or self.ds_rad.ze.min() >= 0 ), "Ze out of range." assert self.ds_rad.ze.isnull().all() or ( 10 * np.log10(self.ds_rad.ze.max()) < 100 ), "Ze out of range." assert ( self.ds_rad.vm.isnull().all() or self.ds_rad.vm.min() > -80 ), "Vm values out of range." assert ( self.ds_rad.vm.isnull().all() or self.ds_rad.vm.max() < 80 ), "Vm values out of range." # make sure that alt is in the data assert "alt" in list(self.ds_rad), "Altitude not found."
[docs] def make_date_path(self): """ Creates path with date structure if it exists. Otherwise, uses regular path without date extension. """ date_path = os.path.join( self.path, self.date.strftime(r"%Y"), self.date.strftime(r"%m"), self.date.strftime(r"%d"), ) if os.path.exists(date_path): self.path = date_path elif os.path.exists(self.path): pass # use regular path without date extension else: raise FileNotFoundError( f"The radar data path {self.path} does not exist" )
[docs] def get_all_files(self, pattern): """ Lists all radar files in the directory. Parameters ---------- pattern : str Specific file pattern depending on the product. Returns ------- list list of all files inside the directory """ pattern_path = os.path.join(self.path, pattern) files = sorted(glob(pattern_path)) if len(files) == 0: Warning(f"No files found with pattern: {pattern_path}") return files
[docs] @staticmethod def status_message(i, file, files): """ This message will be printed while reading the radar data. """ print(f"Reading radar file {i+1}/{len(files)}: {file}")
[docs] @staticmethod def remove_duplicate_times(ds): """ Removes duplicate times. Parameters ---------- ds : xr.Dataset Any data with a "time" coordinate. """ _, index = np.unique(ds["time"], return_index=True) ds = ds.isel(time=index) return ds
[docs] def convert_and_sort_time(self, base_time): """ Convert time in seconds since base to np.datetime64 format and sort time. Parameters ---------- base_time : str Base time as string (e.g. "1970-01-01") """ self.ds_rad["time"] = self.ds_rad["time"].astype( "timedelta64[s]" ) + np.datetime64(base_time) self.ds_rad = self.ds_rad.sel(time=np.sort(self.ds_rad.time)) # ensure that time of files matches provided date assert np.abs( self.date.to_datetime64() - self.ds_rad.time ).max() < np.timedelta64(2, "D")
[docs] def read_uoc_v2(self): """ This function reads the radar netCDF files of the RPG w-band radar The data are precessed with the Matlab code of the Uuniversity of Cologne, UoC. They correspond to the level 2 version of the data processing Units of file: ze unit: mm6 m-3 vm unit: m s-1 Note: fill value -999 in attributes not recognized by xarray. """ files = self.get_all_files("*compact_v2.nc") if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file, decode_times=False) as ds: ds.load() ds = self.remove_duplicate_times(ds) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = ds["zsl"] self.convert_and_sort_time(base_time="2001-01-01") # replace fill_value by nan self.ds_rad = self.ds_rad.where(self.ds_rad != -999)
[docs] def read_uoc_v0(self): """ This function reads the radar netCDF files of the RPG w-band radar The data are precessed with the Matlab code of the Uuniversity of Cologne, UoC. They correspond to the level 2 version of the data processing Units of file: ze unit: dBZ vm unit: m s-1 Note: Only one file per day """ files = self.get_all_files("*joyrad94_nya_lv1b_*") if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file, decode_times=False) as ds: ds.load() ds = ds.rename({"height": "range"}) # round times to full seconds ds["time"] = np.around(ds["time"]).astype("int") ds = self.remove_duplicate_times(ds) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = ds["instrument_altitude"] self.convert_and_sort_time(base_time="2001-01-01") # convert from dB to linear units self.ds_rad["ze"] = 10 ** (0.1 * self.ds_rad["ze"])
[docs] def read_uoc_v1(self): """ This function reads the radar netCDF files of the RPG w-band radar The data are precessed with the Matlab code of the Uuniversity of Cologne, UoC. They correspond to the level 2 version of the data processing Units of file: ze unit: mm6 m-3 vm unit: m s-1 Note: Doppler spectra are not read to improve performance. """ files = self.get_all_files("*ZEN_v2.nc") if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) xr_kwds = dict(drop_variables=["sze"], decode_times=False) with xr.open_dataset(file, **xr_kwds) as ds: ds.load() ds = self.remove_duplicate_times(ds) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = ds["zsl"] self.convert_and_sort_time(base_time="2001-01-01")
[docs] def read_geoms(self): """ This function reads the radar netCDF files of the RPG w-band radar The data are precessed with the Matlab code of the Uuniversity of Cologne, UoC. They correspond to the level 2 version of the data processing Units of file: ze unit: mm6 m-3 vm unit: m s-1 Note: fill value -999 in attributes not recognized by xarray. """ files = self.get_all_files("*groundbased_radar_profiler*") if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file, decode_times=False) as ds: ds.load() ds = ds.rename( { "RANGE": "range", "DATETIME": "time", "RADAR.REFLECTIVITY.FACTOR": "ze", "DOPPLER.VELOCITY_MEAN": "vm", } ) ds = self.remove_duplicate_times(ds) # add range and time as coordinates ds.coords["range"] = ds["range"] ds.coords["time"] = ds["time"] # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["LONGITUDE.INSTRUMENT"] self.ds_rad["lat"] = ds["LATITUDE.INSTRUMENT"] self.ds_rad["alt"] = ds["ALTITUDE.INSTRUMENT"] self.convert_and_sort_time(base_time="2001-01-01") # replace fill_value by nan self.ds_rad = self.ds_rad.where(self.ds_rad != -999)
[docs] def read_bco(self): """ This function reads the radar netCDF files of the RPG w-band radar The data are precessed with the Matlab code of the Uuniversity of Cologne, UoC. They correspond to the level 2 version of the data processing Units of file: ze unit: dBZ vm unit: m s-1 Note: Only one file per day """ files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}.nc') if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file, decode_times=False) as ds: ds.load() ds = ds.rename({"Ze": "ze", "VEL": "vm"}) ds = self.remove_duplicate_times(ds) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = np.nan self.convert_and_sort_time(base_time="1970-01-01") # convert from dB to linear units self.ds_rad["ze"] = 10 ** (0.1 * self.ds_rad["ze"]) # apply dBz threshold dbz_threshold = 10 ** (95.5 / 10.0) dbz_noise_level = self.ds_rad["range"] ** 2 / dbz_threshold dbz_noise_level = xr.DataArray( dbz_noise_level, dims="range", coords={"range": self.ds_rad["range"]}, ) self.ds_rad["ze"] = self.ds_rad["ze"].where( self.ds_rad["ze"] > dbz_noise_level ) self.ds_rad["vm"] = self.ds_rad["vm"].where( self.ds_rad["ze"] > dbz_noise_level )
[docs] def read_mirac_p5(self): """ This function reads the radar netCDF files of the RPG W-band radar onboard the Polar 5 aircraft. Units of file: ze unit: dBZ Note: no mean Doppler velocity available. """ files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}*.nc') if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file, decode_times=False) as ds: ds.load() ds = ds.rename({"Ze": "ze"}) ds = self.remove_duplicate_times(ds) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze"]], self.ds_rad], combine_attrs="override" ) # extract instrument location and altitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = ds["alt"] self.convert_and_sort_time(base_time="2017-01-01") # replace fill_value by nan self.ds_rad = self.ds_rad.where(self.ds_rad != -999) # add dummy variable for Doppler velocity self.ds_rad["vm"] = xr.DataArray( np.zeros(self.ds_rad["ze"].shape), dims=["time", "height"], coords={ "time": self.ds_rad["time"], "height": self.ds_rad["height"], }, )
[docs] def read_cloudnet(self): """ Reads radar reflectivity and Doppler velocity from Cloudnet categorize files. Note: Cloudnet height is already in height above mean sea level. """ ds = read_cloudnet( attenuation_correction_input="cloudnet_categorize", date=self.date, site_name=self.site_name, path=self.path, add_date=False, ) ds = ds.rename({"Z": "ze", "v": "vm"}) ds = self.remove_duplicate_times(ds) self.ds_rad = ds[["ze", "vm"]] # extract instrument location and altitude self.ds_rad["lon"] = ds["longitude"] self.ds_rad["lat"] = ds["latitude"] self.ds_rad["alt"] = ds["altitude"] # convert from dB to linear units self.ds_rad["ze"] = 10 ** (0.1 * self.ds_rad["ze"]) # set inf ze to nan self.ds_rad["ze"] = self.ds_rad["ze"].where( self.ds_rad["ze"] != np.inf ) # set very low vm to nan self.ds_rad["vm"] = self.ds_rad["vm"].where(self.ds_rad["vm"] > -500) self.ds_rad["vm"] = self.ds_rad["vm"].where(self.ds_rad["vm"] < 500)
[docs] def read_pamtra(self): """ Reads PAMTRA simulation for a point location as a function of time. Attenuation: The output radar reflectivity contains attenuation for bottom-up view (groundbased radar), and additionally one radar reflectivity without attenuation. Convention of output: ze: radar reflectivity without attenuation ze_top_down: radar reflectivity with attenuation for top-down view ze_bottom_up: radar reflectivity with attenuation for bottom-up view Units of file: ze unit: dBZ vm unit: m s-1 """ files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}*v1.nc') if len(files) == 0: files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}*v0.nc') if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file) as ds: ds.load() # currently, only 94 GHz simulations are supported assert ds.grid_y.size == 1 ds = ds.isel(grid_y=0) # change heightbins to actual height values assert (ds.height.diff("grid_x") == 0).all() ds["height"] = ds["height"].isel(grid_x=0) ds = ds.swap_dims({"heightbins": "height"}) # rename variables ds = ds.rename( { "Ze": "ze", "Radar_MeanDopplerVel": "vm", "datatime": "time", "longitude": "lon", "latitude": "lat", } ) assert ds.frequency == 94 assert ds.radar_polarisation.size == 1 assert ds.radar_peak_number.size == 1 ds["ze"] = ds["ze"].isel( frequency=0, radar_polarisation=0, radar_peak_number=0 ) ds["vm"] = ds["vm"].isel( frequency=0, radar_polarisation=0, radar_peak_number=0 ) ds = ds.swap_dims({"grid_x": "time"}) ds = ds.reset_coords() # check if attenuation correction was performed by pamtra props = { p.split(": ")[0]: p.split(": ")[1] for p in ds.attrs["properties"] .replace("'", "")[1:-1] .split(", ") } # two-way attenuation handling da_att = 2 * ( ds.Attenuation_Hydrometeors.isel(frequency=0) + ds.Attenuation_Atmosphere.isel(frequency=0) ) da_att_bu = da_att.cumsum("height") da_att_td = ( da_att.sel(height=np.flip(ds.height)) .cumsum("height") .sel(height=ds.height) ) # radar reflectivity contains no attenuation if props["radar_attenuation"] == "disabled": pass # radar reflectivity contains attenuation for top-down view elif props["radar_attenuation"] == "top-down": ds["ze"] = ds["ze"] + da_att_td # radar reflectivity contains attenuation for bottom-up view elif props["radar_attenuation"] == "bottom-up": ds["ze"] = ds["ze"] + da_att_bu else: raise ValueError( f"Attenuation correction {props['radar_attenuation']} not " "supported." ) ds["ze_bottom_up"] = ds["ze"] - da_att_bu ds["ze_top_down"] = ds["ze"] - da_att_td # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm", "ze_bottom_up", "ze_top_down"]], self.ds_rad], combine_attrs="override", ) # add longitude and latitude assert (ds["lon"].diff("time") == 0).all() assert (ds["lat"].diff("time") == 0).all() self.ds_rad["lon"] = ds["lon"].isel(time=0).reset_coords(drop=True) self.ds_rad["lat"] = ds["lat"].isel(time=0).reset_coords(drop=True) if "alt" not in list(self.ds_rad): print("No altitude found in PAMTRA file. Setting alt to 0 m.") self.ds_rad["alt"] = 0 # convert from dB to linear units self.ds_rad["ze"] = 10 ** (0.1 * self.ds_rad["ze"]) self.ds_rad["ze_bottom_up"] = 10 ** (0.1 * self.ds_rad["ze_bottom_up"]) self.ds_rad["ze_top_down"] = 10 ** (0.1 * self.ds_rad["ze_top_down"])
[docs] def read_rasta(self): """ This function reads the radar netCDF files of the airborne RASTA radar. The time-dependent height is interpolated onto a regular height grid. No correction for gas attenuation is applied (gaseous_twowayatt). The horizontal aircraft speed is also read. The following two flags are applied to ze and vm: First flag array: flag (1 and 2 are removed) -4: nadir not available -3: Last gates not valid -2: First gates not valid -1: beyond maximum range 0: no cloud 1: cloud or precipitation 2: possible cloud 3: ground echo 4: ghost ground echo (downward domain) 5: ghost ground echo (upward domain) 6: underground signal 7: underground noise 8: Z can be interpolated Second flag array: flag_Z_interpolated (2 and 3 are removed) 0: no interpolation 1: interpolated but data available 2: interpolated and no data available 3: ghost ground echo (upward) is interpolated Units of file: ze unit: dBZ vm unit: m s-1 ac_speed unit: m s-1 Note: only the vertical-pointing antennas are used. """ files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}*.nc') if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file) as ds: ds.load() ds = self.remove_duplicate_times(ds) # filter data keep_data = ( (ds.flag != 1) & (ds.flag != 2) & (ds.flag_Z_interpolated != 2) & (ds.flag_Z_interpolated != 3) ) ds["Z_interpolated"] = ds["Z_interpolated"].where(keep_data) ds["Vz"] = ds["Vz"].where(keep_data) # change doppler velocity from positive upward to negative upward ds["Vz"] = -ds["Vz"] # rename variables ds = ds.rename( { "longitude": "lon", "latitude": "lat", "height": "height_index", # this is not actual height "aircraft_vh": "ac_speed", "altitude": "alt", } ) # interpolate onto regular height grid ds.coords["height"] = np.arange(-2000, 15000, 60) ze_height = np.zeros((len(ds["time"]), len(ds["height"]))) vm_height = np.zeros((len(ds["time"]), len(ds["height"]))) for i in range(len(ds.time)): # interpolate ze in linear space f_ze = interp1d( ds["height_2D"].isel(time=i).values * 1e3, 10 ** (0.1 * ds["Z_interpolated"].isel(time=i).values), kind="linear", fill_value=np.nan, bounds_error=False, ) ze_height[i, :] = f_ze(ds["height"].values) # interpolate vm f_vm = interp1d( ds["height_2D"].isel(time=i).values * 1e3, ds["Vz"].isel(time=i).values, kind="linear", fill_value=np.nan, bounds_error=False, ) vm_height[i, :] = f_vm(ds["height"].values) # add variables to dataset and name them ze and vm ds["ze"] = xr.DataArray( ze_height, dims=["time", "height"], coords={"time": ds["time"], "height": ds["height"]}, ) ds["vm"] = xr.DataArray( vm_height, dims=["time", "height"], coords={"time": ds["time"], "height": ds["height"]}, ) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ ds[["ze", "vm", "lon", "lat", "alt", "ac_speed"]], self.ds_rad, ], combine_attrs="override", ) self.convert_and_sort_time(base_time=self.date)
[docs] def read_arm(self): """ Reader for ARM radar data. The reader uses the best estimate of the radar reflectivity. Units of file: ze unit: dBZ vm unit: m s-1 """ files = self.get_all_files(f'*{self.date.strftime(r"%Y%m%d")}*.cdf') if len(files) == 0: return None for i, file in enumerate(files): self.status_message(i, file, files) with xr.open_dataset(file) as ds: ds.load() ds = self.remove_duplicate_times(ds) ds = ds.rename( { "mean_doppler_velocity": "vm", "reflectivity_best_estimate": "ze", } ) # override keeps attributes from the last file opened self.ds_rad = xr.merge( [ds[["ze", "vm"]], self.ds_rad], combine_attrs="override" ) # add longitude and latitude self.ds_rad["lon"] = ds["lon"] self.ds_rad["lat"] = ds["lat"] self.ds_rad["alt"] = ds["alt"] self.ds_rad["ze"] = 10 ** (0.1 * self.ds_rad["ze"])