From 3c0bfdf534225e360595e69bf9c41861e5652a56 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Fri, 20 Dec 2024 15:55:47 +0100 Subject: [PATCH 1/5] feat: allow custom sort for scheduler read --- lib/ash_oban.ex | 5 ++ lib/transformers/define_schedulers.ex | 78 +++++++++++++++++---------- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/lib/ash_oban.ex b/lib/ash_oban.ex index 3f88bb0..6830835 100644 --- a/lib/ash_oban.ex +++ b/lib/ash_oban.ex @@ -52,6 +52,7 @@ defmodule AshOban do :max_scheduler_attempts, :record_limit, :where, + :sort, :state, :scheduler, :worker, @@ -200,6 +201,10 @@ defmodule AshOban do type: :any, doc: "The filter expression to determine if something should be triggered" ], + sort: [ + type: :any, + doc: "The sort applied to the query that determines if something should be triggered" + ], on_error: [ type: :atom, doc: diff --git a/lib/transformers/define_schedulers.ex b/lib/transformers/define_schedulers.ex index a90b14b..7fc4345 100644 --- a/lib/transformers/define_schedulers.ex +++ b/lib/transformers/define_schedulers.ex @@ -64,6 +64,15 @@ defmodule AshOban.Transformers.DefineSchedulers do end end + sort = + if not is_nil(trigger.sort) do + quote location: :keep do + def sort(query) do + Ash.Query.sort(query, unquote(trigger.sort)) + end + end + end + limit_stream = if trigger.record_limit do quote do @@ -90,37 +99,47 @@ defmodule AshOban.Transformers.DefineSchedulers do end end - stream = - if is_nil(trigger.where) do - quote location: :keep do - def stream(resource, actor) do - resource - |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Query.select(unquote(primary_key)) - |> limit_stream() - |> Ash.Query.for_read(unquote(trigger.read_action), %{}, - authorize?: AshOban.authorize?(), - actor: actor, - domain: unquote(domain) - ) - |> Ash.stream!(unquote(batch_opts)) - end + pipeline = + quote do + resource + |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) + |> Ash.Query.select(unquote(primary_key)) + |> limit_stream() + end + + pipeline = + if not is_nil(trigger.where) do + quote do + unquote(pipeline) |> filter() end else - quote location: :keep do - def stream(resource, actor) do - resource - |> Ash.Query.set_context(%{private: %{ash_oban?: true}}) - |> Ash.Query.select(unquote(primary_key)) - |> limit_stream() - |> filter() - |> Ash.Query.for_read(unquote(trigger.read_action), %{}, - authorize?: AshOban.authorize?(), - actor: actor, - domain: unquote(domain) - ) - |> Ash.stream!() - end + pipeline + end + + pipeline = + if not is_nil(trigger.sort) do + quote do + unquote(pipeline) |> sort() + end + else + pipeline + end + + pipeline = + quote do + unquote(pipeline) + |> Ash.Query.for_read(unquote(trigger.read_action), %{}, + authorize?: AshOban.authorize?(), + actor: actor, + domain: unquote(domain) + ) + |> Ash.stream!(unquote(batch_opts)) + end + + stream = + quote location: :keep do + def stream(resource, actor) do + unquote(pipeline) end end @@ -239,6 +258,7 @@ defmodule AshOban.Transformers.DefineSchedulers do unquote(limit_stream) unquote(stream) unquote(filter) + unquote(sort) unquote(insert) end From e961e2ff020cdc64eb4254cacd3864fd4199bdd9 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Fri, 20 Dec 2024 15:56:06 +0100 Subject: [PATCH 2/5] feat: better error message for scheduling a non existing trigger --- lib/ash_oban.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/ash_oban.ex b/lib/ash_oban.ex index 6830835..b435bca 100644 --- a/lib/ash_oban.ex +++ b/lib/ash_oban.ex @@ -414,6 +414,9 @@ defmodule AshOban do |> store_actor(opts[:actor]) |> scheduler.new() |> Oban.insert!() + + _ -> + raise ArgumentError, "Invalid trigger or scheduled action: #{inspect(trigger)}" end end From 9470a37460bfedb5b6df01ab377a179c52213f78 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 6 Jan 2025 23:02:48 +0100 Subject: [PATCH 3/5] add a sort test --- test/ash_oban_test.exs | 25 +++++++++++++++++++++++++ test/support/triggered.ex | 2 ++ 2 files changed, 27 insertions(+) diff --git a/test/ash_oban_test.exs b/test/ash_oban_test.exs index 33c77e7..47bffc3 100644 --- a/test/ash_oban_test.exs +++ b/test/ash_oban_test.exs @@ -39,6 +39,31 @@ defmodule AshObanTest do ) end + test "sort is applied when scheduling" do + triggered1 = + Triggered + |> Ash.Changeset.for_create(:create, %{}) + |> Ash.create!() + + triggered2 = + Triggered + |> Ash.Changeset.for_create(:create, %{}) + |> Ash.create!() + + assert %{success: 3} = + AshOban.Test.schedule_and_run_triggers({Triggered, :process}, + actor: %AshOban.Test.ActorPersister.FakeActor{id: 1} + ) + + triggered1 = + Ash.reload!(triggered1) + + triggered2 = + Ash.reload!(triggered2) + + assert DateTime.before?(triggered1.updated_at, triggered2.updated_at) + end + test "a record can be processed manually with additional arguments" do record = Triggered diff --git a/test/support/triggered.ex b/test/support/triggered.ex index 9a63068..ed2c01a 100644 --- a/test/support/triggered.ex +++ b/test/support/triggered.ex @@ -11,6 +11,7 @@ defmodule AshOban.Test.Triggered do trigger :process do action :process where expr(processed != true) + sort inserted_at: :asc max_attempts 2 worker_read_action(:read) end @@ -94,5 +95,6 @@ defmodule AshOban.Test.Triggered do attributes do uuid_primary_key :id attribute :processed, :boolean, default: false, allow_nil?: false + timestamps() end end From 348abf38a854ddfe7cf347a5f7b17fe912be4993 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 6 Jan 2025 23:07:13 +0100 Subject: [PATCH 4/5] generate docs --- .formatter.exs | 1 + documentation/dsls/DSL-AshOban.md | 1 + mix.exs | 1 + mix.lock | 6 +++--- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.formatter.exs b/.formatter.exs index 0a2c28d..85e2954 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -19,6 +19,7 @@ spark_locals_without_parens = [ scheduler_cron: 1, scheduler_priority: 1, scheduler_queue: 1, + sort: 1, state: 1, stream_batch_size: 1, trigger: 1, diff --git a/documentation/dsls/DSL-AshOban.md b/documentation/dsls/DSL-AshOban.md index e349ee6..cd29c4d 100644 --- a/documentation/dsls/DSL-AshOban.md +++ b/documentation/dsls/DSL-AshOban.md @@ -113,6 +113,7 @@ end | [`read_action`](#oban-triggers-trigger-read_action){: #oban-triggers-trigger-read_action } | `atom` | | The read action to use when querying records. Defaults to the primary read. This action *must* support keyset pagination. | | [`worker_read_action`](#oban-triggers-trigger-worker_read_action){: #oban-triggers-trigger-worker_read_action } | `atom` | | The read action to use when fetching the individual records for the trigger. Defaults to `read_action`. If you customize this, ensure your action handles scenarios where the trigger is no longer relevant. | | [`where`](#oban-triggers-trigger-where){: #oban-triggers-trigger-where } | `any` | | The filter expression to determine if something should be triggered | +| [`sort`](#oban-triggers-trigger-sort){: #oban-triggers-trigger-sort } | `any` | | The sort applied to the query that determines if something should be triggered | | [`on_error`](#oban-triggers-trigger-on_error){: #oban-triggers-trigger-on_error } | `atom` | | An update action to call after the last attempt has failed. See the getting started guide for more. | diff --git a/mix.exs b/mix.exs index 59721ae..db4666c 100644 --- a/mix.exs +++ b/mix.exs @@ -123,6 +123,7 @@ defmodule AshOban.MixProject do {:oban, "~> 2.15"}, {:postgrex, "~> 0.18"}, # dev/test dependencies + {:igniter, "~> 0.5.0", only: [:dev, :test]}, {:simple_sat, "~> 0.1", only: [:dev, :test]}, {:ex_doc, "~> 0.22", only: [:dev, :test], runtime: false}, {:ex_check, "~> 0.12", only: [:dev, :test]}, diff --git a/mix.lock b/mix.lock index 530b468..1db95af 100644 --- a/mix.lock +++ b/mix.lock @@ -17,7 +17,7 @@ "git_cli": {:hex, :git_cli, "0.3.0", "a5422f9b95c99483385b976f5d43f7e8233283a47cda13533d7c16131cb14df5", [:mix], [], "hexpm", "78cb952f4c86a41f4d3511f1d3ecb28edb268e3a7df278de2faa1bd4672eaf9b"}, "git_ops": {:hex, :git_ops, "2.6.3", "38c6e381b8281b86e2911fa39bea4eab2d171c86d7428786566891efb73b68c3", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a81cb6c6a2a026a4d48cb9a2e1dfca203f9283a3a70aa0c7bc171970c44f23f8"}, "glob_ex": {:hex, :glob_ex, "0.1.11", "cb50d3f1ef53f6ca04d6252c7fde09fd7a1cf63387714fe96f340a1349e62c93", [:mix], [], "hexpm", "342729363056e3145e61766b416769984c329e4378f1d558b63e341020525de4"}, - "igniter": {:hex, :igniter, "0.4.8", "6d1bf4934952ac3eb20f6cbac0d5cd6d8012e42e3de20ad794703556c14cfa08", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, ">= 1.1.1 and < 2.0.0-0", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "f9dd06f971fa053c6b0d9f8263b625f619a0fd3645d6a8cd6170935055a8f0df"}, + "igniter": {:hex, :igniter, "0.5.8", "d91e90fecb99beadfa9d0d8434fbd4f0fe06ea1a1d29cae4dfd0cb058cb3a5c7", [:mix], [{:glob_ex, "~> 0.1.7", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:rewrite, ">= 1.1.1 and < 2.0.0-0", [hex: :rewrite, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.4", [hex: :sourceror, repo: "hexpm", optional: false]}, {:spitfire, ">= 0.1.3 and < 1.0.0-0", [hex: :spitfire, repo: "hexpm", optional: false]}], "hexpm", "fef198324925405ea5c3b16166002be03b2d7497c038cfc9708aa557d27ba5a2"}, "iterex": {:hex, :iterex, "0.1.2", "58f9b9b9a22a55cbfc7b5234a9c9c63eaac26d276b3db80936c0e1c60355a5a6", [:mix], [], "hexpm", "2e103b8bcc81757a9af121f6dc0df312c9a17220f302b1193ef720460d03029d"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "libgraph": {:hex, :libgraph, "0.16.0", "3936f3eca6ef826e08880230f806bfea13193e49bf153f93edcf0239d4fd1d07", [:mix], [], "hexpm", "41ca92240e8a4138c30a7e06466acc709b0cbb795c643e9e17174a178982d6bf"}, @@ -30,12 +30,12 @@ "owl": {:hex, :owl, "0.12.0", "0c4b48f90797a7f5f09ebd67ba7ebdc20761c3ec9c7928dfcafcb6d3c2d25c99", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "241d85ae62824dd72f9b2e4a5ba4e69ebb9960089a3c68ce6c1ddf2073db3c15"}, "postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"}, "reactor": {:hex, :reactor, "0.10.3", "41a8c34251148e36dd7c75aa8433f2c2f283f29c097f9eb84a630ab28dd75651", [:mix], [{:igniter, "~> 0.4", [hex: :igniter, repo: "hexpm", optional: true]}, {:iterex, "~> 0.1", [hex: :iterex, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2b34380e22b69a35943a7bcceffd5a8b766870f1fc9052162a7ff74ef9cdb3b2"}, - "rewrite": {:hex, :rewrite, "1.1.1", "0e6674eb5f8cb11aabe5ad6207151b4156bf173aa9b43133a68f8cc882364570", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "fcd688b3ca543c3a1f1f4615ccc054ec37cfcde91133a27a683ec09b35ae1496"}, + "rewrite": {:hex, :rewrite, "1.1.2", "f5a5d10f5fed1491a6ff48e078d4585882695962ccc9e6c779bae025d1f92eda", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "7f8b94b1e3528d0a47b3e8b7bfeca559d2948a65fa7418a9ad7d7712703d39d4"}, "simple_sat": {:hex, :simple_sat, "0.1.3", "f650fc3c184a5fe741868b5ac56dc77fdbb428468f6dbf1978e14d0334497578", [:mix], [], "hexpm", "a54305066a356b7194dc81db2a89232bacdc0b3edaef68ed9aba28dcbc34887b"}, "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "sourceror": {:hex, :sourceror, "1.7.1", "599d78f4cc2be7d55c9c4fd0a8d772fd0478e3a50e726697c20d13d02aa056d4", [:mix], [], "hexpm", "cd6f268fe29fa00afbc535e215158680a0662b357dc784646d7dff28ac65a0fc"}, "spark": {:hex, :spark, "2.2.36", "07c921e5efb27f184267c3431d2f82099e24cac90748a47383dd75cbfb558268", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: false]}], "hexpm", "e5ac56b75e5ad43da6d8302b6713277488f8e9a3abdba9aae8f0d0f9cff04538"}, - "spitfire": {:hex, :spitfire, "0.1.3", "7ea0f544005dfbe48e615ed90250c9a271bfe126914012023fd5e4b6b82b7ec7", [:mix], [], "hexpm", "d53b5107bcff526a05c5bb54c95e77b36834550affd5830c9f58760e8c543657"}, + "spitfire": {:hex, :spitfire, "0.1.4", "8fe0df66e735323e4f2a56e719603391b160dd68efd922cadfbb85a2cf6c68af", [:mix], [], "hexpm", "d40d850f4ede5235084876246756b90c7bcd12994111d57c55e2e1e23ac3fe61"}, "splode": {:hex, :splode, "0.2.7", "ed042fa9bd8fe7b66dd0a0faabdb97352058420d90cd1c7c1537f609deb7ef6d", [:mix], [], "hexpm", "267f1f51d5a5ac988cda0649498294844988c5086916fed5a8aff297d69a2059"}, "stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, From 8905cef8060432552b307a67f2b85c5876bc9ca1 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Mon, 6 Jan 2025 23:24:16 +0100 Subject: [PATCH 5/5] fix credo warning --- lib/transformers/define_schedulers.ex | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/transformers/define_schedulers.ex b/lib/transformers/define_schedulers.ex index 7fc4345..d341847 100644 --- a/lib/transformers/define_schedulers.ex +++ b/lib/transformers/define_schedulers.ex @@ -108,21 +108,21 @@ defmodule AshOban.Transformers.DefineSchedulers do end pipeline = - if not is_nil(trigger.where) do + if is_nil(trigger.where) do + pipeline + else quote do unquote(pipeline) |> filter() end - else - pipeline end pipeline = - if not is_nil(trigger.sort) do + if is_nil(trigger.sort) do + pipeline + else quote do unquote(pipeline) |> sort() end - else - pipeline end pipeline =