Skip to content

Commit

Permalink
feat: add sort option
Browse files Browse the repository at this point in the history
  • Loading branch information
barnabasJ authored Jan 6, 2025
2 parents cafa4e6 + 8905cef commit dd22a12
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 31 deletions.
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spark_locals_without_parens = [
scheduler_cron: 1,
scheduler_priority: 1,
scheduler_queue: 1,
sort: 1,
state: 1,
stream_batch_size: 1,
trigger: 1,
Expand Down
1 change: 1 addition & 0 deletions documentation/dsls/DSL-AshOban.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ end
| [`read_action`](#oban-triggers-trigger-read_action){: #oban-triggers-trigger-read_action } | `atom` | | The read action to use when querying records. Defaults to the primary read. This action *must* support keyset pagination. |
| [`worker_read_action`](#oban-triggers-trigger-worker_read_action){: #oban-triggers-trigger-worker_read_action } | `atom` | | The read action to use when fetching the individual records for the trigger. Defaults to `read_action`. If you customize this, ensure your action handles scenarios where the trigger is no longer relevant. |
| [`where`](#oban-triggers-trigger-where){: #oban-triggers-trigger-where } | `any` | | The filter expression to determine if something should be triggered |
| [`sort`](#oban-triggers-trigger-sort){: #oban-triggers-trigger-sort } | `any` | | The sort applied to the query that determines if something should be triggered |
| [`on_error`](#oban-triggers-trigger-on_error){: #oban-triggers-trigger-on_error } | `atom` | | An update action to call after the last attempt has failed. See the getting started guide for more. |


Expand Down
8 changes: 8 additions & 0 deletions lib/ash_oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule AshOban do
:max_scheduler_attempts,
:record_limit,
:where,
:sort,
:state,
:scheduler,
:worker,
Expand Down Expand Up @@ -200,6 +201,10 @@ defmodule AshOban do
type: :any,
doc: "The filter expression to determine if something should be triggered"
],
sort: [
type: :any,
doc: "The sort applied to the query that determines if something should be triggered"
],
on_error: [
type: :atom,
doc:
Expand Down Expand Up @@ -409,6 +414,9 @@ defmodule AshOban do
|> store_actor(opts[:actor])
|> scheduler.new()
|> Oban.insert!()

_ ->
raise ArgumentError, "Invalid trigger or scheduled action: #{inspect(trigger)}"
end
end

Expand Down
76 changes: 48 additions & 28 deletions lib/transformers/define_schedulers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end

sort =
if not is_nil(trigger.sort) do
quote location: :keep do
def sort(query) do
Ash.Query.sort(query, unquote(trigger.sort))
end
end
end

limit_stream =
if trigger.record_limit do
quote do
Expand All @@ -90,37 +99,47 @@ defmodule AshOban.Transformers.DefineSchedulers do
end
end

stream =
pipeline =
quote do
resource
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.select(unquote(primary_key))
|> limit_stream()
end

pipeline =
if is_nil(trigger.where) do
quote location: :keep do
def stream(resource, actor) do
resource
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.select(unquote(primary_key))
|> limit_stream()
|> Ash.Query.for_read(unquote(trigger.read_action), %{},
authorize?: AshOban.authorize?(),
actor: actor,
domain: unquote(domain)
)
|> Ash.stream!(unquote(batch_opts))
end
pipeline
else
quote do
unquote(pipeline) |> filter()
end
end

pipeline =
if is_nil(trigger.sort) do
pipeline
else
quote location: :keep do
def stream(resource, actor) do
resource
|> Ash.Query.set_context(%{private: %{ash_oban?: true}})
|> Ash.Query.select(unquote(primary_key))
|> limit_stream()
|> filter()
|> Ash.Query.for_read(unquote(trigger.read_action), %{},
authorize?: AshOban.authorize?(),
actor: actor,
domain: unquote(domain)
)
|> Ash.stream!()
end
quote do
unquote(pipeline) |> sort()
end
end

pipeline =
quote do
unquote(pipeline)
|> Ash.Query.for_read(unquote(trigger.read_action), %{},
authorize?: AshOban.authorize?(),
actor: actor,
domain: unquote(domain)
)
|> Ash.stream!(unquote(batch_opts))
end

stream =
quote location: :keep do
def stream(resource, actor) do
unquote(pipeline)
end
end

Expand Down Expand Up @@ -239,6 +258,7 @@ defmodule AshOban.Transformers.DefineSchedulers do
unquote(limit_stream)
unquote(stream)
unquote(filter)
unquote(sort)
unquote(insert)
end

Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ defmodule AshOban.MixProject do
{:oban, "~> 2.15"},
{:postgrex, "~> 0.18"},
# dev/test dependencies
{:igniter, "~> 0.5.0", only: [:dev, :test]},
{:simple_sat, "~> 0.1", only: [:dev, :test]},
{:ex_doc, "~> 0.22", only: [:dev, :test], runtime: false},
{:ex_check, "~> 0.12", only: [:dev, :test]},
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"git_cli": {:hex, :git_cli, "0.3.0", "a5422f9b95c99483385b976f5d43f7e8233283a47cda13533d7c16131cb14df5", [:mix], [], "hexpm", "78cb952f4c86a41f4d3511f1d3ecb28edb268e3a7df278de2faa1bd4672eaf9b"},
"git_ops": {:hex, :git_ops, "2.6.3", "38c6e381b8281b86e2911fa39bea4eab2d171c86d7428786566891efb73b68c3", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a81cb6c6a2a026a4d48cb9a2e1dfca203f9283a3a70aa0c7bc171970c44f23f8"},
"glob_ex": {:hex, :glob_ex, "0.1.11", "cb50d3f1ef53f6ca04d6252c7fde09fd7a1cf63387714fe96f340a1349e62c93", [:mix], [], "hexpm", "342729363056e3145e61766b416769984c329e4378f1d558b63e341020525de4"},
"igniter": {:hex, :igniter, "0.4.8", "6d1bf4934952ac3eb20f6cbac0d5cd6d8012e42e3de20ad794703556c14cfa08", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, ">= 1.1.1 and < 2.0.0-0", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "f9dd06f971fa053c6b0d9f8263b625f619a0fd3645d6a8cd6170935055a8f0df"},
"igniter": {:hex, :igniter, "0.5.8", "d91e90fecb99beadfa9d0d8434fbd4f0fe06ea1a1d29cae4dfd0cb058cb3a5c7", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, ">= 1.1.1 and < 2.0.0-0", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "fef198324925405ea5c3b16166002be03b2d7497c038cfc9708aa557d27ba5a2"},
"iterex": {:hex, :iterex, "0.1.2", "58f9b9b9a22a55cbfc7b5234a9c9c63eaac26d276b3db80936c0e1c60355a5a6", [:mix], [], "hexpm", "2e103b8bcc81757a9af121f6dc0df312c9a17220f302b1193ef720460d03029d"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"},
Expand All @@ -30,12 +30,12 @@
"owl": {:hex, :owl, "0.12.0", "0c4b48f90797a7f5f09ebd67ba7ebdc20761c3ec9c7928dfcafcb6d3c2d25c99", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "241d85ae62824dd72f9b2e4a5ba4e69ebb9960089a3c68ce6c1ddf2073db3c15"},
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
"reactor": {:hex, :reactor, "0.10.3", "41a8c34251148e36dd7c75aa8433f2c2f283f29c097f9eb84a630ab28dd75651", [:mix], [{:igniter, "~> 0.4", [hex: :igniter, repo: "hexpm", optional: true]}, {:iterex, "~> 0.1", [hex: :iterex, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2b34380e22b69a35943a7bcceffd5a8b766870f1fc9052162a7ff74ef9cdb3b2"},
"rewrite": {:hex, :rewrite, "1.1.1", "0e6674eb5f8cb11aabe5ad6207151b4156bf173aa9b43133a68f8cc882364570", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "fcd688b3ca543c3a1f1f4615ccc054ec37cfcde91133a27a683ec09b35ae1496"},
"rewrite": {:hex, :rewrite, "1.1.2", "f5a5d10f5fed1491a6ff48e078d4585882695962ccc9e6c779bae025d1f92eda", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "7f8b94b1e3528d0a47b3e8b7bfeca559d2948a65fa7418a9ad7d7712703d39d4"},
"simple_sat": {:hex, :simple_sat, "0.1.3", "f650fc3c184a5fe741868b5ac56dc77fdbb428468f6dbf1978e14d0334497578", [:mix], [], "hexpm", "a54305066a356b7194dc81db2a89232bacdc0b3edaef68ed9aba28dcbc34887b"},
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
"sourceror": {:hex, :sourceror, "1.7.1", "599d78f4cc2be7d55c9c4fd0a8d772fd0478e3a50e726697c20d13d02aa056d4", [:mix], [], "hexpm", "cd6f268fe29fa00afbc535e215158680a0662b357dc784646d7dff28ac65a0fc"},
"spark": {:hex, :spark, "2.2.36", "07c921e5efb27f184267c3431d2f82099e24cac90748a47383dd75cbfb558268", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "e5ac56b75e5ad43da6d8302b6713277488f8e9a3abdba9aae8f0d0f9cff04538"},
"spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"},
"spitfire": {:hex, :spitfire, "0.1.4", "8fe0df66e735323e4f2a56e719603391b160dd68efd922cadfbb85a2cf6c68af", [:mix], [], "hexpm", "d40d850f4ede5235084876246756b90c7bcd12994111d57c55e2e1e23ac3fe61"},
"splode": {:hex, :splode, "0.2.7", "ed042fa9bd8fe7b66dd0a0faabdb97352058420d90cd1c7c1537f609deb7ef6d", [:mix], [], "hexpm", "267f1f51d5a5ac988cda0649498294844988c5086916fed5a8aff297d69a2059"},
"stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
Expand Down
25 changes: 25 additions & 0 deletions test/ash_oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ defmodule AshObanTest do
)
end

test "sort is applied when scheduling" do
triggered1 =
Triggered
|> Ash.Changeset.for_create(:create, %{})
|> Ash.create!()

triggered2 =
Triggered
|> Ash.Changeset.for_create(:create, %{})
|> Ash.create!()

assert %{success: 3} =
AshOban.Test.schedule_and_run_triggers({Triggered, :process},
actor: %AshOban.Test.ActorPersister.FakeActor{id: 1}
)

triggered1 =
Ash.reload!(triggered1)

triggered2 =
Ash.reload!(triggered2)

assert DateTime.before?(triggered1.updated_at, triggered2.updated_at)
end

test "a record can be processed manually with additional arguments" do
record =
Triggered
Expand Down
2 changes: 2 additions & 0 deletions test/support/triggered.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule AshOban.Test.Triggered do
trigger :process do
action :process
where expr(processed != true)
sort inserted_at: :asc
max_attempts 2
worker_read_action(:read)
end
Expand Down Expand Up @@ -94,5 +95,6 @@ defmodule AshOban.Test.Triggered do
attributes do
uuid_primary_key :id
attribute :processed, :boolean, default: false, allow_nil?: false
timestamps()
end
end

0 comments on commit dd22a12

Please sign in to comment.