Skip to content

Commit

Permalink
feat: add list tenants option (#109)
Browse files Browse the repository at this point in the history
* add an option that allows to specify a list of tenants to run a trigger for

* fix pattern match

* add test for tenant list

* add module doc

* generate docs
  • Loading branch information
barnabasJ authored Jan 13, 2025
1 parent 30e6004 commit 98d614a
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 22 deletions.
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ spark_locals_without_parens = [
debug?: 1,
domain: 1,
extra_args: 1,
list_tenants: 1,
lock_for_update?: 1,
log_errors?: 1,
log_final_error?: 1,
Expand Down
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ if Mix.env() == :test do
triggered_process: 10,
triggered_process_2: 10,
triggered_say_hello: 10,
triggered_tenant_aware: 10,
triggered_process_generic: 10
]

Expand Down
2 changes: 2 additions & 0 deletions documentation/dsls/DSL-AshOban.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ end
| Name | Type | Default | Docs |
|------|------|---------|------|
| [`domain`](#oban-domain){: #oban-domain } | `module` | | The Domain to use when calling actions on this resource. Defaults to the resource's domain. |
| [`list_tenants`](#oban-list_tenants){: #oban-list_tenants } | `list(any) \| (-> any) \| module` | `[nil]` | A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for. Can be overwritten on the trigger level. |


## oban.triggers
Expand Down Expand Up @@ -96,6 +97,7 @@ end
| [`action`](#oban-triggers-trigger-action){: #oban-triggers-trigger-action .spark-required} | `atom` | | The action to be triggered. Defaults to the identifier of the resource plus the name of the trigger |
| [`action_input`](#oban-triggers-trigger-action_input){: #oban-triggers-trigger-action_input } | `map` | | Static inputs to supply to the update/destroy action when it is called. Any metadata produced by `read_metadata` will overwrite these values. |
| [`extra_args`](#oban-triggers-trigger-extra_args){: #oban-triggers-trigger-extra_args } | `map \| (any -> any)` | | Additional arguments to merge into the job's arguments map. Can either be a map or a function that takes the record and returns a map. |
| [`list_tenants`](#oban-triggers-trigger-list_tenants){: #oban-triggers-trigger-list_tenants } | `list(any) \| (-> any) \| module` | | A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for. |
| [`scheduler_queue`](#oban-triggers-trigger-scheduler_queue){: #oban-triggers-trigger-scheduler_queue } | `atom` | | The queue to place the scheduler job in. The same queue as job is used by default (but with a priority of 1 so schedulers run first). |
| [`debug?`](#oban-triggers-trigger-debug?){: #oban-triggers-trigger-debug? } | `boolean` | `false` | If set to `true`, detailed debug logging will be enabled for this trigger. You can also set `config :ash_oban, debug_all_triggers?: true` to enable debug logging for all triggers. If the action has `transaction?: false` this is automatically false. |
| [`lock_for_update?`](#oban-triggers-trigger-lock_for_update?){: #oban-triggers-trigger-lock_for_update? } | `boolean` | `true` | If `true`, a transaction will be started before looking up the record, and it will be locked for update. Typically you should leave this on unless you have before/after/around transaction hooks. |
Expand Down
29 changes: 27 additions & 2 deletions lib/ash_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule AshOban do
:read_action,
:action_input,
:extra_args,
:list_tenants,
:worker_read_action,
:lock_for_update?,
:queue,
Expand Down Expand Up @@ -102,6 +103,17 @@ defmodule AshOban do
Additional arguments to merge into the job's arguments map. Can either be a map or a function that takes the record and returns a map.
"""
],
list_tenants: [
type:
{:or,
[
{:list, :any},
{:spark_function_behaviour, AshOban.ListTenants, {AshOban.ListTenants.Function, 0}}
]},
doc: """
A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for.
"""
],
scheduler_queue: [
type: :atom,
doc:
Expand Down Expand Up @@ -360,6 +372,18 @@ defmodule AshOban do
type: {:behaviour, Ash.Domain},
doc:
"The Domain to use when calling actions on this resource. Defaults to the resource's domain."
],
list_tenants: [
type:
{:or,
[
{:list, :any},
{:spark_function_behaviour, AshOban.ListTenants, {AshOban.ListTenants.Function, 0}}
]},
default: [nil],
doc: """
A list of tenants or a function behaviour that returns a list of tenants a trigger should be run for. Can be overwritten on the trigger level.
"""
]
],
sections: [@triggers, @scheduled_actions]
Expand Down Expand Up @@ -487,7 +511,7 @@ defmodule AshOban do
All other options are passed through to `c:Oban.Worker.new/2`
"""
def build_trigger(%resource{} = record, trigger, opts \\ []) do
{opts, oban_job_opts} = Keyword.split(opts, [:actor, :args, :action_arguments])
{opts, oban_job_opts} = Keyword.split(opts, [:actor, :tenant, :args, :action_arguments])

trigger =
case trigger do
Expand Down Expand Up @@ -524,7 +548,8 @@ defmodule AshOban do
%{
primary_key: validate_primary_key(Map.take(record, primary_key), resource),
metadata: metadata,
action_arguments: opts[:action_arguments] || %{}
action_arguments: opts[:action_arguments] || %{},
tenant: opts[:tenant]
}
|> AshOban.store_actor(opts[:actor])
|> then(&Map.merge(extra_args, &1))
Expand Down
6 changes: 6 additions & 0 deletions lib/list_tenants.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule AshOban.ListTenants do
@moduledoc """
The behaviour for listing tenants.
"""
@callback list_tenants(opts :: Keyword.t()) :: [term()]
end
15 changes: 15 additions & 0 deletions lib/list_tenants/function.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule AshOban.ListTenants.Function do
@moduledoc false

@behaviour AshOban.ListTenants

@impl true
def list_tenants([{:fun, {m, f, a}}]) do
apply(m, f, a)
end

@impl true
def list_tenants([{:fun, fun}]) do
fun.()
end
end
63 changes: 47 additions & 16 deletions lib/transformers/define_schedulers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ defmodule AshOban.Transformers.DefineSchedulers do
actor: actor,
domain: unquote(domain)
)
|> Ash.Query.set_tenant(tenant)
|> Ash.stream!(unquote(batch_opts))
end

stream =
quote location: :keep do
def stream(resource, actor) do
def stream(resource, actor, tenant) do
unquote(pipeline)
end
end
Expand Down Expand Up @@ -231,16 +232,29 @@ defmodule AshOban.Transformers.DefineSchedulers do
fn _ -> %{} end
end

case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
unquote(resource)
|> stream(actor)
|> Stream.map(&AshOban.build_trigger(&1, trigger, actor: actor))
|> insert()
(AshOban.Info.oban_trigger(unquote(resource), unquote(trigger.name)).list_tenants ||
AshOban.Info.oban_list_tenants!(unquote(resource)))
|> then(fn
{module, o} ->
module.list_tenants(o)

{:error, e} ->
raise Ash.Error.to_ash_error(e)
end
list_tenants ->
list_tenants
end)
|> Enum.each(fn tenant ->
case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
unquote(resource)
|> stream(actor, tenant)
|> Stream.map(&AshOban.build_trigger(&1, trigger, actor: actor))
|> insert()

{:error, e} ->
raise Ash.Error.to_ash_error(e)
end
end)

:ok
rescue
e ->
Logger.error(
Expand Down Expand Up @@ -307,6 +321,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_tenant(tenant)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
Expand Down Expand Up @@ -335,6 +350,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
Ash.Changeset.before_action(changeset, fn changeset ->
query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_tenant(tenant)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
Expand Down Expand Up @@ -363,26 +379,26 @@ defmodule AshOban.Transformers.DefineSchedulers do
prepare_error =
if on_error_transaction? do
quote location: :keep do
defp prepare_error(changeset, primary_key, authorize?, actor) do
defp prepare_error(changeset, primary_key, authorize?, actor, tenant) do
unquote(get_and_lock)
end
end
else
quote location: :keep do
defp prepare_error(changeset, _, _, _), do: changeset
defp prepare_error(changeset, _, _, _, _), do: changeset
end
end

prepare =
if work_transaction? do
quote location: :keep do
defp prepare(changeset, primary_key, authorize?, actor) do
defp prepare(changeset, primary_key, authorize?, actor, tenant) do
unquote(get_and_lock)
end
end
else
quote location: :keep do
defp prepare(changeset, _, _, _), do: changeset
defp prepare(changeset, _, _, _, _), do: changeset
end
end

Expand Down Expand Up @@ -480,6 +496,8 @@ defmodule AshOban.Transformers.DefineSchedulers do

case AshOban.lookup_actor(args["actor"]) do
{:ok, actor} ->
tenant = args["tenant"]

query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
Expand Down Expand Up @@ -515,10 +533,11 @@ defmodule AshOban.Transformers.DefineSchedulers do

record
|> Ash.Changeset.new()
|> prepare_error(primary_key, authorize?, actor)
|> prepare_error(primary_key, authorize?, actor, tenant)
|> case do
changeset ->
changeset
|> Ash.Changeset.set_tenant(tenant)
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_action(unquote(trigger.on_error), %{error: error},
authorize?: authorize?,
Expand Down Expand Up @@ -613,6 +632,8 @@ defmodule AshOban.Transformers.DefineSchedulers do
{:ok, actor} ->
authorize? = AshOban.authorize?()

tenant = args["tenant"]

AshOban.debug(
"Trigger #{unquote(inspect(resource))}.#{unquote(trigger.name)} triggered for primary key #{inspect(primary_key)}",
unquote(trigger.debug?)
Expand All @@ -627,6 +648,7 @@ defmodule AshOban.Transformers.DefineSchedulers do

unquote(resource)
|> Ash.ActionInput.new()
|> Ash.ActionInput.set_tenant(tenant)
|> Ash.ActionInput.set_context(%{private: %{ash_oban?: true}})
|> Ash.ActionInput.for_action(
unquote(trigger.action),
Expand Down Expand Up @@ -690,6 +712,8 @@ defmodule AshOban.Transformers.DefineSchedulers do
{:ok, actor} ->
authorize? = AshOban.authorize?()

tenant = args["tenant"]

args =
if unquote(is_nil(trigger.read_metadata)) do
%{}
Expand All @@ -715,6 +739,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args),
authorize?: authorize?,
actor: actor,
tenant: tenant,
domain: unquote(domain),
context: %{private: %{ash_oban?: true}},
skip_unknown_inputs: [:metadata],
Expand All @@ -729,6 +754,8 @@ defmodule AshOban.Transformers.DefineSchedulers do
Map.merge(unquote(Macro.escape(trigger.action_input || %{})), args),
authorize?: authorize?,
actor: actor,
tenant: tenant,
domain: unquote(domain),
domain: unquote(domain),
context: %{private: %{ash_oban?: true}},
skip_unknown_inputs: [:metadata],
Expand Down Expand Up @@ -778,8 +805,11 @@ defmodule AshOban.Transformers.DefineSchedulers do
{:ok, actor} ->
authorize? = AshOban.authorize?()

tenant = args["tenant"]

query()
|> Ash.Query.do_filter(primary_key)
|> Ash.Query.set_tenant(tenant)
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.for_read(unquote(read_action), %{},
authorize?: authorize?,
Expand Down Expand Up @@ -807,7 +837,8 @@ defmodule AshOban.Transformers.DefineSchedulers do

record
|> Ash.Changeset.new()
|> prepare(primary_key, authorize?, actor)
|> prepare(primary_key, authorize?, actor, tenant)
|> Ash.Changeset.set_tenant(tenant)
|> Ash.Changeset.set_context(%{private: %{ash_oban?: true}})
|> Ash.Changeset.for_action(
unquote(trigger.action),
Expand Down
Loading

0 comments on commit 98d614a

Please sign in to comment.