Example of covariance using Dask

In this Notebook, the use of Dask is parallel mode is illustrated in goal to compute lead-lag covariances.

Import of data

SST anomalies

First, SST data are extracted.

import matplotlib.pyplot as plt
import xarray as xr
import scipy.signal as sig
import numpy as np
import pandas as pd
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize

data = xr.open_dataset('data/surface_thetao.nc')
data = data.isel(olevel=0)
data = data['thetao']

Then, monthly anomalies are computed:

clim = data.groupby('time_counter.month').mean(dim='time_counter')
anom = data.groupby('time_counter.month') - clim

Oni index

Now, the ONI index is extracted from the CSV file.

nino = pd.read_csv('data/oni.data', skiprows=1, skipfooter=8, engine='python', header=None, index_col=0, delim_whitespace=True, na_values=-99.9)

It needs to be converted into 1D array. This is done by manipulating the years and columns

years = nino.index.values
months = nino.columns.values
mm, yy = np.meshgrid(months, years)
yy = np.ravel(yy)
mm = np.ravel(mm)
date = yy * 100 + mm
nino = np.ravel(nino.values)

Now, we extract the values that correspond to the length of the SST time-series (1958-2018)

iok = np.nonzero((date >= 195801) & (date <= 201812))[0]

Finally, the time-series is converted into a data arraty.

tmean = xr.DataArray(
    data = nino[iok],
    name = 'oni',
    coords={'time_counter' : data['time_counter']}
l = tmean.plot()

First test on covariance analysis

Here, the covariance is computed using numpy arrays.

nt, ny, nx = data.shape
nt, ny, nx

Now, the correlation lags are extracted.

lags = sig.correlation_lags(nt, nt)

The index of the $0$-lag covariance is extracted.

izero = np.nonzero(lags == 0)[0][0]

Now, the covariance is computed by using a for loop using Numpy arrays.

covariance = np.zeros((ny, nx, len(lags)))
dataval = anom.values # t, y, x
tmeanval = tmean.values
for s in range(ny):
    for i in range(nx):
        temp = dataval[:, s, i]
        covariance[s, i, :] = sig.correlate(temp, tmeanval) / nt
cs = plt.pcolormesh(covariance[:, :, izero])
cs.set_clim(-1, 1)

Using user-defined functions in parallel.

To compute covariance in parallel mode, a function that works on Numpy arrays must be created. It basically does the same thing as in the above. Except that in this case, time becomes the rightmost dimension.

import scipy.signal as sig
import numpy as np

def gufunc_cov(x, y):
    print("@@@@@@@@@@@@@@@@@@@@@@@@@ ", x.shape, y.shape)
    nx = x.shape[0]
    ny = x.shape[1]
    ntime = x.shape[-1]
    lags = sig.correlation_lags(ntime, ntime)
    nlags = len(lags)
    output = np.zeros((nx, ny, nlags))
    for s in range(nx):
        for i in range(ny):
            temp = x[s, i]
            output[s, i, :] = sig.correlate(temp, y) / ntime
    return output

Now that it is done, create a new method that returns a xr.apply_ufunc object. The first argument is the above function, the second argument is the SST DataArray, the third argument is the Nino index. The input_core_dims provides the names of the dimensions that will not be broadcasted (here, time). Since the correlate function returns an array of dimensions (y, x, lags), we need to specify the new lag dimension using the output_core_dims anf the dask_gufunc_kwargs arguments:

def xarray_cov(x, y, dim):
    return xr.apply_ufunc(
        input_core_dims=[[dim], [dim]],
        dask_gufunc_kwargs = {'output_sizes' : {'lags': 1463}}

Now, we read our data based on a specific chunk layout. **Note that the time dimension must remain unchunked.

anom = anom.chunk({'y':50, 'x': 50})
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
    calc = xarray_cov(anom, tmean, dim='time_counter').compute()

We see that the calculation time is less than the original one. We can now visualize the resource usage:

visualize([prof, rprof, cprof], show=False)

Finally, we can verify that both calculations (Numpy vs. Dask) returns the same results. First, NaN values are replaced by 0 in both results.

calc = calc.fillna(0)
covariance[np.isnan(covariance)] = 0
np.all(covariance == calc.values)