diff --git a/feedstock/recipe.py b/feedstock/recipe.py index c79f77b5..abc90d04 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -3,9 +3,9 @@ import apache_beam as beam from dataclasses import dataclass -from datetime import datetime -from typing import List, Dict +from typing import List from pangeo_forge_esgf import get_urls_from_esgf, setup_logging +from pangeo_forge_big_query.utils import BQInterface, LogToBigQuery from pangeo_forge_esgf.parsing import parse_instance_ids from pangeo_forge_recipes.patterns import pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( @@ -19,146 +19,6 @@ # setup_logging('DEBUG') setup_logging('INFO') - -# from bigquery_interface import BQInterface, IIDEntry -## copy and paste of bigquery_interface.py until I can import -# it properly (see https://github.com/pangeo-forge/pangeo-forge-runner/issues/92) - -from google.cloud import bigquery -from typing import Optional -from google.api_core.exceptions import NotFound -import datetime - -@dataclass -class IIDEntry: - """Single row/entry for an iid - :param iid: CMIP6 instance id - :param store: URL to zarr store - """ - iid: str - store: str #TODO: should this allow other objects? - - # Check if the iid conforms to a schema - def __post_init__(self): - - schema = 'mip_era.activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.version' - facets = self.iid.split('.') - if len(facets) != len(schema.split('.')): - raise ValueError(f'IID does not conform to CMIP6 {schema =}. Got {self.iid =}') - #TODO: Check each facet with the controlled CMIP vocabulary - - #TODO Check store validity? - - -@dataclass -class IIDResult: - """Class to handle the results pertaining to a single IID. - """ - results: bigquery.table.RowIterator - iid: str - - def __post_init__(self): - if self.results.total_rows > 0: - self.exists=True - self.rows = [r for r in self.results] - self.latest_row = self.rows[0] - else: - self.exists=False - - - -@dataclass -class BQInterface: - """Class to read/write information from BigQuery table - :param table_id: BigQuery table ID - :param client: BigQuery client object - :param result_limit: Maximum number of results to return from query - """ - table_id: str - client: Optional[bigquery.client.Client] = None - result_limit: Optional[int] = 10 - - - def __post_init__(self): - # TODO how do I handle the schema? This class could be used for any table, but for - # TODO this specific case I want to prescribe the schema - # for now just hardcode it - self.schema = [ - bigquery.SchemaField("instance_id", "STRING", mode="REQUIRED"), - bigquery.SchemaField("store", "STRING", mode="REQUIRED"), - bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), - ] - if self.client is None: - self.client = bigquery.Client() - - # check if table exists, otherwise create it - try: - self.table = self.client.get_table(self.table_id) - except NotFound: - self.table = self.create_table() - - def create_table(self) -> bigquery.table.Table: - """Create the table if it does not exist""" - print(f'Creating {self.table_id =}') - table = bigquery.Table(self.table_id, schema=self.schema) - table = self.client.create_table(table) # Make an API request. - return table - - def insert(self, IID_entry): - """Insert a row into the table for a given IID_entry object""" - # Generate a timestamp to add to a bigquery row - timestamp = datetime.datetime.now().isoformat() - json_row = {'instance_id': IID_entry.iid, 'store': IID_entry.store, 'timestamp': timestamp} - errors = self.client.insert_rows_json(self.table_id, [json_row]) - if errors: - raise RuntimeError(f'Error inserting row: {errors}') - - def _get_query_job(self, query:str) -> bigquery.job.query.QueryJob: - """Get result object corresponding to a given iid""" - # keep this in case I ever need the row index again... - # query = f""" - # WITH table_with_index AS (SELECT *, ROW_NUMBER() OVER ()-1 as row_index FROM `{self.table_id}`) - # SELECT * - # FROM `table_with_index` - # WHERE instance_id='{iid}' - # """ - return self.client.query(query) - - def _get_iid_results(self, iid: str) -> IIDResult: - """Get the full result object for a given iid""" - query = f""" - SELECT * - FROM `{self.table_id}` - WHERE instance_id='{iid}' - ORDER BY timestamp DESC - LIMIT {self.result_limit} - """ - results = self._get_query_job(query).result() # TODO: `.result()` is waiting for the query. Should I do this here? - return IIDResult(results, iid) - - def iid_exists(self, iid:str) -> bool: - """Check if iid exists in the table""" - return self._get_iid_results(iid).exists - - def iid_list_exists(self, iids: List[str]) -> List[str]: - """More efficient way to check if a list of iids exists in the table - Passes the entire list to a single SQL query. - Returns a list of iids that exist in the table""" - # source: https://stackoverflow.com/questions/26441928/how-do-i-check-if-multiple-values-exists-in-database - query = f""" - SELECT instance_id, store - FROM {self.table_id} - WHERE instance_id IN ({",".join([f"'{iid}'" for iid in iids])}) - """ - results = self._get_query_job(query).result() - # this is a full row iterator, for now just return the iids - return list(set([r['instance_id'] for r in results])) - -# wrapper functions (not sure if this works instead of the repeated copy and paste in the transform below) -def log_to_bq(iid: str, store: zarr.storage.FSStore, table_id: str): - bq_interface = BQInterface(table_id=table_id) - iid_entry = IIDEntry(iid=iid, store=store.path) - bq_interface.insert(iid_entry) # Custom Beam Transforms @@ -259,22 +119,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | "Testing - Time Dimension" >> beam.Map(self._test_time) ) -@dataclass -class LogToBigQuery(beam.PTransform): - """ - Logging stage for data written to zarr store - """ - iid: str - table_id: str - def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore: - log_to_bq(self.iid, store, self.table_id) - return store - - def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - return (pcoll - | beam.Map(self._log_to_bigquery) - ) iids_raw = [ # to test the latest PR diff --git a/feedstock/requirements.txt b/feedstock/requirements.txt index 1123b407..466f042e 100644 --- a/feedstock/requirements.txt +++ b/feedstock/requirements.txt @@ -1,3 +1,4 @@ +git+https://github.com/carbonplan/pangeo-forge-big-query pangeo-forge-esgf==0.1.1 pangeo-forge-recipes==0.10.4 -dynamic-chunks==0.0.2 +dynamic-chunks==0.0.2 \ No newline at end of file