Skip to content

Commit

Permalink
Merge pull request #3 from maxohq/feat/projection-reset
Browse files Browse the repository at this point in the history
Feat: projection reset
  • Loading branch information
mindreframer authored Dec 27, 2024
2 parents a68368b + 7c592ab commit a8d84ab
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 34 deletions.
9 changes: 6 additions & 3 deletions lib/projections/projection.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Essig.Projections.Projection do
@callback init_storage(Essig.Projections.Data.t()) :: :ok | {:error, any()}
@callback handle_init_storage(Essig.Projections.Data.t()) :: :ok | {:error, any()}
@callback handle_reset(Essig.Projections.Data.t()) :: :ok | {:error, any()}
@callback handle_event(Ecto.Multi.t(), {map(), number()}) :: Ecto.Multi.t()

defmacro __using__(_opts) do
Expand All @@ -8,9 +9,11 @@ defmodule Essig.Projections.Projection do

alias Essig.Projections.Data

def init_storage(_), do: :ok
def handle_init_storage(_), do: :ok
def handle_reset(_), do: :ok

defoverridable init_storage: 1
defoverridable handle_init_storage: 1
defoverridable handle_reset: 1
end
end
end
54 changes: 38 additions & 16 deletions lib/projections/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Essig.Projections.Runner do
require Logger

alias Essig.Projections.Data
alias Projections.Runner.Common

# Client API

Expand Down Expand Up @@ -62,6 +63,10 @@ defmodule Essig.Projections.Runner do
GenStateMachine.call(via_tuple(name), {:set_pause_ms, pause_ms})
end

def reset(name) do
GenStateMachine.call(via_tuple(name), :reset)
end

# Callbacks

@impl true
Expand Down Expand Up @@ -92,21 +97,19 @@ defmodule Essig.Projections.Runner do

@impl true
def handle_event({:call, from}, :get_state_data, state, data) do
IO.puts("Projections.Runner-> get_state_data")
actions = [{:reply, from, {state, data}}]
{:keep_state_and_data, actions}
end

def handle_event({:call, from}, {:set_pause_ms, pause_ms}, _state, data) do
IO.puts("Projections.Runner-> set_pause_ms")
info(data, "set pause_ms to #{pause_ms}")
def handle_event({:call, from}, {:set_pause_ms, pause_ms}, state, data) do
info(data, "set_pause_ms - #{state} - #{pause_ms}")

actions = [{:reply, from, :ok}, {:state_timeout, pause_ms, :paused}]
{:keep_state, %Data{data | pause_ms: pause_ms}, actions}
end

def handle_event({:call, from}, :pause, _state, _data) do
IO.puts("Projections.Runner-> pause")
def handle_event({:call, from}, :pause, state, data) do
info(data, "pause - #{state}")

{:keep_state_and_data,
[
Expand All @@ -117,15 +120,33 @@ defmodule Essig.Projections.Runner do
]}
end

def handle_event({:call, from}, :resume, _state, _data) do
IO.puts("Projections.Runner-> resume")
def handle_event({:call, from}, :resume, state, data) do
info(data, "resume - #{state}")
{:keep_state_and_data, [{:reply, from, :ok}, {:next_event, :internal, :resume}]}
end

def handle_event({:call, from}, :reset, state, data) do
info(data, "reset - #{state}")
data.module.handle_reset(data)

row =
Common.update_external_state(data, data.row, %{
max_id: 0,
status: :idle
})

actions = [
{:reply, from, :ok},
{:next_event, :internal, :init_storage},
{:next_event, :internal, :read_from_eventstore}
]

{:next_state, :bootstrap, %Data{data | row: row}, actions}
end

def handle_event(:internal, :init_storage, :bootstrap, data = %Data{}) do
IO.puts("Projections.Runner-> init_storage: bootstrap")
info(data, "INIT STORAGE")
data.module.init_storage(data)
info(data, "init_storage - #{:bootstrap}")
data.module.handle_init_storage(data)
:keep_state_and_data
end

