Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cost effective bq incremental followup #2140

Merged

Conversation

drewbanin
Copy link
Contributor

@drewbanin drewbanin commented Feb 15, 2020

Reopening from #1971, fixes #1034

Description

  1. Support dictionary-style specification of partitioning fields
  2. Deprecate string-style specification of partitioning fields
  3. Implement BigQuery scripting for incremental models
    • use "real" temporary tables
    • provide partition predicate in merge statement join to limit data scanned in destination table
    • add _dbt_max_partition field for use in model code

Example incremental model:

{{ config(
  materialized='incremental',
  partition_by={"field": "collector_tstamp", "data_type": "date},
  unique_key="event_id"
) }}

select * from snowplow.event
{% if is_incremental() %}
where collector_tstamp > _dbt_max_partition
{% endif %}
  • if snowplow.event is partitioned by collector_tstamp, then only the partitions containing new data in snowplow.event will be scanned
  • the resulting merge statement will only scan the partitions in the destination table that will be updated by the merge

Before this PR, an incremental model which copies a 100mb source table would scan 200mb of data as it required one full table scan against both the source and destination tables every time it ran. After this PR, the same incremental model will process < 2mb (lookups on partitioning fields) of data for an incremental build with no data to merge.

cc @jtcohen6

Remaining todo:

  • add more tests for changing partitioning types
  • add tests for _dbt_max_partition scripting variable
  • confirm logic used for range partitioning... i don't think that part is right...

Checklist

  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • I have updated the CHANGELOG.md and added information about my change to the "dbt next" section.

@cla-bot cla-bot bot added the cla:yes label Feb 15, 2020
@drewbanin drewbanin force-pushed the feature/cost-effective-bq-incremental-followup branch from 27d21f9 to 15cac5a Compare February 15, 2020 23:26
@jtcohen6
Copy link
Contributor

@drewbanin I really like the addition of _dbt_max_partition. This accomplishes something that I thought we would have to enable via a declare config argument. BQ partitioned incrementals present a pretty compelling user experience, now that all these pieces have come together.

Based on your latest changes, it looks like we're going to use "fake" temp tables across the board for the time being, in order to avoid breaking changes to snapshots. There's very little downside to using "fake" temp tables in the incremental materialization; worst case, dbt fails to drop the "temp" table, and someone is charged for 12 hours of storage of that table, or about $0.33/TB (based on monthly storage cost of $20/TB).

Do you think the switch to "true" temp tables is something we'll push to a different PR / future version of dbt?

@drewbanin
Copy link
Contributor Author

hey @jtcohen6 - I would love to use proper temp tables on BigQuery in the future. If you're curious, check out the problematic snapshot code here:

https://github.com/fishtown-analytics/dbt/blob/0115e469c13237aed0ea53cc93655a089b843ab6/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql#L165-L174

We need to use the temp table in the same script where the table is created, but that's not how the snapshot materialization works today.

I think I'd like to update the create_table_as macro to accept a config argument instead of plucking the config out from the model context. This would let us use "fake" temp tables in snapshots, and "real" temp tables in incremental models. As it stands though, I didn't want to overcomplicate this PR.

I did add a line to drop the temp table at the end of the incremental materialization, so the table hopefully won't hang around for any longer than it needs to regardless!

@@ -83,12 +80,12 @@ def exception_handler(self, sql):
yield

except google.cloud.exceptions.BadRequest as e:
message = "Bad request while running:\n{sql}"
self.handle_error(e, message, sql)
message = "Bad request while running query"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing BQ debug logs were incredibly verbose, printing the same SQL and error messages three times over. These changes should only print out the SQL once (when it is executed) and should suppress the re-printing of the query in the BQ exception message.



@dataclass
class PartitionConfig(JsonSchemaMixin):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am LARPing as a person who knows how to use dataclasses. I'm sure there's some subtlety here.... is this the right way to specify these types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup!

@drewbanin
Copy link
Contributor Author

@beckjake much has changed here since you last took a look. Can you review this again please?

Copy link
Contributor

@beckjake beckjake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some PR feedback, mostly on dataclasses. I'd either go all the way on the partition config being a dataclass/hololgram type (my preference) or make it a dict all the way through.

Other than those things, looks great!



@dataclass
class PartitionConfig(Dict[str, Any], JsonSchemaMixin):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand what's going on here correctly, I think instead of inheriting this from Dict you should pass partition_by.to_dict() instead of the places you're using this as a dict.

{{ sql_header if sql_header is not none }}

create or replace table {{ relation }}
{{ partition_by(raw_partition_by) }}
{{ partition_by(partition_by_dict) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_by.to_dict()

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}
{% if not adapter.is_replaceable(old_relation, partition_by, cluster_by) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_by.to_dict(), or change is_replaceable to take a regular PartitionConfig.

@dataclass
class PartitionConfig(Dict[str, Any], JsonSchemaMixin):
field: str
data_type: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_type: str
data_type: str = 'date'

Comment on lines 62 to 70
if raw_partition_by.get('field'):
if raw_partition_by.get('data_type'):
return cls(**raw_partition_by)
else: # assume date type as default
return cls(**raw_partition_by, data_type='date')
else:
dbt.exceptions.raise_compiler_error(
'Config `partition_by` is missing required item `field`'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it could just be:

try:
    return cls.from_dict(raw_partition_by)
except hologram.ValidationError:
    dbt.exceptions.raise_compiler_error(
        'Config `partition_by` is missing required item `field`'
    )

Comment on lines 94 to 97
inferred_partition_by = {
'field': partition_by,
'data_type': data_type
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

inferred_partition_by = cls(field=partition_by, data_type=data_type)
dbt.deprecations.warn(
    'bq-partition-by-string',
    raw_partition_by=raw_partition_by,
    inferred_partition_by=inferred_partition_by
)
return inferred_partition_by

Setting up a kwargs dictionary just to expand it seems unnecessary!

@beckjake beckjake force-pushed the feature/cost-effective-bq-incremental-followup branch from 7d43037 to 6fb8c96 Compare February 21, 2020 20:35
Also, make tests run
@beckjake beckjake force-pushed the feature/cost-effective-bq-incremental-followup branch from 6fb8c96 to 4068c66 Compare February 21, 2020 20:37
Copy link
Contributor Author

@drewbanin drewbanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one more tiny comment here, and then we should ship the heck out of this PR.

This does not only Look Good To Me, this Looks Great To Me. Word on the street is that this PR is going to cut into GCP's profit margin... so glad that these incremental models are finally going to be really efficient!!

Co-Authored-By: Drew Banin <drew@fishtownanalytics.com>
@beckjake beckjake merged commit bcea7cc into dev/barbara-gittings Feb 24, 2020
@beckjake beckjake deleted the feature/cost-effective-bq-incremental-followup branch February 24, 2020 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BQ incremental merge statements respect dest date partitioning
3 participants