Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

feat: update dataset catalog with version #741

Merged
merged 18 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary'
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary'
echo "CODEQL_PYTHON=$(python -m poetry run which python)" >> $GITHUB_ENV

# Initializes the CodeQL tools for scanning.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary'
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary'

- name: Print CDK version
run: poetry run cdk --version
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/prod-upgrade-deploy-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary'
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary'

- name: Print CDK version
run: poetry run cdk --version
Expand Down Expand Up @@ -121,7 +121,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary' \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary' \
--remove-untracked

# deployment
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary'
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary'

- name: Check all commit messages in Pull Request
run: >
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install \
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog validation_summary'
--extras='cdk check_files_checksums check_stac_metadata content_iterator datasets dataset_versions import_dataset import_status populate_catalog update_dataset_catalog validation_summary'

- name: Print CDK version
run: poetry run cdk --version
Expand Down
2 changes: 2 additions & 0 deletions backend/api_keys.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
MESSAGE_KEY = "message"
STATUS_KEY = "status"
SUCCESS_KEY = "success"

EVENT_KEY = "event"
3 changes: 2 additions & 1 deletion backend/check_stac_metadata/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from botocore.response import StreamingBody # type: ignore[import]
from jsonschema import ValidationError, validate # type: ignore[import]

from ..api_keys import EVENT_KEY
from ..error_response_keys import ERROR_KEY, ERROR_MESSAGE_KEY
from ..log import set_up_logging
from ..models import DATASET_ID_PREFIX, DB_KEY_SEPARATOR, VERSION_ID_PREFIX
Expand All @@ -26,7 +27,7 @@ def s3_url_reader(url: str) -> StreamingBody:

def lambda_handler(event: JsonObject, _context: bytes) -> JsonObject:

LOGGER.debug(dumps({"event": event}))
LOGGER.debug(dumps({EVENT_KEY: event}))

# validate input
try:
Expand Down
12 changes: 6 additions & 6 deletions backend/check_stac_metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
from ..s3 import S3_URL_PREFIX
from ..stac_format import (
STAC_ASSETS_KEY,
STAC_CATALOG_TYPE,
STAC_COLLECTION_TYPE,
STAC_FILE_CHECKSUM_KEY,
STAC_HREF_KEY,
STAC_ITEM_TYPE,
STAC_LINKS_KEY,
STAC_TYPE_CATALOG,
STAC_TYPE_COLLECTION,
STAC_TYPE_ITEM,
STAC_TYPE_KEY,
)
from ..types import JsonObject
Expand All @@ -41,9 +41,9 @@
Type[STACItemSchemaValidator],
],
] = {
STAC_COLLECTION_TYPE: STACCollectionSchemaValidator,
STAC_CATALOG_TYPE: STACCatalogSchemaValidator,
STAC_ITEM_TYPE: STACItemSchemaValidator,
STAC_TYPE_COLLECTION: STACCollectionSchemaValidator,
STAC_TYPE_CATALOG: STACCatalogSchemaValidator,
STAC_TYPE_ITEM: STACItemSchemaValidator,
}

PROCESSING_ASSET_ASSET_KEY = "asset"
Expand Down
22 changes: 19 additions & 3 deletions backend/datasets/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
from ..parameter_store import ParameterName, get_param
from ..pystac_io_methods import write_method
from ..resources import ResourceName
from ..s3 import S3_URL_PREFIX
from ..sqs_message_attributes import (
DATA_TYPE_KEY,
DATA_TYPE_STRING,
MESSAGE_ATTRIBUTE_TYPE_KEY,
MESSAGE_ATTRIBUTE_TYPE_ROOT,
STRING_VALUE_KEY,
)
from ..stac_format import STAC_DESCRIPTION_KEY, STAC_ID_KEY, STAC_TITLE_KEY
from ..types import JsonObject

Expand Down Expand Up @@ -60,14 +68,22 @@ def create_dataset(body: JsonObject) -> JsonObject:
catalog_type=CatalogType.SELF_CONTAINED,
)
dataset_catalog.normalize_hrefs(
f"s3://{ResourceName.STORAGE_BUCKET_NAME.value}/{dataset.dataset_prefix}"
f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}/{dataset.dataset_prefix}"
)
dataset_catalog.save()

# add reference to root catalog
SQS_RESOURCE.get_queue_by_name(
QueueName=get_param(ParameterName.ROOT_CATALOG_MESSAGE_QUEUE_NAME)
).send_message(MessageBody=dataset.dataset_prefix)
QueueName=get_param(ParameterName.UPDATE_CATALOG_MESSAGE_QUEUE_NAME)
).send_message(
MessageBody=dataset.dataset_prefix,
MessageAttributes={
MESSAGE_ATTRIBUTE_TYPE_KEY: {
STRING_VALUE_KEY: MESSAGE_ATTRIBUTE_TYPE_ROOT,
DATA_TYPE_KEY: DATA_TYPE_STRING,
}
},
)

