Skip to content

Commit

Permalink
TriggerEngine: allow custom prefetch_count on trigger policies
Browse files Browse the repository at this point in the history
Until now, a trigger policy used a consumer prefetch set by the
`TRIGGER_ENGINE_AMQP_CONSUMER_PREFETCH_COUNT` env (as usual in Astarte).
If the policy provides a custom consumer prefetch count (e.g. for message
ordering reasons), use that prefetch value. This can be done if the related
Realm Management flag is enabled.
The default policy will still respect the default prefetch count, so as not
to break compatibility with previous Astarte versions.

Signed-off-by: Arnaldo Cesco <[email protected]>
  • Loading branch information
Annopaolo committed Mar 6, 2023
1 parent 7654ac1 commit 239d799
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTracker do
alias Astarte.Core.Triggers.Policy.Handler
alias Astarte.Core.Triggers.Policy.ErrorKeyword
alias Astarte.Core.Triggers.PolicyProtobuf.Policy, as: PolicyProto
alias Astarte.TriggerEngine.Config

# 30 seconds
@update_timeout 30 * 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
queue_name = generate_queue_name(realm_name, policy.name)
routing_key = generate_routing_key(realm_name, policy.name)

with :ok <- @adapter.qos(channel, prefetch_count: Config.amqp_consumer_prefetch_count!()),
with :ok <- @adapter.qos(channel, prefetch_count: prefetch_count_or_default(policy)),
:ok <- @adapter.declare_exchange(channel, exchange_name, type: :direct, durable: true),
{:ok, _queue} <- @adapter.declare_queue(channel, queue_name, durable: true),
:ok <-
Expand Down Expand Up @@ -199,6 +199,13 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
end
end

# Protobuf3 encodes missing int field as 0
defp prefetch_count_or_default(%PolicyStruct{prefetch_count: 0}),
do: Config.amqp_consumer_prefetch_count!()

defp prefetch_count_or_default(%PolicyStruct{prefetch_count: prefetch_count}),
do: prefetch_count

defp generate_policy_x_args(%PolicyStruct{
maximum_capacity: maximum_capacity,
event_ttl: event_ttl
Expand Down
2 changes: 1 addition & 1 deletion apps/astarte_trigger_engine/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "2.1.1", "ad8dec713ba885afffffcb81feb619fe7cfcbcabe9377ab65ab7a110bd4f43a0", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "b6d926770e4508e30e3e9e476c57b6c8aeda44f7715663bdc38935620ce5be6f"},
"amqp_client": {:hex, :amqp_client, "3.8.14", "7569517aefb47e0d1c41bca2f4768dc8a2d88487daf7819fecca0d78943f293c", [:make, :rebar3], [{:rabbit_common, "3.8.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "e5ba3ac18abbe34a1d990a6bcac25633dc7061ab8f8d101c7dcff97f49f4c523"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "636f50d6d7dddd1ee6f205a677acf378d2bddeea", [branch: "release-1.1"]},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "aa2714ff9da0a89ca66d9c8c042f28e9aef4050d", [branch: "release-1.1"]},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "0c6895707cdf23e44d20da67bc645e801db08119", [branch: "release-1.1"]},
"bbmustache": {:hex, :bbmustache, "1.11.0", "a6dbfc5cee3e1d7d17aad5f5b8880b4508d974611da8d73e1f6c28bde65d4c47", [:rebar3], [], "hexpm", "7c9cdcf58dc043377ab792a8c7109d8902695fcae3b35c1078a8b38ddcf86e5f"},
"castore": {:hex, :castore, "0.1.18", "deb5b9ab02400561b6f5708f3e7660fc35ca2d51bfc6a940d2f513f89c2975fc", [:mix], [], "hexpm", "61bbaf6452b782ef80b33cdb45701afbcf0a918a45ebe7e73f1130d661e66a06"},
Expand Down

0 comments on commit 239d799

Please sign in to comment.