Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: logging on projections runner termination #8

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/event_store/append_to_stream.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Essig.EventStore.AppendToStream do
use Essig.Repo
require Logger

def run(stream_uuid, stream_type, expected_seq, events) do
# To ensure sequential inserts only, we use locking.
Expand Down Expand Up @@ -40,6 +41,7 @@ defmodule Essig.EventStore.AppendToStream do
end)
|> Ecto.Multi.run(:signal_new_events, fn _repo, %{insert_events: insert_events} ->
last_event = Enum.at(insert_events, -1)
Logger.debug("AppendToStream: [signal_new_events] - last_event: #{inspect(last_event)}")

if last_event do
max_id = last_event.id
Expand Down
25 changes: 25 additions & 0 deletions lib/projections/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,31 @@ defmodule Essig.Projections.Runner do
{:keep_state, %Data{data | store_max_id: max_id}, actions}
end

### ALSO notify on termination!
@impl GenStateMachine
def terminate(reason, state, data) do
require Logger

Logger.warning("""
Projection Runner terminated!
Projection: #{inspect(data.name)}
Reason: #{inspect(reason)}
State: #{inspect(state)}
Last known max_id: #{inspect(data.row.max_id)}
""")

# Optionally update the projection status to indicate abnormal shutdown
if reason not in [:normal, :shutdown] do
Essig.Projections.Runner.Common.update_external_state(
data,
data.row,
%{status: :blocked}
)
end

:ok
end

########### HELPERS

defp fetch_last_record(name) do
Expand Down
2 changes: 1 addition & 1 deletion lib/projections/runner/read_from_event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Essig.Projections.Runner.ReadFromEventStore do
alias Essig.Projections.Runner.Common
require Logger

def run(data = %Data{row: row, pause_ms: pause_ms, store_max_id: store_max_id} = data) do
def run(%Data{row: row, pause_ms: pause_ms, store_max_id: store_max_id} = data) do
scope_uuid = Essig.Context.current_scope()
events = Common.fetch_events(scope_uuid, row.max_id, Essig.Config.events_per_batch())
multi_tuple = {Ecto.Multi.new(), data}
Expand Down