Skip to content

Commit

Permalink
Enhancement of query context and object. (#6962)
Browse files Browse the repository at this point in the history
* added more functionalities for query context and object.

* fixed cache logic

* added default value for groupby

* updated comments and removed print
  • Loading branch information
conglei authored and xtinec committed Mar 5, 2019
1 parent aded70a commit d5b9795
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 25 deletions.
223 changes: 220 additions & 3 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,247 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=R
# pylint: disable=C,R,W
from datetime import datetime, timedelta
import logging
import pickle as pkl
import traceback
from typing import Dict, List

import numpy as np
import pandas as pd

from superset import app, cache
from superset import db
from superset.connectors.connector_registry import ConnectorRegistry
from superset.utils import core as utils
from superset.utils.core import DTTM_ALIAS
from .query_object import QueryObject

config = app.config
stats_logger = config.get('STATS_LOGGER')


class QueryContext:
"""
The query context contains the query object and additional fields necessary
to retrieve the data payload for a given viz.
"""

default_fillna = 0
cache_type = 'df'
enforce_numerical_metrics = True

# TODO: Type datasource and query_object dictionary with TypedDict when it becomes
# a vanilla python type https://github.com/python/mypy/issues/5288
def __init__(
self,
datasource: Dict,
queries: List[Dict],
force: bool = False,
custom_cache_timeout: int = None,
):
self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'),
int(datasource.get('id')),
db.session)
self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries))

def get_data(self):
raise NotImplementedError()
self.force = force

self.custom_cache_timeout = custom_cache_timeout

self.enforce_numerical_metrics = True

def get_query_result(self, query_object):
"""Returns a pandas dataframe based on the query object"""

# Here, we assume that all the queries will use the same datasource, which is
# is a valid assumption for current setting. In a long term, we may or maynot
# support multiple queries from different data source.

timestamp_format = None
if self.datasource.type == 'table':
dttm_col = self.datasource.get_col(query_object.granularity)
if dttm_col:
timestamp_format = dttm_col.python_date_format

# The datasource here can be different backend but the interface is common
result = self.datasource.query(query_object.to_dict())

df = result.df
# Transform the timestamp we received from database to pandas supported
# datetime format. If no python_date_format is specified, the pattern will
# be considered as the default ISO date format
# If the datetime format is unix, the parse will use the corresponding
# parsing logic
if df is not None and not df.empty:
if DTTM_ALIAS in df.columns:
if timestamp_format in ('epoch_s', 'epoch_ms'):
# Column has already been formatted as a timestamp.
df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp)
else:
df[DTTM_ALIAS] = pd.to_datetime(
df[DTTM_ALIAS], utc=False, format=timestamp_format)
if self.datasource.offset:
df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset)
df[DTTM_ALIAS] += query_object.time_shift

if self.enforce_numerical_metrics:
self.df_metrics_to_num(df, query_object)

df.replace([np.inf, -np.inf], np.nan)
df = self.handle_nulls(df)
return {
'query': result.query,
'status': result.status,
'error_message': result.error_message,
'df': df,
}

def df_metrics_to_num(self, df, query_object):
"""Converting metrics to numeric when pandas.read_sql cannot"""
metrics = [metric for metric in query_object.metrics]
for col, dtype in df.dtypes.items():
if dtype.type == np.object_ and col in metrics:
df[col] = pd.to_numeric(df[col], errors='coerce')

def handle_nulls(self, df):
fillna = self.get_fillna_for_columns(df.columns)
return df.fillna(fillna)

def get_fillna_for_col(self, col):
"""Returns the value to use as filler for a specific Column.type"""
if col and col.is_string:
return ' NULL'
return self.default_fillna

def get_fillna_for_columns(self, columns=None):
"""Returns a dict or scalar that can be passed to DataFrame.fillna"""
if columns is None:
return self.default_fillna
columns_dict = {col.column_name: col for col in self.datasource.columns}
fillna = {
c: self.get_fillna_for_col(columns_dict.get(c))
for c in columns
}
return fillna

def get_data(self, df):
return df.to_dict(orient='records')

def get_single_payload(self, query_obj):
"""Returns a payload of metadata and data"""
payload = self.get_df_payload(query_obj)
df = payload.get('df')
status = payload.get('status')
if status != utils.QueryStatus.FAILED:
if df is not None and df.empty:
payload['error'] = 'No data'
else:
payload['data'] = self.get_data(df)
if 'df' in payload:
del payload['df']
return payload

def get_payload(self):
"""Get all the paylaods from the arrays"""
return [self.get_single_payload(query_ojbect) for query_ojbect in self.queries]

@property
def cache_timeout(self):
if self.custom_cache_timeout is not None:
return self.custom_cache_timeout
if self.datasource.cache_timeout is not None:
return self.datasource.cache_timeout
if (
hasattr(self.datasource, 'database') and
self.datasource.database.cache_timeout) is not None:
return self.datasource.database.cache_timeout
return config.get('CACHE_DEFAULT_TIMEOUT')

def get_df_payload(self, query_obj, **kwargs):
"""Handles caching around the df paylod retrieval"""
cache_key = query_obj.cache_key(
datasource=self.datasource.uid, **kwargs) if query_obj else None
logging.info('Cache key: {}'.format(cache_key))
is_loaded = False
stacktrace = None
df = None
cached_dttm = datetime.utcnow().isoformat().split('.')[0]
cache_value = None
status = None
query = ''
error_message = None
if cache_key and cache and not self.force:
cache_value = cache.get(cache_key)
if cache_value:
stats_logger.incr('loaded_from_cache')
try:
cache_value = pkl.loads(cache_value)
df = cache_value['df']
query = cache_value['query']
status = utils.QueryStatus.SUCCESS
is_loaded = True
except Exception as e:
logging.exception(e)
logging.error('Error reading cache: ' +
utils.error_msg_from_exception(e))
logging.info('Serving from cache')

