Source code for datacube.utils.masking

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Tools for masking data based on a bit-mask variable with attached definition.

The main functions are `make_mask(variable)` `describe_flags(variable)`
"""

import collections

import pandas
import xarray
from xarray import DataArray, Dataset

from datacube.utils.math import valid_mask

FLAGS_ATTR_NAME = 'flags_definition'


def list_flag_names(variable):
    """
    Returns the available masking flags for the variable

    :param variable: Masking xarray.Dataset or xarray.DataArray
    :return: list
    """
    flags_def = get_flags_def(variable)
    return sorted(flags_def.keys())


[docs] def describe_variable_flags(variable, with_pandas: bool = True): """ Returns either a Pandas Dataframe (with_pandas=True - default) or a string (with_pandas=False) describing the available flags for a masking variable Interprets the `flags_definition` attribute on the provided variable and returns a Pandas Dataframe or string like:: Bits are listed from the MSB (bit 13) to the LSB (bit 0) Bit Value Flag Name Description 13 0 cloud_shadow_fmask Cloud Shadow (Fmask) 12 0 cloud_shadow_acca Cloud Shadow (ACCA) 11 0 cloud_fmask Cloud (Fmask) 10 0 cloud_acca Cloud (ACCA) :param variable: Masking xarray.Dataset or xarray.DataArray :return: Pandas Dataframe or str """ flags_def = get_flags_def(variable) if not with_pandas: return describe_flags_def(flags_def) return pandas.DataFrame.from_dict(flags_def, orient='index')
def describe_flags_def(flags_def) -> str: return '\n'.join(generate_table(list(_table_contents(flags_def)))) def _table_contents(flags_def): yield 'Flag name', 'Description', 'Bit. No', 'Value', 'Meaning' for name, defn in sorted(flags_def.items(), key=_order_bitdefs_by_bits): name, desc = name, defn['description'] for value, meaning in defn['values'].items(): yield name, desc, str(defn['bits']), str(value), str(meaning) name, desc = '', '' def _order_bitdefs_by_bits(bitdef): name, defn = bitdef try: return min(defn['bits']) except TypeError: return defn['bits']
[docs] def make_mask(variable, **flags): """ Returns a mask array, based on provided flags When multiple flags are provided, they will be combined in a logical AND fashion. For example: >>> make_mask(pqa, cloud_acca=False, cloud_fmask=False, land_obs=True) # doctest: +SKIP OR >>> make_mask(pqa, **GOOD_PIXEL_FLAGS) # doctest: +SKIP where `GOOD_PIXEL_FLAGS` is a dict of flag_name to True/False :param variable: :type variable: xarray.Dataset or xarray.DataArray :param flags: list of boolean flags :return: boolean xarray.DataArray or xarray.Dataset """ flags_def = get_flags_def(variable) mask, mask_value = create_mask_value(flags_def, **flags) return variable & mask == mask_value
def valid_data_mask(data): """ Returns bool arrays where the data is not `nodata` :param Dataset or DataArray data: :return: Dataset or DataArray """ if isinstance(data, Dataset): return data.map(valid_data_mask) if not isinstance(data, DataArray): raise TypeError(f'valid_data_mask not supported for type {type(data)}') nodata = data.attrs.get('nodata', None) return xarray.apply_ufunc(valid_mask, data, nodata, dask='parallelized', output_dtypes=[bool])
[docs] def mask_invalid_data(data, keep_attrs: bool = True): """ Sets all `nodata` values to ``nan``. This will convert numeric data to type `float`. :param Dataset or DataArray data: :param bool keep_attrs: If the attributes of the data should be included in the returned . :return: Dataset or DataArray """ if isinstance(data, Dataset): # Pass keep_attrs as a positional arg to the DataArray func return data.map(mask_invalid_data, keep_attrs=keep_attrs, args=(keep_attrs,)) if isinstance(data, DataArray): if 'nodata' not in data.attrs: return data out_data_array = data.where(data != data.nodata) if keep_attrs: out_data_array.attrs = {key: value for key, value in data.attrs.items() if key != 'nodata'} return out_data_array raise TypeError(f'mask_invalid_data not supported for type {type(data)}')
def create_mask_value(bits_def, **flags) -> tuple[int, int]: mask = 0 value = 0 for flag_name, flag_ref in flags.items(): defn = bits_def.get(flag_name, None) if defn is None: raise ValueError(f'Unknown flag: "{flag_name}"') try: [flag_value] = (bit_val for bit_val, val_ref in defn['values'].items() if val_ref == flag_ref) flag_value = int(flag_value) # Might be string if coming from DB except ValueError: raise ValueError(f'Unknown value {flag_ref} specified for flag {flag_name}') from None if isinstance(defn['bits'], collections.abc.Iterable): # Multi-bit flag # Set mask for bit in defn['bits']: mask = set_value_at_index(mask, bit, True) shift = min(defn['bits']) real_val = flag_value << shift value |= real_val else: bit = defn['bits'] mask = set_value_at_index(mask, bit, True) value = set_value_at_index(value, bit, flag_value) return mask, value def mask_to_dict(bits_def, mask_value): """ Describes which flags are set for a mask value :param bits_def: :param mask_value: :return: Mapping of flag_name -> set_value :rtype: dict """ return_dict = {} for flag_name, flag_defn in bits_def.items(): # Make bits a list, even if there is only one flag_bits = flag_defn['bits'] if not isinstance(flag_defn['bits'], list): flag_bits = [flag_bits] # The amount to shift flag_value to line up with mask_value flag_shift = min(flag_bits) # Mask our mask_value, we are only interested in the bits for this flag flag_mask = 0 for i in flag_bits: flag_mask |= (1 << i) masked_mask_value = mask_value & flag_mask for flag_value, value in flag_defn['values'].items(): shifted_value = int(flag_value) << flag_shift if shifted_value == masked_mask_value: assert flag_name not in return_dict return_dict[flag_name] = value return return_dict def get_flags_def(variable): flags = getattr(variable, FLAGS_ATTR_NAME, None) if flags is not None: return flags data_vars = getattr(variable, 'data_vars', None) if data_vars is not None: # Maybe we have a DataSet, not a DataArray for var in data_vars.values(): flags = getattr(var, FLAGS_ATTR_NAME, None) if flags is not None: return flags raise ValueError('No masking variable found') def set_value_at_index(bitmask, index, value): """ Set a bit value onto an integer bitmask eg. set bits 2 and 4 to True >>> mask = 0 >>> mask = set_value_at_index(mask, 2, True) >>> mask = set_value_at_index(mask, 4, True) >>> print(bin(mask)) 0b10100 >>> mask = set_value_at_index(mask, 2, False) >>> print(bin(mask)) 0b10000 :param bitmask: existing int bitmask to alter :type bitmask: int :type index: int :type value: bool """ bit_val = 2 ** index if value: bitmask |= bit_val else: bitmask &= (~bit_val) return bitmask def generate_table(rows): """ Yield strings to print a table using the data in `rows`. TODO: Maybe replace with Pandas :param rows: A sequence of sequences with the 0th element being the table header """ # - figure out column widths widths = [len(max(columns, key=len)) for columns in zip(*rows)] # - print the header header, data = rows[0], rows[1:] yield ( ' | '.join(format(title, f"{width}s") for width, title in zip(widths, header)) ) # Print the separator first_col = '' # - print the data for row in data: if first_col == '' and row[0] != '': # - print the separator yield '-+-'.join('-' * width for width in widths) first_col = row[0] yield ( " | ".join(format(cdata, f"{width}s") for width, cdata in zip(widths, row)) )