"""
Processing module
"""
import numpy as np
import numpy.ma as ma
import pandas as pd
from pandas_flavor import register_dataframe_method, register_series_method
def _rolling_window(data, block):
"""
http://stackoverflow.com/questions/4936620/
Using strides for an efficient moving average filter.
"""
shape = data.shape[:-1] + (data.shape[-1] - block + 1, block)
strides = data.strides + (data.strides[-1],)
return np.lib.stride_tricks.as_strided(data, shape=shape, strides=strides)
[docs]
@register_series_method
@register_dataframe_method
def remove_above_water(df):
"""Remove all data above the water line."""
return remove_up_to(df, idx=0)
[docs]
@register_series_method
@register_dataframe_method
def remove_up_to(df, idx):
"""
Remove all the data above a certain index value where index can be pressure or depth.
"""
new_df = df.copy()
return new_df[new_df.index >= idx]
[docs]
@register_series_method
@register_dataframe_method
def split(df):
"""Returns a tuple with down/up-cast."""
idx = df.index.argmax() + 1
down = df.iloc[:idx]
# Reverse index to orient it as a CTD cast.
up = df.iloc[idx:][::-1]
return down, up
[docs]
@register_series_method
@register_dataframe_method
def lp_filter(df, sample_rate=24.0, time_constant=0.15):
"""
Filter a series with `time_constant` (use 0.15 s for pressure), and for
a signal of `sample_rate` in Hertz (24 Hz for 911+).
NOTE: 911+ systems do not require filter for temperature nor salinity.
Examples
--------
>>> from pathlib import Path
>>> import matplotlib.pyplot as plt
>>> import ctd
>>> data_path = Path(__file__).parents[1].joinpath("tests", "data")
>>> raw = ctd.from_cnv(data_path.joinpath("CTD-spiked-unfiltered.cnv.bz2"))
>>> prc = ctd.from_cnv(data_path.joinpath("CTD-spiked-filtered.cnv.bz2"))
>>> kw = {"sample_rate": 24.0, "time_constant": 0.15}
>>> original = prc.index.values
>>> unfiltered = raw.index.values
>>> filtered = raw.lp_filter(**kw).index.values
>>> fig, ax = plt.subplots()
>>> (l1,) = ax.plot(original, "k", label="original")
>>> (l2,) = ax.plot(unfiltered, "r", label="unfiltered")
>>> (l3,) = ax.plot(filtered, "g", label="filtered")
>>> leg = ax.legend()
Notes
-----
https://scipy-cookbook.readthedocs.io/items/FIRFilter.html
"""
from scipy import signal
# Butter is closer to what SBE is doing with their cosine filter.
Wn = (1.0 / time_constant) / (sample_rate * 2.0)
b, a = signal.butter(2, Wn, "low")
new_df = df.copy()
new_df.index = signal.filtfilt(b, a, df.index.values)
return new_df
[docs]
@register_series_method
@register_dataframe_method
def press_check(df):
"""
Remove pressure reversals from the index.
"""
new_df = df.copy()
press = new_df.copy().index.values
ref = press[0]
inversions = np.diff(np.r_[press, press[-1]]) < 0
mask = np.zeros_like(inversions)
for k, p in enumerate(inversions):
if p:
ref = press[k]
cut = press[k + 1 :] < ref
mask[k + 1 :][cut] = True
new_df[mask] = np.NaN
return new_df
def _bindata(series, delta, method):
"""Average the data into bins of the size `delta`."""
start = np.ceil(series.index[0])
stop = np.floor(series.index[-1])
new_index = np.arange(start, stop, delta)
binned = pd.cut(series.index, bins=new_index)
if method == "average":
new_series = series.groupby(binned).mean()
new_series.index = new_index[:-1] + delta / 2
elif method == "interpolate":
data = np.interp(new_index, series.index, series)
return pd.Series(data, index=new_index, name=series.name)
else:
raise ValueError(
f"Expected method `average` or `interpolate`, but got {method}.",
)
return new_series
[docs]
@register_series_method
@register_dataframe_method
def bindata(df, delta=1.0, method="average"):
"""
Bin average the index (usually pressure) to a given interval (default
delta = 1).
"""
if isinstance(df, pd.Series):
new_df = _bindata(df, delta=delta, method=method)
else:
new_df = df.apply(_bindata, delta=delta, method=method)
return new_df
def _despike(series, n1, n2, block, keep):
"""
Wild Edit Seabird-like function. Passes with Standard deviation
`n1` and `n2` with window size `block`.
"""
data = series.values.astype(float).copy()
roll = _rolling_window(data, block)
roll = ma.masked_invalid(roll)
std = n1 * roll.std(axis=1)
mean = roll.mean(axis=1)
# Use the last value to fill-up.
std = np.r_[std, np.tile(std[-1], block - 1)]
mean = np.r_[mean, np.tile(mean[-1], block - 1)]
mask = np.abs(data - mean.filled(fill_value=np.NaN)) > std.filled(fill_value=np.NaN)
data[mask] = np.NaN
# Pass two recompute the mean and std without the flagged values from pass
# one and removed the flagged data.
roll = _rolling_window(data, block)
roll = ma.masked_invalid(roll)
std = n2 * roll.std(axis=1)
mean = roll.mean(axis=1)
# Use the last value to fill-up.
std = np.r_[std, np.tile(std[-1], block - 1)]
mean = np.r_[mean, np.tile(mean[-1], block - 1)]
values = series.values.astype(float)
mask = np.abs(values - mean.filled(fill_value=np.NaN)) > std.filled(
fill_value=np.NaN,
)
clean = series.astype(float).copy()
clean[mask] = np.NaN
return clean
[docs]
@register_series_method
@register_dataframe_method
def despike(df, n1=2, n2=20, block=100, keep=0):
"""
Wild Edit Seabird-like function. Passes with Standard deviation
`n1` and `n2` with window size `block`.
"""
if isinstance(df, pd.Series):
new_df = _despike(df, n1=n1, n2=n2, block=block, keep=keep)
else:
new_df = df.apply(_despike, n1=n1, n2=n2, block=block, keep=keep)
return new_df
def _smooth(series, window_len, window):
"""Smooth the data using a window with requested size."""
windows = {
"flat": np.ones,
"hanning": np.hanning,
"hamming": np.hamming,
"bartlett": np.bartlett,
"blackman": np.blackman,
}
data = series.values.copy()
if window_len < 3:
return pd.Series(data, index=series.index, name=series.name)
if window not in list(windows.keys()):
raise ValueError(
"""window must be one of 'flat', 'hanning',
'hamming', 'bartlett', 'blackman'""",
)
s = np.r_[
2 * data[0] - data[window_len:1:-1],
data,
2 * data[-1] - data[-1:-window_len:-1],
]
w = windows[window](window_len)
data = np.convolve(w / w.sum(), s, mode="same")
data = data[window_len - 1 : -window_len + 1]
return pd.Series(data, index=series.index, name=series.name)
[docs]
@register_series_method
@register_dataframe_method
def smooth(df, window_len=11, window="hanning"):
"""Smooth the data using a window with requested size."""
if isinstance(df, pd.Series):
new_df = _smooth(df, window_len=window_len, window=window)
else:
new_df = df.apply(_smooth, window_len=window_len, window=window)
return new_df
def _movingaverage(series, window_size=48):
"""Moving average function on a pandas series."""
window = np.ones(int(window_size)) / float(window_size)
return pd.Series(np.convolve(series, window, "same"), index=series.index)
[docs]
@register_series_method
@register_dataframe_method
def movingaverage(df, window_size=48):
"""
Moving average on a data frame or series.
Inputs:
windows_size : integer
"""
if isinstance(df, pd.Series):
new_df = _movingaverage(df, window_size=window_size)
else:
new_df = df.apply(_movingaverage, window_size=window_size)
return new_df