diff --git a/lib/projections/projection.ex b/lib/projections/projection.ex index c065bb3..c183c33 100644 --- a/lib/projections/projection.ex +++ b/lib/projections/projection.ex @@ -1,5 +1,6 @@ defmodule Essig.Projections.Projection do - @callback init_storage(Essig.Projections.Data.t()) :: :ok | {:error, any()} + @callback handle_init_storage(Essig.Projections.Data.t()) :: :ok | {:error, any()} + @callback handle_reset(Essig.Projections.Data.t()) :: :ok | {:error, any()} @callback handle_event(Ecto.Multi.t(), {map(), number()}) :: Ecto.Multi.t() defmacro __using__(_opts) do @@ -8,9 +9,11 @@ defmodule Essig.Projections.Projection do alias Essig.Projections.Data - def init_storage(_), do: :ok + def handle_init_storage(_), do: :ok + def handle_reset(_), do: :ok - defoverridable init_storage: 1 + defoverridable handle_init_storage: 1 + defoverridable handle_reset: 1 end end end diff --git a/lib/projections/runner.ex b/lib/projections/runner.ex index 4ace9d6..c36c79a 100644 --- a/lib/projections/runner.ex +++ b/lib/projections/runner.ex @@ -35,6 +35,7 @@ defmodule Essig.Projections.Runner do require Logger alias Essig.Projections.Data + alias Projections.Runner.Common # Client API @@ -62,6 +63,10 @@ defmodule Essig.Projections.Runner do GenStateMachine.call(via_tuple(name), {:set_pause_ms, pause_ms}) end + def reset(name) do + GenStateMachine.call(via_tuple(name), :reset) + end + # Callbacks @impl true @@ -92,21 +97,19 @@ defmodule Essig.Projections.Runner do @impl true def handle_event({:call, from}, :get_state_data, state, data) do - IO.puts("Projections.Runner-> get_state_data") actions = [{:reply, from, {state, data}}] {:keep_state_and_data, actions} end - def handle_event({:call, from}, {:set_pause_ms, pause_ms}, _state, data) do - IO.puts("Projections.Runner-> set_pause_ms") - info(data, "set pause_ms to #{pause_ms}") + def handle_event({:call, from}, {:set_pause_ms, pause_ms}, state, data) do + info(data, "set_pause_ms - #{state} - #{pause_ms}") actions = [{:reply, from, :ok}, {:state_timeout, pause_ms, :paused}] {:keep_state, %Data{data | pause_ms: pause_ms}, actions} end - def handle_event({:call, from}, :pause, _state, _data) do - IO.puts("Projections.Runner-> pause") + def handle_event({:call, from}, :pause, state, data) do + info(data, "pause - #{state}") {:keep_state_and_data, [ @@ -117,15 +120,33 @@ defmodule Essig.Projections.Runner do ]} end - def handle_event({:call, from}, :resume, _state, _data) do - IO.puts("Projections.Runner-> resume") + def handle_event({:call, from}, :resume, state, data) do + info(data, "resume - #{state}") {:keep_state_and_data, [{:reply, from, :ok}, {:next_event, :internal, :resume}]} end + def handle_event({:call, from}, :reset, state, data) do + info(data, "reset - #{state}") + data.module.handle_reset(data) + + row = + Common.update_external_state(data, data.row, %{ + max_id: 0, + status: :idle + }) + + actions = [ + {:reply, from, :ok}, + {:next_event, :internal, :init_storage}, + {:next_event, :internal, :read_from_eventstore} + ] + + {:next_state, :bootstrap, %Data{data | row: row}, actions} + end + def handle_event(:internal, :init_storage, :bootstrap, data = %Data{}) do - IO.puts("Projections.Runner-> init_storage: bootstrap") - info(data, "INIT STORAGE") - data.module.init_storage(data) + info(data, "init_storage - #{:bootstrap}") + data.module.handle_init_storage(data) :keep_state_and_data end @@ -135,18 +156,18 @@ defmodule Essig.Projections.Runner do state, data = %Data{} ) do - IO.puts("Projections.Runner-> read_from_eventstore - #{state}") + info(data, "read_from_eventstore - #{state}") Projections.Runner.ReadFromEventStore.run(data) end # resume reading, pause timeout triggered - def handle_event(:state_timeout, :paused, :bootstrap, _) do + def handle_event(:state_timeout, :paused, _, _) do {:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]} end # resume reading, pause timeout triggered def handle_event(:state_timeout, :paused, :idle, _) do - {:keep_state_and_data, []} + {:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]} end # internal pause event, nothing, timeout will trigger resume @@ -165,7 +186,8 @@ defmodule Essig.Projections.Runner do {:keep_state_and_data, [{:next_event, :internal, :read_from_eventstore}]} end - def handle_event(:info, {:new_events, notification}, _status, data) do + def handle_event(:info, {:new_events, notification}, state, data) + when state in [:bootstrap, :idle] do IO.puts("HANDLE NEW EVENTS") ## we get a notification from the pubsub, that there are new events %{max_id: max_id} = notification @@ -193,6 +215,6 @@ defmodule Essig.Projections.Runner do end def info(data, msg) do - Logger.info("Projection #{inspect(data.name)}: #{msg}") + Logger.info("Projections.Runner-> #{inspect(data.name)}: #{msg}") end end diff --git a/lib/projections/runner/common.ex b/lib/projections/runner/common.ex index 4fb9275..ce6209b 100644 --- a/lib/projections/runner/common.ex +++ b/lib/projections/runner/common.ex @@ -10,18 +10,6 @@ defmodule Projections.Runner.Common do ) end - ## we cache the call to the latest event ID for 1 second - def max_events_id() do - max_events_id(Essig.Context.current_scope()) - end - - def max_events_id(scope_uuid) do - Essig.Cache.request( - {Essig.EventStoreReads, :last_id, [scope_uuid]}, - ttl: :timer.seconds(1) - ) - end - @doc """ Update the the projections TABLE row + MetaTable entry """ diff --git a/lib/projections/runner/read_from_event_store.ex b/lib/projections/runner/read_from_event_store.ex index 668a0e0..30defd7 100644 --- a/lib/projections/runner/read_from_event_store.ex +++ b/lib/projections/runner/read_from_event_store.ex @@ -17,7 +17,7 @@ defmodule Projections.Runner.ReadFromEventStore do if length(events) > 0 do last_event = List.last(events) - info(data, "at #{last_event.id}") + info(data, "CURRENT MAX ID #{last_event.id}") # not sure, what to do with response. BUT: projections MUST NEVER fail. {:ok, _multi_results} = Essig.Repo.transaction(multi) |> IO.inspect() @@ -48,6 +48,7 @@ defmodule Projections.Runner.ReadFromEventStore do {:next_state, :idle, %Data{data | row: row}} end else + info(data, "EMPTY EVENTS") row = Common.update_external_state(data, row, %{status: :idle}) {:next_state, :idle, %Data{data | row: row}} end diff --git a/lib/sample/projections/proj1.ex b/lib/sample/projections/proj1.ex index b95559c..287be60 100644 --- a/lib/sample/projections/proj1.ex +++ b/lib/sample/projections/proj1.ex @@ -15,9 +15,16 @@ defmodule Sample.Projections.Proj1 do end @impl Essig.Projections.Projection - def init_storage(data = %Data{}) do + def handle_init_storage(data = %Data{}) do Logger.info("RUNNING INIT STORAGE for #{__MODULE__} with name #{inspect(data.name)}") Repo.query("create table if not exists projection_proj1 (id integer, data text)") :ok end + + @impl Essig.Projections.Projection + def handle_reset(data = %Data{}) do + Logger.info("RUNNING RESET for #{__MODULE__} with name #{inspect(data.name)}") + Repo.query("drop table if exists projection_proj1") + :ok + end end diff --git a/lib/sample/projections/proj2.ex b/lib/sample/projections/proj2.ex index 364309a..8b8e34e 100644 --- a/lib/sample/projections/proj2.ex +++ b/lib/sample/projections/proj2.ex @@ -13,9 +13,16 @@ defmodule Sample.Projections.Proj2 do end @impl Essig.Projections.Projection - def init_storage(data = %Data{}) do + def handle_init_storage(data = %Data{}) do Logger.info("RUNNING INIT STORAGE for #{__MODULE__} with name #{inspect(data.name)}") Repo.query("create table if not exists projection_proj2 (id integer, data text)") :ok end + + @impl Essig.Projections.Projection + def handle_reset(data = %Data{}) do + Logger.info("RUNNING RESET for #{__MODULE__} with name #{inspect(data.name)}") + Repo.query("drop table if exists projection_proj2") + :ok + end end