From 2dc9d2fe5c98a7b4fd6f60dde195b9bdff7fdb07 Mon Sep 17 00:00:00 2001 From: Milo Hyben Date: Wed, 15 Nov 2023 13:37:59 +1100 Subject: [PATCH] Changes as per code review. (#608) --- api/routes/__init__.py | 2 - api/routes/billing.py | 6 +- api/utils/dates.py | 26 +- db/python/layers/billing.py | 392 ++++++++++++-------- web/src/pages/billing/BillingCostByTime.tsx | 9 +- 5 files changed, 262 insertions(+), 173 deletions(-) diff --git a/api/routes/__init__.py b/api/routes/__init__.py index e661901e8..18edb5969 100644 --- a/api/routes/__init__.py +++ b/api/routes/__init__.py @@ -1,5 +1,3 @@ -import os - from api.routes.sample import router as sample_router from api.routes.imports import router as import_router from api.routes.analysis import router as analysis_router diff --git a/api/routes/billing.py b/api/routes/billing.py index 86273e69d..bee9efee7 100644 --- a/api/routes/billing.py +++ b/api/routes/billing.py @@ -113,7 +113,7 @@ async def get_datasets( @router.get( - '/sequencing_types', + '/sequencing-types', response_model=list[str], operation_id='getSequencingTypes', ) @@ -151,7 +151,7 @@ async def get_stages( @router.get( - '/sequencing_groups', + '/sequencing-groups', response_model=list[str], operation_id='getSequencingGroups', ) @@ -170,7 +170,7 @@ async def get_sequencing_groups( @router.get( - '/invoice_months', + '/invoice-months', response_model=list[str], operation_id='getInvoiceMonths', ) diff --git a/api/utils/dates.py b/api/utils/dates.py index 4704ae413..4e01527b9 100644 --- a/api/utils/dates.py +++ b/api/utils/dates.py @@ -15,7 +15,11 @@ def parse_date_only_string(d: str | None) -> date | None: def get_invoice_month_range(convert_month: date) -> tuple[date, date]: - """Get the start and end date of the invoice month for a given date""" + """ + Get the start and end date of the invoice month for a given date + Start and end date are used mostly for optimising BQ queries + All our BQ tables/views are partitioned by day + """ first_day = convert_month.replace(day=1) # Grab the first day of invoice month then subtract INVOICE_DAY_DIFF days @@ -34,3 +38,23 @@ def get_invoice_month_range(convert_month: date) -> tuple[date, date]: ) return start_day, last_day + + +def reformat_datetime( + in_date: str | None, in_format: str, out_format: str +) -> str | None: + """ + Reformat datetime as string to another string format + This function take string as input and return string as output + """ + if not in_date: + return None + + try: + result = datetime.strptime( + in_date, in_format + ) + return result.strftime(out_format) + + except Exception as excep: + raise ValueError(f'Date could not be converted: {in_date}') from excep diff --git a/db/python/layers/billing.py b/db/python/layers/billing.py index 49d7915d8..475786a8e 100644 --- a/db/python/layers/billing.py +++ b/db/python/layers/billing.py @@ -1,11 +1,9 @@ -# pylint: disable=ungrouped-imports,too-many-locals import re from typing import Any from datetime import datetime -from collections import Counter +from collections import Counter, defaultdict from google.cloud import bigquery -from api.utils.dates import get_invoice_month_range from models.models import ( BillingRowRecord, @@ -27,6 +25,15 @@ BQ_BUDGET_VIEW, BQ_GCP_BILLING_VIEW, ) +from api.utils.dates import ( + get_invoice_month_range, + reformat_datetime +) + + +def abbrev_cost_category(cost_category: str) -> str: + """"abbreviate cost category""" + return 'S' if cost_category == 'Cloud Storage' else 'C' class BillingLayer(BqBaseLayer): @@ -153,6 +160,9 @@ async def get_gcp_projects(self): """Get all GCP projects in database""" # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query + # @days is defined by env variable BQ_DAYS_BACK_OPTIMAL + # this part_time > filter is to limit the amount of data scanned, + # saving cost for running BQ _query = f""" SELECT DISTINCT gcp_project FROM `{BQ_GCP_BILLING_VIEW}` @@ -184,6 +194,10 @@ async def get_topics(self): """Get all topics in database""" # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query + # @days is defined by env variable BQ_DAYS_BACK_OPTIMAL + # this day > filter is to limit the amount of data scanned, + # saving cost for running BQ + # aggregated views are partitioned by day _query = f""" SELECT DISTINCT topic FROM `{BQ_AGGREG_VIEW}` @@ -213,7 +227,6 @@ async def get_topics(self): async def get_invoice_months(self): """Get all invoice months in database""" - # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query _query = f""" SELECT DISTINCT FORMAT_DATE("%Y%m", day) as invoice_month FROM `{BQ_AGGREG_VIEW}` @@ -234,6 +247,10 @@ async def get_cost_categories(self): """Get all service description in database""" # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query + # @days is defined by env variable BQ_DAYS_BACK_OPTIMAL + # this day > filter is to limit the amount of data scanned, + # saving cost for running BQ + # aggregated views are partitioned by day _query = f""" SELECT DISTINCT cost_category FROM `{BQ_AGGREG_VIEW}` @@ -268,6 +285,10 @@ async def get_skus( """Get all SKUs in database""" # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query + # @days is defined by env variable BQ_DAYS_BACK_OPTIMAL + # this day > filter is to limit the amount of data scanned, + # saving cost for running BQ + # aggregated views are partitioned by day _query = f""" SELECT DISTINCT sku FROM `{BQ_AGGREG_VIEW}` @@ -309,6 +330,10 @@ async def get_extended_values(self, field: str): """ # cost of this BQ is 10MB on DEV is minimal, AU$ 0.000008 per query + # @days is defined by env variable BQ_DAYS_BACK_OPTIMAL + # this day > filter is to limit the amount of data scanned, + # saving cost for running BQ + # aggregated views are partitioned by day _query = f""" SELECT DISTINCT {field} FROM `{BQ_AGGREG_EXT_VIEW}` @@ -454,7 +479,7 @@ async def get_total_cost( if query.source == 'gcp_billing': # BQ_GCP_BILLING_VIEW view is partitioned by different field - # BG has limitation, materialized view can only by partition by base table + # BQ has limitation, materialized view can only by partition by base table # partition or its subset, in our case _PARTITIONTIME # (part_time field in the view) # We are querying by day, @@ -526,10 +551,16 @@ async def get_total_cost( # return empty list if no record found return [] - async def get_budget(self) -> dict[str, float] | None: + async def get_budgets_by_gcp_project( + self, field: BillingColumn, is_current_month: bool + ) -> dict[str, float]: """ - Get budget for projects + Get budget for gcp-projects """ + if field != BillingColumn.PROJECT or not is_current_month: + # only projects have budget and only for current month + return {} + _query = f""" WITH t AS ( SELECT gcp_project, MAX(created_at) as last_created_at @@ -546,7 +577,7 @@ async def get_budget(self) -> dict[str, float] | None: if query_job_result: return {row.gcp_project: row.budget for row in query_job_result} - return None + return {} async def get_last_loaded_day(self): """Get the most recent fully loaded day in db @@ -578,28 +609,33 @@ async def get_last_loaded_day(self): return None - async def prepare_daily_cost_query( + async def prepare_daily_cost_subquery( self, field, view_to_use, source, query_params ): - """prepare_daily_cost_query""" + """prepare daily cost subquery""" if source == 'gcp_billing': # add extra filter to limit materialized view partition - daily_cost_gcp_billing_filter = """ + # Raw BQ billing table is partitioned by part_time (when data are loaded) + # and not by end of usage time (day) + # There is a delay up to 4-5 days between part_time and day + # 7 days is added to be sure to get all data + gcp_billing_optimise_filter = """ AND part_time >= TIMESTAMP(@last_loaded_day) AND part_time <= TIMESTAMP_ADD( TIMESTAMP(@last_loaded_day), INTERVAL 7 DAY ) """ else: - daily_cost_gcp_billing_filter = '' + gcp_billing_optimise_filter = '' # Find the last fully loaded day in the view last_loaded_day = await self.get_last_loaded_day() + daily_cost_field = ', day.cost as daily_cost' daily_cost_join = f"""LEFT JOIN ( SELECT @@ -609,7 +645,7 @@ async def prepare_daily_cost_query( FROM `{view_to_use}` WHERE day = TIMESTAMP(@last_loaded_day) - {daily_cost_gcp_billing_filter} + {gcp_billing_optimise_filter} GROUP BY field, cost_category @@ -623,7 +659,7 @@ async def prepare_daily_cost_query( 'last_loaded_day', 'STRING', last_loaded_day ), ) - return (query_params, daily_cost_field, daily_cost_join) + return (last_loaded_day, query_params, daily_cost_field, daily_cost_join) async def execute_running_cost_query( self, field: BillingColumn, @@ -633,58 +669,73 @@ async def execute_running_cost_query( """ Run query to get running cost of selected field """ + # check if invoice month is valid first + if not invoice_month or not re.match(r'^\d{6}$', invoice_month): + raise ValueError('Invalid invoice month') + + invoice_month_date = datetime.strptime(invoice_month, '%Y%m') + if invoice_month != invoice_month_date.strftime('%Y%m'): + raise ValueError('Invalid invoice month') + + # get start day and current day for given invoice month + # This is to optimise the query, BQ view is partitioned by day + # and not by invoice month + start_day_date, last_day_date = get_invoice_month_range( + invoice_month_date + ) + start_day = start_day_date.strftime('%Y-%m-%d') + last_day = last_day_date.strftime('%Y-%m-%d') + # by default look at the normal view if field in BillingColumn.extended_cols(): + # if any of the extendeid fields are needed use the extended view view_to_use = BQ_AGGREG_EXT_VIEW elif source == 'gcp_billing': + # if source is gcp_billing, + # use the view on top of the raw billing table view_to_use = BQ_GCP_BILLING_VIEW else: + # otherwise use the normal view view_to_use = BQ_AGGREG_VIEW - is_current_month = True - invoice_month_filter = '' - last_loaded_day = None - query_params = [] - - if not invoice_month or not re.match(r'^\d{6}$', invoice_month): - # TODO for production change to select current day, month - start_day = '2023-03-01' - current_day = '2023-03-10' - last_day = '2023-03-30' - else: - # get start day and current day for given invoice month - invoice_month_date = datetime.strptime(invoice_month, '%Y%m') - start_day_date, last_day_date = get_invoice_month_range( - invoice_month_date - ) - start_day = start_day_date.strftime('%Y-%m-%d') - last_day = last_day_date.strftime('%Y-%m-%d') - current_day = datetime.now().strftime('%Y-%m-%d') - - invoice_month_filter = ' AND invoice_month = @invoice_month' - query_params.append( - bigquery.ScalarQueryParameter('invoice_month', 'STRING', invoice_month) + if source == 'gcp_billing': + # add extra filter to limit materialized view partition + # Raw BQ billing table is partitioned by part_time (when data are loaded) + # and not by end of usage time (day) + # There is a delay up to 4-5 days between part_time and day + # 7 days is added to be sure to get all data + filter_to_optimise_query = """ + part_time >= TIMESTAMP(@start_day) + AND part_time <= TIMESTAMP_ADD( + TIMESTAMP(@last_day), INTERVAL 7 DAY ) + """ + else: + # add extra filter to limit materialized view partition + filter_to_optimise_query = """ + day >= TIMESTAMP(@start_day) + AND day <= TIMESTAMP(@last_day) + """ - if last_day < current_day: - # not current invoice month, do not show daily cost & budget - is_current_month = False - - query_params.append( - bigquery.ScalarQueryParameter('start_day', 'STRING', start_day) - ) - query_params.append( + # start_day and last_day are in to optimise the query + query_params = [ + bigquery.ScalarQueryParameter('start_day', 'STRING', start_day), bigquery.ScalarQueryParameter('last_day', 'STRING', last_day) - ) + ] + + current_day = datetime.now().strftime('%Y-%m-%d') + is_current_month = last_day >= current_day + last_loaded_day = None if is_current_month: # Only current month can have last 24 hours cost # Last 24H in UTC time ( + last_loaded_day, query_params, daily_cost_field, daily_cost_join - ) = await self.prepare_daily_cost_query( + ) = await self.prepare_daily_cost_subquery( field, view_to_use, source, @@ -695,17 +746,6 @@ async def execute_running_cost_query( daily_cost_field = ', NULL as daily_cost' daily_cost_join = '' - if source == 'gcp_billing': - # add extra filter to limit materialized view partition - monthly_cost_gcp_billing_filter = """ - AND part_time >= TIMESTAMP(@start_day) - AND part_time <= TIMESTAMP_ADD( - TIMESTAMP(@last_day), INTERVAL 7 DAY - ) - """ - else: - monthly_cost_gcp_billing_filter = '' - _query = f""" SELECT CASE WHEN month.field IS NULL THEN 'N/A' ELSE month.field END as field, @@ -715,30 +755,26 @@ async def execute_running_cost_query( FROM ( SELECT - * + {field.value} as field, + cost_category, + SUM(cost) as cost FROM - ( - SELECT - {field.value} as field, - cost_category, - SUM(cost) as cost - FROM - `{view_to_use}` - WHERE day >= TIMESTAMP(@start_day) AND - day <= TIMESTAMP(@last_day) - {invoice_month_filter} - {monthly_cost_gcp_billing_filter} - GROUP BY - field, - cost_category - ) - WHERE - cost > 0.1 + `{view_to_use}` + WHERE {filter_to_optimise_query} + AND invoice_month = @invoice_month + GROUP BY + field, + cost_category + HAVING cost > 0.1 ) month {daily_cost_join} ORDER BY field ASC, daily_cost DESC, monthly_cost DESC; """ + query_params.append( + bigquery.ScalarQueryParameter('invoice_month', 'STRING', invoice_month) + ) + return is_current_month, last_loaded_day, list( self._connection.connection.query( _query, @@ -746,98 +782,33 @@ async def execute_running_cost_query( ).result() ) - async def get_running_cost( - self, field: BillingColumn, - invoice_month: str | None = None, - source: str | None = None + async def append_total_running_cost( + self, + field: BillingColumn, + is_current_month: bool, + last_loaded_day: str | None, + total_monthly: dict, + total_daily: dict, + total_monthly_category: dict, + total_daily_category: dict, + results: list[BillingCostBudgetRecord] ) -> list[BillingCostBudgetRecord]: """ - Get currently running cost of selected field + Add total row: compute + storage to the results """ - - # accept only Topic, Dataset or Project at this stage - if field not in ( - BillingColumn.TOPIC, - BillingColumn.PROJECT, - BillingColumn.DATASET, - ): - raise ValueError('Invalid field') - - ( - is_current_month, - last_loaded_day, - query_job_result - ) = await self.execute_running_cost_query( - field, invoice_month, source - ) - if not query_job_result: - # return empty list - return [] - - # prepare data - results = [] - - # reformat last_loaded_day if present - if last_loaded_day: - last_loaded_day = datetime.strptime(last_loaded_day, '%Y-%m-%d %H:%M:%S+00:00') - last_loaded_day = last_loaded_day.strftime('%b %d') - - total_monthly: dict[str, Counter[str]] = { - 'C': Counter(), - 'S': Counter(), - } - total_daily: dict[str, Counter[str]] = { - 'C': Counter(), - 'S': Counter(), - } - field_details: dict[str, list[Any]] = {} # detail category cost for each field - total_monthly_category: Counter[str] = Counter() - total_daily_category: Counter[str] = Counter() - for row in query_job_result: - if row.field not in field_details: - field_details[row.field] = [] - - field_details[row.field].append( - { - 'cost_group': 'S' if row.cost_category == 'Cloud Storage' else 'C', - 'cost_category': row.cost_category, - 'daily_cost': row.daily_cost if is_current_month else None, - 'monthly_cost': row.monthly_cost, - } - ) - - total_monthly_category[row.cost_category] += row.monthly_cost - if row.daily_cost: - total_daily_category[row.cost_category] += row.daily_cost - - # cost groups S/C - if row.cost_category == 'Cloud Storage': - total_monthly['S']['ALL'] += row.monthly_cost - total_monthly['S'][row.field] += row.monthly_cost - if row.daily_cost and is_current_month: - total_daily['S']['ALL'] += row.daily_cost - total_daily['S'][row.field] += row.daily_cost - else: - total_monthly['C']['ALL'] += row.monthly_cost - total_monthly['C'][row.field] += row.monthly_cost - if row.daily_cost and is_current_month: - total_daily['C']['ALL'] += row.daily_cost - total_daily['C'][row.field] += row.daily_cost - - # add total # construct ALL fields details all_details = [] for cat, mth_cost in total_monthly_category.items(): all_details.append( { - 'cost_group': 'S' if cat == 'Cloud Storage' else 'C', + 'cost_group': abbrev_cost_category(cat), 'cost_category': cat, 'daily_cost': total_daily_category[cat] if is_current_month else None, 'monthly_cost': mth_cost, } ) - # add total row first + # add total row: compute + storage results.append( BillingCostBudgetRecord.from_json( { @@ -862,12 +833,25 @@ async def get_running_cost( ) ) + return results + + async def append_running_cost_records( + self, + field: BillingColumn, + is_current_month: bool, + last_loaded_day: str | None, + total_monthly: dict, + total_daily: dict, + field_details: dict, + results: list[BillingCostBudgetRecord] + ) -> list[BillingCostBudgetRecord]: + """ + Add all the selected field rows: compute + storage to the results + """ # get budget map per gcp project - if field == BillingColumn.PROJECT and is_current_month: - budget = await self.get_budget() - else: - # only projects have budget and only for current month - budget = None + budgets_per_gcp_project = await self.get_budgets_by_gcp_project( + field, is_current_month + ) # add rows by field for key, details in field_details.items(): @@ -880,7 +864,8 @@ async def get_running_cost( total_monthly['S'][key] if key in total_monthly['S'] else 0 ) monthly = compute_monthly + storage_monthly - budget_monthly = budget.get(key, 0) if budget else 0 + budget_monthly = budgets_per_gcp_project.get(key) + results.append( BillingCostBudgetRecord.from_json( { @@ -903,3 +888,90 @@ async def get_running_cost( ) return results + + async def get_running_cost( + self, field: BillingColumn, + invoice_month: str | None = None, + source: str | None = None + ) -> list[BillingCostBudgetRecord]: + """ + Get currently running cost of selected field + """ + + # accept only Topic, Dataset or Project at this stage + if field not in ( + BillingColumn.TOPIC, + BillingColumn.PROJECT, + BillingColumn.DATASET, + ): + raise ValueError('Invalid field only topic, dataset or project allowed') + + ( + is_current_month, + last_loaded_day, + query_job_result + ) = await self.execute_running_cost_query( + field, invoice_month, source + ) + if not query_job_result: + # return empty list + return [] + + # prepare data + results: list[BillingCostBudgetRecord] = [] + + # reformat last_loaded_day if present + last_loaded_day = reformat_datetime( + last_loaded_day, + '%Y-%m-%d %H:%M:%S+00:00', + '%b %d' + ) + + total_monthly: dict[str, Counter[str]] = defaultdict(Counter) + total_daily: dict[str, Counter[str]] = defaultdict(Counter) + field_details: dict[str, list[Any]] = defaultdict(list) + total_monthly_category: Counter[str] = Counter() + total_daily_category: Counter[str] = Counter() + + for row in query_job_result: + if row.field not in field_details: + field_details[row.field] = [] + + cost_group = abbrev_cost_category(row.cost_category) + + field_details[row.field].append( + { + 'cost_group': cost_group, + 'cost_category': row.cost_category, + 'daily_cost': row.daily_cost if is_current_month else None, + 'monthly_cost': row.monthly_cost, + } + ) + + total_monthly_category[row.cost_category] += row.monthly_cost + if row.daily_cost: + total_daily_category[row.cost_category] += row.daily_cost + + # cost groups totals + total_monthly[cost_group]['ALL'] += row.monthly_cost + total_monthly[cost_group][row.field] += row.monthly_cost + if row.daily_cost and is_current_month: + total_daily[cost_group]['ALL'] += row.daily_cost + total_daily[cost_group][row.field] += row.daily_cost + + # add total row: compute + storage + results = await self.append_total_running_cost( + field, is_current_month, last_loaded_day, + total_monthly, total_daily, total_monthly_category, + total_daily_category, + results + ) + + # add rest of the records: compute + storage + results = await self.append_running_cost_records( + field, is_current_month, last_loaded_day, + total_monthly, total_daily, field_details, + results + ) + + return results diff --git a/web/src/pages/billing/BillingCostByTime.tsx b/web/src/pages/billing/BillingCostByTime.tsx index 3e5e09cbd..629b3ee08 100644 --- a/web/src/pages/billing/BillingCostByTime.tsx +++ b/web/src/pages/billing/BillingCostByTime.tsx @@ -27,15 +27,11 @@ const BillingCostByTime: React.FunctionComponent = () => { : BillingColumn.GcpProject const inputSelectedData: string | undefined = searchParams.get('selectedData') ?? undefined - // TODO once we have more data change to the current month - // ( - // `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}` - // ) const [start, setStart] = React.useState( - searchParams.get('start') ?? `${now.getFullYear()}-03-01` + searchParams.get('start') ?? `${now.getFullYear()}-${now.getMonth() + 1}-01` ) const [end, setEnd] = React.useState( - searchParams.get('end') ?? `${now.getFullYear()}-03-05` + searchParams.get('end') ?? `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}` ) const [groupBy, setGroupBy] = React.useState( fixedGroupBy ?? BillingColumn.GcpProject @@ -177,7 +173,6 @@ const BillingCostByTime: React.FunctionComponent = () => { .catch((er) => setError(er.message)) } - // on first load React.useEffect(() => { if (selectedData !== undefined && selectedData !== '' && selectedData !== null) { let source = 'aggregate'