Source code for datacube.api.query

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Storage Query and Access API module
"""


import collections
import datetime
import logging
import math
import warnings

import numpy as np
import pandas
from odc.geo import Geometry
from odc.geo.geom import lonlat_bounds, mid_longitude
from pandas import to_datetime as pandas_to_datetime
from typing_extensions import override

from ..index import extract_geom_from_query, strip_all_spatial_fields_from_query
from ..model import Dataset, QueryField, Range
from ..utils.dates import normalise_dt, tz_aware

_LOG: logging.Logger = logging.getLogger(__name__)


[docs] class GroupBy:
[docs] def __init__(self, group_by_func, dimension, units, sort_key=None, group_key=None) -> None: """ GroupBy Object :param group_by_func: Dataset -> group identifier :param dimension: dimension of the group key :param units: units of the group key :param sort_key: how to sort datasets in a group internally :param group_key: the coordinate value for a group list[Dataset] -> coord value """ self.group_by_func = group_by_func self.dimension = dimension self.units = units if sort_key is None: sort_key = group_by_func self.sort_key = sort_key if group_key is None: group_key = lambda datasets: group_by_func(datasets[0]) # noqa: E731 self.group_key = group_key
OTHER_KEYS = ('measurements', 'group_by', 'output_crs', 'resolution', 'set_nan', 'product', 'geopolygon', 'like', 'source_filter')
[docs] class Query: def __init__(self, index=None, product=None, geopolygon=None, like=None, **search_terms) -> None: """Parses search terms in preparation for querying the Data Cube Index. Create a :class:`Query` object by passing it a set of search terms as keyword arguments. >>> query = Query(product='ls5_nbar_albers', time=('2001-01-01', '2002-01-01')) Use by accessing :attr:`search_terms`: >>> query.search_terms['time'] # doctest: +NORMALIZE_WHITESPACE Range(begin=datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzutc()), \ end=datetime.datetime(2002, 1, 1, 23, 59, 59, 999999, tzinfo=tzutc())) By passing in an ``index``, the search parameters will be validated as existing on the ``product``, and a spatial search appropriate for the index driver can be extracted. Used by :meth:`datacube.Datacube.find_datasets` and :meth:`datacube.Datacube.load`. :param datacube.index.Index index: An optional `index` object, if checking of field names is desired. :param str product: name of product :type geopolygon: the spatial boundaries of the search, can be: odc.geo.geom.Geometry: A Geometry object Any string or JsonLike object that can be converted to a Geometry object. An iterable of either of the above; or None: no geopolygon defined (may be derived from like or lat/lon/x/y/crs search terms) :param xarray.Dataset like: spatio-temporal bounds of `like` are used for the search :param search_terms: * `measurements` - list of measurements to retrieve * `latitude`, `lat`, `y`, `longitude`, `lon`, `long`, `x` - tuples (min, max) bounding spatial dimensions * 'extra_dimension_name' (e.g. `z`) - tuples (min, max) bounding extra \ dimensions specified by name for 3D datasets. E.g. z=(10, 30). * `crs` - spatial coordinate reference system to interpret the spatial bounds * `group_by` - observation grouping method. One of `time`, `solar_day`. Default is `time` """ self.index = index self.product = product self.geopolygon = extract_geom_from_query(geopolygon=geopolygon, **search_terms) if 'source_filter' in search_terms and search_terms['source_filter'] is not None: self.source_filter: Query | None = Query(**search_terms['source_filter']) else: self.source_filter = None search_terms = strip_all_spatial_fields_from_query(search_terms) remaining_keys = set(search_terms.keys()) - set(OTHER_KEYS) if self.index: # Retrieve known keys for extra dimensions known_dim_keys = set() if product is not None: datacube_products = index.products.search(product=product) else: datacube_products = index.products.get_all() for datacube_product in datacube_products: known_dim_keys.update(datacube_product.extra_dimensions.dims.keys()) remaining_keys -= known_dim_keys unknown_keys = remaining_keys - set(index.products.get_field_names()) # TODO: What about keys source filters, and what if the keys don't match up with this product... if unknown_keys: raise LookupError('Unknown arguments: ', unknown_keys) self.search = {} for key in remaining_keys: self.search.update(_values_to_search(**{key: search_terms[key]})) if like: assert self.geopolygon is None, "'like' with other spatial bounding parameters is not supported" self.geopolygon = getattr(like, 'extent', self.geopolygon) if 'time' not in self.search: time_coord = like.coords.get('time') if time_coord is not None: self.search['time'] = _time_to_search_dims( # convert from np.datetime64 to datetime.datetime (pandas_to_datetime(time_coord.values[0]).to_pydatetime(), pandas_to_datetime(time_coord.values[-1]).to_pydatetime()) ) @property def search_terms(self) -> dict: """ Access the search terms as a dictionary. :type: dict """ kwargs = {} kwargs.update(self.search) if self.geopolygon: if self.index and self.index.supports_spatial_indexes: kwargs['geopolygon'] = self.geopolygon else: geo_bb = lonlat_bounds(self.geopolygon, resolution="auto") if math.isclose(geo_bb.bottom, geo_bb.top, abs_tol=1e-5): kwargs['lat'] = geo_bb.bottom else: kwargs['lat'] = Range(geo_bb.bottom, geo_bb.top) if math.isclose(geo_bb.left, geo_bb.right, abs_tol=1e-5): kwargs['lon'] = geo_bb.left else: kwargs['lon'] = Range(geo_bb.left, geo_bb.right) if self.product: kwargs['product'] = self.product if self.source_filter: kwargs['source_filter'] = self.source_filter.search_terms return kwargs @override def __repr__(self) -> str: return self.__str__() @override def __str__(self) -> str: return f"""Datacube Query: type = {self.product} search = {self.search} geopolygon = {self.geopolygon} """
def _extract_time_from_ds(ds: Dataset) -> datetime.datetime: return normalise_dt(ds.center_time)
[docs] def query_group_by(group_by: str | GroupBy | None = 'time', **kwargs: QueryField) -> GroupBy: """ Group by function for loading datasets :param group_by: group_by name, supported strings are - `time` (default) - `solar_day`, see :func:`datacube.api.query.solar_day` or pass a :class:`datacube.api.query.GroupBy` object. :return: :class:`datacube.api.query.GroupBy` :raises LookupError: when group_by string is not a valid dictionary key. """ if isinstance(group_by, GroupBy): return group_by if not isinstance(group_by, str): group_by = None time_grouper = GroupBy(group_by_func=_extract_time_from_ds, dimension='time', units='seconds since 1970-01-01 00:00:00') solar_day_grouper = GroupBy(group_by_func=solar_day, dimension='time', units='seconds since 1970-01-01 00:00:00', sort_key=_extract_time_from_ds, group_key=lambda datasets: _extract_time_from_ds(datasets[0])) group_by_map: dict[str | None, GroupBy] = { None: time_grouper, 'time': time_grouper, 'solar_day': solar_day_grouper } try: return group_by_map[group_by] except KeyError: raise LookupError( f'No group by function for {group_by}, valid options are: {group_by_map.keys()}', ) from None
def _value_to_range(value: str | int | float | list[str | int | float]) -> tuple[float, float]: if isinstance(value, str | float | int): value = float(value) return value, value else: return float(value[0]), float(value[-1]) def _values_to_search(**kwargs) -> dict: search = {} for key, value in kwargs.items(): if key.lower() in ('time', 't'): search['time'] = _time_to_search_dims(value) elif key not in ['latitude', 'lat', 'y'] + ['longitude', 'lon', 'x']: # If it's not a string, but is a sequence of length 2, then it's a Range if ( not isinstance(value, str) and isinstance(value, collections.abc.Sequence) and len(value) == 2 ): search[key] = Range(*value) # All other cases are default else: search[key] = value return search def _time_to_search_dims(time_range): with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) tr_start, tr_end = time_range, time_range if hasattr(time_range, '__iter__') and not isinstance(time_range, str): tmp = list(time_range) if len(tmp) > 2: raise ValueError("Please supply start and end date only for time query") tr_start, tr_end = tmp[0], tmp[-1] if isinstance(tr_start, int | float) or isinstance(tr_end, int | float): raise TypeError("Time dimension must be provided as a datetime or a string") if tr_start is None: start = datetime.datetime.fromtimestamp(0) elif not isinstance(tr_start, datetime.datetime): # convert to datetime.datetime if hasattr(tr_start, 'isoformat'): tr_start = tr_start.isoformat() start = pandas_to_datetime(tr_start).to_pydatetime() else: start = tr_start if tr_end is None: tr_end = datetime.datetime.now().strftime("%Y-%m-%d") # Attempt conversion to isoformat # allows pandas.Period to handle datetime objects if hasattr(tr_end, 'isoformat'): tr_end = tr_end.isoformat() # get end of period to ensure range is inclusive end = pandas.Period(tr_end).end_time.to_pydatetime() tr = Range(tz_aware(start), tz_aware(end)) if start == end: return tr[0] return tr def _convert_to_solar_time(utc, longitude: float) -> datetime.datetime: seconds_per_degree = 240 offset_seconds = int(longitude * seconds_per_degree) offset = datetime.timedelta(seconds=offset_seconds) return utc + offset def _ds_mid_longitude(dataset: Dataset) -> float | None: m = dataset.metadata if hasattr(m, 'lon'): lon = m.lon return (lon.begin + lon.end)*0.5 return None
[docs] def solar_day(dataset: Dataset, longitude: float | None = None) -> np.datetime64: """ Adjust Dataset timestamp for "local time" given location and convert to numpy. :param dataset: Dataset object from which to read time and location :param longitude: If supplied correct timestamp for this longitude, rather than mid-point of the Dataset's footprint """ utc = dataset.center_time.astimezone(datetime.timezone.utc) if longitude is None: _lon = _ds_mid_longitude(dataset) if _lon is None: raise ValueError('Cannot compute solar_day: dataset is missing spatial info') longitude = _lon solar_time = _convert_to_solar_time(utc, longitude) return np.datetime64(solar_time.date(), 'D')
def solar_offset(geom: Geometry | Dataset, precision: str = 'h') -> datetime.timedelta: """ Given a geometry or a Dataset compute offset to add to UTC timestamp to get solar day right. This only work when geometry is "local enough". :param geom: Geometry with defined CRS :param precision: one of ``'h'`` or ``'s'``, defaults to hour precision """ if isinstance(geom, Geometry): lon = mid_longitude(geom) else: _lon = _ds_mid_longitude(geom) if _lon is None: raise ValueError('Cannot compute solar offset, dataset is missing spatial info') lon = _lon if precision == 'h': return datetime.timedelta(hours=round(lon*24/360)) # 240 == (24*60*60)/360 (seconds of a day per degree of longitude) return datetime.timedelta(seconds=int(lon*240))