Source code for oceans.filters

import numpy as np


[docs] def lanc(numwt, haf): """ Generates a numwt + 1 + numwt lanczos cosine low pass filter with -6dB (1/4 power, 1/2 amplitude) point at haf Parameters ---------- numwt : int number of points haf : float frequency (in 'cpi' of -6dB point, 'cpi' is cycles per interval. For hourly data cpi is cph, Examples -------- >>> from oceans.filters import lanc >>> import matplotlib.pyplot as plt >>> t = np.arange(500) # Time in hours. >>> h = 2.5 * np.sin(2 * np.pi * t / 12.42) >>> h += 1.5 * np.sin(2 * np.pi * t / 12.0) >>> h += 0.3 * np.random.randn(len(t)) >>> wt = lanc(96 + 1 + 96, 1.0 / 40) >>> low = np.convolve(wt, h, mode="same") >>> high = h - low >>> fig, (ax0, ax1) = plt.subplots(nrows=2) >>> _ = ax0.plot(high, label="high") >>> _ = ax1.plot(low, label="low") >>> _ = ax0.legend(numpoints=1) >>> _ = ax1.legend(numpoints=1) """ summ = 0 numwt += 1 wt = np.zeros(numwt) # Filter weights. ii = np.arange(numwt) wt = 0.5 * (1.0 + np.cos(np.pi * ii * 1.0 / numwt)) ii = np.arange(1, numwt) xx = np.pi * 2 * haf * ii wt[1 : numwt + 1] = wt[1 : numwt + 1] * np.sin(xx) / xx summ = wt[1 : numwt + 1].sum() xx = wt.sum() + summ wt /= xx return np.r_[wt[::-1], wt[1 : numwt + 1]]
[docs] def smoo1(datain, window_len=11, window="hanning"): """ Smooth the data using a window with requested size. Parameters ---------- datain : array_like input series window_len : int size of the smoothing window; should be an odd integer window : str window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'. flat window will produce a moving average smoothing. Returns ------- data_out : array_like smoothed signal See Also -------- scipy.signal.lfilter Notes ----- original from: https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the beginning and end part of the output signal. Examples -------- >>> from oceans.filters import smoo1 >>> import numpy as np >>> import matplotlib.pyplot as plt >>> time = np.linspace(-4, 4, 100) >>> series = np.sin(time) >>> noise_series = series + np.random.randn(len(time)) * 0.1 >>> data_out = smoo1(series) >>> ws = 31 >>> ax = plt.subplot(211) >>> _ = ax.plot(np.ones(ws)) >>> windows = ["flat", "hanning", "hamming", "bartlett", "blackman"] >>> for w in windows[1:]: ... _ = eval("plt.plot(np." + w + "(ws) )") >>> _ = ax.axis([0, 30, 0, 1.1]) >>> leg = ax.legend(windows) >>> _ = plt.title("The smoothing windows") >>> ax = plt.subplot(212) >>> (l1,) = ax.plot(series) >>> (l2,) = ax.plot(noise_series) >>> for w in windows: ... _ = plt.plot(smoo1(noise_series, 10, w)) ... >>> l = ["original signal", "signal with noise"] >>> l.extend(windows) >>> leg = ax.legend(l) >>> _ = plt.title("Smoothing a noisy signal") TODO: window parameter can be the window itself (i.e. an array) instead of a string. """ datain = np.asarray(datain) if datain.ndim != 1: raise ValueError("Smooth only accepts 1 dimension arrays.") if datain.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len < 3: return datain if window not in ["flat", "hanning", "hamming", "bartlett", "blackman"]: msg = "Window must be is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" # noqa raise ValueError(msg) s = np.r_[ 2 * datain[0] - datain[window_len:1:-1], datain, 2 * datain[-1] - datain[-1:-window_len:-1], ] if window == "flat": # Moving average. w = np.ones(window_len, "d") else: w = eval("np." + window + "(window_len)") data_out = np.convolve(w / w.sum(), s, mode="same") return data_out[window_len - 1 : -window_len + 1]
[docs] def smoo2(A, hei, wid, kind="hann", badflag=-9999, beta=14): """ Calculates the smoothed array 'As' from the original array 'A' using the specified window of type 'kind' and shape ('hei', 'wid'). Usage: As = smoo2(A, hei, wid, kind='hann', badflag=-9999, beta=14) Parameters ---------- A : 2D array Array to be smoothed. hei : integer Window height. Must be odd and greater than or equal to 3. wid : integer Window width. Must be odd and greater than or equal to 3. kind : string, optional Refer to Numpy for details about each window type. badflag : float, optional The bad data flag. Elements of the input array 'A' holding this value are ignored. beta : float, optional Shape parameter for the kaiser window. Returns ------- As : 2D array The smoothed array. André Palóczy Filho (paloczy@gmail.com) April 2012 """ # Checking window type and dimensions kinds = ["hann", "hamming", "blackman", "bartlett", "kaiser"] if kind not in kinds: raise ValueError(f"Invalid window type requested: {kind}") if (np.mod(hei, 2) == 0) or (np.mod(wid, 2) == 0): raise ValueError("Window dimensions must be odd") if (hei <= 1) or (wid <= 1): raise ValueError("Window shape must be (3,3) or greater") # Creating the 2D window. if kind == "kaiser": # If the window kind is kaiser (beta is required). wstr = "np.outer(np.kaiser(hei, beta), np.kaiser(wid, beta))" # If the window kind is hann, hamming, blackman or bartlett # (beta is not required). else: if kind == "hann": # Converting the correct window name (Hann) to the numpy function # name (numpy.hanning). kind = "hanning" # Computing outer product to make a 2D window out of the original 1d # windows. # TODO: Get rid of this evil eval. wstr = "np.outer(np." + kind + "(hei), np." + kind + "(wid))" wdw = eval(wstr) A = np.asanyarray(A) Fnan = np.isnan(A) imax, jmax = A.shape As = np.nan * np.ones((imax, jmax)) for i in range(imax): for j in range(jmax): # Default window parameters. wupp = 0 wlow = hei wlef = 0 wrig = wid lh = np.floor(hei / 2) lw = np.floor(wid / 2) # Default array ranges (functions of the i, j indices). upp = i - lh low = i + lh + 1 lef = j - lw rig = j + lw + 1 # Tiling window and input array at the edges. # Upper edge. if upp < 0: wupp = wupp - upp upp = 0 # Left edge. if lef < 0: wlef = wlef - lef lef = 0 # Bottom edge. if low > imax: ex = low - imax wlow = wlow - ex low = imax # Right edge. if rig > jmax: ex = rig - jmax wrig = wrig - ex rig = jmax # Computing smoothed value at point (i, j). Ac = A[upp:low, lef:rig] wdwc = wdw[wupp:wlow, wlef:wrig] fnan = np.isnan(Ac) Ac[fnan] = 0 wdwc[fnan] = 0 # Eliminating NaNs from mean computation. fbad = Ac == badflag wdwc[fbad] = 0 # Eliminating bad data from mean computation. a = Ac * wdwc As[i, j] = a.sum() / wdwc.sum() # Assigning NaN to the positions holding NaNs in the original array. As[Fnan] = np.nan return As
[docs] def weim(x, N, kind="hann", badflag=-9999, beta=14): """ Calculates the smoothed array 'xs' from the original array 'x' using the specified window of type 'kind' and size 'N'. 'N' must be an odd number. Usage: xs = weim(x, N, kind='hann', badflag=-9999, beta=14) Parameters ---------- x : 1D array Array to be smoothed. N : integer Window size. Must be odd. kind : string, optional Refer to Numpy for details about each window type. badflag : float, optional The bad data flag. Elements of the input array 'A' holding this value are ignored. beta : float, optional Shape parameter for the kaiser window. For windows other than the kaiser window, this parameter does nothing. Returns ------- xs : 1D array The smoothed array. --------------------------------------- André Palóczy Filho (paloczy@gmail.com) June 2012 """ # Checking window type and dimensions. kinds = ["hann", "hamming", "blackman", "bartlett", "kaiser"] if kind not in kinds: raise ValueError(f"Invalid window type requested: {kind}") if np.mod(N, 2) == 0: raise ValueError("Window size must be odd") # Creating the window. if kind == "kaiser": # If the window kind is kaiser (beta is required). wstr = "np.kaiser(N, beta)" # If the window kind is hann, hamming, blackman or bartlett (beta is not # required). else: if kind == "hann": # Converting the correct window name (Hann) to the numpy function # name (numpy.hanning). kind = "hanning" # Computing outer product to make a 2D window out of the original # 1D windows. wstr = "np." + kind + "(N)" # FIXME: Do not use `eval`. w = eval(wstr) x = np.asarray(x).flatten() Fnan = np.isnan(x).flatten() ln = (N - 1) / 2 lx = x.size lf = lx - ln xs = np.nan * np.ones(lx) # Eliminating bad data from mean computation. fbad = x == badflag x[fbad] = np.nan for i in range(lx): if i <= ln: xx = x[: ln + i + 1] ww = w[ln - i :] elif i >= lf: xx = x[i - ln :] ww = w[: lf - i - 1] else: xx = x[i - ln : i + ln + 1] ww = w.copy() # Counting only NON-NaNs, both in the input array and in the window # points. f = ~np.isnan(xx) xx = xx[f] ww = ww[f] # Thou shalt not divide by zero. if f.sum() == 0: xs[i] = x[i] else: xs[i] = np.sum(xx * ww) / np.sum(ww) # Assigning NaN to the positions holding NaNs in the input array. xs[Fnan] = np.nan return xs
[docs] def medfilt1(x, L=3): """ Median filter for 1d arrays. Performs a discrete one-dimensional median filter with window length `L` to input vector `x`. Produces a vector the same size as `x`. Boundaries are handled by shrinking `L` at edges; no data outside of `x` is used in producing the median filtered output. Parameters ---------- x : array_like Input 1D data L : integer Window length Returns ------- xout : array_like Numpy 1d array of median filtered result; same size as x Examples -------- >>> from oceans.filters import medfilt1 >>> import matplotlib.pyplot as plt >>> # 100 pseudo-random integers ranging from 1 to 100, plus three large >>> # outliers for illustration. >>> x = np.r_[ ... np.ceil(np.random.rand(25) * 100), ... [1000], ... np.ceil(np.random.rand(25) * 100), ... [2000], ... np.ceil(np.random.rand(25) * 100), ... [3000], ... np.ceil(np.random.rand(25) * 100), ... ] >>> L = 2 >>> xout = medfilt1(x=x, L=L) >>> ax = plt.subplot(211) >>> l1, l2 = ax.plot(x), ax.plot(xout) >>> ax.grid(True) >>> y1min, y1max = np.min(xout) * 0.5, np.max(xout) * 2.0 >>> leg1 = ax.legend(["x (pseudo-random)", "xout"]) >>> t1 = ax.set_title( ... '''Median filter with window length %s. ... Removes outliers, tracks remaining signal)''' ... % L ... ) >>> L = 103 >>> xout = medfilt1(x=x, L=L) >>> ax = plt.subplot(212) >>> ( ... l1, ... l2, ... ) = ax.plot( ... x ... ), ax.plot(xout) >>> ax.grid(True) >>> y2min, y2max = np.min(xout) * 0.5, np.max(xout) * 2.0 >>> leg2 = ax.legend(["Same x (pseudo-random)", "xout"]) >>> t2 = ax.set_title( ... '''Median filter with window length %s. ... Removes outliers and noise''' ... % L ... ) >>> ax = plt.subplot(211) >>> lims1 = ax.set_ylim([min(y1min, y2min), max(y1max, y2max)]) >>> ax = plt.subplot(212) >>> lims2 = ax.set_ylim([min(y1min, y2min), max(y1max, y2max)]) """ xin = np.atleast_1d(np.asanyarray(x)) N = len(x) L = int(L) # Ensure L is odd integer so median requires no interpolation. if L % 2 == 0: L += 1 if N < 2: raise ValueError("Input sequence must be >= 2.") return None if L < 2: raise ValueError("Input filter window length must be >=2.") return None if L > N: msg = "Input filter window length must be shorter than series: L = {:d}, len(x) = {:d}".format # noqa raise ValueError(msg(L, N)) return None if xin.ndim > 1: msg = "Input sequence has to be 1d: ndim = {}".format raise ValueError(msg(xin.ndim)) xout = np.zeros_like(xin) + np.nan Lwing = (L - 1) // 2 # NOTE: Use np.ndenumerate in case I expand to +1D case for i, _xi in enumerate(xin): if i < Lwing: # Left boundary. xout[i] = np.median(xin[0 : i + Lwing + 1]) # (0 to i + Lwing) elif i >= N - Lwing: # Right boundary. xout[i] = np.median(xin[i - Lwing : N]) # (i-Lwing to N-1) else: # Middle (N-2*Lwing input vector and filter window overlap). xout[i] = np.median(xin[i - Lwing : i + Lwing + 1]) # (i-Lwing to i+Lwing) return xout
[docs] def fft_lowpass(signal, low, high): """ Performs a low pass filer on the series. low and high specifies the boundary of the filter. >>> from oceans.filters import fft_lowpass >>> import matplotlib.pyplot as plt >>> t = np.arange(500) # Time in hours. >>> x = 2.5 * np.sin(2 * np.pi * t / 12.42) >>> x += 1.5 * np.sin(2 * np.pi * t / 12.0) >>> x += 0.3 * np.random.randn(len(t)) >>> filtered = fft_lowpass(x, low=1 / 30, high=1 / 40) >>> fig, ax = plt.subplots() >>> (l1,) = ax.plot(t, x, label="original") >>> (l2,) = ax.plot(t, filtered, label="filtered") >>> legend = ax.legend() """ if len(signal) % 2: result = np.fft.rfft(signal, len(signal)) else: result = np.fft.rfft(signal) freq = np.fft.fftfreq(len(signal))[: len(signal) // 2 + 1] factor = np.ones_like(freq) factor[freq > low] = 0.0 sl = np.logical_and(high < freq, freq < low) a = factor[sl] # Create float array of required length and reverse. a = np.arange(len(a) + 2).astype(float)[::-1] # Ramp from 1 to 0 exclusive. a = (a / a[0])[1:-1] # Insert ramp into factor. factor[sl] = a result = result * factor return np.fft.irfft(result, len(signal))
[docs] def md_trenberth(x): """ Returns the filtered series using the Trenberth filter as described on Monthly Weather Review, vol. 112, No. 2, Feb 1984. Input data: series x of dimension 1Xn (must be at least dimension 11) Output data: y = md_trenberth(x) where y has dimension 1X(n-10) Examples -------- >>> from oceans.filters import md_trenberth >>> import matplotlib.pyplot as plt >>> t = np.arange(500) # Time in hours. >>> x = 2.5 * np.sin(2 * np.pi * t / 12.42) >>> x += 1.5 * np.sin(2 * np.pi * t / 12.0) >>> x += 0.3 * np.random.randn(len(t)) >>> filtered = md_trenberth(x) >>> fig, ax = plt.subplots() >>> (l1,) = ax.plot(t, x, label="original") >>> pad = [np.nan] * 5 >>> (l2,) = ax.plot(t, np.r_[pad, filtered, pad], label="filtered") >>> legend = ax.legend() """ x = np.asanyarray(x) weight = np.array( [ 0.02700, 0.05856, 0.09030, 0.11742, 0.13567, 0.14210, 0.13567, 0.11742, 0.09030, 0.05856, 0.02700, ], ) sz = len(x) y = np.zeros(sz - 10) for i in range(5, sz - 5): y[i - 5] = 0 for j in range(11): y[i - 5] = y[i - 5] + x[i - 6 + j + 1] * weight[j] return y
[docs] def pl33tn(x, dt=1.0, T=33.0, mode="valid", t=None): """ Computes low-passed series from `x` using pl33 filter, with optional sample interval `dt` (hours) and filter half-amplitude period T (hours) as input for non-hourly series. The PL33 filter is described on p. 21, Rosenfeld (1983), WHOI Technical Report 85-35. Filter half amplitude period = 33 hrs., half power period = 38 hrs. The time series x is folded over and cosine tapered at each end to return a filtered time series xf of the same length. Assumes length of x greater than 67. Can input a DataArray and use dask-supported for lazy execution. In that use case, dt is ignored and calculated from the input DataArray. cf-xarray is also required. Examples -------- >>> from oceans.filters import pl33tn >>> import matplotlib.pyplot as plt >>> t = np.arange(500) # Time in hours. >>> x = 2.5 * np.sin(2 * np.pi * t / 12.42) >>> x += 1.5 * np.sin(2 * np.pi * t / 12.0) >>> x += 0.3 * np.random.randn(len(t)) >>> filtered_33 = pl33tn(x, dt=4.0) # 33 hr filter >>> filtered_33d3 = pl33tn(x, dt=4.0, T=72.0) # 3 day filter >>> fig, ax = plt.subplots() >>> (l1,) = ax.plot(t, x, label="original") >>> pad = [np.nan] * 8 >>> (l2,) = ax.plot(t, np.r_[pad, filtered_33, pad], label="33 hours") >>> pad = [np.nan] * 17 >>> (l3,) = ax.plot(t, np.r_[pad, filtered_33d3, pad], label="3 days") >>> legend = ax.legend() """ import cf_xarray # noqa: F401 import pandas as pd import xarray as xr if isinstance(x, xr.Dataset | pd.DataFrame): raise TypeError("Input a DataArray not a Dataset, or a Series not a DataFrame.") if isinstance(x, pd.Series) and not isinstance( x.index, pd.core.indexes.datetimes.DatetimeIndex, ): raise TypeError("Input Series needs to have parsed datetime indices.") # find dt in units of hours if isinstance(x, xr.DataArray): dt = (x.cf["T"][1] - x.cf["T"][0]) / np.timedelta64( 3_600_000_000_000, ) elif isinstance(x, pd.Series): dt = (x.index[1] - x.index[0]) / pd.Timedelta("1H") pl33 = np.array( [ -0.00027, -0.00114, -0.00211, -0.00317, -0.00427, -0.00537, -0.00641, -0.00735, -0.00811, -0.00864, -0.00887, -0.00872, -0.00816, -0.00714, -0.00560, -0.00355, -0.00097, +0.00213, +0.00574, +0.00980, +0.01425, +0.01902, +0.02400, +0.02911, +0.03423, +0.03923, +0.04399, +0.04842, +0.05237, +0.05576, +0.05850, +0.06051, +0.06174, +0.06215, +0.06174, +0.06051, +0.05850, +0.05576, +0.05237, +0.04842, +0.04399, +0.03923, +0.03423, +0.02911, +0.02400, +0.01902, +0.01425, +0.00980, +0.00574, +0.00213, -0.00097, -0.00355, -0.00560, -0.00714, -0.00816, -0.00872, -0.00887, -0.00864, -0.00811, -0.00735, -0.00641, -0.00537, -0.00427, -0.00317, -0.00211, -0.00114, -0.00027, ], ) _dt = np.linspace(-33, 33, 67) dt = float(dt) * (33.0 / T) filter_time = np.arange(0.0, 33.0, dt, dtype="d") Nt = len(filter_time) filter_time = np.hstack((-filter_time[-1:0:-1], filter_time)) pl33 = np.interp(filter_time, _dt, pl33) pl33 /= pl33.sum() if isinstance(x, xr.DataArray): x = x.interpolate_na(dim=x.cf["T"].name) weight = xr.DataArray(pl33, dims=["window"]) xf = ( x.rolling({x.cf["T"].name: len(pl33)}, center=True) .construct({x.cf["T"].name: "window"}) .dot(weight, dims="window") ) # update attrs attrs = { key: f"{value}, filtered" for key, value in x.attrs.items() if key != "units" } xf.attrs = attrs elif isinstance(x, pd.Series): xf = x.to_frame().apply(np.convolve, v=pl33, mode=mode) # nan out edges which are not good values anyway if mode == "same": xf[: Nt - 1] = np.nan xf[-Nt + 2 :] = np.nan else: # use numpy xf = np.convolve(x, pl33, mode=mode) # times to match xf if t is not None: # Nt = len(filter_time) tf = t[Nt - 1 : -Nt + 1] return xf, tf # nan out edges which are not good values anyway if mode == "same": xf[: Nt - 1] = np.nan xf[-Nt + 2 :] = np.nan return xf