diff --git a/superset/common/query_context.py b/superset/common/query_context.py index 0964292d8fd1b..5053372412509 100644 --- a/superset/common/query_context.py +++ b/superset/common/query_context.py @@ -14,30 +14,247 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# pylint: disable=R +# pylint: disable=C,R,W +from datetime import datetime, timedelta +import logging +import pickle as pkl +import traceback from typing import Dict, List +import numpy as np +import pandas as pd + +from superset import app, cache from superset import db from superset.connectors.connector_registry import ConnectorRegistry +from superset.utils import core as utils +from superset.utils.core import DTTM_ALIAS from .query_object import QueryObject +config = app.config +stats_logger = config.get('STATS_LOGGER') + class QueryContext: """ The query context contains the query object and additional fields necessary to retrieve the data payload for a given viz. """ + + default_fillna = 0 + cache_type = 'df' + enforce_numerical_metrics = True + # TODO: Type datasource and query_object dictionary with TypedDict when it becomes # a vanilla python type https://github.com/python/mypy/issues/5288 def __init__( self, datasource: Dict, queries: List[Dict], + force: bool = False, + custom_cache_timeout: int = None, ): self.datasource = ConnectorRegistry.get_datasource(datasource.get('type'), int(datasource.get('id')), db.session) self.queries = list(map(lambda query_obj: QueryObject(**query_obj), queries)) - def get_data(self): - raise NotImplementedError() + self.force = force + + self.custom_cache_timeout = custom_cache_timeout + + self.enforce_numerical_metrics = True + + def get_query_result(self, query_object): + """Returns a pandas dataframe based on the query object""" + + # Here, we assume that all the queries will use the same datasource, which is + # is a valid assumption for current setting. In a long term, we may or maynot + # support multiple queries from different data source. + + timestamp_format = None + if self.datasource.type == 'table': + dttm_col = self.datasource.get_col(query_object.granularity) + if dttm_col: + timestamp_format = dttm_col.python_date_format + + # The datasource here can be different backend but the interface is common + result = self.datasource.query(query_object.to_dict()) + + df = result.df + # Transform the timestamp we received from database to pandas supported + # datetime format. If no python_date_format is specified, the pattern will + # be considered as the default ISO date format + # If the datetime format is unix, the parse will use the corresponding + # parsing logic + if df is not None and not df.empty: + if DTTM_ALIAS in df.columns: + if timestamp_format in ('epoch_s', 'epoch_ms'): + # Column has already been formatted as a timestamp. + df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(pd.Timestamp) + else: + df[DTTM_ALIAS] = pd.to_datetime( + df[DTTM_ALIAS], utc=False, format=timestamp_format) + if self.datasource.offset: + df[DTTM_ALIAS] += timedelta(hours=self.datasource.offset) + df[DTTM_ALIAS] += query_object.time_shift + + if self.enforce_numerical_metrics: + self.df_metrics_to_num(df, query_object) + + df.replace([np.inf, -np.inf], np.nan) + df = self.handle_nulls(df) + return { + 'query': result.query, + 'status': result.status, + 'error_message': result.error_message, + 'df': df, + } + + def df_metrics_to_num(self, df, query_object): + """Converting metrics to numeric when pandas.read_sql cannot""" + metrics = [metric for metric in query_object.metrics] + for col, dtype in df.dtypes.items(): + if dtype.type == np.object_ and col in metrics: + df[col] = pd.to_numeric(df[col], errors='coerce') + + def handle_nulls(self, df): + fillna = self.get_fillna_for_columns(df.columns) + return df.fillna(fillna) + + def get_fillna_for_col(self, col): + """Returns the value to use as filler for a specific Column.type""" + if col and col.is_string: + return ' NULL' + return self.default_fillna + + def get_fillna_for_columns(self, columns=None): + """Returns a dict or scalar that can be passed to DataFrame.fillna""" + if columns is None: + return self.default_fillna + columns_dict = {col.column_name: col for col in self.datasource.columns} + fillna = { + c: self.get_fillna_for_col(columns_dict.get(c)) + for c in columns + } + return fillna + + def get_data(self, df): + return df.to_dict(orient='records') + + def get_single_payload(self, query_obj): + """Returns a payload of metadata and data""" + payload = self.get_df_payload(query_obj) + df = payload.get('df') + status = payload.get('status') + if status != utils.QueryStatus.FAILED: + if df is not None and df.empty: + payload['error'] = 'No data' + else: + payload['data'] = self.get_data(df) + if 'df' in payload: + del payload['df'] + return payload + + def get_payload(self): + """Get all the paylaods from the arrays""" + return [self.get_single_payload(query_ojbect) for query_ojbect in self.queries] + + @property + def cache_timeout(self): + if self.custom_cache_timeout is not None: + return self.custom_cache_timeout + if self.datasource.cache_timeout is not None: + return self.datasource.cache_timeout + if ( + hasattr(self.datasource, 'database') and + self.datasource.database.cache_timeout) is not None: + return self.datasource.database.cache_timeout + return config.get('CACHE_DEFAULT_TIMEOUT') + + def get_df_payload(self, query_obj, **kwargs): + """Handles caching around the df paylod retrieval""" + cache_key = query_obj.cache_key( + datasource=self.datasource.uid, **kwargs) if query_obj else None + logging.info('Cache key: {}'.format(cache_key)) + is_loaded = False + stacktrace = None + df = None + cached_dttm = datetime.utcnow().isoformat().split('.')[0] + cache_value = None + status = None + query = '' + error_message = None + if cache_key and cache and not self.force: + cache_value = cache.get(cache_key) + if cache_value: + stats_logger.incr('loaded_from_cache') + try: + cache_value = pkl.loads(cache_value) + df = cache_value['df'] + query = cache_value['query'] + status = utils.QueryStatus.SUCCESS + is_loaded = True + except Exception as e: + logging.exception(e) + logging.error('Error reading cache: ' + + utils.error_msg_from_exception(e)) + logging.info('Serving from cache') + + if query_obj and not is_loaded: + try: + query_result = self.get_query_result(query_obj) + status = query_result['status'] + query = query_result['query'] + error_message = query_result['error_message'] + df = query_result['df'] + if status != utils.QueryStatus.FAILED: + stats_logger.incr('loaded_from_source') + is_loaded = True + except Exception as e: + logging.exception(e) + if not error_message: + error_message = '{}'.format(e) + status = utils.QueryStatus.FAILED + stacktrace = traceback.format_exc() + + if ( + is_loaded and + cache_key and + cache and + status != utils.QueryStatus.FAILED): + try: + cache_value = dict( + dttm=cached_dttm, + df=df if df is not None else None, + query=query, + ) + cache_value = pkl.dumps( + cache_value, protocol=pkl.HIGHEST_PROTOCOL) + + logging.info('Caching {} chars at key {}'.format( + len(cache_value), cache_key)) + + stats_logger.incr('set_cache_key') + cache.set( + cache_key, + cache_value, + timeout=self.cache_timeout) + except Exception as e: + # cache.set call can fail if the backend is down or if + # the key is too large or whatever other reasons + logging.warning('Could not cache key {}'.format(cache_key)) + logging.exception(e) + cache.delete(cache_key) + return { + 'cache_key': cache_key, + 'cached_dttm': cache_value['dttm'] if cache_value is not None else None, + 'cache_timeout': self.cache_timeout, + 'df': df, + 'error': error_message, + 'is_cached': cache_key is not None, + 'query': query, + 'status': status, + 'stacktrace': stacktrace, + 'rowcount': len(df.index) if df is not None else 0, + } diff --git a/superset/common/query_object.py b/superset/common/query_object.py index c851d47c16428..a2394042a09f8 100644 --- a/superset/common/query_object.py +++ b/superset/common/query_object.py @@ -15,15 +15,17 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=R -from typing import Dict, List, Optional +import hashlib +from typing import Dict, List, Optional, Union + +import simplejson as json from superset import app from superset.utils import core as utils + # TODO: Type Metrics dictionary with TypedDict when it becomes a vanilla python type # https://github.com/python/mypy/issues/5288 -Metric = Dict - class QueryObject: """ @@ -33,31 +35,87 @@ class QueryObject: def __init__( self, granularity: str, + metrics: List[Union[Dict, str]], groupby: List[str] = None, - metrics: List[Metric] = None, filters: List[str] = None, time_range: Optional[str] = None, time_shift: Optional[str] = None, is_timeseries: bool = False, + timeseries_limit: int = 0, row_limit: int = app.config.get('ROW_LIMIT'), - limit: int = 0, - timeseries_limit_metric: Optional[Metric] = None, + timeseries_limit_metric: Optional[Dict] = None, order_desc: bool = True, extras: Optional[Dict] = None, + prequeries: Optional[Dict] = None, + is_prequery: bool = False, + columns: List[str] = None, + orderby: List[List] = None, ): self.granularity = granularity self.from_dttm, self.to_dttm = utils.get_since_until(time_range, time_shift) self.is_timeseries = is_timeseries - self.groupby = groupby or [] - self.metrics = metrics or [] - self.filter = filters or [] + self.time_range = time_range + self.time_shift = utils.parse_human_timedelta(time_shift) + self.groupby = groupby if groupby is not None else [] + + # Temporal solution for backward compatability issue + # due the new format of non-ad-hoc metric. + self.metrics = [metric if 'expressionType' in metric else metric['label'] + for metric in metrics] self.row_limit = row_limit - self.timeseries_limit = int(limit) + self.filter = filters if filters is not None else [] + self.timeseries_limit = timeseries_limit self.timeseries_limit_metric = timeseries_limit_metric self.order_desc = order_desc - self.prequeries = [] - self.is_prequery = False - self.extras = extras + self.prequeries = prequeries + self.is_prequery = is_prequery + self.extras = extras if extras is not None else {} + self.columns = columns if columns is not None else [] + self.orderby = orderby if orderby is not None else [] def to_dict(self): - raise NotImplementedError() + query_object_dict = { + 'granularity': self.granularity, + 'from_dttm': self.from_dttm, + 'to_dttm': self.to_dttm, + 'is_timeseries': self.is_timeseries, + 'groupby': self.groupby, + 'metrics': self.metrics, + 'row_limit': self.row_limit, + 'filter': self.filter, + 'timeseries_limit': self.timeseries_limit, + 'timeseries_limit_metric': self.timeseries_limit_metric, + 'order_desc': self.order_desc, + 'prequeries': self.prequeries, + 'is_prequery': self.is_prequery, + 'extras': self.extras, + 'columns': self.columns, + 'orderby': self.orderby, + } + return query_object_dict + + def cache_key(self, **extra): + """ + The cache key is made out of the key/values in `query_obj`, plus any + other key/values in `extra` + We remove datetime bounds that are hard values, and replace them with + the use-provided inputs to bounds, which may be time-relative (as in + "5 days ago" or "now"). + """ + cache_dict = self.to_dict() + cache_dict.update(extra) + + for k in ['from_dttm', 'to_dttm']: + del cache_dict[k] + if self.time_range: + cache_dict['time_range'] = self.time_range + json_data = self.json_dumps(cache_dict, sort_keys=True) + return hashlib.md5(json_data.encode('utf-8')).hexdigest() + + def json_dumps(self, obj, sort_keys=False): + return json.dumps( + obj, + default=utils.json_int_dttm_ser, + ignore_nan=True, + sort_keys=sort_keys, + ) diff --git a/superset/views/api.py b/superset/views/api.py index 7b84217c55d24..aadee9cb46d9a 100644 --- a/superset/views/api.py +++ b/superset/views/api.py @@ -15,16 +15,18 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=R -import json - -from flask import g, request +from flask import request from flask_appbuilder import expose from flask_appbuilder.security.decorators import has_access_api +import simplejson as json -from superset import appbuilder, security_manager +from superset import appbuilder, db, security_manager from superset.common.query_context import QueryContext +from superset.legacy import update_time_range +import superset.models.core as models from superset.models.core import Log -from .base import api, BaseSupersetView, data_payload_response, handle_api_exception +from superset.utils import core as utils +from .base import api, BaseSupersetView, handle_api_exception class Api(BaseSupersetView): @@ -37,11 +39,37 @@ def query(self): """ Takes a query_obj constructed in the client and returns payload data response for the given query_obj. + params: query_context: json_blob """ query_context = QueryContext(**json.loads(request.form.get('query_context'))) - security_manager.assert_datasource_permission(query_context.datasource, g.user) - payload_json = query_context.get_data() - return data_payload_response(payload_json) + security_manager.assert_datasource_permission(query_context.datasource) + payload_json = query_context.get_payload() + return json.dumps( + payload_json, + default=utils.json_int_dttm_ser, + ignore_nan=True, + ) + + @Log.log_this + @api + @handle_api_exception + @has_access_api + @expose('/v1/form_data/', methods=['GET']) + def query_form_data(self): + """ + Get the formdata stored in the database for existing slice. + params: slice_id: integer + """ + form_data = {} + slice_id = request.args.get('slice_id') + if slice_id: + slc = db.session.query(models.Slice).filter_by(id=slice_id).one_or_none() + if slc: + form_data = slc.form_data.copy() + + update_time_range(form_data) + + return json.dumps(form_data) appbuilder.add_view_no_menu(Api)