Skip to content

Commit

Permalink
TriggerEngine: allow custom prefetch_count on trigger policies
Browse files Browse the repository at this point in the history
Until now, a trigger policy used a consumer prefetch set by the
`DATA_UPDATER_PLANT_AMQP_CONSUMER_PREFETCH_COUNT` env (as usual in Astarte).
However, if consumer prefetch count > 1, ordered message delivery is not
guaranteed, because messages might not be dequeued (thus processed) sequentially.
However, this allows for higher throughput. Now, the `prefetch_count` policy field
allows the user to set its the prefetch count.
The default policy will still use the env var, so as not to break compatibility
with previous Astarte versions.

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Jan 11, 2023
1 parent b89afbb commit 0e969b7
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTracker do
alias Astarte.Core.Triggers.Policy.Handler
alias Astarte.Core.Triggers.Policy.ErrorKeyword
alias Astarte.Core.Triggers.PolicyProtobuf.Policy, as: PolicyProto
alias Astarte.TriggerEngine.Config

# 30 seconds
@update_timeout 30 * 1000
Expand Down Expand Up @@ -138,7 +139,9 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTracker do
maximum_capacity: nil,
error_handlers: [
%Handler{on: %ErrorKeyword{keyword: "any_error"}, strategy: "discard"}
]
],
# Default prefetch_count to Config.amqp_consumer_prefetch_count!() so that we don't break Astarte < 1.1 behaviour
prefetch_count: Config.amqp_consumer_prefetch_count!()
}
|> Policy.to_policy_proto()
|> PolicyProto.encode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
queue_name = generate_queue_name(realm_name, policy.name)
routing_key = generate_routing_key(realm_name, policy.name)

with :ok <- @adapter.qos(channel, prefetch_count: Config.amqp_consumer_prefetch_count!()),
with :ok <- @adapter.qos(channel, prefetch_count: policy.prefetch_count),
:ok <- @adapter.declare_exchange(channel, exchange_name, type: :direct, durable: true),
{:ok, _queue} <- @adapter.declare_queue(channel, queue_name, durable: true),
:ok <-
Expand Down
4 changes: 2 additions & 2 deletions apps/astarte_trigger_engine/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ defmodule Astarte.TriggerEngine.Mixfile do

defp astarte_required_modules(_) do
[
{:astarte_core, github: "astarte-platform/astarte_core", branch: "release-1.1"},
{:astarte_core, github: "annopaolo/astarte_core", branch: "policy_prefetch_count"},
{:astarte_data_access,
github: "astarte-platform/astarte_data_access", branch: "release-1.1"}
github: "annopaolo/astarte_data_access", branch: "policy_prefetch_count"}
]
end

Expand Down
4 changes: 2 additions & 2 deletions apps/astarte_trigger_engine/mix.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
%{
"amqp": {:hex, :amqp, "2.1.1", "ad8dec713ba885afffffcb81feb619fe7cfcbcabe9377ab65ab7a110bd4f43a0", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "b6d926770e4508e30e3e9e476c57b6c8aeda44f7715663bdc38935620ce5be6f"},
"amqp_client": {:hex, :amqp_client, "3.8.14", "7569517aefb47e0d1c41bca2f4768dc8a2d88487daf7819fecca0d78943f293c", [:make, :rebar3], [{:rabbit_common, "3.8.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "e5ba3ac18abbe34a1d990a6bcac25633dc7061ab8f8d101c7dcff97f49f4c523"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "636f50d6d7dddd1ee6f205a677acf378d2bddeea", [branch: "release-1.1"]},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "0c6895707cdf23e44d20da67bc645e801db08119", [branch: "release-1.1"]},
"astarte_core": {:git, "https://github.com/annopaolo/astarte_core.git", "c07b6ef62c866e74ced4901213ae3c831cae628d", [branch: "policy_prefetch_count"]},
"astarte_data_access": {:git, "https://github.com/annopaolo/astarte_data_access.git", "8cbe55a83deb473832b9666da1bc9ee44f56854f", [branch: "policy_prefetch_count"]},
"bbmustache": {:hex, :bbmustache, "1.11.0", "a6dbfc5cee3e1d7d17aad5f5b8880b4508d974611da8d73e1f6c28bde65d4c47", [:rebar3], [], "hexpm", "7c9cdcf58dc043377ab792a8c7109d8902695fcae3b35c1078a8b38ddcf86e5f"},
"castore": {:hex, :castore, "0.1.18", "deb5b9ab02400561b6f5708f3e7660fc35ca2d51bfc6a940d2f513f89c2975fc", [:mix], [], "hexpm", "61bbaf6452b782ef80b33cdb45701afbcf0a918a45ebe7e73f1130d661e66a06"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
Expand Down

0 comments on commit 0e969b7

Please sign in to comment.