Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alt serialize sessions #5113

Merged
merged 1 commit into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,4 @@ unless s3_disabled? do
imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET")
end

config :plausible, Plausible.Cache.Adapter, sessions: [partitions: 4]

config :phoenix_storybook, enabled: env !== "prod"
1 change: 1 addition & 0 deletions lib/plausible/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Plausible.Application do

children =
[
Plausible.Session.BalancerSupervisor,
Plausible.Cache.Stats,
Plausible.PromEx,
{Plausible.Auth.TOTP.Vault, key: totp_vault_key()},
Expand Down
2 changes: 1 addition & 1 deletion lib/plausible/ingestion/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ defmodule Plausible.Ingestion.Event do
event.clickhouse_event,
event.clickhouse_session_attrs,
previous_user_id,
write_buffer_insert
buffer_insert: write_buffer_insert
)

case session_result do
Expand Down
39 changes: 39 additions & 0 deletions lib/plausible/session/balancer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Plausible.Session.Balancer do
@moduledoc "Serialize session processing to avoid explicit locks"
use GenServer

def start_link(id) do
GenServer.start_link(__MODULE__, id, name: via(id))
end

@impl true
def init(id) do
{:ok, %{id: id}}
end

def dispatch(user_id, fun, opts) do
timeout = Keyword.fetch!(opts, :timeout)
local? = Keyword.get(opts, :local?, false)

if local? do
fun.()
else
worker = :erlang.phash2(user_id, Plausible.Session.BalancerSupervisor.size()) + 1
[{pid, _}] = Registry.lookup(Plausible.Session.Balancer.Registry, worker)
GenServer.call(pid, {:process, fun}, timeout)
end
end

@impl true
def handle_call({:process, fun}, _from, state) do
try do
response = fun.()
{:reply, response, state}
rescue
e ->
{:reply, {:error, e}, state}
end
end

defp via(id), do: {:via, Registry, {Plausible.Session.Balancer.Registry, id}}
end
33 changes: 33 additions & 0 deletions lib/plausible/session/balancer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Plausible.Session.BalancerSupervisor do
@moduledoc "Serialize session processing to avoid explicit locks"
use Supervisor

if Mix.env() in [:test, :ce_test] do
def size(),
do: 10
else
def size(), do: 100
end

def start_link(_) do
Supervisor.start_link(__MODULE__, size(), name: __MODULE__)
end

def init(size) do
children =
for id <- 1..size do
%{
id: id,
start: {Plausible.Session.Balancer, :start_link, [id]},
restart: :permanent
}
end

Supervisor.init(
[
{Registry, [keys: :unique, name: Plausible.Session.Balancer.Registry]} | children
],
strategy: :one_for_one
)
end
end
42 changes: 29 additions & 13 deletions lib/plausible/session/cache_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,43 @@ defmodule Plausible.Session.CacheStore do
require Logger
alias Plausible.Session.WriteBuffer

@lock_timeout 500
@lock_timeout 1000

@lock_telemetry_event [:plausible, :sessions, :cache, :lock]

def lock_telemetry_event, do: @lock_telemetry_event

def on_event(event, session_attributes, prev_user_id, buffer_insert \\ &WriteBuffer.insert/1) do
def on_event(event, session_attributes, prev_user_id, opts \\ []) do
buffer_insert = Keyword.get(opts, :buffer_insert, &WriteBuffer.insert/1)
skip_balancer? = Keyword.get(opts, :skip_balancer?, false)
lock_requested_at = System.monotonic_time()

Plausible.Cache.Adapter.with_lock(
:sessions,
{event.site_id, event.user_id},
@lock_timeout,
fn ->
lock_duration = System.monotonic_time() - lock_requested_at
:telemetry.execute(@lock_telemetry_event, %{duration: lock_duration}, %{})
found_session = find_session(event, event.user_id) || find_session(event, prev_user_id)

handle_event(event, found_session, session_attributes, buffer_insert)
try do
response =
Plausible.Session.Balancer.dispatch(
event.user_id,
fn ->
lock_duration = System.monotonic_time() - lock_requested_at
:telemetry.execute(@lock_telemetry_event, %{duration: lock_duration}, %{})

found_session =
find_session(event, event.user_id) || find_session(event, prev_user_id)

handle_event(event, found_session, session_attributes, buffer_insert)
end,
timeout: @lock_timeout,
local?: skip_balancer?
)

case response do
{:error, e} -> raise e
_ -> {:ok, response}
end
)
catch
:exit, {:timeout, _} ->
Sentry.capture_message("Timeout while handling session event")
{:error, :timeout}
end
end

