Example of covariance using Dask

In this Notebook, the use of Dask is parallel mode is illustrated in goal to compute lead-lag covariances.

Import of data

SST anomalies

First, SST data are extracted.

import matplotlib.pyplot as plt
import xarray as xr
import scipy.signal as sig
import numpy as np
import pandas as pd
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize

data = xr.open_dataset('data/surface_thetao.nc')
data = data.isel(olevel=0)
data = data['thetao']
data

Then, monthly anomalies are computed:

clim = data.groupby('time_counter.month').mean(dim='time_counter')
anom = data.groupby('time_counter.month') - clim
anom

Oni index

Now, the ONI index is extracted from the CSV file.

nino = pd.read_csv('data/oni.data', skiprows=1, skipfooter=8, engine='python', header=None, index_col=0, delim_whitespace=True, na_values=-99.9)
nino

It needs to be converted into 1D array. This is done by manipulating the years and columns

years = nino.index.values
years
months = nino.columns.values
months
mm, yy = np.meshgrid(months, years)
yy = np.ravel(yy)
mm = np.ravel(mm)
date = yy * 100 + mm
date
nino = np.ravel(nino.values)

Now, we extract the values that correspond to the length of the SST time-series (1958-2018)

iok = np.nonzero((date >= 195801) & (date <= 201812))[0]
date[iok]

Finally, the time-series is converted into a data arraty.

tmean = xr.DataArray(
    data = nino[iok],
    name = 'oni',
    coords={'time_counter' : data['time_counter']}
)
l = tmean.plot()
tmean

First test on covariance analysis

Here, the covariance is computed using numpy arrays.

nt, ny, nx = data.shape
nt, ny, nx

Now, the correlation lags are extracted.

lags = sig.correlation_lags(nt, nt)
lags

The index of the $0$-lag covariance is extracted.

izero = np.nonzero(lags == 0)[0][0]
izero

Now, the covariance is computed by using a for loop using Numpy arrays.

%%time
covariance = np.zeros((ny, nx, len(lags)))
dataval = anom.values # t, y, x
tmeanval = tmean.values
for s in range(ny):
    for i in range(nx):
        temp = dataval[:, s, i]
        covariance[s, i, :] = sig.correlate(temp, tmeanval) / nt
covariance.shape
cs = plt.pcolormesh(covariance[:, :, izero])
cs.set_clim(-1, 1)
plt.colorbar(cs)

Using user-defined functions in parallel.

To compute covariance in parallel mode, a function that works on Numpy arrays must be created. It basically does the same thing as in the above. Except that in this case, time becomes the rightmost dimension.

import scipy.signal as sig
import numpy as np

def gufunc_cov(x, y):
    print("@@@@@@@@@@@@@@@@@@@@@@@@@ ", x.shape, y.shape)
    nx = x.shape[0]
    ny = x.shape[1]
    ntime = x.shape[-1]
    lags = sig.correlation_lags(ntime, ntime)
    nlags = len(lags)
    output = np.zeros((nx, ny, nlags))
    for s in range(nx):
        for i in range(ny):
            temp = x[s, i]
            output[s, i, :] = sig.correlate(temp, y) / ntime
    return output

Now that it is done, create a new method that returns a xr.apply_ufunc object. The first argument is the above function, the second argument is the SST DataArray, the third argument is the Nino index. The input_core_dims provides the names of the dimensions that will not be broadcasted (here, time). Since the correlate function returns an array of dimensions (y, x, lags), we need to specify the new lag dimension using the output_core_dims anf the dask_gufunc_kwargs arguments:

def xarray_cov(x, y, dim):
    return xr.apply_ufunc(
        gufunc_cov,
        x,
        y, 
        input_core_dims=[[dim], [dim]],
        output_core_dims=[['lags']],
        dask="parallelized",
        output_dtypes=[np.float32],
        dask_gufunc_kwargs = {'output_sizes' : {'lags': 1463}}
    )

Now, we read our data based on a specific chunk layout. **Note that the time dimension must remain unchunked.

anom = anom.chunk({'y':50, 'x': 50})
anom
%%time
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
    calc = xarray_cov(anom, tmean, dim='time_counter').compute()

We see that the calculation time is less than the original one. We can now visualize the resource usage:

visualize([prof, rprof, cprof], show=False)

Finally, we can verify that both calculations (Numpy vs. Dask) returns the same results. First, NaN values are replaced by 0 in both results.

calc = calc.fillna(0)
covariance[np.isnan(covariance)] = 0
np.all(covariance == calc.values)