# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2020 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
import warnings
import toolz # type: ignore[import]
import rasterio # type: ignore[import]
from rasterio.shutil import copy as rio_copy # type: ignore[import]
import numpy as np
import xarray as xr
import dask
from dask.delayed import Delayed
from pathlib import Path
from typing import Union, Optional, List, Any, Dict
from .io import check_write_path
from .geometry import GeoBox
from .geometry.tools import align_up
__all__ = ("write_cog", "to_cog")
def _adjust_blocksize(block, dim):
if block > dim:
return align_up(dim, 16)
return align_up(block, 16)
def _write_cog(
pix: np.ndarray,
geobox: GeoBox,
fname: Union[Path, str],
nodata: Optional[float] = None,
overwrite: bool = False,
blocksize: Optional[int] = None,
overview_resampling: Optional[str] = None,
overview_levels: Optional[List[int]] = None,
ovr_blocksize: Optional[int] = None,
use_windowed_writes: bool = False,
intermediate_compression: Union[bool, str, Dict[str, Any]] = False,
**extra_rio_opts
) -> Union[Path, bytes]:
"""Write geo-registered ndarray to a GeoTiff file or RAM.
:param pix: ``xarray.DataArray`` with crs or (ndarray, geobox, nodata) triple
:param fname: Output file or ":mem:"
:param nodata: Set ``nodata`` flag to this value if supplied
:param overwrite: True -- replace existing file, False -- abort with IOError exception
:param blocksize: Size of internal tiff tiles (512x512 pixels)
:param ovr_blocksize: Size of internal tiles in overview images (defaults to blocksize)
:param overview_resampling: Use this resampling when computing overviews
:param overview_levels: List of shrink factors to compute overviews for: [2,4,8,16,32]
to disable overviews supply empty list ``[]``
:param use_windowed_writes: Write image block by block (might need this for large images)
:param intermediate_compression: Configure compression settings for first pass write, default is no compression
:param extra_rio_opts: Any other option is passed to ``rasterio.open``
When fname=":mem:" write COG to memory rather than to a file and return it
as a memoryview object.
NOTE: about memory requirements
This function generates a temporary in memory tiff file without compression
to speed things up. It then adds overviews to this file and only then
copies it to the final destination with requested compression settings.
This is necessary to produce a compliant COG, since the COG standard demands
overviews to be placed before native resolution data and a double pass is the
only way to achieve this currently.
This means that this function will use about 1.5 to 2 times memory taken by `pix`.
"""
# pylint: disable=too-many-locals
if blocksize is None:
blocksize = 512
if ovr_blocksize is None:
ovr_blocksize = blocksize
if overview_resampling is None:
overview_resampling = "nearest"
# normalise intermediate_compression argument to a dict()
if isinstance(intermediate_compression, bool):
intermediate_compression = (
{"compress": "deflate", "zlevel": 2} if intermediate_compression else {}
)
elif isinstance(intermediate_compression, str):
intermediate_compression = {"compress": intermediate_compression}
if pix.ndim == 2:
h, w = pix.shape
nbands = 1
band = 1 # type: Any
elif pix.ndim == 3:
if pix.shape[:2] == geobox.shape:
pix = pix.transpose([2, 0, 1])
elif pix.shape[-2:] != geobox.shape:
raise ValueError("GeoBox shape does not match image shape")
nbands, h, w = pix.shape
band = tuple(i for i in range(1, nbands + 1))
else:
raise ValueError("Need 2d or 3d ndarray on input")
assert geobox.shape == (h, w)
if overview_levels is None:
if min(w, h) < 512:
overview_levels = []
else:
overview_levels = [2 ** i for i in range(1, 6)]
if fname != ":mem:":
path = check_write_path(
fname, overwrite
) # aborts if overwrite=False and file exists already
resampling = rasterio.enums.Resampling[overview_resampling]
if (blocksize % 16) != 0:
warnings.warn("Block size must be a multiple of 16, will be adjusted")
rio_opts = dict(
width=w,
height=h,
count=nbands,
dtype=pix.dtype.name,
crs=str(geobox.crs),
transform=geobox.transform,
tiled=True,
blockxsize=_adjust_blocksize(blocksize, w),
blockysize=_adjust_blocksize(blocksize, h),
zlevel=6,
predictor=3 if pix.dtype.kind == "f" else 2,
compress="DEFLATE",
)
if nodata is not None:
rio_opts.update(nodata=nodata)
rio_opts.update(extra_rio_opts)
def _write(pix, band, dst):
if not use_windowed_writes:
dst.write(pix, band)
return
for _, win in dst.block_windows():
if pix.ndim == 2:
block = pix[win.toslices()]
else:
block = pix[(slice(None),) + win.toslices()]
dst.write(block, indexes=band, window=win)
# Deal efficiently with "no overviews needed case"
if len(overview_levels) == 0:
if fname == ":mem:":
with rasterio.MemoryFile() as mem:
with mem.open(driver="GTiff", **rio_opts) as dst:
_write(pix, band, dst)
return bytes(mem.getbuffer())
else:
with rasterio.open(path, mode="w", driver="GTiff", **rio_opts) as dst:
_write(pix, band, dst)
return path
# copy re-compresses anyway so skip compression for temp image
tmp_opts = toolz.dicttoolz.dissoc(rio_opts, "compress", "predictor", "zlevel")
tmp_opts.update(intermediate_compression)
with rasterio.Env(GDAL_TIFF_OVR_BLOCKSIZE=ovr_blocksize):
with rasterio.MemoryFile() as mem:
with mem.open(driver="GTiff", **tmp_opts) as tmp:
_write(pix, band, tmp)
tmp.build_overviews(overview_levels, resampling)
if fname == ":mem:":
with rasterio.MemoryFile() as mem2:
rio_copy(
tmp,
mem2.name,
driver="GTiff",
copy_src_overviews=True,
**toolz.dicttoolz.dissoc(
rio_opts,
"width",
"height",
"count",
"dtype",
"crs",
"transform",
"nodata",
)
)
return bytes(mem2.getbuffer())
rio_copy(tmp, path, driver="GTiff", copy_src_overviews=True, **rio_opts)
return path
_delayed_write_cog_to_mem = dask.delayed( # pylint: disable=invalid-name
_write_cog, name="compress-cog", pure=True, nout=1
)
_delayed_write_cog_to_file = dask.delayed( # pylint: disable=invalid-name
_write_cog, name="save-cog", pure=False, nout=1
)
[docs]def write_cog(
geo_im: xr.DataArray,
fname: Union[str, Path],
overwrite: bool = False,
blocksize: Optional[int] = None,
ovr_blocksize: Optional[int] = None,
overview_resampling: Optional[str] = None,
overview_levels: Optional[List[int]] = None,
use_windowed_writes: bool = False,
intermediate_compression: Union[bool, str, Dict[str, Any]] = False,
**extra_rio_opts
) -> Union[Path, bytes, Delayed]:
"""
Save ``xarray.DataArray`` to a file in Cloud Optimized GeoTiff format.
This function is "Dask aware". If ``geo_im`` is a Dask array, then the
output of this function is also a Dask Delayed object. This allows us to
save multiple images concurrently across a Dask cluster. If you are not
familiar with Dask this can be confusing, as no operation is performed until the
``.compute()`` method is called, so if you call this function with Dask
array it will return immediately without writing anything to disk.
If you are using Dask to speed up data loading, follow the example below:
.. code-block:: python
# Example: save red band from first time slice to file "red.tif"
xx = dc.load(.., dask_chunks=dict(x=1024, y=1024))
write_cog(xx.isel(time=0).red, "red.tif").compute()
# or compute input first instead
write_cog(xx.isel(time=0).red.compute(), "red.tif")
:param geo_im: ``xarray.DataArray`` with crs
:param fname: Output path or ``":mem:"`` in which case compress to RAM and return bytes
:param overwrite: True -- replace existing file, False -- abort with IOError exception
:param blocksize: Size of internal tiff tiles (512x512 pixels)
:param ovr_blocksize: Size of internal tiles in overview images (defaults to blocksize)
:param overview_resampling: Use this resampling when computing overviews
:param overview_levels: List of shrink factors to compute overiews for: [2,4,8,16,32],
to disable overviews supply empty list ``[]``
:param nodata: Set ``nodata`` flag to this value if supplied, by default ``nodata`` is
read from the attributes of the input array (``geo_im.attrs['nodata']``).
:param use_windowed_writes: Write image block by block (might need this for large images)
:param intermediate_compression: Configure compression settings for first pass write, default is no compression
:param extra_rio_opts: Any other option is passed to ``rasterio.open``
:returns: Path to which output was written
:returns: Bytes if ``fname=":mem:"``
:returns: ``dask.Delayed`` object if input is a Dask array
.. note ::
**memory requirements**
This function generates a temporary in memory tiff file without
compression to speed things up. It then adds overviews to this file and
only then copies it to the final destination with requested compression
settings. This is necessary to produce a compliant COG, since the COG standard
demands overviews to be placed before native resolution data and double
pass is the only way to achieve this currently.
This means that this function will use about 1.5 to 2 times memory taken by ``geo_im``.
"""
pix = geo_im.data
geobox = getattr(geo_im, "geobox", None)
nodata = extra_rio_opts.pop("nodata", None)
if nodata is None:
nodata = geo_im.attrs.get("nodata", None)
if geobox is None:
raise ValueError("Need geo-registered array on input")
if dask.is_dask_collection(pix):
real_op = (
_delayed_write_cog_to_mem
if fname == ":mem:"
else _delayed_write_cog_to_file
)
else:
real_op = _write_cog
return real_op(
pix,
geobox,
fname,
nodata=nodata,
overwrite=overwrite,
blocksize=blocksize,
ovr_blocksize=ovr_blocksize,
overview_resampling=overview_resampling,
overview_levels=overview_levels,
use_windowed_writes=use_windowed_writes,
intermediate_compression=intermediate_compression,
**extra_rio_opts
)
[docs]def to_cog(
geo_im: xr.DataArray,
blocksize: Optional[int] = None,
ovr_blocksize: Optional[int] = None,
overview_resampling: Optional[str] = None,
overview_levels: Optional[List[int]] = None,
use_windowed_writes: bool = False,
intermediate_compression: Union[bool, str, Dict[str, Any]] = False,
**extra_rio_opts
) -> Union[bytes, Delayed]:
"""
Compress ``xarray.DataArray`` into Cloud Optimized GeoTiff bytes in memory.
This function doesn't write to disk, it compresses in RAM, which is useful
for saving data to S3 or other cloud object stores.
This function is "Dask aware". If ``geo_im`` is a Dask array, then the
output of this function is also a Dask Delayed object. This allows us to
compress multiple images concurrently across a Dask cluster. If you are not
familiar with Dask this can be confusing, as no operation is performed until the
``.compute()`` method is called, so if you call this function with Dask
array it will return immediately without compressing any data.
:param geo_im: ``xarray.DataArray`` with crs
:param blocksize: Size of internal tiff tiles (512x512 pixels)
:param ovr_blocksize: Size of internal tiles in overview images (defaults to blocksize)
:param overview_resampling: Use this resampling when computing overviews
:param overview_levels: List of shrink factors to compute overiews for: [2,4,8,16,32]
:param nodata: Set ``nodata`` flag to this value if supplied, by default ``nodata`` is
read from the attributes of the input array (``geo_im.attrs['nodata']``).
:param use_windowed_writes: Write image block by block (might need this for large images)
:param intermediate_compression: Configure compression settings for first pass write, default is no compression
:param extra_rio_opts: Any other option is passed to ``rasterio.open``
:returns: In-memory GeoTiff file as bytes
:returns: ``dask.Delayed`` object if input is a Dask array
Also see :py:meth:`~datacube.utils.cog.write_cog`
"""
bb = write_cog(
geo_im,
":mem:",
blocksize=blocksize,
ovr_blocksize=ovr_blocksize,
overview_resampling=overview_resampling,
overview_levels=overview_levels,
use_windowed_writes=use_windowed_writes,
intermediate_compression=intermediate_compression,
**extra_rio_opts
)
assert isinstance(
bb, (bytes, Delayed)
) # for mypy sake for :mem: output it bytes or delayed bytes
return bb