if query_obj and not is_loaded:
try:
query_result = self.get_query_result(query_obj)
status = query_result['status']
query = query_result['query']
error_message = query_result['error_message']
df = query_result['df']
if status != utils.QueryStatus.FAILED:
stats_logger.incr('loaded_from_source')
is_loaded = True
except Exception as e:
logging.exception(e)
if not error_message:
error_message = '{}'.format(e)
status = utils.QueryStatus.FAILED
stacktrace = traceback.format_exc()

if (
is_loaded and
cache_key and
cache and
status != utils.QueryStatus.FAILED):
try:
cache_value = dict(
dttm=cached_dttm,
df=df if df is not None else None,
query=query,
)
cache_value = pkl.dumps(
cache_value, protocol=pkl.HIGHEST_PROTOCOL)

logging.info('Caching {} chars at key {}'.format(
len(cache_value), cache_key))

stats_logger.incr('set_cache_key')
cache.set(
cache_key,
cache_value,
timeout=self.cache_timeout)
except Exception as e:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logging.warning('Could not cache key {}'.format(cache_key))
logging.exception(e)
cache.delete(cache_key)
return {
'cache_key': cache_key,
'cached_dttm': cache_value['dttm'] if cache_value is not None else None,
'cache_timeout': self.cache_timeout,
'df': df,
'error': error_message,
'is_cached': cache_key is not None,
'query': query,
'status': status,
'stacktrace': stacktrace,
'rowcount': len(df.index) if df is not None else 0,
}
86 changes: 72 additions & 14 deletions superset/common/query_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=R
from typing import Dict, List, Optional
import hashlib
from typing import Dict, List, Optional, Union

import simplejson as json

from superset import app
from superset.utils import core as utils


# TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type
# https://github.com/python/mypy/issues/5288
Metric = Dict


class QueryObject:
"""
Expand All @@ -33,31 +35,87 @@ class QueryObject:
def __init__(
self,
granularity: str,
metrics: List[Union[Dict, str]],
groupby: List[str] = None,
metrics: List[Metric] = None,
filters: List[str] = None,
time_range: Optional[str] = None,
time_shift: Optional[str] = None,
is_timeseries: bool = False,
timeseries_limit: int = 0,
row_limit: int = app.config.get('ROW_LIMIT'),
limit: int = 0,
timeseries_limit_metric: Optional[Metric] = None,
timeseries_limit_metric: Optional[Dict] = None,
order_desc: bool = True,
extras: Optional[Dict] = None,
prequeries: Optional[Dict] = None,
is_prequery: bool = False,
columns: List[str] = None,
orderby: List[List] = None,
):
self.granularity = granularity
self.from_dttm, self.to_dttm = utils.get_since_until(time_range, time_shift)
self.is_timeseries = is_timeseries
self.groupby = groupby or []
self.metrics = metrics or []
self.filter = filters or []
self.time_range = time_range
self.time_shift = utils.parse_human_timedelta(time_shift)
self.groupby = groupby if groupby is not None else []

# Temporal solution for backward compatability issue
# due the new format of non-ad-hoc metric.
self.metrics = [metric if 'expressionType' in metric else metric['label']
for metric in metrics]
self.row_limit = row_limit
self.timeseries_limit = int(limit)
self.filter = filters if filters is not None else []
self.timeseries_limit = timeseries_limit
self.timeseries_limit_metric = timeseries_limit_metric
self.order_desc = order_desc
self.prequeries = []
self.is_prequery = False
self.extras = extras
self.prequeries = prequeries
self.is_prequery = is_prequery
self.extras = extras if extras is not None else {}
self.columns = columns if columns is not None else []
self.orderby = orderby if orderby is not None else []

def to_dict(self):
raise NotImplementedError()
query_object_dict = {
'granularity': self.granularity,
'from_dttm': self.from_dttm,
'to_dttm': self.to_dttm,
'is_timeseries': self.is_timeseries,
'groupby': self.groupby,
'metrics': self.metrics,
'row_limit': self.row_limit,
'filter': self.filter,
'timeseries_limit': self.timeseries_limit,
'timeseries_limit_metric': self.timeseries_limit_metric,
'order_desc': self.order_desc,
'prequeries': self.prequeries,
'is_prequery': self.is_prequery,
'extras': self.extras,
'columns': self.columns,
'orderby': self.orderby,
}
return query_object_dict

def cache_key(self, **extra):
"""
The cache key is made out of the key/values in `query_obj`, plus any
other key/values in `extra`
We remove datetime bounds that are hard values, and replace them with
the use-provided inputs to bounds, which may be time-relative (as in
"5 days ago" or "now").
"""
cache_dict = self.to_dict()
cache_dict.update(extra)

for k in ['from_dttm', 'to_dttm']:
del cache_dict[k]
if self.time_range:
cache_dict['time_range'] = self.time_range
json_data = self.json_dumps(cache_dict, sort_keys=True)
return hashlib.md5(json_data.encode('utf-8')).hexdigest()

def json_dumps(self, obj, sort_keys=False):
return json.dumps(
obj,
default=utils.json_int_dttm_ser,
ignore_nan=True,
sort_keys=sort_keys,
)
Loading

0 comments on commit d5b9795

Please sign in to comment.