|
| 1 | +--- |
| 2 | +title: "Parallel raster computations using Dask" |
| 3 | +teaching: 40 |
| 4 | +exercises: 20 |
| 5 | +questions: |
| 6 | +- "How can I parallelize computations on rasters with Dask?" |
| 7 | +- "How can I determine if parallelization improves calculation speed?" |
| 8 | +- "What are good practices in applying parallelization to my raster calculations?" |
| 9 | +objectives: |
| 10 | +- "Profile the timing of the raster calculations." |
| 11 | +- "Open raster data as a chunked array." |
| 12 | +- "Recognize good practices in selecting proper chunk sizes." |
| 13 | +- "Setup raster calculations that take advantage of parallelization." |
| 14 | +keypoints: |
| 15 | +- "The `%%time` Jupyter magic command can be used to profile calculations." |
| 16 | +- "Data 'chunks' are the unit of parallelization in raster calculations." |
| 17 | +- "(`rio`)`xarray` can open raster files as chunked arrays." |
| 18 | +- "The chunk shape and size can significantly affect the calculation performance." |
| 19 | +- "Cloud-optimized GeoTIFFs have an internal structure that enables performant parallel read." |
| 20 | +--- |
| 21 | + |
| 22 | +# Introduction |
| 23 | +Very often raster computations involve applying the same operation to different pieces of data. Think, for instance, to |
| 24 | +the "pixel"-wise sum of two raster datasets, where the same sum operation is applied to all the matching grid-cells of |
| 25 | +the two rasters. This class of tasks can benefit from chunking the input raster(s) into smaller pieces: operations on |
| 26 | +different pieces can be run in parallel using multiple computing units (e.g., multi-core CPUs), thus potentially |
| 27 | +speeding up calculations. In addition, working on chunked data can lead to smaller memory footprints, since one |
| 28 | +may bypass the need to store the full dataset in memory by processing it chunk by chunk. |
| 29 | + |
| 30 | +In this episode, we will introduce the use of Dask in the context of raster calculations. Dask is a Python library for |
| 31 | +parallel and distributed computing. It provides a framework to work with different data structures, including chunked |
| 32 | +arrays (Dask Arrays). Dask is well integrated with (`rio`)`xarray` objects, which can use Dask arrays as underlying |
| 33 | +data structures. |
| 34 | + |
| 35 | +> ## Dask |
| 36 | +> |
| 37 | +> This episode shows how Dask can be used to parallelize operations on local CPUs. However, the same library can be |
| 38 | +> configured to run tasks on large compute clusters. |
| 39 | +> |
| 40 | +> More resources on Dask: |
| 41 | +> * [Dask](https://dask.org) and [Dask Array](https://docs.dask.org/en/stable/array.html). |
| 42 | +> * [Xarray with Dask](https://xarray.pydata.org/en/stable/user-guide/dask.html). |
| 43 | +{: .callout} |
| 44 | + |
| 45 | +It is important to realize, however, that many details determine the extent to which using Dask's chunked arrays instead |
| 46 | +of regular Numpy arrays leads to faster calculations (and lower memory requirements). The actual operations to carry |
| 47 | +out, the size of the dataset, and parameters such as the chunks' shape and size, all affects the performance of our |
| 48 | +computations. Depending on the specifics of the calculations, serial calculations might actually turn out to be faster! |
| 49 | +Being able to profile the computational time is thus essential, and we will see how to do that in a Jupyter environment |
| 50 | +in the next section. |
| 51 | + |
| 52 | +# Time profiling in Jupyter |
| 53 | + |
| 54 | +Let's set up a raster calculation using assets from the search of satellite scenes that we have carried out in the |
| 55 | +previous episode. The search result, which consisted of a collection of STAC items (an `ItemCollection`), has been saved |
| 56 | +in GeoJSON format. We can load the collection using the `pystac` library: |
| 57 | + |
| 58 | +~~~ |
| 59 | +import pystac |
| 60 | +items = pystac.ItemCollection.from_file("mysearch.json") |
| 61 | +~~~ |
| 62 | +{: .language-python} |
| 63 | + |
| 64 | +We select the last scene, and extract the URLs of two assets: the true-color image ("visual") and the scene |
| 65 | +classification layer ("SCL"). The latter is a mask where each grid cell is assigned a label that represents a specific |
| 66 | +class e.g. "4" for vegetation, "6" for water, etc. (all classes and labels are reported in the |
| 67 | +[Sentinel-2 documentation](https://sentinels.copernicus.eu/web/sentinel/technical-guides/sentinel-2-msi/level-2a/algorithm), |
| 68 | +see Figure 3): |
| 69 | + |
| 70 | +~~~ |
| 71 | +assets = items[-1].assets # last item's assets |
| 72 | +visual_href = assets["visual"].href # true color image |
| 73 | +scl_href = assets["SCL"].href # scene classification layer |
| 74 | +~~~ |
| 75 | +{: .language-python} |
| 76 | + |
| 77 | + |
| 78 | +Opening the two assets with `rioxarray` shows that the true-color image is available as a raster file with 10 m |
| 79 | +resolution, while the scene classification layer has a lower resolution (20 m): |
| 80 | + |
| 81 | +~~~ |
| 82 | +import rioxarray |
| 83 | +scl = rioxarray.open_rasterio(scl_href) |
| 84 | +visual = rioxarray.open_rasterio(visual_href) |
| 85 | +print(scl.rio.resolution(), visual.rio.resolution()) |
| 86 | +~~~ |
| 87 | +{: .language-python} |
| 88 | + |
| 89 | +~~~ |
| 90 | +(20.0, -20.0), (10.0, -10.0) |
| 91 | +~~~ |
| 92 | +{: .output} |
| 93 | + |
| 94 | +In order to match the image and the mask pixels, we take advantage of a feature of the cloud-optimized GeoTIFF (COG) |
| 95 | +format, which is used to store these raster files. COGs typically include multiple lower-resolution versions of the |
| 96 | +original image, called "overviews", in the same file. This allows to avoid downloading high-resolution images when only |
| 97 | +quick previews are required. |
| 98 | + |
| 99 | +Overviews are often computed using powers of 2 as down-sampling (or zoom) factors (e.g. 2, 4, 8, 16). For the true-color |
| 100 | +image we thus open the first level overview (zoom factor 2) and check that the resolution is now also 20 m: |
| 101 | + |
| 102 | +~~~ |
| 103 | +visual = rioxarray.open_rasterio(visual_href, overview_level=0) |
| 104 | +print(visual.rio.resolution()) |
| 105 | +~~~ |
| 106 | +{: .language-python} |
| 107 | + |
| 108 | +~~~ |
| 109 | +(20.0, -20.0) |
| 110 | +~~~ |
| 111 | +{: .output} |
| 112 | + |
| 113 | +We can now time profile the first step of our raster calculation: the (down)loading of the rasters' content. We do it by |
| 114 | +using the Jupyter magic `%%time`, which returns the time required to run the content of a cell: |
| 115 | + |
| 116 | +~~~ |
| 117 | +%%time |
| 118 | +scl = scl.load() |
| 119 | +visual = visual.load() |
| 120 | +~~~ |
| 121 | +{: .language-python} |
| 122 | + |
| 123 | +~~~ |
| 124 | +CPU times: user 729 ms, sys: 852 ms, total: 1.58 s |
| 125 | +Wall time: 40.5 s |
| 126 | +~~~ |
| 127 | +{: .output} |
| 128 | + |
| 129 | +~~~ |
| 130 | +visual.plot.imshow(figsize=(10,10)) |
| 131 | +scl.squeeze().plot.imshow(levels=range(13), figsize=(12,10)) |
| 132 | +~~~ |
| 133 | +{: .language-python} |
| 134 | + |
| 135 | +<img src="../fig/20-Dask-arrays-s2-true-color-image.png" title="Scene true color image" alt="true color image scene" width="612" style="display: block; margin: auto;" /> |
| 136 | +<img src="../fig/20-Dask-arrays-s2-scene-classification.png" title="Scene classification" alt="scene classification" width="612" style="display: block; margin: auto;" /> |
| 137 | + |
| 138 | +After loading the raster files into memory, we run the following steps: |
| 139 | +* We create a mask of the grid cells that are labeled as "cloud" in the scene classification layer (values "8" and "9", |
| 140 | + standing for medium- and high-cloud probability, respectively). |
| 141 | +* We use this mask to set the corresponding grid cells in the true-color image to null values. |
| 142 | +* We save the masked image to disk as in COG format. |
| 143 | + |
| 144 | +Again, we measure the cell execution time using `%%time`: |
| 145 | + |
| 146 | +~~~ |
| 147 | +%%time |
| 148 | +mask = scl.squeeze().isin([8, 9]) |
| 149 | +visual_masked = visual.where(~mask, other=visual.rio.nodata) |
| 150 | +visual_masked.rio.to_raster("band_masked.tif") |
| 151 | +~~~ |
| 152 | +{: .language-python} |
| 153 | + |
| 154 | +~~~ |
| 155 | +CPU times: user 270 ms, sys: 366 ms, total: 636 ms |
| 156 | +Wall time: 647 ms |
| 157 | +~~~ |
| 158 | +{: .output} |
| 159 | + |
| 160 | +We can inspect the masked image as: |
| 161 | + |
| 162 | +~~~ |
| 163 | +visual_masked.plot.imshow(figsize=(10, 10)) |
| 164 | +~~~ |
| 165 | +{: .language-python} |
| 166 | + |
| 167 | +<img src="../fig/20-Dask-arrays-s2-true-color-image_masked.png" title="True color image after masking out clouds" alt="masked true color image" width="612" style="display: block; margin: auto;" /> |
| 168 | + |
| 169 | +In the following section we will see how to parallelize these raster calculations, and we will compare timings to the |
| 170 | +serial calculations that we have just run. |
| 171 | + |
| 172 | +# Dask-powered rasters |
| 173 | + |
| 174 | +## Chunked arrays |
| 175 | + |
| 176 | +As we have mentioned, `rioxarray` supports the use of Dask's chunked arrays as underlying data structure. When opening |
| 177 | +a raster file with `open_rasterio` and providing the `chunks` argument, Dask arrays are employed instead of regular |
| 178 | +Numpy arrays. `chunks` describes the shape of the blocks which the data will be split in. As an example, we |
| 179 | +open the blue band raster ("B02") using a chunk shape of `(1, 4000, 4000)` (block size of `1` in the first dimension and |
| 180 | +of `4000` in the second and third dimensions): |
| 181 | + |
| 182 | +~~~ |
| 183 | +blue_band_href = assets["B02"].href |
| 184 | +blue_band = rioxarray.open_rasterio(blue_band_href, chunks=(1, 4000, 4000)) |
| 185 | +~~~ |
| 186 | +{: .language-python} |
| 187 | + |
| 188 | +Xarray and Dask also provide a graphical representation of the raster data array and of its blocked structure. |
| 189 | + |
| 190 | +<img src="../fig/20-Dask-arrays-s2-blue-band.png" title="Xarray representation of a Dask-backed DataArray" alt="DataArray with Dask" width="612" style="display: block; margin: auto;" /> |
| 191 | + |
| 192 | +> ## Exercise: Chunk sizes matter |
| 193 | +> We have already seen how COGs are regular GeoTIFF files with a special internal structure. Another feature of COGs is |
| 194 | +> that data is organized in "blocks" that can be accessed remotely via independent HTTP requests, enabling partial file |
| 195 | +> readings. This is useful if you want to access only a portion of your raster file, but it also allows for efficient |
| 196 | +> parallel reading. You can check the blocksize employed in a COG file with the following code snippet: |
| 197 | +> |
| 198 | +> ~~~ |
| 199 | +> import rasterio |
| 200 | +> with rasterio.open(cog_uri) as r: |
| 201 | +> if r.is_tiled: |
| 202 | +> print(f"Chunk size: {r.block_shapes}") |
| 203 | +> ~~~ |
| 204 | +> {: .language-python} |
| 205 | +> |
| 206 | +> In order to optimally access COGs it is best to align the blocksize of the file with the chunks employed when loading |
| 207 | +> the file. Open the blue-band asset ("B02") of a Sentinel-2 scene as a chunked `DataArray` object using a suitable |
| 208 | +> chunk size. Which elements do you think should be considered when choosing the chunk size? |
| 209 | +> |
| 210 | +> > ## Solution |
| 211 | +> > ~~~ |
| 212 | +> > import rasterio |
| 213 | +> > with rasterio.open(blue_band_href) as r: |
| 214 | +> > if r.is_tiled: |
| 215 | +> > print(f"Chunk size: {r.block_shapes}") |
| 216 | +> > ~~~ |
| 217 | +> > {: .language-python} |
| 218 | +> > |
| 219 | +> > ~~~ |
| 220 | +> > Chunk size: [(1024, 1024)] |
| 221 | +> > ~~~ |
| 222 | +> > {: .output} |
| 223 | +> > |
| 224 | +> > Ideal chunk size values for this raster are thus multiples of 1024. An element to consider is the number of |
| 225 | +> > resulting chunks and their size. Chunks should not be too big nor too small (i.e. too many). As a rule of thumb, |
| 226 | +> > chunk sizes of 100 MB typically work well with Dask (see, e.g., this |
| 227 | +> > [blog post](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)). Also, the shape might be relevant, |
| 228 | +> > depending on the application! Here, we might select a chunks shape of `(1, 6144, 6144)`: |
| 229 | +> > |
| 230 | +> > ~~~ |
| 231 | +> > band = rioxarray.open_rasterio(blue_band_href, chunks=(1, 6144, 6144)) |
| 232 | +> > ~~~ |
| 233 | +> > {: .language-python} |
| 234 | +> > |
| 235 | +> > which leads to chunks 72 MB large: ((1 x 6144 x 6144) x 2 bytes / 2^20 = 72 MB). Also, we can let `rioxarray` and Dask |
| 236 | +> > figure out appropriate chunk shapes by setting `chunks="auto"`: |
| 237 | +> > |
| 238 | +> > ~~~ |
| 239 | +> > band = rioxarray.open_rasterio(blue_band_href, chunks="auto") |
| 240 | +> > ~~~ |
| 241 | +> > {: .language-python} |
| 242 | +> > |
| 243 | +> > which leads to `(1, 8192, 8192)` chunks (128 MB). |
| 244 | +> {: .solution} |
| 245 | +{: .challenge} |
| 246 | +
|
| 247 | +## Parallel computations |
| 248 | +
|
| 249 | +Operations performed on a `DataArray` that has been opened as a chunked Dask array are executed using Dask. Dask |
| 250 | +coordinates how the operations should be executed on the individual chunks of data, and runs these tasks in parallel as |
| 251 | +much as possible. |
| 252 | +
|
| 253 | +Let's now repeat the raster calculations that we have carried out in the previous section, but running calculations in |
| 254 | +parallel over a multi-core CPU. We first open the relevant rasters as chunked arrays: |
| 255 | +
|
| 256 | +~~~ |
| 257 | +scl = rioxarray.open_rasterio(scl_href, lock=False, chunks=(1, 2048, 2048)) |
| 258 | +visual = rioxarray.open_rasterio(visual_href, overview_level=0, lock=False, chunks=(3, 2048, 2048)) |
| 259 | +~~~ |
| 260 | +{: .language-python} |
| 261 | +
|
| 262 | +Setting `lock=False` tells `rioxarray` that the individual data chunks can be loaded simultaneously from the source by |
| 263 | +the Dask workers. |
| 264 | +
|
| 265 | +As the next step, we trigger the download of the data using the `.persist()` method, see below. This makes sure that the downloaded |
| 266 | +chunks are stored in the form of a chunked Dask array (calling `.load()` would instead merge the chunks in a single |
| 267 | +Numpy array). |
| 268 | +
|
| 269 | +We explicitly tell Dask to parallelize the required workload over 4 threads. Don't forget to add the Jupyter magic to |
| 270 | +record the timing! |
| 271 | +
|
| 272 | +~~~ |
| 273 | +%%time |
| 274 | +scl = scl.persist(scheduler="threads", num_workers=4) |
| 275 | +visual = visual.persist(scheduler="threads", num_workers=4) |
| 276 | +~~~ |
| 277 | +{: .language-python} |
| 278 | +
|
| 279 | +~~~ |
| 280 | +CPU times: user 1.18 s, sys: 806 ms, total: 1.99 s |
| 281 | +Wall time: 12.6 s |
| 282 | +~~~ |
| 283 | +{: .output} |
| 284 | +
|
| 285 | +So downloading chunks of data using 4 workers gave a speed-up of almost 4 times (40.5 s vs 12.6 s)! |
| 286 | +
|
| 287 | +Let's now continue to the second step of the calculation. Note how the same syntax as for its serial version is employed |
| 288 | +for creating and applying the cloud mask. Only the raster saving includes additional arguments: |
| 289 | +* `tiled=True`: write raster as a chunked GeoTIFF. |
| 290 | +* `lock=threading.Lock()`: the threads which are splitting the workload must "synchronise" when writing to the same file |
| 291 | + (they might otherwise overwrite each other's output). |
| 292 | +* `compute=False`: do not immediately run the calculation, more on this later. |
| 293 | +
|
| 294 | +~~~ |
| 295 | +from threading import Lock |
| 296 | +~~~ |
| 297 | +{: .language-python} |
| 298 | +
|
| 299 | +~~~ |
| 300 | +%%time |
| 301 | +mask = scl.squeeze().isin([8, 9]) |
| 302 | +visual_masked = visual.where(~mask, other=0) |
| 303 | +visual_store = visual_masked.rio.to_raster("band_masked.tif", tiled=True, lock=Lock(), compute=False) |
| 304 | +~~~ |
| 305 | +{: .language-python} |
| 306 | +
|
| 307 | +~~~ |
| 308 | +CPU times: user 13.3 ms, sys: 4.98 ms, total: 18.3 ms |
| 309 | +Wall time: 17.8 ms |
| 310 | +~~~ |
| 311 | +{: .output} |
| 312 | +
|
| 313 | +Did we just observe a 36x speed-up when comparing to the serial calculation (647 ms vs 17.8 ms)? Actually, no |
| 314 | +calculation has run yet. This is because operations performed on Dask arrays are executed "lazily", i.e. they are not |
| 315 | +immediately run. |
| 316 | +
|
| 317 | +> ## Dask graph |
| 318 | +> |
| 319 | +> The sequence of operations to carry out is stored in a task graph, which can be visualized with: |
| 320 | +> |
| 321 | +> ~~~ |
| 322 | +> import dask |
| 323 | +> dask.visualize(visual_store) |
| 324 | +> ~~~ |
| 325 | +> {: .language-python} |
| 326 | +> |
| 327 | +> <img src="../fig/20-Dask-arrays-graph.png" title="Dask graph" alt="dask graph" width="612" style="display: block; margin: auto;" /> |
| 328 | +> |
| 329 | +> The task graph gives Dask the complete "overview" of the calculation, thus enabling a better management of tasks and |
| 330 | +> resources when dispatching calculations to be run in parallel. |
| 331 | +{: .callout} |
| 332 | +
|
| 333 | +While most methods of `DataArray`'s run operations lazily when Dask arrays are employed, some methods by default |
| 334 | +trigger immediate calculations, like the method `to_raster()` (we have changed this behaviour by specifying |
| 335 | +`compute=False`). In order to trigger calculations, we can use the `.compute()` method. Again, we explicitly tell Dask |
| 336 | +to run tasks on 4 threads. Let's time the cell execution: |
| 337 | +
|
| 338 | +~~~ |
| 339 | +%%time |
| 340 | +visual_store.compute(scheduler="threads", num_workers=4) |
| 341 | +~~~ |
| 342 | +{: .language-python} |
| 343 | +
|
| 344 | +~~~ |
| 345 | +CPU times: user 532 ms, sys: 488 ms, total: 1.02 s |
| 346 | +Wall time: 791 ms |
| 347 | +~~~ |
| 348 | +{: .output} |
| 349 | +
|
| 350 | +The timing that we have recorded for this step is now closer to the one recorded for the serial calculation (the |
| 351 | +parallel calculation actually took slightly longer). The explanation for this behaviour lies in the overhead that Dask |
| 352 | +introduces to manage the tasks in the Dask graph. This overhead, which is typically of the order of milliseconds per |
| 353 | +task, can be larger than the parallelization gain, and this is typically the case for calculations with small chunks |
| 354 | +(note that here we have used chunks that are only 4 to 32 MB large). |
0 commit comments