Parallel programing with Dask¶
Dask is a library that allows to decompose an array into objects of small sizes (chunks
). Therefore, big datasets are not loaded into memory at once. Additionnally, parallel computing is possible on these chunks
objects. Let’s see an example.
First, the SST field is loaded using xarray
import matplotlib.pyplot as plt
import xarray as xr
data = xr.open_dataset('data/surface_thetao.nc')
data = data.isel(olevel=0)
data = data['thetao']
data
Let’s compute the time-mean of the given dataset, and load in memory the output.
%%time
datamean = data.mean(dim='time_counter')
datamean = datamean.compute() # does the calculation and store in memory
datamean.min()
Now, let’s spatially divide our dataset into squares of 150x150 pixels. This is done using the chunk
method (you can also provide a chunks
argument in the open_dataset
method).
data = xr.open_dataset('data/surface_thetao.nc')
data = data.isel(olevel=0)
data = data['thetao']
data = data.chunk({'x': 150, 'y': 150})
data
%%time
datamean2 = data.mean(dim='time_counter')
datamean2 = datamean2.compute()
datamean2.min()
The computation time is much better using the chunked data array. And the use of memory is reduced. To see how dask manages the computation, you can use the dask.visualize
method. It first requires that the dask
object is extracted from the xarray
object.
datamean2 = data.mean(dim='time_counter')
dask_obj = datamean2.data
dask_obj.visualize()
In the above graph, it can be seen that each chunk has it’s own (y, x)
map.
If now the mean over all dimensions is computed:
datamean3 = data.mean()
dask_obj = datamean3.data
dask_obj.visualize()
In this case, the mean maps are first computed for each chunk. Then the mean maps for some chunks are recombined together. The last 4 objects are finally aggregated together to give the final output.
Many functions are implemented in xarray
and which will work with dask
(cf the list of available functions).
However, if the function that you want to use is missing, user-defined ufunc
can be created.
Using user-defined functions in parallel.¶
In order to use a function which is not implemented in the xarray
list of universal functions, the xarray.apply_ufunc
method should be used.
For instance, in order to to implement the scipy.signal.detrend
function in a parallel manner,
First, create a function that takes as arguments a numpy.array
. Note that the dimension on which you will operate (for detrending, that would be time
) will be the last one.
import scipy.signal as sig
import numpy as np
def gufunc_detrend(x):
x[np.isnan(x)] = 0
return sig.detrend(x)
Now that it is done, create a new method returns a xr.apply_ufunc
object. The first argument is the above function, the second argument is the DataArray
. The input_core_dims
provides the names of the core dimensions, the ones on which the operations will be performed. In this case, time
. Since the detrend
function returns an array of the same size as the input, the output_core_dims
should be provided as well.
def xarray_detrend(x, dim):
return xr.apply_ufunc(
gufunc_detrend,
x,
input_core_dims=[[dim]],
output_core_dims=[[dim]],
dask="parallelized",
output_dtypes=[np.float32],
)
Now, we read our data based on a specific chunk layout. Note that the time
dimension must remain unchunked, hence the -1
.
data = xr.open_dataset('data/surface_thetao.nc',
chunks={'time_counter': -1, 'x': 150, 'y' : 150})
data = data['thetao']
data = data.isel(olevel=0)
data
Now we compute the monthly anomalies:
dataclim = data.groupby('time_counter.month').mean(dim='time_counter')
dataclim
data = data.groupby('time_counter.month') - dataclim
data = data.chunk({'time_counter': -1, 'y': 150, 'y': 150})
data
%%time
calc = xarray_detrend(data, dim='time_counter').compute()
Note that you can call the compute
method in association with a progress bar as follows:
from dask.diagnostics import ProgressBar
with ProgressBar():
calc = xarray_detrend(data, dim='time_counter').compute()
calc
Now let’s check if trend seems ok. First, we extract the detrended time-series on a specific location
coords = dict(x=90, y=165)
calcts = calc.isel(**coords) # detrendet time series
calcts
Then, we extract the raw anomalies on the same location
datats = data.isel(**coords)
Now, we can plot the raw anomalies and the associated trend:
plt.plot(datats, label='anomalies')
plt.plot(datats - calcts, label='trend')
plt.legend()
Use on HPCs¶
It is theoretically possible to parallel Dask operations on HPCs, such as Datarmor. This is achieved by using the dask-jobqueue in association with the dask.distributed
module. For instance, to run a computation on a PBS
cluster such as Datarmor, the PBSCluster
method should be used.
The first step is to create a jobqueue.yaml
file in the ~/.config/dask
directory. This file contains all the PBS settings for the cluster you are working on (cf. here for the Datarmor configuration). These are the settings for a single job..
There must be also a distributed.yaml
configuration file, which contains the settings for the HPC server. An example is available here.
When done, create your Python script as shown below (taken from dask example page).
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=10) # launch 10 jobs
To see the job script:
print(cluster.job_script())
Now init the Client
object using the cluster defined in the above:
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
When done, run your Dask operations
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
This is not even clear for me, so use with caution!!!
For more informations:
https://docs.dask.org/en/latest/
lops-array for some examples.