# return response
resp_body = dataset.as_dict()
Expand Down
3 changes: 1 addition & 2 deletions backend/import_dataset/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from jsonschema import ValidationError, validate # type: ignore[import]
from smart_open import open as smart_open # type: ignore[import]

from ..api_keys import EVENT_KEY
from ..error_response_keys import ERROR_KEY, ERROR_MESSAGE_KEY
from ..import_dataset_keys import NEW_KEY_KEY, ORIGINAL_KEY_KEY, TARGET_BUCKET_NAME_KEY
from ..import_file_batch_job_id_keys import ASSET_JOB_ID_KEY, METADATA_JOB_ID_KEY
Expand Down Expand Up @@ -70,8 +71,6 @@

S3_BATCH_COPY_ROLE_ARN = get_param(ParameterName.PROCESSING_IMPORT_DATASET_ROLE_ARN)

EVENT_KEY = "event"

JOB_MANIFEST_FORMAT: JobManifestFormatType = "S3BatchOperations_CSV_20180820"
JOB_MANIFEST_FIELD_NAMES: List[JobManifestFieldNameType] = ["Bucket", "Key"]
JOB_REPORT_FORMAT: JobReportFormatType = "Report_CSV_20180820"
Expand Down
2 changes: 1 addition & 1 deletion backend/parameter_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def _generate_next_value_( # type: ignore[misc,override] # pylint:disable=no-se
PROCESSING_IMPORT_ASSET_FILE_FUNCTION_TASK_ARN = auto()
PROCESSING_IMPORT_DATASET_ROLE_ARN = auto()
PROCESSING_IMPORT_METADATA_FILE_FUNCTION_TASK_ARN = auto()
ROOT_CATALOG_MESSAGE_QUEUE_NAME = auto()
UPDATE_CATALOG_MESSAGE_QUEUE_NAME = auto()
STORAGE_DATASETS_TABLE_NAME = auto()
STORAGE_VALIDATION_RESULTS_TABLE_NAME = auto()

Expand Down
101 changes: 92 additions & 9 deletions backend/populate_catalog/task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
from json import dumps
from os.path import join
from urllib.parse import urlparse

import boto3
from pystac import STAC_IO, Catalog, CatalogType # type: ignore[import]
from pystac import STAC_IO, Catalog, CatalogType, Collection, Item # type: ignore[import]
from pystac.layout import HrefLayoutStrategy # type: ignore[import]

from ..api_keys import EVENT_KEY
from ..api_responses import BODY_KEY
from ..log import set_up_logging
from ..pystac_io_methods import read_method, write_method
from ..resources import ResourceName
from ..s3 import S3_URL_PREFIX
from ..sqs_message_attributes import (
MESSAGE_ATTRIBUTE_TYPE_DATASET,
MESSAGE_ATTRIBUTE_TYPE_KEY,
MESSAGE_ATTRIBUTE_TYPE_ROOT,
STRING_VALUE_KEY_LOWER,
)
from ..types import JsonObject

STAC_IO.write_text_method = write_method
Expand All @@ -18,11 +31,78 @@
CATALOG_KEY = "catalog.json"
CONTENTS_KEY = "Contents"
RECORDS_KEY = "Records"
MESSAGE_ATTRIBUTES_KEY = "messageAttributes"

LOGGER = set_up_logging(__name__)


def lambda_handler(event: JsonObject, _context: bytes) -> JsonObject:
"""Main Lambda entry point."""

LOGGER.debug(dumps({EVENT_KEY: event}))

for message in event[RECORDS_KEY]:
if (
message[MESSAGE_ATTRIBUTES_KEY][MESSAGE_ATTRIBUTE_TYPE_KEY][STRING_VALUE_KEY_LOWER]
== MESSAGE_ATTRIBUTE_TYPE_ROOT
):
handle_root(message[BODY_KEY])
elif (
message[MESSAGE_ATTRIBUTES_KEY][MESSAGE_ATTRIBUTE_TYPE_KEY][STRING_VALUE_KEY_LOWER]
== MESSAGE_ATTRIBUTE_TYPE_DATASET
):
handle_dataset(message[BODY_KEY])
else:
raise UnhandledSQSMessageException("Unhandled SQS message type")

return {}


class UnhandledSQSMessageException(Exception):
pass


class GeostoreSTACLayoutStrategy(HrefLayoutStrategy):
def get_catalog_href(self, cat: Catalog, parent_dir: str, is_root: bool) -> str:
original_path = urlparse(cat.get_self_href()).path.rsplit("/", maxsplit=2)
if is_root:
cat_root = parent_dir
l0b0 marked this conversation as resolved.
Show resolved Hide resolved
else:
cat_root = join(parent_dir, original_path[-2])

return join(cat_root, original_path[-1])

def get_collection_href(self, col: Collection, parent_dir: str, is_root: bool) -> str:
original_path = urlparse(col.get_self_href()).path.rsplit("/", maxsplit=2)
assert not is_root
return join(parent_dir, *original_path[-2:])

def get_item_href(self, item: Item, parent_dir: str) -> str:
original_path = item.get_self_href().split("/")
return join(parent_dir, original_path[-1])


def handle_dataset(version_metadata_key: str) -> None:
"""Handle writing a new dataset version to the dataset catalog"""
storage_bucket_path = f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}"
dataset_prefix = version_metadata_key.split("/", maxsplit=1)[0]

