Source code for gridgeo.cfvariable

import numpy as np

from gridgeo.ugrid import ugrid


def _make_grid(coords):
    if coords.ndim != 3:
        raise ValueError(f"Expected 3 dimension array, got {coords.ndim}.")
    M, N, L = coords.shape
    polygons = np.concatenate(
        (
            coords[0:-1, 0:-1],
            coords[0:-1, 1:],
            coords[1:, 1:],
            coords[1:, 0:-1],
        ),
        axis=L,
    )

    polygons = polygons.reshape(((M - 1) * (N - 1), 4, L))
    return [p for p in polygons if not np.isnan(p).any()]


def _filled_masked(arr):
    if hasattr(arr, "filled"):
        return arr.filled(fill_value=np.NaN)
    else:
        return arr


[docs]class CFVariable(object): def __init__(self, nc, **kwargs): """ FIXME: this should be done when slicing a CFVariable in pocean-core. This class is only a temporary workaround until something better is created. """ self._nc = nc variables = self._nc.get_variables_by_attributes(**kwargs) if len(variables) > 1: raise ValueError( f"Found more than 1 variable with criteria {kwargs}" ) elif not variables: raise ValueError( f"Could not find any variables with criteria {kwargs}" ) else: self._variable = variables[0] self._coords = self._variable.coordinates.split() def _filter_coords(self, variables): valid_coords = [] for var in variables: if var.name in self._coords: valid_coords.append(var) if len(valid_coords) != 1: raise ValueError(f"Expected a single coord, got '{valid_coords}'.") return valid_coords[0]
[docs] def axis(self, name): return getattr(self, "{}_axis".format(name.lower()))()
[docs] def t_axis(self): tvars = list( set( self._nc.get_variables_by_attributes( axis=lambda x: x and str(x).lower() == "t" ) + self._nc.get_variables_by_attributes( standard_name=lambda x: str(x) in [ "time" ] # We don't want coords `forecast_reference_time`. ) + self._nc.get_variables_by_attributes( _CoordinateAxisType=lambda x: str(x).lower() == "time" ) ) ) # _CoordinateAxisType: Time return self._filter_coords(tvars)
[docs] def crs(self): crs = getattr(self._variable, "grid_mapping", None) if crs: crs = self._nc[crs] return crs
[docs] def x_axis(self): xnames = ["longitude", "grid_longitude", "projection_x_coordinate"] xunits = [ "degrees_east", "degree_east", "degree_E", "degrees_E", "degreeE", "degreesE", ] xvars = list( set( self._nc.get_variables_by_attributes( axis=lambda x: x and str(x).lower() == "x" ) + self._nc.get_variables_by_attributes( standard_name=lambda x: x and str(x).lower() in xnames ) + self._nc.get_variables_by_attributes( units=lambda x: x and str(x).lower() in xunits ) ) ) return self._filter_coords(xvars)
[docs] def y_axis(self): ynames = ["latitude", "grid_latitude", "projection_y_coordinate"] yunits = [ "degrees_north", "degree_north", "degree_N", "degrees_N", "degreeN", "degreesN", ] yvars = list( set( self._nc.get_variables_by_attributes( axis=lambda x: x and str(x).lower() == "y" ) + self._nc.get_variables_by_attributes( standard_name=lambda x: x and str(x).lower() in ynames ) + self._nc.get_variables_by_attributes( units=lambda x: x and str(x).lower() in yunits ) ) ) return self._filter_coords(yvars)
[docs] def z_axis(self): znames = [ "atmosphere_ln_pressure_coordinate", "atmosphere_sigma_coordinate", "atmosphere_hybrid_sigma_pressure_coordinate", "atmosphere_hybrid_height_coordinate", "atmosphere_sleve_coordinate", "ocean_sigma_coordinate", "ocean_s_coordinate", "ocean_s_coordinate_g1", "ocean_s_coordinate_g2", "ocean_sigma_z_coordinate", "ocean_double_sigma_coordinate", ] zvars = list( set( self._nc.get_variables_by_attributes( axis=lambda x: x and str(x).lower() == "z" ) + self._nc.get_variables_by_attributes( positive=lambda x: x and str(x).lower() in ["up", "down"] ) + self._nc.get_variables_by_attributes( standard_name=lambda x: x and str(x).lower() in znames ) ) ) return self._filter_coords(zvars)
[docs] def topology(self): vnames = ["grid_topology", "mesh_topology"] topologies = self._nc.get_variables_by_attributes( cf_role=lambda v: v in vnames ) if not topologies: if self.x_axis().ndim == 1 and self.y_axis().ndim == 1: return "unknown_1d" elif self.x_axis().ndim == 2 and self.y_axis().ndim == 2: return "unknown_2d" else: raise ValueError( f"Could not identify the topology for {self._nc}." ) if topologies and len(topologies) > 1: raise ValueError( f"Expected 1 topology variable, got {len(topologies)}." ) mesh = topologies[0] dims = getattr(mesh, "topology_dimension", None) cf_role = getattr(mesh, "cf_role", None) if cf_role == "mesh_topology" and dims in (1, 2): return "ugrid" if cf_role == "grid_topology" and dims == 2: return "sgrid"
[docs] def polygons(self): if self.topology() == "ugrid": grid = ugrid(self._nc) node_x = grid["nodes"]["x"] node_y = grid["nodes"]["y"] faces = grid["faces"] return [(list(zip(node_x[k], node_y[k]))) for k in faces] if self.topology() == "sgrid": x, y = self.x_axis()[:], self.y_axis()[:] coords = np.concatenate([x[..., None], y[..., None]], axis=2) return _make_grid(coords) if self.topology() == "unknown_1d": x, y = self.x_axis()[:], self.y_axis()[:] # Some non-compliant grids, like NYHOPS, may have missing_value/fill_value. x = _filled_masked(x) y = _filled_masked(y) if hasattr(y, "filled"): y = y.filled(fill_value=np.NaN) x, y = np.meshgrid(x, y) coords = np.stack([x, y], axis=2) return _make_grid(coords) if self.topology() == "unknown_2d": x, y = self.x_axis()[:], self.y_axis()[:] # Some non-compliant grids, like NYHOPS, may have missing_value/fill_value. x = _filled_masked(x) y = _filled_masked(y) coords = np.concatenate([x[..., None], y[..., None]], axis=2) return _make_grid(coords)
# Replication of the `netCDF4.Variable` object via composition. def __getitem__(self, key): return self._variable.__getitem__(key) def __repr__(self): return self._variable.__repr__() @property def units(self): return self._variable.units @property def standard_name(self): return self._variable.standard_name @property def long_name(self): return self._variable.long_name @property def coordinates(self): return self._variable.coordinates @property def ndim(self): return self._variable.ndim @property def size(self): return self._variable.size @property def shape(self): return self._variable.shape @property def scale(self): return self._variable.scale @property def datatype(self): return self._variable.datatype @property def dimensions(self): return self._variable.dimensions @property def dtype(self): return self._variable.dtype @property def field(self): return self._variable.field @property def name(self): return self._variable.name @property def mask(self): return self._variable.mask @property def _FillValue(self): return self._variable._FillValue @property def _ChunkSizes(self): return self._variable._ChunkSizes @property def chartostring(self): return self._variable.chartostring
[docs] def chunking(self): return self._variable.chunking()
[docs] def endian(self): return self._variable.endian()
[docs] def filters(self): return self._variable.filters()
[docs] def group(self): return self._variable.group()
[docs] def ncattrs(self): return self._variable.ncattrs()