defp handle_event(%{name: "engagement"} = event, found_session, _, _) do
Expand Down
13 changes: 8 additions & 5 deletions test/plausible/ingestion/event_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Plausible.Ingestion.EventTest do
use Plausible.DataCase, async: true
use Plausible.DataCase, async: false
use Plausible.Teams.Test

import Phoenix.ConnTest
Expand Down Expand Up @@ -282,10 +282,9 @@ defmodule Plausible.Ingestion.EventTest do

test = self()

very_slow_buffer = fn sessions ->
very_slow_buffer = fn _sessions ->
send(test, :slow_buffer_insert_started)
Process.sleep(1000)
Plausible.Session.WriteBuffer.insert(sessions)
Process.sleep(800)
end

first_conn =
Expand Down Expand Up @@ -315,7 +314,11 @@ defmodule Plausible.Ingestion.EventTest do

receive do
:slow_buffer_insert_started ->
assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(second_request)
assert {:ok, %{buffered: [], dropped: [dropped]}} =
Event.build_and_buffer(second_request,
session_write_buffer_insert: very_slow_buffer
)

assert dropped.drop_reason == :lock_timeout
end
end
Expand Down
42 changes: 22 additions & 20 deletions test/plausible/session/cache_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ defmodule Plausible.Session.CacheStoreTest do
} do
telemetry_event = CacheStore.lock_telemetry_event()

test_pid = self()

:telemetry.attach(
"#{test}-telemetry-handler",
telemetry_event,
fn ^telemetry_event, %{duration: d}, _, _ when is_integer(d) ->
send(self(), {:telemetry_handled, d})
send(test_pid, {:telemetry_handled, d})
end,
%{}
)
Expand All @@ -56,7 +58,7 @@ defmodule Plausible.Session.CacheStoreTest do
event2 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)
event3 = build(:event, name: "pageview", user_id: event1.user_id, site_id: event1.site_id)

CacheStore.on_event(event1, @session_params, nil, buffer)
CacheStore.on_event(event1, @session_params, nil, buffer_insert: buffer)

assert_receive({:buffer, :insert, [[session1]]})
assert_receive({:telemetry_handled, duration})
Expand All @@ -65,7 +67,7 @@ defmodule Plausible.Session.CacheStoreTest do
[event2, event3]
|> Enum.map(fn e ->
Task.async(fn ->
CacheStore.on_event(e, @session_params, nil, slow_buffer)
CacheStore.on_event(e, @session_params, nil, buffer_insert: slow_buffer)
end)
end)
|> Task.await_many()
Expand Down Expand Up @@ -122,20 +124,20 @@ defmodule Plausible.Session.CacheStoreTest do

async1 =
Task.async(fn ->
CacheStore.on_event(event1, @session_params, nil, very_slow_buffer)
CacheStore.on_event(event1, @session_params, nil, buffer_insert: very_slow_buffer)
end)

# Ensure next events are executed after processing event1 starts
Process.sleep(100)

async2 =
Task.async(fn ->
CacheStore.on_event(event2, @session_params, nil, buffer)
CacheStore.on_event(event2, @session_params, nil, buffer_insert: buffer)
end)

async3 =
Task.async(fn ->
CacheStore.on_event(event3, @session_params, nil, buffer)
CacheStore.on_event(event3, @session_params, nil, buffer_insert: buffer)
end)

Task.await_many([async1, async2, async3])
Expand All @@ -160,22 +162,22 @@ defmodule Plausible.Session.CacheStoreTest do

async1 =
Task.async(fn ->
CacheStore.on_event(event1, @session_params, nil, very_slow_buffer)
CacheStore.on_event(event1, @session_params, nil, buffer_insert: very_slow_buffer)
end)

# Ensure next events are executed after processing event1 starts
Process.sleep(100)

async2 =
Task.async(fn ->
CacheStore.on_event(event2, @session_params, nil, buffer)
CacheStore.on_event(event2, @session_params, nil, buffer_insert: buffer)
end)

Process.sleep(100)

