diff --git a/datacube/drivers/postgis/_api.py b/datacube/drivers/postgis/_api.py index a59755662..db5204951 100644 --- a/datacube/drivers/postgis/_api.py +++ b/datacube/drivers/postgis/_api.py @@ -21,7 +21,7 @@ from sqlalchemy import delete, update from sqlalchemy.dialects.postgresql import insert from sqlalchemy.sql.expression import Select -from sqlalchemy import select, text, and_, or_, func +from sqlalchemy import select, text, and_, or_, func, column from sqlalchemy.dialects.postgresql import INTERVAL from sqlalchemy.exc import IntegrityError from sqlalchemy.engine import Row @@ -607,13 +607,14 @@ def geospatial_query(self, geom): def search_datasets_query(self, expressions, source_exprs=None, select_fields=None, with_source_ids=False, limit=None, geom=None, - archived: bool | None = False): + archived: bool | None = False, order_by=None): """ :type expressions: Tuple[Expression] :type source_exprs: Tuple[Expression] :type select_fields: Iterable[PgField] :type with_source_ids: bool :type limit: int + :type order_by: Iterable[str | PgField | sqlalchemy.Function] :type geom: Geometry :rtype: sqlalchemy.Expression """ @@ -629,6 +630,18 @@ def search_datasets_query(self, else: select_columns = _dataset_select_fields() + def _ob_exprs(o): + if isinstance(o, str): + # does this need to be more robust? + return text(o) + elif isinstance(o, PgField): + return o.alchemy_expression + # if a func, leave as-is + return o + + if order_by is not None: + order_by = [_ob_exprs(o) for o in order_by] + if geom: SpatialIndex, spatialquery = self.geospatial_query(geom) else: @@ -652,21 +665,22 @@ def search_datasets_query(self, if spatialquery is not None: where_expr = and_(where_expr, spatialquery) query = query.join(SpatialIndex) - query = query.where(where_expr).limit(limit) + query = query.where(where_expr).order_by(*order_by).limit(limit) return query def search_datasets(self, expressions, source_exprs=None, select_fields=None, with_source_ids=False, limit=None, geom=None, - archived: bool | None = False): + archived: bool | None = False, order_by=None): """ :type with_source_ids: bool :type select_fields: tuple[datacube.drivers.postgis._fields.PgField] :type expressions: tuple[datacube.drivers.postgis._fields.PgExpression] """ - select_query = self.search_datasets_query(expressions, source_exprs, - select_fields, with_source_ids, - limit, geom=geom, archived=archived) + assert source_exprs is None + assert not with_source_ids + select_query = self.search_datasets_query(expressions, select_fields=select_fields, limit=limit, + geom=geom, archived=archived, order_by=order_by) _LOG.debug("search_datasets SQL: %s", str(select_query)) return self._connection.execute(select_query) @@ -1504,3 +1518,18 @@ def temporal_extent_full(self) -> Select: return select( func.min(time_min.alchemy_expression), func.max(time_max.alchemy_expression) ) + + def find_most_recent_change(self, product_id: int): + """ + Find the database-local time of the last dataset that changed for this product. + """ + return self._connection.execute( + select( + func.max( + func.greatest( + Dataset.added, + column("updated"), + ) + ) + ).where(Dataset.product_ref == product_id) + ).scalar() diff --git a/datacube/drivers/postgis/sql.py b/datacube/drivers/postgis/sql.py index 396393a9f..65c7e3816 100644 --- a/datacube/drivers/postgis/sql.py +++ b/datacube/drivers/postgis/sql.py @@ -128,7 +128,7 @@ def pg_exists(conn, name): Does a postgres object exist? :rtype bool """ - return conn.execute("SELECT to_regclass(%s)", name).scalar() is not None + return conn.execute(text(f"SELECT to_regclass('{name}')")).scalar() is not None def pg_column_exists(conn, table, column): @@ -136,12 +136,12 @@ def pg_column_exists(conn, table, column): Does a postgres object exist? :rtype bool """ - return conn.execute(""" + return conn.execute(text(f""" SELECT 1 FROM pg_attribute - WHERE attrelid = to_regclass(%s) - AND attname = %s + WHERE attrelid = to_regclass('{table}') + AND attname = '{column}' AND NOT attisdropped - """, table, column).scalar() is not None + """)).scalar() is not None def escape_pg_identifier(engine, name): diff --git a/datacube/drivers/postgres/_api.py b/datacube/drivers/postgres/_api.py index 2e8b6917b..6ed3c414d 100644 --- a/datacube/drivers/postgres/_api.py +++ b/datacube/drivers/postgres/_api.py @@ -517,7 +517,7 @@ def raw_expr(expression): @staticmethod def search_datasets_query(expressions, source_exprs=None, select_fields=None, with_source_ids=False, limit=None, - archived: bool | None = False): + archived: bool | None = False, order_by=None): """ :type expressions: tuple[Expression] :type source_exprs: tuple[Expression] @@ -537,6 +537,18 @@ def search_datasets_query(expressions, source_exprs=None, else: select_columns = _DATASET_SELECT_FIELDS + def _ob_exprs(o): + if isinstance(o, str): + # does this need to be more robust? + return text(o) + elif isinstance(o, PgField): + return o.alchemy_expression.asc() + # if a func, leave as-is + return o + + if order_by is not None: + order_by = [_ob_exprs(o) for o in order_by] + if with_source_ids: # Include the IDs of source datasets select_columns += ( @@ -571,6 +583,8 @@ def search_datasets_query(expressions, source_exprs=None, from_expression ).where( where_expr + ).order_by( + *order_by ).limit( limit ) @@ -626,6 +640,8 @@ def search_datasets_query(expressions, source_exprs=None, recursive_query.join(DATASET, DATASET.c.id == recursive_query.c.source_dataset_ref) ).where( where_expr + ).order_by( + *order_by ).limit( limit ) @@ -634,7 +650,7 @@ def search_datasets_query(expressions, source_exprs=None, def search_datasets(self, expressions, source_exprs=None, select_fields=None, with_source_ids=False, limit=None, - archived: bool | None = False): + archived: bool | None = False, order_by=None): """ :type with_source_ids: bool :type select_fields: tuple[datacube.drivers.postgres._fields.PgField] @@ -642,7 +658,7 @@ def search_datasets(self, expressions, """ select_query = self.search_datasets_query(expressions, source_exprs, select_fields, with_source_ids, limit, - archived=archived) + archived=archived, order_by=order_by) return self._connection.execute(select_query) def bulk_simple_dataset_search(self, products=None, batch_size=0): @@ -1271,3 +1287,18 @@ def grant_role(self, role: str, users: Iterable[str]) -> None: raise ValueError('Unknown user %r' % user) _core.grant_role(self._connection, pg_role, users) + + def find_most_recent_change(self, product_id: int): + """ + Find the database-local time of the last dataset that changed for this product. + """ + return self._connection.execute( + select( + func.max( + func.greatest( + DATASET.c.added, + column("updated"), + ) + ) + ).where(DATASET.c.dataset_type_ref == product_id) + ).scalar() diff --git a/datacube/index/postgis/_datasets.py b/datacube/index/postgis/_datasets.py index 0439ab763..02b2818d8 100755 --- a/datacube/index/postgis/_datasets.py +++ b/datacube/index/postgis/_datasets.py @@ -32,6 +32,7 @@ from datacube.utils.changes import get_doc_changes, Offset from odc.geo import CRS, Geometry from datacube.index import fields, extract_geom_from_query, strip_all_spatial_fields_from_query +from sqlalchemy.sql.functions import Function _LOG = logging.getLogger(__name__) @@ -645,19 +646,21 @@ def search_by_metadata(self, metadata: JsonDict, archived: bool | None = False): } } ) - def search(self, limit=None, archived: bool | None = False, **query): + def search(self, limit=None, archived: bool | None = False, order_by=None, **query): """ Perform a search, returning results as Dataset objects. :param Union[str,float,Range,list] query: :param int limit: Limit number of datasets + :param Iterable[str|Field|Function] order_by: :rtype: __generator[Dataset] """ source_filter = query.pop('source_filter', None) for product, datasets in self._do_search_by_product(query, source_filter=source_filter, limit=limit, - archived=archived): + archived=archived, + order_by=order_by): yield from self._make_many(datasets, product) def search_by_product(self, archived: bool | None = False, **query): @@ -675,7 +678,7 @@ def search_returning(self, custom_offsets: Mapping[str, Offset] | None = None, limit: int | None = None, archived: bool | None = False, - order_by: str | Field | None = None, + order_by: Iterable[str | Field | Function] | None = None, **query: QueryField): """ Perform a search, returning only the specified fields. @@ -687,10 +690,10 @@ def search_returning(self, :param tuple[str] field_names: :param Union[str,float,Range,list] query: :param int limit: Limit number of datasets + :param Iterable[str|Field|Function] order_by: sql text, dataset field, or sqlalchemy function + by which to order results :returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields """ - if order_by: - raise ValueError("order_by argument is not yet supported by the postgis index driver.") field_name_d: dict[str, None] = {} if field_names is None and custom_offsets is None: for f in self._index.products.get_field_names(): @@ -715,7 +718,8 @@ def search_returning(self, select_field_names=list(field_name_d.keys()), additional_fields=custom_fields, limit=limit, - archived=archived): + archived=archived, + order_by=order_by): for columns in results: coldict = columns._asdict() @@ -785,7 +789,7 @@ def _do_search_by_product(self, query, return_fields=False, additional_fields: Mapping[str, Field] | None = None, select_field_names=None, with_source_ids=False, source_filter=None, limit=None, - archived: bool | None = False): + archived: bool | None = False, order_by=None): assert not with_source_ids assert source_filter is None product_queries = list(self._get_product_queries(query)) @@ -825,7 +829,8 @@ def _do_search_by_product(self, query, return_fields=False, limit=limit, with_source_ids=with_source_ids, geom=geom, - archived=archived + archived=archived, + order_by=order_by )) def _do_count_by_product(self, query, archived: bool | None = False): diff --git a/datacube/index/postgis/_products.py b/datacube/index/postgis/_products.py index 52bd5972b..36a3e2bab 100644 --- a/datacube/index/postgis/_products.py +++ b/datacube/index/postgis/_products.py @@ -379,9 +379,17 @@ def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, da return result - def spatial_extent(self, product: str | Product, crs: CRS = CRS("EPSG:4326")) -> Geometry | None: + def spatial_extent(self, product: int | str | Product, crs: CRS = CRS("EPSG:4326")) -> Geometry | None: if isinstance(product, str): product = self._index.products.get_by_name_unsafe(product) ids = [ds.id for ds in self._index.datasets.search(product=product.name)] with self._db_connection() as connection: return connection.spatial_extent(ids, crs) + + def most_recent_change(self, product: str | Product) -> datetime.datetime: + if isinstance(product, str): + product = self._index.products.get_by_name_unsafe(product) + assert isinstance(product, Product) + assert product.id is not None + with self._db_connection() as connection: + return connection.find_most_recent_change(product.id) diff --git a/datacube/index/postgres/_datasets.py b/datacube/index/postgres/_datasets.py index adbd8e4fb..028599b0d 100755 --- a/datacube/index/postgres/_datasets.py +++ b/datacube/index/postgres/_datasets.py @@ -28,6 +28,7 @@ from datacube.index import fields from datacube.drivers.postgres._api import split_uri from datacube.migration import ODC2DeprecationWarning +from sqlalchemy.sql.functions import Function _LOG = logging.getLogger(__name__) @@ -614,7 +615,7 @@ def search_by_metadata(self, metadata, archived: bool | None = False): } } ) - def search(self, limit=None, source_filter=None, archived: bool | None = False, **query): + def search(self, limit=None, source_filter=None, archived: bool | None = False, order_by=None, **query): """ Perform a search, returning results as Dataset objects. @@ -626,7 +627,8 @@ def search(self, limit=None, source_filter=None, archived: bool | None = False, for product, datasets in self._do_search_by_product(query, source_filter=source_filter, limit=limit, - archived=archived): + archived=archived, + order_by=order_by): yield from self._make_many(datasets, product) def search_by_product(self, archived: bool | None = False, **query): @@ -644,7 +646,7 @@ def search_returning(self, custom_offsets: Mapping[str, Offset] | None = None, limit=None, archived: bool | None = False, - order_by: str | Field | None = None, + order_by: Iterable[str | Field | Function] | None = None, **query): """ Perform a search, returning only the specified fields. @@ -656,10 +658,10 @@ def search_returning(self, :param tuple[str] field_names: defaults to all known search fields :param Union[str,float,Range,list] query: :param int limit: Limit number of datasets + :param Iterable[str|Field|Function] order_by: sql text, dataset field, or sqlalchemy function + by which to order results :returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields """ - if order_by: - raise ValueError("order_by argument is not currently supported by the postgres index driver.") field_name_d: dict[str, None] = {} if field_names is None and custom_offsets is None: for f in self._index.products.get_field_names(): @@ -686,7 +688,8 @@ def search_returning(self, select_field_names=list(field_name_d.keys()), additional_fields=custom_fields, limit=limit, - archived=archived): + archived=archived, + order_by=order_by): for columns in p_results: coldict = columns._asdict() @@ -769,7 +772,8 @@ def _do_search_by_product(self, query, return_fields=False, select_field_names=None, with_source_ids=False, source_filter=None, limit=None, - archived: bool | None = False): + archived: bool | None = False, + order_by=None): if "geopolygon" in query: raise NotImplementedError("Spatial search API not supported by this index.") if source_filter: @@ -818,7 +822,8 @@ def _do_search_by_product(self, query, return_fields=False, select_fields=select_fields, limit=limit, with_source_ids=with_source_ids, - archived=archived + archived=archived, + order_by=order_by )) def _do_count_by_product(self, query, archived: bool | None = False): diff --git a/datacube/index/postgres/_products.py b/datacube/index/postgres/_products.py index 9faf7bf5d..24ca7e4b8 100644 --- a/datacube/index/postgres/_products.py +++ b/datacube/index/postgres/_products.py @@ -381,3 +381,11 @@ def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, da assert product.id is not None with self._db_connection() as connection: return connection.temporal_extent_by_product(product.id, min_offset, max_offset) + + def most_recent_change(self, product: str | Product) -> datetime.datetime: + if isinstance(product, str): + product = self._index.products.get_by_name_unsafe(product) + assert isinstance(product, Product) + assert product.id is not None + with self._db_connection() as connection: + return connection.find_most_recent_change(product.id) diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index 2bddd3c9b..be08450c1 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -347,7 +347,7 @@ def ls8_eo3_dataset4(index, extended_eo3_metadata_type, ls8_eo3_product, eo3_ls8 @pytest.fixture -def wo_eo3_dataset(index, wo_eo3_product, eo3_wo_dataset_doc, ls8_eo3_dataset): +def wo_eo3_dataset(index, wo_eo3_product, eo3_wo_dataset_doc): return doc_to_ds(index, wo_eo3_product.name, *eo3_wo_dataset_doc) diff --git a/integration_tests/index/test_search_eo3.py b/integration_tests/index/test_search_eo3.py index 9b371cf24..1c2ad392b 100644 --- a/integration_tests/index/test_search_eo3.py +++ b/integration_tests/index/test_search_eo3.py @@ -37,6 +37,15 @@ def test_search_by_metadata(index: Index, ls8_eo3_product, wo_eo3_product): assert len(lds) == 1 +def test_find_most_recent_change(index: Index, ls8_eo3_dataset, ls8_eo3_dataset2, ls8_eo3_dataset3): + product = ls8_eo3_dataset.product + dt = index.products.most_recent_change(product) + assert dt == ls8_eo3_dataset3.indexed_time + index.datasets.archive([ls8_eo3_dataset.id, ls8_eo3_dataset2.id]) + dt = index.products.most_recent_change(product.name) + assert dt == index.datasets.get(ls8_eo3_dataset2.id).archived_time + + def test_search_dataset_equals_eo3(index: Index, ls8_eo3_dataset: Dataset): datasets = list(index.datasets.search( platform='landsat-8' @@ -353,6 +362,30 @@ def test_search_archived_eo3(index, ls8_eo3_dataset, ls8_eo3_dataset2, wo_eo3_da assert len(datasets) == 2 +def test_search_order_by_eo3(index, ls8_eo3_dataset, ls8_eo3_dataset2, ls8_eo3_dataset3): + # provided as a string + datasets = list(index.datasets.search(order_by=['id'])) + assert len(datasets) == 3 + assert str(datasets[0].id) < str(datasets[1].id) + assert str(datasets[1].id) < str(datasets[2].id) + + # provided as a Field + prod = ls8_eo3_dataset.product + fields = prod.metadata_type.dataset_fields + index.datasets.archive([ls8_eo3_dataset3.id]) + datasets = list(index.datasets.search(order_by=[fields['id']])) + assert len(datasets) == 2 + assert str(datasets[0].id) < str(datasets[1].id) + + # ensure limit doesn't interfere with ordering + datasets = list(index.datasets.search(order_by=['id'], limit=1)) + assert datasets[0] == ls8_eo3_dataset2 + + datasets = list(index.datasets.search(order_by=[fields['id'].alchemy_expression.desc()])) + assert len(datasets) == 2 + assert str(datasets[0].id) > str(datasets[1].id) + + def test_search_or_expressions_eo3(index: Index, ls8_eo3_dataset: Dataset, ls8_eo3_dataset2: Dataset,