Skip to content

Commit

Permalink
add order_by to dataset search; new method to find product's most rec…
Browse files Browse the repository at this point in the history
…ently changed dataset
  • Loading branch information
Ariana Barzinpour committed Sep 11, 2024
1 parent 9faceaa commit 7e02a7c
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 33 deletions.
43 changes: 36 additions & 7 deletions datacube/drivers/postgis/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from sqlalchemy import delete, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql.expression import Select
from sqlalchemy import select, text, and_, or_, func
from sqlalchemy import select, text, and_, or_, func, column
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.exc import IntegrityError
from sqlalchemy.engine import Row
Expand Down Expand Up @@ -607,13 +607,14 @@ def geospatial_query(self, geom):
def search_datasets_query(self,
expressions, source_exprs=None,
select_fields=None, with_source_ids=False, limit=None, geom=None,
archived: bool | None = False):
archived: bool | None = False, order_by=None):
"""
:type expressions: Tuple[Expression]
:type source_exprs: Tuple[Expression]
:type select_fields: Iterable[PgField]
:type with_source_ids: bool
:type limit: int
:type order_by: Iterable[str | PgField | sqlalchemy.Function]
:type geom: Geometry
:rtype: sqlalchemy.Expression
"""
Expand All @@ -629,6 +630,18 @@ def search_datasets_query(self,
else:
select_columns = _dataset_select_fields()

def _ob_exprs(o):
if isinstance(o, str):
# does this need to be more robust?
return text(o)
elif isinstance(o, PgField):
return o.alchemy_expression
# if a func, leave as-is
return o

if order_by is not None:
order_by = [_ob_exprs(o) for o in order_by]

if geom:
SpatialIndex, spatialquery = self.geospatial_query(geom)
else:
Expand All @@ -652,21 +665,22 @@ def search_datasets_query(self,
if spatialquery is not None:
where_expr = and_(where_expr, spatialquery)
query = query.join(SpatialIndex)
query = query.where(where_expr).limit(limit)
query = query.where(where_expr).order_by(*order_by).limit(limit)
return query

def search_datasets(self, expressions,
source_exprs=None, select_fields=None,
with_source_ids=False, limit=None, geom=None,
archived: bool | None = False):
archived: bool | None = False, order_by=None):
"""
:type with_source_ids: bool
:type select_fields: tuple[datacube.drivers.postgis._fields.PgField]
:type expressions: tuple[datacube.drivers.postgis._fields.PgExpression]
"""
select_query = self.search_datasets_query(expressions, source_exprs,
select_fields, with_source_ids,
limit, geom=geom, archived=archived)
assert source_exprs is None
assert not with_source_ids
select_query = self.search_datasets_query(expressions, select_fields=select_fields, limit=limit,
geom=geom, archived=archived, order_by=order_by)
_LOG.debug("search_datasets SQL: %s", str(select_query))
return self._connection.execute(select_query)

Expand Down Expand Up @@ -1504,3 +1518,18 @@ def temporal_extent_full(self) -> Select:
return select(
func.min(time_min.alchemy_expression), func.max(time_max.alchemy_expression)
)

def find_most_recent_change(self, product_id: int):
"""
Find the database-local time of the last dataset that changed for this product.
"""
return self._connection.execute(
select(
func.max(
func.greatest(
Dataset.added,
column("updated"),
)
)
).where(Dataset.product_ref == product_id)
).scalar()
10 changes: 5 additions & 5 deletions datacube/drivers/postgis/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,20 @@ def pg_exists(conn, name):
Does a postgres object exist?
:rtype bool
"""
return conn.execute("SELECT to_regclass(%s)", name).scalar() is not None
return conn.execute(text(f"SELECT to_regclass('{name}')")).scalar() is not None


def pg_column_exists(conn, table, column):
"""
Does a postgres object exist?
:rtype bool
"""
return conn.execute("""
return conn.execute(text(f"""
SELECT 1 FROM pg_attribute
WHERE attrelid = to_regclass(%s)
AND attname = %s
WHERE attrelid = to_regclass('{table}')
AND attname = '{column}'
AND NOT attisdropped
""", table, column).scalar() is not None
""")).scalar() is not None


def escape_pg_identifier(engine, name):
Expand Down
37 changes: 34 additions & 3 deletions datacube/drivers/postgres/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def raw_expr(expression):
@staticmethod
def search_datasets_query(expressions, source_exprs=None,
select_fields=None, with_source_ids=False, limit=None,
archived: bool | None = False):
archived: bool | None = False, order_by=None):
"""
:type expressions: tuple[Expression]
:type source_exprs: tuple[Expression]
Expand All @@ -537,6 +537,18 @@ def search_datasets_query(expressions, source_exprs=None,
else:
select_columns = _DATASET_SELECT_FIELDS

def _ob_exprs(o):
if isinstance(o, str):
# does this need to be more robust?
return text(o)
elif isinstance(o, PgField):
return o.alchemy_expression.asc()
# if a func, leave as-is
return o

if order_by is not None:
order_by = [_ob_exprs(o) for o in order_by]

if with_source_ids:
# Include the IDs of source datasets
select_columns += (
Expand Down Expand Up @@ -571,6 +583,8 @@ def search_datasets_query(expressions, source_exprs=None,
from_expression
).where(
where_expr
).order_by(
*order_by
).limit(
limit
)
Expand Down Expand Up @@ -626,6 +640,8 @@ def search_datasets_query(expressions, source_exprs=None,
recursive_query.join(DATASET, DATASET.c.id == recursive_query.c.source_dataset_ref)
).where(
where_expr
).order_by(
*order_by
).limit(
limit
)
Expand All @@ -634,15 +650,15 @@ def search_datasets_query(expressions, source_exprs=None,
def search_datasets(self, expressions,
source_exprs=None, select_fields=None,
with_source_ids=False, limit=None,
archived: bool | None = False):
archived: bool | None = False, order_by=None):
"""
:type with_source_ids: bool
:type select_fields: tuple[datacube.drivers.postgres._fields.PgField]
:type expressions: tuple[datacube.drivers.postgres._fields.PgExpression]
"""
select_query = self.search_datasets_query(expressions, source_exprs,
select_fields, with_source_ids, limit,
archived=archived)
archived=archived, order_by=order_by)
return self._connection.execute(select_query)

def bulk_simple_dataset_search(self, products=None, batch_size=0):
Expand Down Expand Up @@ -1271,3 +1287,18 @@ def grant_role(self, role: str, users: Iterable[str]) -> None:
raise ValueError('Unknown user %r' % user)

_core.grant_role(self._connection, pg_role, users)

def find_most_recent_change(self, product_id: int):
"""
Find the database-local time of the last dataset that changed for this product.
"""
return self._connection.execute(
select(
func.max(
func.greatest(
DATASET.c.added,
column("updated"),
)
)
).where(DATASET.c.dataset_type_ref == product_id)
).scalar()
21 changes: 13 additions & 8 deletions datacube/index/postgis/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from datacube.utils.changes import get_doc_changes, Offset
from odc.geo import CRS, Geometry
from datacube.index import fields, extract_geom_from_query, strip_all_spatial_fields_from_query
from sqlalchemy.sql.functions import Function

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -645,19 +646,21 @@ def search_by_metadata(self, metadata: JsonDict, archived: bool | None = False):
}
}
)
def search(self, limit=None, archived: bool | None = False, **query):
def search(self, limit=None, archived: bool | None = False, order_by=None, **query):
"""
Perform a search, returning results as Dataset objects.
:param Union[str,float,Range,list] query:
:param int limit: Limit number of datasets
:param Iterable[str|Field|Function] order_by:
:rtype: __generator[Dataset]
"""
source_filter = query.pop('source_filter', None)
for product, datasets in self._do_search_by_product(query,
source_filter=source_filter,
limit=limit,
archived=archived):
archived=archived,
order_by=order_by):
yield from self._make_many(datasets, product)

