Skip to content

Commit

Permalink
Merge pull request #4 from leap-stc/carbonplan_cataloging
Browse files Browse the repository at this point in the history
carbonplan cat util tests
  • Loading branch information
norlandrhagen authored Jan 10, 2024
2 parents c8206aa + c3e337d commit b0726d6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ repos:
- id: check-docstring-first
- id: check-json
- id: check-yaml
- id: double-quote-string-fixer
- id: debug-statements
- id: mixed-line-ending

Expand Down
2 changes: 1 addition & 1 deletion data_management_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .utils import LogToBigQuery, BQInterface
from .utils import LogToBigQuery, BQInterface, RegisterDatasetToCatalog
45 changes: 39 additions & 6 deletions data_management_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import datetime
from dataclasses import dataclass


@dataclass
class IIDEntry:
"""Single row/entry for an iid
Expand Down Expand Up @@ -60,16 +59,18 @@ class BQInterface:
table_id: str
client: Optional[bigquery.client.Client] = None
result_limit: Optional[int] = 10
schema: Optional[list] = None

def __post_init__(self):
# TODO how do I handle the schema? This class could be used for any table, but for
# TODO this specific case I want to prescribe the schema
# for now just hardcode it
self.schema = [
bigquery.SchemaField("instance_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("store", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
]
if not self.schema:
self.schema = [
bigquery.SchemaField("instance_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("store", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
]
if self.client is None:
self.client = bigquery.Client()

Expand All @@ -86,6 +87,22 @@ def create_table(self) -> bigquery.table.Table:
table = self.client.create_table(table) # Make an API request.
return table

def catalog_insert(self, dataset_id: str, dataset_url: str):
timestamp = datetime.datetime.now().isoformat()
table = self.client.get_table(self.table_id)

rows_to_insert = [
{
"dataset_id": dataset_id,
"dataset_url": dataset_url,
"timestamp": timestamp,
}
]

errors = self.client.insert_rows_json(table, rows_to_insert)
if errors:
raise RuntimeError(f"Error inserting row: {errors}")

def insert(self, IID_entry):
"""Insert a row into the table for a given IID_entry object"""
# Generate a timestamp to add to a bigquery row
Expand Down Expand Up @@ -165,3 +182,19 @@ def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._log_to_bigquery)


@dataclass
class RegisterDatasetToCatalog(beam.PTransform):
table_id: str
dataset_id: str

def _register_dataset_to_catalog(
self, store: zarr.storage.FSStore
) -> zarr.storage.FSStore:
bq_interface = BQInterface(table_id=self.table_id)
bq_interface.catalog_insert(dataset_id=self.dataset_id, dataset_url=store.path)
return store

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._register_dataset_to_catalog)

0 comments on commit b0726d6

Please sign in to comment.