Skip to content

Commit

Permalink
feat: move kafka to extension and implement events (#47)
Browse files Browse the repository at this point in the history
* refactor: move kafka_ex to extension

* feat: implement JSON encoder for Delivery

* feat: add kafka events to Delivery

* feat: add kafka events to Telegram bot
  • Loading branch information
Rekkice committed Feb 21, 2025
1 parent ccb6c37 commit e55c94f
Show file tree
Hide file tree
Showing 18 changed files with 339 additions and 54 deletions.
7 changes: 5 additions & 2 deletions tololo/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ config :spark,
config :tololo,
ecto_repos: [Tololo.Repo],
generators: [timestamp_type: :utc_datetime],
extensions: [Tololo.Extensions.TelegramBot, Tololo.Extensions.Prometheus],
extensions: [
Tololo.Extensions.TelegramBot,
Tololo.Extensions.Prometheus,
Tololo.Extensions.Kafka
],
pubsub: Tololo.PubSub,
geocoding_endpoint: "https://nominatim.openstreetmap.org/search",
business_name: "Sushi",
Expand All @@ -58,7 +62,6 @@ config :tololo,
days_for_stale: 2,
min_done_distance_meters: 50


config :tololo,
# appends domains from extensions to the list
ash_domains: [
Expand Down
28 changes: 27 additions & 1 deletion tololo/core/lib/deliveries/delivery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule TololoCore.Deliveries.Delivery do
extensions: [AshGraphql.Resource, AshAdmin.Resource],
data_layer: AshPostgres.DataLayer,
authorizers: [Ash.Policy.Authorizer],
notifiers: [Ash.Notifier.PubSub]
notifiers: [Ash.Notifier.PubSub, TololoCore.Kafka.AshNotifier]

use Gettext, backend: TololoCore.Gettext

Expand Down Expand Up @@ -400,3 +400,29 @@ defmodule TololoCore.Deliveries.Delivery do
end
end
end

defimpl Jason.Encoder, for: TololoCore.Deliveries.Delivery do
def encode(value, opts) do
Jason.Encode.map(
Map.take(value, [
:id,
:display_id,
:delivery_person,
:delivery_order,
:from_name,
:to_name,
:from_latitude,
:from_longitude,
:current_latitude,
:current_longitude,
:to_latitude,
:to_longitude,
:to_address,
:to_phone,
:to_notes,
:state
]),
opts
)
end
end
41 changes: 41 additions & 0 deletions tololo/core/lib/kafka/ash_notifier.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule TololoCore.Kafka.AshNotifier do
use Ash.Notifier
alias Ash.Notifier.Notification

def notify(%Notification{
data: resource,
action: %{name: :initialize}
}) do
produce(
resource,
%{event: "delivery_initialized"}
)
end

def notify(%Notification{
data: resource,
changeset: %{data: %{state: old_state}},
action: %{name: :update_state}
}) do
produce(
resource,
%{event: "delivery_state_change", old_state: old_state}
)
end

def notify(%Notification{
data: resource,
changeset: %{data: %{state: old_state}},
action: %{name: :done_with_distance_check}
}) do
produce(
resource,
%{event: "delivery_state_change", old_state: old_state}
)
end

def notify(_notif), do: nil

defp produce(resource, data),
do: TololoCore.Kafka.produce("tololo-deliveries", data |> Map.put(:resource, resource))
end
20 changes: 20 additions & 0 deletions tololo/core/lib/kafka/kafka.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule TololoCore.Kafka do
@moduledoc """
A kafka interface to send messages.
"""

# env put by Kafka extension
defp driver, do: Application.get_env(:tololo, :kafka_driver, TololoCore.Kafka.Noop)

@doc """
Produces a message to Kafka. Should be implemented by drivers.
"""

@spec produce(String.t(), String.t() | map(), term()) :: :ok | {:error, term()}
def produce(topic, message, opts \\ [])

def produce(topic, message, opts) when is_map(message),
do: produce(topic, Jason.encode(message), opts)

def produce(topic, message, opts), do: driver().produce(topic, message, opts)
end
10 changes: 10 additions & 0 deletions tololo/core/lib/kafka/noop.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule TololoCore.Kafka.Noop do
@moduledoc """
Noop driver implementation for Kafka. Used when Kafka isn't configured, will always return :ok.
"""

@behaviour TololoCore.Kafka

@spec produce(String.t(), String.t(), term()) :: :ok | {:error, term()}
def produce(_topic, _message, _opts \\ []), do: :ok
end
12 changes: 0 additions & 12 deletions tololo/extensions/kafka/driver.ex

This file was deleted.

27 changes: 0 additions & 27 deletions tololo/extensions/kafka/kafka.ex

This file was deleted.

17 changes: 17 additions & 0 deletions tololo/extensions/kafka/lib/driver.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Tololo.Extensions.Kafka.Driver do
@moduledoc """
Default driver implementation for Kafka.
"""

@behaviour TololoCore.Kafka

@spec produce(String.t(), String.t(), term()) :: :ok | {:error, term()}
def produce(topic, message, opts \\ []) do
KafkaEx.produce(%KafkaEx.Protocol.Produce.Request{
topic: topic,
partition: 0,
required_acks: 1,
messages: [%KafkaEx.Protocol.Produce.Message{value: message}]
})
end
end
45 changes: 45 additions & 0 deletions tololo/extensions/kafka/lib/extension.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Tololo.Extensions.Kafka do
@moduledoc false
use Supervisor
@behaviour TololoCore.Extension

@impl true
def child_spec(init_arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [init_arg]},
type: :supervisor,
restart: :permanent
}
end

def start_link(_init_arg) do
Application.put_env(:tololo, :kafka_driver, Tololo.Extensions.Kafka.Driver)
Supervisor.start_link(__MODULE__, nil, name: __MODULE__)
KafkaEx.create_worker(:kafka_ex)
end

@impl true
def init(_) do
children = [
{Tololo.Extensions.Kafka.Supervisor, max_restarts: 10, max_seconds: 60}
]

Supervisor.init(children, strategy: :one_for_all)
end

@impl true
def routes() do
quote do
end
end

@impl true
def endpoint() do
quote do
end
end

@impl true
def ash_domains(), do: []
end
17 changes: 17 additions & 0 deletions tololo/extensions/kafka/lib/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Tololo.Extensions.Kafka.Supervisor do
@moduledoc """
Wrapper needed to manually pass the KafkaEx supervisor to the main app's supervision tree.
"""
def start_link(opts) do
KafkaEx.Supervisor.start_link(opts[:max_restarts], opts[:max_seconds])
end

def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor,
restart: :permanent,
}
end
end
29 changes: 29 additions & 0 deletions tololo/extensions/kafka/mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Tololo.Extensions.Kafka.MixProject do
use Mix.Project

def project do
[
app: :tololo_extension_kafka,
version: "0.1.0",
elixir: "~> 1.18",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

def application do
[
extra_applications: [:logger]
]
end

defp deps do
[
{:kafka_ex, "~> 0.11", runtime: false},
{:tololo_core,
path:
Path.join(["..", "..", "core"])
|> Path.expand()}
]
end
end
Loading

0 comments on commit e55c94f

Please sign in to comment.