From 0c7f7288d8cded5dc73d49d1e0be397e748d4f10 Mon Sep 17 00:00:00 2001 From: Yongjie Zhao Date: Mon, 17 Jan 2022 18:42:59 +0800 Subject: [PATCH] feat(advanced analytics): support groupby in resample (#18045) --- .../src/operators/resampleOperator.ts | 14 +++- .../utils/operators/resampleOperator.test.ts | 79 ++++++++++++++++++- .../src/query/types/PostProcessing.ts | 3 + superset/utils/pandas_postprocessing.py | 26 ++++-- .../pandas_postprocessing_tests.py | 67 ++++++++++++++++ 5 files changed, 180 insertions(+), 9 deletions(-) diff --git a/superset-frontend/packages/superset-ui-chart-controls/src/operators/resampleOperator.ts b/superset-frontend/packages/superset-ui-chart-controls/src/operators/resampleOperator.ts index a43838d710b8c..1c50e932d5673 100644 --- a/superset-frontend/packages/superset-ui-chart-controls/src/operators/resampleOperator.ts +++ b/superset-frontend/packages/superset-ui-chart-controls/src/operators/resampleOperator.ts @@ -17,7 +17,11 @@ * specific language governing permissions and limitationsxw * under the License. */ -import { PostProcessingResample } from '@superset-ui/core'; +import { + ensureIsArray, + isPhysicalColumn, + PostProcessingResample, +} from '@superset-ui/core'; import { PostProcessingFactory } from './types'; import { TIME_COLUMN } from './utils'; @@ -28,6 +32,13 @@ export const resampleOperator: PostProcessingFactory< const resampleMethod = resampleZeroFill ? 'asfreq' : formData.resample_method; const resampleRule = formData.resample_rule; if (resampleMethod && resampleRule) { + const groupby_columns = ensureIsArray(queryObject.columns).map(column => { + if (isPhysicalColumn(column)) { + return column; + } + return column.label; + }); + return { operation: 'resample', options: { @@ -35,6 +46,7 @@ export const resampleOperator: PostProcessingFactory< rule: resampleRule, fill_value: resampleZeroFill ? 0 : null, time_column: TIME_COLUMN, + groupby_columns, }, }; } diff --git a/superset-frontend/packages/superset-ui-chart-controls/test/utils/operators/resampleOperator.test.ts b/superset-frontend/packages/superset-ui-chart-controls/test/utils/operators/resampleOperator.test.ts index bce5a84abc556..18116a8310a7f 100644 --- a/superset-frontend/packages/superset-ui-chart-controls/test/utils/operators/resampleOperator.test.ts +++ b/superset-frontend/packages/superset-ui-chart-controls/test/utils/operators/resampleOperator.test.ts @@ -16,8 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -import { QueryObject, SqlaFormData } from '@superset-ui/core'; -import { resampleOperator } from '../../../src'; +import { AdhocColumn, QueryObject, SqlaFormData } from '@superset-ui/core'; +import { resampleOperator } from '@superset-ui/chart-controls'; const formData: SqlaFormData = { metrics: [ @@ -75,6 +75,7 @@ test('should do resample', () => { rule: '1D', fill_value: null, time_column: '__timestamp', + groupby_columns: [], }, }); }); @@ -92,6 +93,80 @@ test('should do zerofill resample', () => { rule: '1D', fill_value: 0, time_column: '__timestamp', + groupby_columns: [], + }, + }); +}); + +test('should append physical column to resample', () => { + expect( + resampleOperator( + { ...formData, resample_method: 'zerofill', resample_rule: '1D' }, + { ...queryObject, columns: ['column1', 'column2'] }, + ), + ).toEqual({ + operation: 'resample', + options: { + method: 'asfreq', + rule: '1D', + fill_value: 0, + time_column: '__timestamp', + groupby_columns: ['column1', 'column2'], + }, + }); +}); + +test('should append label of adhoc column and physical column to resample', () => { + expect( + resampleOperator( + { ...formData, resample_method: 'zerofill', resample_rule: '1D' }, + { + ...queryObject, + columns: [ + { + hasCustomLabel: true, + label: 'concat_a_b', + expressionType: 'SQL', + sqlExpression: "'a' + 'b'", + } as AdhocColumn, + 'column2', + ], + }, + ), + ).toEqual({ + operation: 'resample', + options: { + method: 'asfreq', + rule: '1D', + fill_value: 0, + time_column: '__timestamp', + groupby_columns: ['concat_a_b', 'column2'], + }, + }); +}); + +test('should append `undefined` if adhoc non-existing label', () => { + expect( + resampleOperator( + { ...formData, resample_method: 'zerofill', resample_rule: '1D' }, + { + ...queryObject, + columns: [ + { + sqlExpression: "'a' + 'b'", + } as AdhocColumn, + 'column2', + ], + }, + ), + ).toEqual({ + operation: 'resample', + options: { + method: 'asfreq', + rule: '1D', + fill_value: 0, + time_column: '__timestamp', + groupby_columns: [undefined, 'column2'], }, }); }); diff --git a/superset-frontend/packages/superset-ui-core/src/query/types/PostProcessing.ts b/superset-frontend/packages/superset-ui-core/src/query/types/PostProcessing.ts index f38ed411f1fa5..266551bebb188 100644 --- a/superset-frontend/packages/superset-ui-core/src/query/types/PostProcessing.ts +++ b/superset-frontend/packages/superset-ui-core/src/query/types/PostProcessing.ts @@ -167,6 +167,9 @@ export interface PostProcessingResample { rule: string; fill_value?: number | null; time_column: string; + // If AdhocColumn doesn't have a label, it will be undefined. + // todo: we have to give an explicit label for AdhocColumn. + groupby_columns?: Array; }; } diff --git a/superset/utils/pandas_postprocessing.py b/superset/utils/pandas_postprocessing.py index ea4e40986bf81..c247a49a4ba49 100644 --- a/superset/utils/pandas_postprocessing.py +++ b/superset/utils/pandas_postprocessing.py @@ -958,27 +958,41 @@ def outliers(series: Series) -> Set[float]: return aggregate(df, groupby=groupby, aggregates=aggregates) -def resample( +@validate_column_args("groupby_columns") +def resample( # pylint: disable=too-many-arguments df: DataFrame, rule: str, method: str, time_column: str, + groupby_columns: Optional[Tuple[Optional[str], ...]] = None, fill_value: Optional[Union[float, int]] = None, ) -> DataFrame: """ - resample a timeseries dataframe. + support upsampling in resample :param df: DataFrame to resample. :param rule: The offset string representing target conversion. :param method: How to fill the NaN value after resample. :param time_column: existing columns in DataFrame. + :param groupby_columns: columns except time_column in dataframe :param fill_value: What values do fill missing. :return: DataFrame after resample :raises QueryObjectValidationError: If the request in incorrect """ - df = df.set_index(time_column) - if method == "asfreq" and fill_value is not None: - df = df.resample(rule).asfreq(fill_value=fill_value) + + def _upsampling(_df: DataFrame) -> DataFrame: + _df = _df.set_index(time_column) + if method == "asfreq" and fill_value is not None: + return _df.resample(rule).asfreq(fill_value=fill_value) + return getattr(_df.resample(rule), method)() + + if groupby_columns: + df = ( + df.set_index(keys=list(groupby_columns)) + .groupby(by=list(groupby_columns)) + .apply(_upsampling) + ) + df = df.reset_index().set_index(time_column).sort_index() else: - df = getattr(df.resample(rule), method)() + df = _upsampling(df) return df.reset_index() diff --git a/tests/integration_tests/pandas_postprocessing_tests.py b/tests/integration_tests/pandas_postprocessing_tests.py index feabef6e2fbaf..50612e1da3055 100644 --- a/tests/integration_tests/pandas_postprocessing_tests.py +++ b/tests/integration_tests/pandas_postprocessing_tests.py @@ -1029,3 +1029,70 @@ def test_resample(self): ) self.assertListEqual(post_df["label"].tolist(), ["x", "y", 0, 0, "z", 0, "q"]) self.assertListEqual(post_df["y"].tolist(), [1.0, 2.0, 0, 0, 3.0, 0, 4.0]) + + def test_resample_with_groupby(self): + """ +The Dataframe contains a timestamp column, a string column and a numeric column. + __timestamp city val +0 2022-01-13 Chicago 6.0 +1 2022-01-13 LA 5.0 +2 2022-01-13 NY 4.0 +3 2022-01-11 Chicago 3.0 +4 2022-01-11 LA 2.0 +5 2022-01-11 NY 1.0 + """ + df = DataFrame( + { + "__timestamp": to_datetime( + [ + "2022-01-13", + "2022-01-13", + "2022-01-13", + "2022-01-11", + "2022-01-11", + "2022-01-11", + ] + ), + "city": ["Chicago", "LA", "NY", "Chicago", "LA", "NY"], + "val": [6.0, 5.0, 4.0, 3.0, 2.0, 1.0], + } + ) + post_df = proc.resample( + df=df, + rule="1D", + method="asfreq", + fill_value=0, + time_column="__timestamp", + groupby_columns=("city",), + ) + assert list(post_df.columns) == [ + "__timestamp", + "city", + "val", + ] + assert [str(dt.date()) for dt in post_df["__timestamp"]] == ( + ["2022-01-11"] * 3 + ["2022-01-12"] * 3 + ["2022-01-13"] * 3 + ) + assert list(post_df["val"]) == [3.0, 2.0, 1.0, 0, 0, 0, 6.0, 5.0, 4.0] + + # should raise error when get a non-existent column + with pytest.raises(QueryObjectValidationError): + proc.resample( + df=df, + rule="1D", + method="asfreq", + fill_value=0, + time_column="__timestamp", + groupby_columns=("city", "unkonw_column",), + ) + + # should raise error when get a None value in groupby list + with pytest.raises(QueryObjectValidationError): + proc.resample( + df=df, + rule="1D", + method="asfreq", + fill_value=0, + time_column="__timestamp", + groupby_columns=("city", None,), + )