# Example of covariance using Dask¶

In this Notebook, the use of Dask is parallel mode is illustrated in goal to compute lead-lag covariances.

## Import of data¶

### SST anomalies¶

First, SST data are extracted.

import matplotlib.pyplot as plt
import xarray as xr
import scipy.signal as sig
import numpy as np
import pandas as pd
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize

data = xr.open_dataset('data/surface_thetao.nc')
data = data.isel(olevel=0)
data = data['thetao']
data


Then, monthly anomalies are computed:

clim = data.groupby('time_counter.month').mean(dim='time_counter')

anom = data.groupby('time_counter.month') - clim
anom


## Oni index¶

Now, the ONI index is extracted from the CSV file.

nino = pd.read_csv('data/oni.data', skiprows=1, skipfooter=8, engine='python', header=None, index_col=0, delim_whitespace=True, na_values=-99.9)
nino


It needs to be converted into 1D array. This is done by manipulating the years and columns

years = nino.index.values
years

months = nino.columns.values
months

mm, yy = np.meshgrid(months, years)
yy = np.ravel(yy)
mm = np.ravel(mm)
date = yy * 100 + mm
date

nino = np.ravel(nino.values)


Now, we extract the values that correspond to the length of the SST time-series (1958-2018)

iok = np.nonzero((date >= 195801) & (date <= 201812))
date[iok]


Finally, the time-series is converted into a data arraty.

tmean = xr.DataArray(
data = nino[iok],
name = 'oni',
coords={'time_counter' : data['time_counter']}
)
l = tmean.plot()
tmean


## First test on covariance analysis¶

Here, the covariance is computed using numpy arrays.

nt, ny, nx = data.shape
nt, ny, nx


Now, the correlation lags are extracted.

lags = sig.correlation_lags(nt, nt)
lags


The index of the $0$-lag covariance is extracted.

izero = np.nonzero(lags == 0)
izero


Now, the covariance is computed by using a for loop using Numpy arrays.

%%time
covariance = np.zeros((ny, nx, len(lags)))
dataval = anom.values # t, y, x
tmeanval = tmean.values
for s in range(ny):
for i in range(nx):
temp = dataval[:, s, i]
covariance[s, i, :] = sig.correlate(temp, tmeanval) / nt
covariance.shape

cs = plt.pcolormesh(covariance[:, :, izero])
cs.set_clim(-1, 1)
plt.colorbar(cs)


## Using user-defined functions in parallel.¶

To compute covariance in parallel mode, a function that works on Numpy arrays must be created. It basically does the same thing as in the above. Except that in this case, time becomes the rightmost dimension.

import scipy.signal as sig
import numpy as np

def gufunc_cov(x, y):
print("@@@@@@@@@@@@@@@@@@@@@@@@@ ", x.shape, y.shape)
nx = x.shape
ny = x.shape
ntime = x.shape[-1]
lags = sig.correlation_lags(ntime, ntime)
nlags = len(lags)
output = np.zeros((nx, ny, nlags))
for s in range(nx):
for i in range(ny):
temp = x[s, i]
output[s, i, :] = sig.correlate(temp, y) / ntime
return output


Now that it is done, create a new method that returns a xr.apply_ufunc object. The first argument is the above function, the second argument is the SST DataArray, the third argument is the Nino index. The input_core_dims provides the names of the dimensions that will not be broadcasted (here, time). Since the correlate function returns an array of dimensions (y, x, lags), we need to specify the new lag dimension using the output_core_dims anf the dask_gufunc_kwargs arguments:

def xarray_cov(x, y, dim):
return xr.apply_ufunc(
gufunc_cov,
x,
y,
input_core_dims=[[dim], [dim]],
output_core_dims=[['lags']],
output_dtypes=[np.float32],
dask_gufunc_kwargs = {'output_sizes' : {'lags': 1463}}
)


Now, we read our data based on a specific chunk layout. **Note that the time dimension must remain unchunked.

anom = anom.chunk({'y':50, 'x': 50})
anom

%%time
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
calc = xarray_cov(anom, tmean, dim='time_counter').compute()


We see that the calculation time is less than the original one. We can now visualize the resource usage:

visualize([prof, rprof, cprof], show=False)


Finally, we can verify that both calculations (Numpy vs. Dask) returns the same results. First, NaN values are replaced by 0 in both results.

calc = calc.fillna(0)
covariance[np.isnan(covariance)] = 0

np.all(covariance == calc.values)