async3 =
Task.async(fn ->
CacheStore.on_event(event3, @session_params, nil, buffer)
CacheStore.on_event(event3, @session_params, nil, buffer_insert: buffer)
end)

Task.await_many([async1, async2, async3])
Expand All @@ -198,7 +200,7 @@ defmodule Plausible.Session.CacheStoreTest do
event = build(:event, name: "pageview")

assert_raise RuntimeError, "boom", fn ->
CacheStore.on_event(event, @session_params, nil, crashing_buffer)
CacheStore.on_event(event, @session_params, nil, buffer_insert: crashing_buffer)
end
end

Expand All @@ -210,7 +212,7 @@ defmodule Plausible.Session.CacheStoreTest do
"meta.value": ["true", "false"]
)

CacheStore.on_event(event, @session_params, nil, buffer)
CacheStore.on_event(event, @session_params, nil, buffer_insert: buffer)

assert_receive({:buffer, :insert, [sessions]})
assert [session] = sessions
Expand Down Expand Up @@ -253,8 +255,8 @@ defmodule Plausible.Session.CacheStoreTest do
| timestamp: timestamp
}

CacheStore.on_event(event1, %{}, nil, buffer)
CacheStore.on_event(event2, %{}, nil, buffer)
CacheStore.on_event(event1, %{}, nil, buffer_insert: buffer)
CacheStore.on_event(event2, %{}, nil, buffer_insert: buffer)
assert_receive({:buffer, :insert, [[_negative_record, session]]})
assert session.is_bounce == false
assert session.duration == 10
Expand All @@ -267,8 +269,8 @@ defmodule Plausible.Session.CacheStoreTest do
pageview = build(:pageview, timestamp: NaiveDateTime.shift(now, second: -10))
engagement = %{pageview | name: "engagement", timestamp: now}

CacheStore.on_event(pageview, %{}, nil, buffer)
CacheStore.on_event(engagement, %{}, nil, buffer)
CacheStore.on_event(pageview, %{}, nil, buffer_insert: buffer)
CacheStore.on_event(engagement, %{}, nil, buffer_insert: buffer)
assert_receive({:buffer, :insert, [[session]]})

assert session.is_bounce == true
Expand All @@ -282,7 +284,7 @@ defmodule Plausible.Session.CacheStoreTest do

pageview1 = build(:event, name: "pageview", timestamp: start)

CacheStore.on_event(pageview1, %{}, nil, buffer)
CacheStore.on_event(pageview1, %{}, nil, buffer_insert: buffer)
assert_receive({:buffer, :insert, [[start_session]]})

for delta <- [20, 40, 60] do
Expand All @@ -292,11 +294,11 @@ defmodule Plausible.Session.CacheStoreTest do
timestamp: start |> NaiveDateTime.shift(minute: delta)
})

CacheStore.on_event(engagement, %{}, nil, buffer)
CacheStore.on_event(engagement, %{}, nil, buffer_insert: buffer)
end

pageview2 = Map.put(pageview1, :timestamp, start |> NaiveDateTime.shift(minute: 80))
CacheStore.on_event(pageview2, %{}, nil, buffer)
CacheStore.on_event(pageview2, %{}, nil, buffer_insert: buffer)
assert_receive({:buffer, :insert, [[_negative_record, updated_session]]})

assert updated_session.session_id == start_session.session_id
Expand Down Expand Up @@ -500,8 +502,8 @@ defmodule Plausible.Session.CacheStoreTest do

event2 = %{event1 | timestamp: timestamp}

CacheStore.on_event(event1, %{}, nil, buffer)
CacheStore.on_event(event2, %{}, nil, buffer)
CacheStore.on_event(event1, %{}, nil, buffer_insert: buffer)
CacheStore.on_event(event2, %{}, nil, buffer_insert: buffer)

assert_receive({:buffer, :insert, [[_negative_record, session]]})
assert session.duration == 10
Expand Down
5 changes: 4 additions & 1 deletion test/support/test_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ defmodule Plausible.TestUtils do

defp populate_native_stats(events) do
for event_params <- events do
{:ok, session} = Plausible.Session.CacheStore.on_event(event_params, event_params, nil)
{:ok, session} =
Plausible.Session.CacheStore.on_event(event_params, event_params, nil,
skip_balancer?: true
)

event_params
|> Plausible.ClickhouseEventV2.merge_session(session)
Expand Down
Loading