dataset_catalog = Catalog.from_file(f"{storage_bucket_path}/{dataset_prefix}/{CATALOG_KEY}")

dataset_version_metadata = STAC_IO.read_stac_object(
f"{storage_bucket_path}/{version_metadata_key}"
)

dataset_catalog.add_child(dataset_version_metadata, strategy=GeostoreSTACLayoutStrategy())

dataset_catalog.normalize_hrefs(
f"{storage_bucket_path}/{dataset_prefix}", strategy=GeostoreSTACLayoutStrategy()
)
dataset_catalog.save(catalog_type=CatalogType.SELF_CONTAINED)


def handle_root(dataset_prefix: str) -> None:
"""Handle writing a new dataset to the root catalog"""
results = S3_CLIENT.list_objects(
Bucket=ResourceName.STORAGE_BUCKET_NAME.value, Prefix=CATALOG_KEY
)
Expand All @@ -40,14 +120,17 @@ def lambda_handler(event: JsonObject, _context: bytes) -> JsonObject:
description=ROOT_CATALOG_DESCRIPTION,
catalog_type=CatalogType.SELF_CONTAINED,
)
root_catalog.set_self_href(
f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}/{CATALOG_KEY}"
)

for record in event[RECORDS_KEY]:
dataset_path = f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}/{record[BODY_KEY]}"
dataset_catalog = Catalog.from_file(f"{dataset_path}/{CATALOG_KEY}")

root_catalog.add_child(dataset_catalog)
root_catalog.normalize_hrefs(f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}")
dataset_path = f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}/{dataset_prefix}"
dataset_catalog = Catalog.from_file(f"{dataset_path}/{CATALOG_KEY}")

root_catalog.save(catalog_type=CatalogType.SELF_CONTAINED)
root_catalog.add_child(dataset_catalog, strategy=GeostoreSTACLayoutStrategy())
root_catalog.normalize_hrefs(
f"{S3_URL_PREFIX}{ResourceName.STORAGE_BUCKET_NAME.value}",
strategy=GeostoreSTACLayoutStrategy(),
)

return {}
root_catalog.save(catalog_type=CatalogType.SELF_CONTAINED)
16 changes: 16 additions & 0 deletions backend/sqs_message_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def decapitalize(key: str) -> str:
"""
This method will be used to lower case the first character of SQS
message attributes being received by Lambda to resolve inconsistencies.
Issue outlined here: https://github.com/boto/boto3/issues/2582
"""
return f"{key[:1].lower()}{key[1:]}"
l0b0 marked this conversation as resolved.
Show resolved Hide resolved


MESSAGE_ATTRIBUTE_TYPE_KEY = "type"
MESSAGE_ATTRIBUTE_TYPE_ROOT = "root"
MESSAGE_ATTRIBUTE_TYPE_DATASET = "dataset"
DATA_TYPE_KEY = "DataType"
DATA_TYPE_STRING = "String"
STRING_VALUE_KEY = "StringValue"
STRING_VALUE_KEY_LOWER = decapitalize(STRING_VALUE_KEY)
20 changes: 12 additions & 8 deletions backend/stac_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
STAC_HREF_KEY = "href"
STAC_ID_KEY = "id"
STAC_LICENSE_KEY = "license"
STAC_LINKS_KEY = "links"
STAC_MEDIA_TYPE_GEOJSON = "application/geo+json"
l0b0 marked this conversation as resolved.
Show resolved Hide resolved
STAC_MEDIA_TYPE_JSON = "application/json"
STAC_PROPERTIES_DATETIME_KEY = "datetime"
STAC_PROPERTIES_KEY = "properties"
STAC_REL_CHILD = "child"
STAC_REL_ITEM = "item"
STAC_REL_KEY = "rel"
STAC_REL_PARENT = "parent"
STAC_REL_ROOT = "root"
STAC_REL_SELF = "self"
STAC_TITLE_KEY = "title"
STAC_TYPE_CATALOG = "Catalog"
STAC_TYPE_COLLECTION = "Collection"
STAC_TYPE_ITEM = "Feature"
STAC_TYPE_KEY = "type"
STAC_VERSION_KEY = "stac_version"
STAC_LINKS_KEY = "links"
STAC_REL_KEY = "rel"
STAC_REL_ROOT = "root"
STAC_REL_CHILD = "child"

STAC_COLLECTION_TYPE = "Collection"
STAC_ITEM_TYPE = "Feature"
STAC_CATALOG_TYPE = "Catalog"
Empty file.
Loading