|
| 1 | +%{ |
| 2 | + title: "Things You Can Do With Ecto", |
| 3 | + description: "Neat tricks I've learned while working with Ecto", |
| 4 | + author: "Andy LeClair", |
| 5 | + tags: ["elixir", "ecto"], |
| 6 | + related_listening: "https://www.youtube.com/watch?v=SJnfdEX0QWk", |
| 7 | +} |
| 8 | +--- |
| 9 | + |
| 10 | +I've been hard at work at my day job porting our analytics stack from Snowflake to Clickhouse. We've needed to port over our two existing query builders (one in Javascript, one Elixir) |
| 11 | +to one system. We're heavy users of Ecto, so naturally we would prefer to use the excellent [ecto_ch](https://github.com/plausible/ecto_ch) library to interact with Clickhouse. This mostly works, but we've run into a few |
| 12 | +limitations that have required us to get creative. Here are a few things I've learned along the way. |
| 13 | + |
| 14 | +## Fragment |
| 15 | + |
| 16 | +`fragment` is an elegant escape hatch when you need to do something specific to your database. Clickhouse has a _ton_ of specific functions, so we use `fragment` a lot. |
| 17 | +We have a module that looks like this that we import, like `Ecto.Query`. For example, if you wanted to use Clickhouse's `argMax` aggregate to get the _latest_ value for a column, given some timestamp column (a common use-case), you could do this: |
| 18 | + |
| 19 | +```elixir |
| 20 | +defmodule Clickhouse.Helpers do |
| 21 | + import Ecto.Query |
| 22 | + |
| 23 | + defmacro arg_max(column, timestamp) do |
| 24 | + quote do |
| 25 | + fragment("argMax(?, ?)", unquote(column), unquote(timestamp)) |
| 26 | + end |
| 27 | + end |
| 28 | +end |
| 29 | +``` |
| 30 | + |
| 31 | +And this works! Let's imagine you're storing user profiles. The profiles are essentially a map with the keys and values broken out into separate columns. |
| 32 | + |
| 33 | +With a table schema like this: |
| 34 | + |
| 35 | +```elixir |
| 36 | +defmodule MyApp.UserProfile do |
| 37 | + use Ecto.Schema |
| 38 | + |
| 39 | + schema "user_profiles" do |
| 40 | + field :user_id, :string |
| 41 | + field :key, :string |
| 42 | + field :value, :string |
| 43 | + field :timestamp, Ch, type: "DateTime64(3)" |
| 44 | + end |
| 45 | +end |
| 46 | +``` |
| 47 | + |
| 48 | +We will not update values, instead we will just write in new values whenever some attribute changes. You can easily get the latest values for each user with a query like this: |
| 49 | + |
| 50 | +```elixir |
| 51 | +defmodule MyApp.Profiles do |
| 52 | + import Ecto.Query |
| 53 | + import Clickhouse.Helpers |
| 54 | + |
| 55 | + alias MyApp.UserProfile |
| 56 | + |
| 57 | + def latest_values do |
| 58 | + from p in UserProfile, |
| 59 | + select: %{ |
| 60 | + user_id: p.user_id, |
| 61 | + key: p.key, |
| 62 | + value: arg_max(p.value, p.timestamp) |
| 63 | + }, |
| 64 | + group_by: [p.user_id, p.key] |
| 65 | + end |
| 66 | + |
| 67 | +end |
| 68 | +``` |
| 69 | + |
| 70 | +The cool thing about `fragment` is that you can nest it! |
| 71 | + |
| 72 | +```elixir |
| 73 | +defmacro map(key, value) do |
| 74 | + quote do |
| 75 | + fragment("map(?, ?)", unquote(key), unquote(value)) |
| 76 | + end |
| 77 | +end |
| 78 | + |
| 79 | +defmacro max_map(col) do |
| 80 | + quote do |
| 81 | + fragment("maxMap(?)", unquote(col)) |
| 82 | + end |
| 83 | +end |
| 84 | + |
| 85 | +def latest_profile do |
| 86 | + from q in subquery( |
| 87 | + from t in UserProfile, |
| 88 | + select: %{ |
| 89 | + user_id: t.user_id, |
| 90 | + attrs: map(t.key, arg_max(t.value, t.timestamp)) |
| 91 | + }, |
| 92 | + group_by: [t.user_id] |
| 93 | + ), |
| 94 | + select: %{ |
| 95 | + user_id: q.user_id, |
| 96 | + profile: max_map(q.attrs) |
| 97 | + } |
| 98 | +end |
| 99 | +``` |
| 100 | + |
| 101 | +Now you've got Clickhouse rolling up the profiles and you'll get a map back. Neat! I am going to spend _zero_ time talking about why the function |
| 102 | +in Clickhouse to merge nested maps is `maxMap` because _I do not know_. It simply is. But, you can see that composing these macros makes this super easy and the resulting Ecto query looks very close to what we'd get writing the query by hand. This is an especially useful property when you have a target query you're working toward and you want to work backward from that into Ecto. |
| 103 | + |
| 104 | +That's all well and good, but let's say that you need to do some filtering. Let's say we want to find the users with a specific profile value for a given key. If you want to filter on the _latest_ values, you could do it with `having` (because `argMax` is an aggregate we need to use `having` not `where`): |
| 105 | + |
| 106 | +```elixir |
| 107 | +@spec matching_users(String.t(), String.t()) :: Ecto.Query.t() |
| 108 | +def matching_users(key, value) do |
| 109 | + from p in UserProfile, |
| 110 | + select: %{ |
| 111 | + user_id: p.user_id, |
| 112 | + key: p.key, |
| 113 | + value: arg_max(p.value, p.timestamp) |> selected_as(:value) |
| 114 | + }, |
| 115 | + where: p.key == ^key, |
| 116 | + group_by: [p.user_id], |
| 117 | + having: selected_as(:value) == ^value |
| 118 | +end |
| 119 | +``` |
| 120 | + |
| 121 | +However, Clickhouse also provides aggregate combinators that you might want to use, such as `argMaxIfOrNull`. This combinator is like `argMax` but it has a third argument, a condition. |
| 122 | + |
| 123 | +This leads to my next thing I learned about `fragment`: you can pass in an _expression_. |
| 124 | + |
| 125 | +```elixir |
| 126 | +defmacro arg_max_if_or_null(column, timestamp, condition) do |
| 127 | + quote do |
| 128 | + fragment("argMaxIfOrNull(?, ?, ?)", unquote(column), unquote(timestamp), unquote(condition)) |
| 129 | + end |
| 130 | +end |
| 131 | + |
| 132 | +@spec users_with_latest_values([String.t()]) :: Ecto.Query.t() |
| 133 | +def users_with_latest_values(attributes) do |
| 134 | + query = |
| 135 | + from p in UserProfile, |
| 136 | + select: %{user_id: p.user_id}, |
| 137 | + group_by: [p.user_id] |
| 138 | + |
| 139 | + Enum.reduce(query, attributes, fn attr, query -> |
| 140 | + from q in query, |
| 141 | + select_merge: %{ |
| 142 | + ^attr => arg_max_if_or_null(q.value, q.timestamp, q.key == ^attr) |
| 143 | + } |
| 144 | + end) |
| 145 | +end |
| 146 | +``` |
| 147 | + |
| 148 | +As you can see, the third "argument" to `arg_max_if_or_null` is an expression, but it gets interpolated into `?` inside `fragment`. Incredible! This _just works!_ When I figured this out, it really blew my mind, and I don't think the docs mention it at all. |
| 149 | + |
| 150 | +## CTEs |
| 151 | + |
| 152 | +If you work with analytical queries at all, you will probably encounter a CTE. These are also known as `with` statements. I'm not going to give a full primer on CTEs, but they're a way to define a subquery that you can reference later in your query. Unfortunately for us, Clickhouse doesn't materialize CTE results, so if you refer to the binding multiple times, the query gets run multiple times. I hear they're working on it! That said, CTEs definitely still have a use and we use them. It was not immediately obvious to me from the docs how I could use a CTE, so here are some things I've learned. |
| 153 | + |
| 154 | +### You can pass `with_cte` a string! |
| 155 | + |
| 156 | +The docs for [with_cte](https://hexdocs.pm/ecto/Ecto.Query.html#with_cte) don't say you can do this, but you can just name your CTE parts directly, with a string. This is a version |
| 157 | +of the query from above, but using a CTE. Let's make it a little more interesting of a query, and say we also need to go get the latest event for each of those users in order to sort the data and return the latest timestamp. |
| 158 | + |
| 159 | +```elixir |
| 160 | +defmodule NeatEcto.Schemas.Events do |
| 161 | + use Ecto.Schema |
| 162 | + |
| 163 | + schema "events" do |
| 164 | + field :user_id, :string |
| 165 | + field :event_type, :string |
| 166 | + field :timestamp, Ch, type: "DateTime64(3)" |
| 167 | + end |
| 168 | +end |
| 169 | + |
| 170 | + |
| 171 | +def latest_profile_cte_version do |
| 172 | + attrs_query = |
| 173 | + from t in UserProfile, |
| 174 | + select: %{ |
| 175 | + user_id: t.user_id, |
| 176 | + attrs: map(t.key, arg_max(t.value, t.timestamp)) |
| 177 | + }, |
| 178 | + group_by: [t.user_id] |
| 179 | + |
| 180 | + latest_events_query = |
| 181 | + from e in Events, |
| 182 | + select: %{ |
| 183 | + user_id: e.user_id, |
| 184 | + latest_timestamp: max(e.timestamp) |
| 185 | + }, |
| 186 | + group_by: [e.user_id] |
| 187 | + |
| 188 | + "attr_pairs" |
| 189 | + |> with_cte("attr_pairs", as: ^attrs_query) |
| 190 | + |> with_cte("latest_events", as: ^latest_events_query) |
| 191 | + |> join(:inner, [a], e in "latest_events", on: a.user_id == e.user_id) |
| 192 | + |> select([a, e], %{ |
| 193 | + user_id: a.user_id, |
| 194 | + latest_event: e.latest_timestamp, |
| 195 | + profile: max_map(a.attrs) |
| 196 | + }) |
| 197 | + |> order_by([a, e], desc: e.timestamp) |
| 198 | + |> group_by([a, e], [a.user_id]) |
| 199 | +end |
| 200 | +``` |
| 201 | + |
| 202 | +Of course, you could join on a subquery here, but I'm just trying to illustrate the specifics of using a CTE. It's totally possible to dynamically build CTEs and that can be a super useful tool in building complex queries. |
| 203 | + |
| 204 | +## Cheating |
| 205 | + |
| 206 | +Ok this last one is a little dirty. Maybe even _yucky_. Ecto expects that the developer is in charge of the naming of things and that those names are mostly available at compile time. There are few places where you can essentially allow for using |
| 207 | +user input to name things (CTEs being one!), mostly you need to be using atoms, and every BEAM developer knows you shouldn't dynamically create atoms because they're never GC'd. That leads us to the following trick: |
| 208 | + |
| 209 | +```elixir |
| 210 | +# Everything in life needs a limit |
| 211 | +@substitutions for i <- 0..100, into: %{}, do: {i, :"subs_#{i}"} |
| 212 | +def substitutions, do: @substitutions |
| 213 | + |
| 214 | +@spec latest_profiles([String.t()]) :: Ecto.Query.t() |
| 215 | +def latest_profiles(keys) do |
| 216 | + substitutions = substitutions() |
| 217 | + base_query = from u in UserProfile, select: %{user_id: u.user_id} |
| 218 | + |
| 219 | + indexed_keys = Enum.with_index(keys) |
| 220 | + |
| 221 | + placeholders = for {name, i} <- indexed_keys, into: %{}, do: {name, substitutions[i]} |
| 222 | + |
| 223 | + query = |
| 224 | + Enum.reduce(indexed_keys, base_query, fn {prop, i}, query -> |
| 225 | + prop_placeholder = placeholders[i] |
| 226 | + |
| 227 | + select_merge( |
| 228 | + query, |
| 229 | + [p], |
| 230 | + %{ |
| 231 | + ^prop_placeholder => |
| 232 | + arg_max_if_or_null(p.attribute_value, p.timestamp, p.attribute_name == ^prop) |
| 233 | + } |
| 234 | + ) |
| 235 | + end) |
| 236 | + |
| 237 | + {query, placeholders} |
| 238 | +end |
| 239 | +``` |
| 240 | + |
| 241 | +What you _can_ do is map your user provided list of strings to a list of atoms and then just hand back the mapping along with the query. Then, some enterprising developer could use the above `latest_profiles` query inside |
| 242 | +a subquery. This is really useful if you need to make sure that you're able to compare on the results of the subquery externally and to make sure that the subquery only gets executed once. This technique can also be useful |
| 243 | +if you need to return data in a specific column aliasing and it would be better to make the database do some calculation. You can execute the query and then use the placeholders to map back to the original key. |
| 244 | + |
| 245 | +It is an _extremely_ niche use-case I know, however, it's something I did run into in my day job. It's useful to know how flexible Ecto can be! I try to coach my engineers to use Ecto as much as we can. There are _very_ |
| 246 | +few places where Ecto outright doesn't work (perhaps fodder for another blog post) but you can get an absolutely insane amount of mileage out of it. It's the best query builder I've ever used! Ecto forever. |
| 247 | + |
| 248 | +I hope this was useful to you. If you want to reach out, hit me up on Bluesky at [andyleclair.dev](https://bsky.app/profile/andyleclair.dev) 🦋 |
0 commit comments