Skip to content

Commit

Permalink
bigquery parity for statement blocks (#526)
Browse files Browse the repository at this point in the history
* wip

* bq parity for data fetching requests

* move macros around

* tests for introspective queries

* fix exception signature

* provide schema to create table for bq

* more tweaks

* release bq connection after opening

* optionally log to term

* don't use unicode char in log func

* use empty list as default for execute
  • Loading branch information
drewbanin committed Sep 19, 2017
1 parent 5cc2e13 commit efe4ece
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 16 deletions.
24 changes: 19 additions & 5 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class BigQueryAdapter(PostgresAdapter):
"query_for_existing",
"execute_model",
"drop",
"execute",
"quote_schema_and_table"
]

SCOPE = ('https://www.googleapis.com/auth/bigquery',
Expand Down Expand Up @@ -223,6 +225,8 @@ def materialize_as_table(cls, profile, dataset, model_name, model_sql):
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()

cls.release_connection(profile, model_name)

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
Expand All @@ -232,10 +236,11 @@ def materialize_as_table(cls, profile, dataset, model_name, model_sql):

@classmethod
def execute_model(cls, profile, model, materialization, model_name=None):
connection = cls.get_connection(profile, model.get('name'))

if flags.STRICT_MODE:
connection = cls.get_connection(profile, model.get('name'))
validate_connection(connection)
cls.release_connection(profile, model.get('name'))

model_name = model.get('name')
model_schema = model.get('schema')
Expand Down Expand Up @@ -267,10 +272,10 @@ def fetch_query_results(cls, query):
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return rows
return all_rows

@classmethod
def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):
def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -283,7 +288,16 @@ def execute_and_fetch(cls, profile, sql, model_name=None, **kwargs):

query.run()

return cls.fetch_query_results(query)
res = []
if fetch:
res = cls.fetch_query_results(query)

status = 'ERROR' if query.errors else 'OK'
return status, res

@classmethod
def execute_and_fetch(cls, profile, sql, model_name, auto_begin=None):
return cls.execute(profile, sql, model_name, fetch=True)

@classmethod
def add_begin_query(cls, profile, name):
Expand Down Expand Up @@ -368,7 +382,7 @@ def quote(cls, identifier):
return '`{}`'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table):
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
Expand Down
15 changes: 14 additions & 1 deletion dbt/adapters/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DefaultAdapter(object):
"add_query",
"expand_target_column_types",
"quote_schema_and_table",
"execute"
]

raw_functions = [
Expand Down Expand Up @@ -539,7 +540,19 @@ def execute_and_fetch(cls, profile, sql, model_name=None,
auto_begin=False):
_, cursor = cls.execute_one(profile, sql, model_name, auto_begin)

return cursor.fetchall()
status = cls.get_status(cursor)
rows = cursor.fetchall()
return status, rows

@classmethod
def execute(cls, profile, sql, model_name=None, auto_begin=False,
fetch=False):
if fetch:
return cls.execute_and_fetch(profile, sql, model_name, auto_begin)
else:
_, cursor = cls.execute_one(profile, sql, model_name, auto_begin)
status = cls.get_status(cursor)
return status, []

@classmethod
def execute_all(cls, profile, sqls, model_name=None):
Expand Down
7 changes: 5 additions & 2 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def _add_sql_handlers(context):
})


def log(msg):
logger.debug(msg)
def log(msg, info=False):
if info:
logger.info(msg)
else:
logger.debug(msg)
return ''


Expand Down
8 changes: 8 additions & 0 deletions dbt/include/global_project/macros/adapters/bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

{% macro bigquery__create_table_as(temporary, identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'table') }}
{% endmacro %}

{% macro bigquery__create_view_as(identifier, sql) -%}
{{ adapter.execute_model({"name": identifier, "injected_sql": sql, "schema": schema}, 'view') }}
{% endmacro %}
1 change: 0 additions & 1 deletion dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
);
{% endmacro %}


{% macro create_view_as(identifier, sql) -%}
{{ adapter_macro('create_view_as', identifier, sql) }}
{%- endmacro %}
Expand Down
6 changes: 3 additions & 3 deletions dbt/include/global_project/macros/core.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
{{ write(sql) }}
{%- endif -%}

{%- set _, cursor = adapter.add_query(sql, auto_begin=auto_begin) -%}
{%- set status, res = adapter.execute(sql, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- if name is not none -%}
{%- set result = [] if not fetch_result else adapter.get_result_from_cursor(cursor) -%}
{{ store_result(name, status=adapter.get_status(cursor), data=result) }}
{{ store_result(name, status=status, data=res) }}
{%- endif -%}

{%- endif -%}
{%- endmacro %}
4 changes: 2 additions & 2 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def print_start_line(self):
self.num_nodes)

def execute_test(self, test):
rows = self.adapter.execute_and_fetch(
res, rows = self.adapter.execute_and_fetch(
self.profile,
test.get('wrapped_sql'),
test.get('name'),
Expand All @@ -426,7 +426,7 @@ def execute_test(self, test):
num_cols = len(rows[0])
raise RuntimeError(
"Bad test {name}: Returned {rows} rows and {cols} cols"
.format(name=test.name, rows=num_rows, cols=num_cols))
.format(name=test.get('name'), rows=num_rows, cols=num_cols))

return rows[0][0]

Expand Down
2 changes: 1 addition & 1 deletion dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def find_by_name(flat_graph, target_name, target_package, subgraph,
if len(node_parts) != 3:
node_type = model.get('resource_type', 'node')
msg = "{} names cannot contain '.' characters".format(node_type)
dbt.exceptions.raise_compiler_error(model, msg)
dbt.exceptions.raise_compiler_error(msg, model)

resource_type, package_name, node_name = node_parts

Expand Down
20 changes: 20 additions & 0 deletions test/integration/022_bigquery_test/macros/test_creation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@


-- hack b/c bq model names are fully qualified, which doesn't work
-- with query_for_existing
{% macro test_was_materialized(model, name, type) %}

{#-- don't run this query in the parsing step #}
{%- if model -%}
{%- set existing_tables = adapter.query_for_existing(schema) -%}
{%- else -%}
{%- set existing_tables = {} -%}
{%- endif -%}

{% if name in existing_tables and existing_tables[name] == type %}
select 0 as success
{% else %}
select 1 as error
{% endif %}

{% endmacro %}
11 changes: 11 additions & 0 deletions test/integration/022_bigquery_test/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ view:
unique:
- id
- dupe # fails

was_materialized:
- {name: view, type: view}

table_model:
constraints:
not_null:
- id

was_materialized:
- {name: table_model, type: table}
4 changes: 4 additions & 0 deletions test/integration/022_bigquery_test/models/table_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

{{ config(materialized = "table") }}

select * from {{ ref('view') }}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dbt.project import read_project


class TestSimpleBigQueryView(DBTIntegrationTest):
class TestSimpleBigQueryRun(DBTIntegrationTest):

def setUp(self):
pass
Expand All @@ -18,6 +18,12 @@ def schema(self):
def models(self):
return "test/integration/022_bigquery_test/models"

@property
def project_config(self):
return {
'macro-paths': ['test/integration/022_bigquery_test/macros'],
}

def run_schema_validations(self):
project = read_project('dbt_project.yml')
args = FakeArgs()
Expand Down

0 comments on commit efe4ece

Please sign in to comment.