Skip to content

Commit

Permalink
feature: see Presto row and array data types (#7413)
Browse files Browse the repository at this point in the history
* Merge lastest from master into lyft-release-sp8 (#7405)

* filter out all nan series (#7313)

* improve not rich tooltip (#7345)

* Create issue_label_bot.yaml (#7341)

* fix: do not save colors without a color scheme (#7347)

* [wtforms] Strip leading/trailing whitespace (#7084)

* [schema] Updating the datasources schema (#5451)

* limit tables/views returned if schema is not provided (#7358)

* limit tables/views returned if schema is not provided

* fix typo

* improve code performance

* handle the case when table name or view name does not present a schema

* Add type anno (#7342)

* Updated local dev instructions to include missing step

* First pass at type annotations

* [schema] Updating the base column schema (#5452)

* Update 937d04c16b64_update_datasources.py (#7361)

* Feature flag for client cache (#7348)

* Feature flag for client cache

* Fix integration test

* Revert "Fix integration test"

This reverts commit 58434ab.

* Feature flag for client cache

* Fix integration tests

* Add feature flag to config.py

* Add another feature check

* Fix more integration tests

* Fix raw HTML in SliceAdder (#7338)

* remove backendSync.json (#7331)

* [bubbles] issue when using duplicated metrics (#7087)

* SUPERSET-7: Docker compose config version breaks on Ubuntu 16.04 (#7359)

* SUPERSET-8: Update text in docs copyright footer (#7360)

* SUPERSET-7: Docker compose config version breaks on Ubuntu 16.04

* SUPERSET-8: Extra text in docs copyright footer

* [schema] Adding commits and removing unnecessary foreign-key definitions (#7371)

*  Store last selected dashboard in sessionStorage (#7181)

* Store last selected dashboard in sessionStorage

* Fix tests

* [schema] Updating the base metric schema (#5453)

* Fix NoneType bug & fill the test recipients with original recipients if empty (#7365)

* feat: see Presto row and array data types (#7391)

* feat: see Presto row and array data types

* fix: address PR comments

* fix: lint and build issues

* fix: add types

* add stronger type hints where possible

* fix: lint issues and add select_star func in Hive

* add missing pkg init

* fix: build issues

* fix: pylint issues

* fix: use logging instead of print
  • Loading branch information
DiggidyDave authored and xtinec committed May 1, 2019
1 parent 46579b1 commit a6aabf8
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 6 deletions.
206 changes: 201 additions & 5 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
import re
import textwrap
import time
from typing import List, Tuple

from flask import g
from flask_babel import lazy_gettext as _
import pandas
import sqlalchemy as sqla
from sqlalchemy import Column, select
from sqlalchemy import Column, select, types
from sqlalchemy.engine import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.result import RowProxy
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import quoted_name, text
from sqlalchemy.sql.expression import TextAsFrom
Expand All @@ -52,6 +56,7 @@

from superset import app, conf, db, sql_parse
from superset.exceptions import SupersetTemplateException
from superset.models.sql_types.presto_sql_types import type_map as presto_type_map
from superset.utils import core as utils

QueryStatus = utils.QueryStatus
Expand Down Expand Up @@ -105,16 +110,16 @@ class BaseEngineSpec(object):
"""Abstract class for database engine specific configurations"""

engine = 'base' # str as defined in sqlalchemy.engine.engine
time_grain_functions = {}
time_grain_functions: dict = {}
time_groupby_inline = False
limit_method = LimitMethod.FORCE_LIMIT
time_secondary_columns = False
inner_joins = True
allows_subquery = True
supports_column_aliases = True
force_column_alias_quotes = False
arraysize = None
max_column_name_length = None
arraysize = 0
max_column_name_length = 0

@classmethod
def get_time_expr(cls, expr, pdf, time_grain, grain):
Expand Down Expand Up @@ -351,6 +356,10 @@ def get_table_names(cls, inspector, schema):
def get_view_names(cls, inspector, schema):
return sorted(inspector.get_view_names(schema))

@classmethod
def get_columns(cls, inspector: Inspector, table_name: str, schema: str) -> list:
return inspector.get_columns(table_name, schema)

@classmethod
def where_latest_partition(
cls, table_name, schema, database, qry, columns=None):
Expand Down Expand Up @@ -735,7 +744,7 @@ class MySQLEngineSpec(BaseEngineSpec):
'INTERVAL DAYOFWEEK(DATE_SUB({col}, INTERVAL 1 DAY)) - 1 DAY))',
}

type_code_map = {} # loaded from get_datatype only if needed
type_code_map: dict = {} # loaded from get_datatype only if needed

@classmethod
def convert_dttm(cls, target_type, dttm):
Expand Down Expand Up @@ -814,6 +823,180 @@ def get_view_names(cls, inspector, schema):
"""
return []

@classmethod
def _create_column_info(cls, column: RowProxy, name: str, data_type: str) -> dict:
"""
Create column info object
:param column: column object
:param name: column name
:param data_type: column data type
:return: column info object
"""
return {
'name': name,
'type': data_type,
# newer Presto no longer includes this column
'nullable': getattr(column, 'Null', True),
'default': None,
}

@classmethod
def _get_full_name(cls, names: List[Tuple[str, str]]) -> str:
"""
Get the full column name
:param names: list of all individual column names
:return: full column name
"""
return '.'.join(column[0] for column in names if column[0])

@classmethod
def _has_nested_data_types(cls, component_type: str) -> bool:
"""
Check if string contains a data type. We determine if there is a data type by
whitespace or multiple data types by commas
:param component_type: data type
:return: boolean
"""
comma_regex = r',(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
white_space_regex = r'\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
return re.search(comma_regex, component_type) is not None \
or re.search(white_space_regex, component_type) is not None

@classmethod
def _split_data_type(cls, data_type: str, delimiter: str) -> List[str]:
"""
Split data type based on given delimiter. Do not split the string if the
delimiter is enclosed in quotes
:param data_type: data type
:param delimiter: string separator (i.e. open parenthesis, closed parenthesis,
comma, whitespace)
:return: list of strings after breaking it by the delimiter
"""
return re.split(
r'{}(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'.format(delimiter), data_type)

@classmethod
def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None:
"""
Parse a row or array column
:param column: column
:param result: list tracking the results
"""
full_data_type = '{} {}'.format(column.Column, column.Type)
# split on open parenthesis ( to get the structural
# data type and its component types
data_types = cls._split_data_type(full_data_type, r'\(')
stack: List[Tuple[str, str]] = []
for data_type in data_types:
# split on closed parenthesis ) to track which component
# types belong to what structural data type
inner_types = cls._split_data_type(data_type, r'\)')
for inner_type in inner_types:
# We have finished parsing multiple structural data types
if not inner_type and len(stack) > 0:
stack.pop()
elif cls._has_nested_data_types(inner_type):
# split on comma , to get individual data types
single_fields = cls._split_data_type(inner_type, ', ')
for single_field in single_fields:
# If component type starts with a comma, the first single field
# will be an empty string. Disregard this empty string.
if not single_field:
continue
# split on whitespace to get field name and data type
field_info = cls._split_data_type(single_field, r'\s')
# check if there is a structural data type within
# overall structural data type
if field_info[1] == 'array' or field_info[1] == 'row':
stack.append((field_info[0], field_info[1]))
full_parent_path = cls._get_full_name(stack)
result.append(cls._create_column_info(
column, full_parent_path,
presto_type_map[field_info[1]]()))
else: # otherwise this field is a basic data type
full_parent_path = cls._get_full_name(stack)
column_name = '{}.{}'.format(full_parent_path, field_info[0])
result.append(cls._create_column_info(
column, column_name, presto_type_map[field_info[1]]()))
# If the component type ends with a structural data type, do not pop
# the stack. We have run across a structural data type within the
# overall structural data type. Otherwise, we have completely parsed
# through the entire structural data type and can move on.
if not (inner_type.endswith('array') or inner_type.endswith('row')):
stack.pop()
# We have an array of row objects (i.e. array(row(...)))
elif 'array' == inner_type or 'row' == inner_type:
# Push a dummy object to represent the structural data type
stack.append(('', inner_type))
# We have an array of a basic data types(i.e. array(varchar)).
elif len(stack) > 0:
# Because it is an array of a basic data type. We have finished
# parsing the structural data type and can move on.
stack.pop()

@classmethod
def _show_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[RowProxy]:
"""
Show presto column names
:param inspector: object that performs database schema inspection
:param table_name: table name
:param schema: schema name
:return: list of column objects
"""
quote = inspector.engine.dialect.identifier_preparer.quote_identifier
full_table = quote(table_name)
if schema:
full_table = '{}.{}'.format(quote(schema), full_table)
columns = inspector.bind.execute('SHOW COLUMNS FROM {}'.format(full_table))
return columns

@classmethod
def get_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[dict]:
"""
Get columns from a Presto data source. This includes handling row and
array data types
:param inspector: object that performs database schema inspection
:param table_name: table name
:param schema: schema name
:return: a list of results that contain column info
(i.e. column name and data type)
"""
columns = cls._show_columns(inspector, table_name, schema)
result: List[dict] = []
for column in columns:
try:
# parse column if it is a row or array
if 'array' in column.Type or 'row' in column.Type:
cls._parse_structural_column(column, result)
continue
else: # otherwise column is a basic data type
column_type = presto_type_map[column.Type]()
except KeyError:
logging.info('Did not recognize type {} of column {}'.format(
column.Type, column.Column))
column_type = types.NullType
result.append(cls._create_column_info(column, column.Column, column_type))
return result

@classmethod
def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None,
limit: int = 100, show_cols: bool = False, indent: bool = True,
latest_partition: bool = True, cols: List[dict] = []) -> str:
"""
Temporary method until we have a function that can handle row and array columns
"""
presto_cols = cols
if show_cols:
dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'
presto_cols = [
col for col in presto_cols if re.search(dot_regex, col['name']) is None]
return BaseEngineSpec.select_star(
my_db, table_name, engine, schema, limit,
show_cols, indent, latest_partition, presto_cols,
)

@classmethod
def adjust_database_uri(cls, uri, selected_schema=None):
database = uri.database
Expand Down Expand Up @@ -1323,6 +1506,11 @@ def handle_cursor(cls, cursor, query, session):
time.sleep(hive_poll_interval)
polled = cursor.poll()

@classmethod
def get_columns(
cls, inspector: Inspector, table_name: str, schema: str) -> List[dict]:
return inspector.get_columns(table_name, schema)

@classmethod
def where_latest_partition(
cls, table_name, schema, database, qry, columns=None):
Expand Down Expand Up @@ -1354,6 +1542,14 @@ def _partition_query(
cls, table_name, limit=0, order_by=None, filters=None):
return f'SHOW PARTITIONS {table_name}'

@classmethod
def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None,
limit: int = 100, show_cols: bool = False, indent: bool = True,
latest_partition: bool = True, cols: List[dict] = []) -> str:
return BaseEngineSpec.select_star(
my_db, table_name, engine, schema, limit,
show_cols, indent, latest_partition, cols)

@classmethod
def modify_url_for_impersonation(cls, url, impersonate_user, username):
"""
Expand Down
2 changes: 1 addition & 1 deletion superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ def get_table(self, table_name, schema=None):
autoload_with=self.get_sqla_engine())

def get_columns(self, table_name, schema=None):
return self.inspector.get_columns(table_name, schema)
return self.db_engine_spec.get_columns(self.inspector, table_name, schema)

def get_indexes(self, table_name, schema=None):
return self.inspector.get_indexes(table_name, schema)
Expand Down
16 changes: 16 additions & 0 deletions superset/models/sql_types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit a6aabf8

Please sign in to comment.