def search_by_product(self, archived: bool | None = False, **query):
Expand All @@ -675,7 +678,7 @@ def search_returning(self,
custom_offsets: Mapping[str, Offset] | None = None,
limit: int | None = None,
archived: bool | None = False,
order_by: str | Field | None = None,
order_by: Iterable[str | Field | Function] | None = None,
**query: QueryField):
"""
Perform a search, returning only the specified fields.
Expand All @@ -687,10 +690,10 @@ def search_returning(self,
:param tuple[str] field_names:
:param Union[str,float,Range,list] query:
:param int limit: Limit number of datasets
:param Iterable[str|Field|Function] order_by: sql text, dataset field, or sqlalchemy function
by which to order results
:returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields
"""
if order_by:
raise ValueError("order_by argument is not yet supported by the postgis index driver.")
field_name_d: dict[str, None] = {}
if field_names is None and custom_offsets is None:
for f in self._index.products.get_field_names():
Expand All @@ -715,7 +718,8 @@ def search_returning(self,
select_field_names=list(field_name_d.keys()),
additional_fields=custom_fields,
limit=limit,
archived=archived):
archived=archived,
order_by=order_by):
for columns in results:
coldict = columns._asdict()

Expand Down Expand Up @@ -785,7 +789,7 @@ def _do_search_by_product(self, query, return_fields=False,
additional_fields: Mapping[str, Field] | None = None,
select_field_names=None,
with_source_ids=False, source_filter=None, limit=None,
archived: bool | None = False):
archived: bool | None = False, order_by=None):
assert not with_source_ids
assert source_filter is None
product_queries = list(self._get_product_queries(query))
Expand Down Expand Up @@ -825,7 +829,8 @@ def _do_search_by_product(self, query, return_fields=False,
limit=limit,
with_source_ids=with_source_ids,
geom=geom,
archived=archived
archived=archived,
order_by=order_by
))