Expand All @@ -135,18 +156,18 @@ defmodule Essig.Projections.Runner do
state,
data = %Data{}
) do
IO.puts("Projections.Runner-> read_from_eventstore - #{state}")
info(data, "read_from_eventstore - #{state}")
Projections.Runner.ReadFromEventStore.run(data)
end

# resume reading, pause timeout triggered
def handle_event(:state_timeout, :paused, :bootstrap, _) do
def handle_event(:state_timeout, :paused, _, _) do
{:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]}
end

# resume reading, pause timeout triggered
def handle_event(:state_timeout, :paused, :idle, _) do
{:keep_state_and_data, []}
{:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]}
end

# internal pause event, nothing, timeout will trigger resume
Expand All @@ -165,7 +186,8 @@ defmodule Essig.Projections.Runner do
{:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]}
end

def handle_event(:info, {:new_events, notification}, _status, data) do
def handle_event(:info, {:new_events, notification}, state, data)
when state in [:bootstrap, :idle] do
IO.puts("HANDLE NEW EVENTS")
## we get a notification from the pubsub, that there are new events
%{max_id: max_id} = notification
Expand Down Expand Up @@ -193,6 +215,6 @@ defmodule Essig.Projections.Runner do
end

def info(data, msg) do
Logger.info("Projection #{inspect(data.name)}: #{msg}")
Logger.info("Projections.Runner-> #{inspect(data.name)}: #{msg}")
end
end
12 changes: 0 additions & 12 deletions lib/projections/runner/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@ defmodule Projections.Runner.Common do
)
end

## we cache the call to the latest event ID for 1 second
def max_events_id() do
max_events_id(Essig.Context.current_scope())
end

def max_events_id(scope_uuid) do
Essig.Cache.request(
{Essig.EventStoreReads, :last_id, [scope_uuid]},
ttl: :timer.seconds(1)
)
end

@doc """
Update the the projections TABLE row + MetaTable entry
"""
Expand Down
3 changes: 2 additions & 1 deletion lib/projections/runner/read_from_event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Projections.Runner.ReadFromEventStore do
if length(events) > 0 do
last_event = List.last(events)

info(data, "at #{last_event.id}")
info(data, "CURRENT MAX ID #{last_event.id}")
# not sure, what to do with response. BUT: projections MUST NEVER fail.
{:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect()

Expand Down Expand Up @@ -48,6 +48,7 @@ defmodule Projections.Runner.ReadFromEventStore do
{:next_state, :idle, %Data{data | row: row}}
end
else
info(data, "EMPTY EVENTS")
row = Common.update_external_state(data, row, %{status: :idle})
{:next_state, :idle, %Data{data | row: row}}
end
Expand Down
9 changes: 8 additions & 1 deletion lib/sample/projections/proj1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@ defmodule Sample.Projections.Proj1 do
end

@impl Essig.Projections.Projection
def init_storage(data = %Data{}) do
def handle_init_storage(data = %Data{}) do
Logger.info("RUNNING INIT STORAGE for #{__MODULE__} with name #{inspect(data.name)}")
Repo.query("create table if not exists projection_proj1 (id integer, data text)")
:ok
end

@impl Essig.Projections.Projection
def handle_reset(data = %Data{}) do
Logger.info("RUNNING RESET for #{__MODULE__} with name #{inspect(data.name)}")
Repo.query("drop table if exists projection_proj1")
:ok
end
end
9 changes: 8 additions & 1 deletion lib/sample/projections/proj2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@ defmodule Sample.Projections.Proj2 do
end

@impl Essig.Projections.Projection
def init_storage(data = %Data{}) do
def handle_init_storage(data = %Data{}) do
Logger.info("RUNNING INIT STORAGE for #{__MODULE__} with name #{inspect(data.name)}")
Repo.query("create table if not exists projection_proj2 (id integer, data text)")
:ok
end

@impl Essig.Projections.Projection
def handle_reset(data = %Data{}) do
Logger.info("RUNNING RESET for #{__MODULE__} with name #{inspect(data.name)}")
Repo.query("drop table if exists projection_proj2")
:ok
end
end

0 comments on commit a8d84ab

Please sign in to comment.