Skip to content

Commit

Permalink
moves BQ transforms outside of recipe.py and imports from pangeo-forg…
Browse files Browse the repository at this point in the history
…e-big-query (#61)

Co-authored-by: Julius Busecke <julius@ldeo.columbia.edu>
  • Loading branch information
norlandrhagen and jbusecke authored Nov 16, 2023
1 parent 690205b commit 28935e8
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 158 deletions.
159 changes: 2 additions & 157 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import apache_beam as beam
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
from typing import List
from pangeo_forge_esgf import get_urls_from_esgf, setup_logging
from pangeo_forge_big_query.utils import BQInterface, LogToBigQuery
from pangeo_forge_esgf.parsing import parse_instance_ids
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
Expand All @@ -19,146 +19,6 @@
# setup_logging('DEBUG')
setup_logging('INFO')


# from bigquery_interface import BQInterface, IIDEntry
## copy and paste of bigquery_interface.py until I can import
# it properly (see https://github.com/pangeo-forge/pangeo-forge-runner/issues/92)

from google.cloud import bigquery
from typing import Optional
from google.api_core.exceptions import NotFound
import datetime

@dataclass
class IIDEntry:
"""Single row/entry for an iid
:param iid: CMIP6 instance id
:param store: URL to zarr store
"""
iid: str
store: str #TODO: should this allow other objects?

# Check if the iid conforms to a schema
def __post_init__(self):

schema = 'mip_era.activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.version'
facets = self.iid.split('.')
if len(facets) != len(schema.split('.')):
raise ValueError(f'IID does not conform to CMIP6 {schema =}. Got {self.iid =}')
#TODO: Check each facet with the controlled CMIP vocabulary

#TODO Check store validity?


@dataclass
class IIDResult:
"""Class to handle the results pertaining to a single IID.
"""
results: bigquery.table.RowIterator
iid: str

def __post_init__(self):
if self.results.total_rows > 0:
self.exists=True
self.rows = [r for r in self.results]
self.latest_row = self.rows[0]
else:
self.exists=False



@dataclass
class BQInterface:
"""Class to read/write information from BigQuery table
:param table_id: BigQuery table ID
:param client: BigQuery client object
:param result_limit: Maximum number of results to return from query
"""
table_id: str
client: Optional[bigquery.client.Client] = None
result_limit: Optional[int] = 10


def __post_init__(self):
# TODO how do I handle the schema? This class could be used for any table, but for
# TODO this specific case I want to prescribe the schema
# for now just hardcode it
self.schema = [
bigquery.SchemaField("instance_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("store", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
]
if self.client is None:
self.client = bigquery.Client()

# check if table exists, otherwise create it
try:
self.table = self.client.get_table(self.table_id)
except NotFound:
self.table = self.create_table()

def create_table(self) -> bigquery.table.Table:
"""Create the table if it does not exist"""
print(f'Creating {self.table_id =}')
table = bigquery.Table(self.table_id, schema=self.schema)
table = self.client.create_table(table) # Make an API request.
return table

def insert(self, IID_entry):
"""Insert a row into the table for a given IID_entry object"""
# Generate a timestamp to add to a bigquery row
timestamp = datetime.datetime.now().isoformat()
json_row = {'instance_id': IID_entry.iid, 'store': IID_entry.store, 'timestamp': timestamp}
errors = self.client.insert_rows_json(self.table_id, [json_row])
if errors:
raise RuntimeError(f'Error inserting row: {errors}')

def _get_query_job(self, query:str) -> bigquery.job.query.QueryJob:
"""Get result object corresponding to a given iid"""
# keep this in case I ever need the row index again...
# query = f"""
# WITH table_with_index AS (SELECT *, ROW_NUMBER() OVER ()-1 as row_index FROM `{self.table_id}`)
# SELECT *
# FROM `table_with_index`
# WHERE instance_id='{iid}'
# """
return self.client.query(query)

def _get_iid_results(self, iid: str) -> IIDResult:
"""Get the full result object for a given iid"""
query = f"""
SELECT *
FROM `{self.table_id}`
WHERE instance_id='{iid}'
ORDER BY timestamp DESC
LIMIT {self.result_limit}
"""
results = self._get_query_job(query).result() # TODO: `.result()` is waiting for the query. Should I do this here?
return IIDResult(results, iid)

def iid_exists(self, iid:str) -> bool:
"""Check if iid exists in the table"""
return self._get_iid_results(iid).exists

def iid_list_exists(self, iids: List[str]) -> List[str]:
"""More efficient way to check if a list of iids exists in the table
Passes the entire list to a single SQL query.
Returns a list of iids that exist in the table"""
# source: https://stackoverflow.com/questions/26441928/how-do-i-check-if-multiple-values-exists-in-database
query = f"""
SELECT instance_id, store
FROM {self.table_id}
WHERE instance_id IN ({",".join([f"'{iid}'" for iid in iids])})
"""
results = self._get_query_job(query).result()
# this is a full row iterator, for now just return the iids
return list(set([r['instance_id'] for r in results]))

# wrapper functions (not sure if this works instead of the repeated copy and paste in the transform below)
def log_to_bq(iid: str, store: zarr.storage.FSStore, table_id: str):
bq_interface = BQInterface(table_id=table_id)
iid_entry = IIDEntry(iid=iid, store=store.path)
bq_interface.insert(iid_entry)

# Custom Beam Transforms

Expand Down Expand Up @@ -259,22 +119,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
| "Testing - Time Dimension" >> beam.Map(self._test_time)
)

@dataclass
class LogToBigQuery(beam.PTransform):
"""
Logging stage for data written to zarr store
"""
iid: str
table_id: str

def _log_to_bigquery(self, store: zarr.storage.FSStore) -> zarr.storage.FSStore:
log_to_bq(self.iid, store, self.table_id)
return store

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (pcoll
| beam.Map(self._log_to_bigquery)
)

iids_raw = [
# to test the latest PR
Expand Down
3 changes: 2 additions & 1 deletion feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
git+https://github.com/carbonplan/pangeo-forge-big-query
pangeo-forge-esgf==0.1.1
pangeo-forge-recipes==0.10.4
dynamic-chunks==0.0.2
dynamic-chunks==0.0.2

0 comments on commit 28935e8

Please sign in to comment.