def _do_count_by_product(self, query, archived: bool | None = False):
Expand Down
10 changes: 9 additions & 1 deletion datacube/index/postgis/_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,17 @@ def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, da

return result

def spatial_extent(self, product: str | Product, crs: CRS = CRS("EPSG:4326")) -> Geometry | None:
def spatial_extent(self, product: int | str | Product, crs: CRS = CRS("EPSG:4326")) -> Geometry | None:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
ids = [ds.id for ds in self._index.datasets.search(product=product.name)]
with self._db_connection() as connection:
return connection.spatial_extent(ids, crs)

def most_recent_change(self, product: str | Product) -> datetime.datetime:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
assert isinstance(product, Product)
assert product.id is not None
with self._db_connection() as connection:
return connection.find_most_recent_change(product.id)
21 changes: 13 additions & 8 deletions datacube/index/postgres/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from datacube.index import fields
from datacube.drivers.postgres._api import split_uri
from datacube.migration import ODC2DeprecationWarning
from sqlalchemy.sql.functions import Function

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -614,7 +615,7 @@ def search_by_metadata(self, metadata, archived: bool | None = False):
}
}
)
def search(self, limit=None, source_filter=None, archived: bool | None = False, **query):
def search(self, limit=None, source_filter=None, archived: bool | None = False, order_by=None, **query):
"""
Perform a search, returning results as Dataset objects.
Expand All @@ -626,7 +627,8 @@ def search(self, limit=None, source_filter=None, archived: bool | None = False,
for product, datasets in self._do_search_by_product(query,
source_filter=source_filter,
limit=limit,
archived=archived):
archived=archived,
order_by=order_by):
yield from self._make_many(datasets, product)

def search_by_product(self, archived: bool | None = False, **query):
Expand All @@ -644,7 +646,7 @@ def search_returning(self,
custom_offsets: Mapping[str, Offset] | None = None,
limit=None,
archived: bool | None = False,
order_by: str | Field | None = None,
order_by: Iterable[str | Field | Function] | None = None,
**query):
"""
Perform a search, returning only the specified fields.
Expand All @@ -656,10 +658,10 @@ def search_returning(self,
:param tuple[str] field_names: defaults to all known search fields
:param Union[str,float,Range,list] query:
:param int limit: Limit number of datasets
:param Iterable[str|Field|Function] order_by: sql text, dataset field, or sqlalchemy function
by which to order results
:returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields
"""
if order_by:
raise ValueError("order_by argument is not currently supported by the postgres index driver.")
field_name_d: dict[str, None] = {}
if field_names is None and custom_offsets is None:
for f in self._index.products.get_field_names():
Expand All @@ -686,7 +688,8 @@ def search_returning(self,
select_field_names=list(field_name_d.keys()),
additional_fields=custom_fields,
limit=limit,
archived=archived):
archived=archived,
order_by=order_by):
for columns in p_results:
coldict = columns._asdict()

Expand Down Expand Up @@ -769,7 +772,8 @@ def _do_search_by_product(self, query, return_fields=False,
select_field_names=None,
with_source_ids=False, source_filter=None,
limit=None,
archived: bool | None = False):
archived: bool | None = False,
order_by=None):
if "geopolygon" in query:
raise NotImplementedError("Spatial search API not supported by this index.")
if source_filter:
Expand Down Expand Up @@ -818,7 +822,8 @@ def _do_search_by_product(self, query, return_fields=False,
select_fields=select_fields,
limit=limit,
with_source_ids=with_source_ids,
archived=archived
archived=archived,
order_by=order_by
))

def _do_count_by_product(self, query, archived: bool | None = False):
Expand Down
8 changes: 8 additions & 0 deletions datacube/index/postgres/_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,11 @@ def temporal_extent(self, product: str | Product) -> tuple[datetime.datetime, da
assert product.id is not None
with self._db_connection() as connection:
return connection.temporal_extent_by_product(product.id, min_offset, max_offset)

def most_recent_change(self, product: str | Product) -> datetime.datetime:
if isinstance(product, str):
product = self._index.products.get_by_name_unsafe(product)
assert isinstance(product, Product)
assert product.id is not None
with self._db_connection() as connection:
return connection.find_most_recent_change(product.id)
Loading

0 comments on commit 7e02a7c

Please sign in to comment.