Skip to content

Commit

Permalink
New Paths Query (#5646)
Browse files Browse the repository at this point in the history
* initial implementation

* debug some tests and revert format to original

* some minor cleanup

* add screen handling

* start point handling

* improve indexing so multiple sessions by the same user are handled properly

* all tests passing

* fix type

* fix types

* change types

* more types

* rename base paths

* upgrade start point functionality

* assertEquals, remove inequality

* Revert "assertEquals, remove inequality"

This reverts commit 5eb229f.

Co-authored-by: Neil Kakkar <neilkakkar@gmail.com>
  • Loading branch information
EDsCODE and neilkakkar authored Aug 20, 2021
1 parent 3a2a5ec commit 06c5c6f
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 34 deletions.
5 changes: 3 additions & 2 deletions ee/clickhouse/queries/column_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Set, Tuple, cast
from typing import List, Set, Tuple, Union, cast

from ee.clickhouse.materialized_columns.columns import ColumnName, get_materialized_columns
from ee.clickhouse.models.action import get_action_tables_and_properties, uses_elements_chain
Expand All @@ -7,6 +7,7 @@
from posthog.models.entity import Entity
from posthog.models.filters import Filter
from posthog.models.filters.mixins.utils import cached_property
from posthog.models.filters.path_filter import PathFilter
from posthog.models.property import Property, PropertyName, PropertyType
from posthog.models.team import Team

Expand All @@ -18,7 +19,7 @@ class ColumnOptimizer:
This speeds up queries since clickhouse ends up selecting less data.
"""

def __init__(self, filter: Filter, team_id: int):
def __init__(self, filter: Union[Filter, PathFilter], team_id: int):
self.filter = filter
self.team_id = team_id

Expand Down
9 changes: 5 additions & 4 deletions ee/clickhouse/queries/event_query.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Tuple, Union

from ee.clickhouse.models.cohort import format_person_query, get_precalculated_query, is_precalculated_query
from ee.clickhouse.models.property import filter_element, prop_filter_json_extract
from ee.clickhouse.queries.util import parse_timestamps
from ee.clickhouse.sql.person import GET_TEAM_PERSON_DISTINCT_IDS
from posthog.models import Cohort, Filter, Property, Team
from posthog.models.filters.path_filter import PathFilter


class ClickhouseEventQuery(metaclass=ABCMeta):
Expand All @@ -14,15 +15,15 @@ class ClickhouseEventQuery(metaclass=ABCMeta):
EVENT_TABLE_ALIAS = "e"

_PERSON_PROPERTIES_ALIAS = "person_props"
_filter: Filter
_filter: Union[Filter, PathFilter]
_team_id: int
_should_join_distinct_ids = False
_should_join_persons = False
_should_round_interval = False

def __init__(
self,
filter: Filter,
filter: Union[Filter, PathFilter],
team_id: int,
round_interval=False,
should_join_distinct_ids=False,
Expand All @@ -31,7 +32,7 @@ def __init__(
) -> None:
self._filter = filter
self._team_id = team_id
self.params = {
self.params: Dict[str, Any] = {
"team_id": self._team_id,
}

Expand Down
Empty file.
55 changes: 55 additions & 0 deletions ee/clickhouse/queries/paths/path_event_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Any, Dict, Tuple

from ee.clickhouse.models.property import get_property_string_expr
from ee.clickhouse.queries.event_query import ClickhouseEventQuery
from posthog.constants import AUTOCAPTURE_EVENT, PAGEVIEW_EVENT, SCREEN_EVENT


class PathEventQuery(ClickhouseEventQuery):
def get_query(self) -> Tuple[str, Dict[str, Any]]:
_fields = (
f"{self.EVENT_TABLE_ALIAS}.timestamp AS timestamp, if(event = %(screen)s, {self._get_screen_name_parsing()}, if({self.EVENT_TABLE_ALIAS}.event = %(pageview)s, {self._get_current_url_parsing()}, if({self.EVENT_TABLE_ALIAS}.event = %(autocapture)s, concat('autocapture:', {self.EVENT_TABLE_ALIAS}.elements_chain), {self.EVENT_TABLE_ALIAS}.event))) AS path_item"
+ (f", {self.DISTINCT_ID_TABLE_ALIAS}.person_id as person_id" if self._should_join_distinct_ids else "")
)

date_query, date_params = self._get_date_filter()
self.params.update(date_params)

prop_filters = self._filter.properties
prop_query, prop_params = self._get_props(prop_filters, allow_denormalized_props=True)
self.params.update(prop_params)

query = f"""
SELECT {_fields} FROM events {self.EVENT_TABLE_ALIAS}
{self._get_disintct_id_query()}
{self._get_person_query()}
WHERE team_id = %(team_id)s
AND (event = %(pageview)s OR event = %(screen)s OR event = %(autocapture)s OR NOT event LIKE %(custom_event_match)s)
{date_query}
{prop_query}
ORDER BY {self.DISTINCT_ID_TABLE_ALIAS}.person_id, {self.EVENT_TABLE_ALIAS}.timestamp
"""
self.params.update(
{
"custom_event_match": "$%",
"pageview": PAGEVIEW_EVENT,
"screen": SCREEN_EVENT,
"autocapture": AUTOCAPTURE_EVENT,
}
)
return query, self.params

def _determine_should_join_distinct_ids(self) -> None:
self._should_join_distinct_ids = True

def _get_current_url_parsing(self):
path_type, _ = get_property_string_expr(
"events", "$current_url", "'$current_url'", "properties", allow_denormalized_props=True
)
return path_type

def _get_screen_name_parsing(self):
path_type, _ = get_property_string_expr(
"events", "$screen_name", "'$screen_name'", "properties", allow_denormalized_props=True
)
return path_type
67 changes: 67 additions & 0 deletions ee/clickhouse/queries/paths/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from abc import ABC
from typing import Dict, List, Tuple

from ee.clickhouse.client import sync_execute
from ee.clickhouse.queries.paths.path_event_query import PathEventQuery
from ee.clickhouse.sql.paths.path import PATH_ARRAY_QUERY
from posthog.constants import LIMIT
from posthog.models import Filter, Team
from posthog.models.filters.path_filter import PathFilter
from posthog.queries.paths import Paths

EVENT_IN_SESSION_LIMIT_DEFAULT = 5
SESSION_TIME_THRESHOLD_DEFAULT = 1800000 # milliseconds to 30 minutes


class ClickhousePathsNew:
_filter: PathFilter
_team: Team

def __init__(self, filter: PathFilter, team: Team) -> None:
self._filter = filter
self._team = team
self.params = {
"team_id": self._team.pk,
"events": [], # purely a speed optimization, don't need this for filtering
"event_in_session_limit": EVENT_IN_SESSION_LIMIT_DEFAULT,
"session_time_threshold": SESSION_TIME_THRESHOLD_DEFAULT,
"autocapture_match": "%autocapture:%",
}

def run(self, *args, **kwargs):

results = self._exec_query()
return self._format_results(results)

def _format_results(self, results):
if not results or len(results) == 0:
return []

resp = []
for res in results:
resp.append(
{"source": res[0], "target": res[1], "value": res[2],}
)
return resp

def _exec_query(self) -> List[Tuple]:
query, _ = self.get_query()
return sync_execute(query, self.params)

def get_query(self) -> Tuple[str, dict]:
path_event_query, params = PathEventQuery(filter=self._filter, team_id=self._team.pk).get_query()
self.params.update(params)

boundary_event_filter, start_params = self.get_start_point_filter()
self.params.update(start_params)
return (
PATH_ARRAY_QUERY.format(path_event_query=path_event_query, boundary_event_filter=boundary_event_filter),
self.params,
)

def get_start_point_filter(self) -> Tuple[str, Dict]:

if not self._filter.start_point:
return "", {"start_point": None}

return "WHERE arrayElement(limited_path, 1) = %(start_point)s", {"start_point": self._filter.start_point}
24 changes: 21 additions & 3 deletions ee/clickhouse/queries/test/test_paths.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from uuid import uuid4

from freezegun import freeze_time

from ee.clickhouse.materialized_columns.columns import materialize
from ee.clickhouse.models.event import create_event
from ee.clickhouse.queries.clickhouse_paths import ClickhousePaths
from ee.clickhouse.queries.paths.paths import ClickhousePathsNew
from ee.clickhouse.util import ClickhouseTestMixin
from posthog.constants import PAGEVIEW_EVENT, SCREEN_EVENT
from posthog.models.filters.path_filter import PathFilter
Expand All @@ -15,15 +18,30 @@ def _create_event(**kwargs):
create_event(**kwargs)


class TestClickhousePaths(ClickhouseTestMixin, paths_test_factory(ClickhousePaths, _create_event, Person.objects.create)): # type: ignore
class TestClickhousePathsOld(ClickhouseTestMixin, paths_test_factory(ClickhousePaths, _create_event, Person.objects.create)): # type: ignore
# remove when migrated to new Paths query
def test_denormalized_properties(self):
materialize("events", "$current_url")
materialize("events", "$screen_name")

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT})).get_query()
self.assertNotIn("json", query.lower())

query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT})).get_query()
self.assertNotIn("json", query.lower())

self.test_current_url_paths_and_logic()


class TestClickhousePaths(ClickhouseTestMixin, paths_test_factory(ClickhousePathsNew, _create_event, Person.objects.create)): # type: ignore
def test_denormalized_properties(self):
materialize("events", "$current_url")
materialize("events", "$screen_name")

query, _ = ClickhousePaths().get_query(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT}))
query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": PAGEVIEW_EVENT})).get_query()
self.assertNotIn("json", query.lower())

query, _ = ClickhousePaths().get_query(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT}))
query, _ = ClickhousePathsNew(team=self.team, filter=PathFilter(data={"path_type": SCREEN_EVENT})).get_query()
self.assertNotIn("json", query.lower())

self.test_current_url_paths_and_logic()
1 change: 1 addition & 0 deletions ee/clickhouse/queries/trends/trend_event_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ee.clickhouse.queries.util import date_from_clause, get_time_diff, get_trunc_func_ch, parse_timestamps
from posthog.constants import MONTHLY_ACTIVE, WEEKLY_ACTIVE
from posthog.models import Entity
from posthog.models.filters.filter import Filter


class TrendsEventQuery(ClickhouseEventQuery):
Expand Down
6 changes: 3 additions & 3 deletions ee/clickhouse/queries/trends/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple, cast
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from rest_framework.exceptions import ValidationError

Expand All @@ -8,7 +8,7 @@
from ee.clickhouse.sql.events import EVENT_JOIN_PERSON_SQL
from posthog.constants import WEEKLY_ACTIVE
from posthog.models.entity import Entity
from posthog.models.filters import Filter
from posthog.models.filters import Filter, PathFilter

MATH_FUNCTIONS = {
"sum": "sum",
Expand Down Expand Up @@ -69,7 +69,7 @@ def parse_response(stats: Dict, filter: Filter, additional_values: Dict = {}) ->
}


def get_active_user_params(filter: Filter, entity: Entity, team_id: int) -> Dict[str, Any]:
def get_active_user_params(filter: Union[Filter, PathFilter], entity: Entity, team_id: int) -> Dict[str, Any]:
params = {}
params.update({"prev_interval": "7 DAY" if entity.math == WEEKLY_ACTIVE else "30 day"})
diff = timedelta(days=7) if entity.math == WEEKLY_ACTIVE else timedelta(days=30)
Expand Down
68 changes: 68 additions & 0 deletions ee/clickhouse/sql/paths/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,71 @@
""".format(
paths_query=paths_query_step_3
)


PATH_ARRAY_QUERY = """
SELECT if(target_event LIKE %(autocapture_match)s, concat(arrayElement(splitByString('autocapture:', assumeNotNull(source_event)), 1), final_source_element), source_event) final_source_event,
if(target_event LIKE %(autocapture_match)s, concat(arrayElement(splitByString('autocapture:', assumeNotNull(target_event)), 1), final_target_element), target_event) final_target_event,
event_count,
if(target_event LIKE %(autocapture_match)s, arrayElement(splitByString('autocapture:', assumeNotNull(source_event)), 2), NULL) source_event_elements_chain,
concat('<', extract(source_event_elements_chain, '^(.*?)[.|:]'), '> ', extract(source_event_elements_chain, 'text="(.*?)"')) final_source_element,
if(target_event LIKE %(autocapture_match)s, arrayElement(splitByString('autocapture:', assumeNotNull(target_event)), 2), NULL) target_event_elements_chain,
concat('<', extract(target_event_elements_chain, '^(.*?)[.|:]'), '> ', extract(target_event_elements_chain, 'text="(.*?)"')) final_target_element
FROM (
SELECT last_path_key as source_event,
path_key as target_event,
COUNT(*) AS event_count
FROM (
SELECT person_id,
path,
event_in_session_index,
concat(toString(event_in_session_index), '_', path) as path_key,
if(event_in_session_index > 1, neighbor(path_key, -1), null) AS last_path_key
FROM (
SELECT person_id, joined_path_item as path
, event_in_session_index
, session_index
, arrayPopFront(arrayPushBack(path_basic, '')) as path_basic_0
, arrayMap((x,y) -> if(x=y, 0, 1), path_basic, path_basic_0) as mapping
, arrayFilter((x,y) -> y, time, mapping) as timings
, arrayFilter((x,y)->y, path_basic, mapping) as compact_path
, indexOf(compact_path, %(start_point)s) as start_index
, if(start_index > 0, arraySlice(compact_path, start_index), compact_path) as filtered_path
, if(start_index > 0, arraySlice(timings, start_index), timings) as filtered_timings
, arraySlice(filtered_path, 1, %(event_in_session_limit)s) as limited_path
, arraySlice(filtered_timings, 1, %(event_in_session_limit)s) as limited_timings
FROM (
SELECT person_id
, path_time_tuple.1 as path_basic
, path_time_tuple.2 as time
, session_index
, arrayDifference(timing) as times
, arrayZip(paths, times) as paths_tuple
, arraySplit(x -> if(x.2 < %(session_time_threshold)s, 0, 1), paths_tuple) as session_paths
FROM (
SELECT person_id,
groupArray(toUnixTimestamp64Milli(timestamp)) as timing,
groupArray(path_item) as paths
FROM ({path_event_query})
GROUP BY person_id
ORDER BY person_id
)
/* this array join splits paths for a single personID per session */
ARRAY JOIN session_paths AS path_time_tuple, arrayEnumerate(session_paths) AS session_index
)
ARRAY JOIN limited_path AS joined_path_item, arrayEnumerate(limited_path) AS event_in_session_index
{boundary_event_filter}
ORDER BY person_id, session_index
)
)
WHERE source_event IS NOT NULL
GROUP BY source_event,
target_event
ORDER BY event_count DESC,
source_event,
target_event
LIMIT 20
)
"""
1 change: 1 addition & 0 deletions posthog/models/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .filter import Filter
from .path_filter import PathFilter
from .retention_filter import RetentionFilter
14 changes: 13 additions & 1 deletion posthog/models/filters/path_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

from posthog.constants import INSIGHT_PATHS
from posthog.models.filters.base_filter import BaseFilter
from posthog.models.filters.mixins.common import DateMixin, FilterTestAccountsMixin, InsightMixin, IntervalMixin
from posthog.models.filters.mixins.common import (
BreakdownMixin,
BreakdownTypeMixin,
DateMixin,
EntitiesMixin,
FilterTestAccountsMixin,
InsightMixin,
IntervalMixin,
)
from posthog.models.filters.mixins.paths import (
ComparatorDerivedMixin,
PropTypeDerivedMixin,
Expand All @@ -24,6 +32,10 @@ class PathFilter(
InsightMixin,
FilterTestAccountsMixin,
DateMixin,
BreakdownMixin,
BreakdownTypeMixin,
EntitiesMixin,
# TODO: proper fix for EventQuery abstraction
BaseFilter,
):
def __init__(self, data: Optional[Dict[str, Any]] = None, request: Optional[HttpRequest] = None, **kwargs) -> None:
Expand Down
3 changes: 3 additions & 0 deletions posthog/queries/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@


class Paths(BaseQuery):
def __init__(self, *args, **kwargs) -> None:
super().__init__()

def _event_subquery(self, event: str, key: str):
return Event.objects.filter(pk=OuterRef(event)).values(key)[:1]

Expand Down
Loading

0 comments on commit 06c5c6f

Please sign in to comment.