Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: allow custom prefetch_count on trigger policies #749

Open
wants to merge 4 commits into
base: release-1.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
trigger policies.
- [astarte_trigger_engine] Add a customizable HTTP trigger redelivery mechanism via
trigger policies (see [#554](https://github.com/astarte-platform/astarte/issues/554)).
- [astarte_realm_management] Allow to change the prefetch count of a trigger delivery policy.
This feature is experimental (disabled by default) and must be explicitly enabled using the
`REALM_MANAGEMENT_ALLOW_TRIGGER_POLICY_PREFETCH_COUNT` feature gate.

### Fixed
- [astarte_appengine_api] Return empty data instead of error when querying `properties` interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ defmodule Astarte.RealmManagement.Config do
type: :integer,
default: 4000

@envdoc "Specify whether to allow setting a custom consumer prefetch count for trigger policy queues (experimental feature)."
app_env :allow_trigger_policy_prefetch_count,
:astarte_realm_management,
:allow_trigger_policy_prefetch_count,
os_env: "REALM_MANAGEMENT_ALLOW_TRIGGER_POLICY_PREFETCH_COUNT",
type: :boolean,
default: false

def cassandra_node!, do: Enum.random(cqex_nodes!())

@doc """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ defmodule Astarte.RealmManagement.Engine do
{:ok, json_obj} <- decode_policy(policy_json),
policy_changeset = Policy.changeset(%Policy{}, json_obj),
{:ok, %Policy{name: policy_name} = policy} <- validate_trigger_policy(policy_changeset),
:ok <- verify_policy_prefetch_count_allowed(policy),
:ok <- verify_trigger_policy_not_exists(client, policy_name) do
_ =
Logger.info("Installing trigger policy",
Expand Down Expand Up @@ -932,6 +933,20 @@ defmodule Astarte.RealmManagement.Engine do
end
end

defp verify_policy_prefetch_count_allowed(policy) do
%Policy{name: name, prefetch_count: prefetch_count} = policy

if Config.allow_trigger_policy_prefetch_count!() or prefetch_count == nil do
:ok
else
Logger.warn("Trigger policy prefetch_count not allowed, but set in #{name}",
tag: "trigger_policy_prefetch_count_not_allowed"
)

{:error, :trigger_policy_prefetch_count_not_allowed}
end
end

defp verify_trigger_policy_not_exists(client, policy_name) do
with {:ok, exists?} <- Queries.check_trigger_policy_already_present(client, policy_name) do
if not exists? do
Expand Down
2 changes: 1 addition & 1 deletion apps/astarte_realm_management/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "2.1.2", "eab047abb54f7e30022b81b9534b797e51c6e7756f1b112ec6dcee3c3ac20eac", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "535901c611a979221d045839e9e7a661bf33d04590b796c8fa30f487511fde04"},
"amqp_client": {:hex, :amqp_client, "3.8.35", "e81dbec62057155b5aff857ac9ee85a63af2baf6e0fd4e9d02a3aff46a3de836", [:make, :rebar3], [{:rabbit_common, "3.8.35", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "ca8066e8d12530e31a9879789bc44bb2a4877dcd2d4b65e56b3d301b5e727688"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "636f50d6d7dddd1ee6f205a677acf378d2bddeea", [branch: "release-1.1"]},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "aa2714ff9da0a89ca66d9c8c042f28e9aef4050d", [branch: "release-1.1"]},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "0c6895707cdf23e44d20da67bc645e801db08119", [branch: "release-1.1"]},
"astarte_rpc": {:git, "https://github.com/astarte-platform/astarte_rpc.git", "25130b1df6cf4a04a14caef9723e34de704b42ac", [branch: "release-1.1"]},
"castore": {:hex, :castore, "0.1.18", "deb5b9ab02400561b6f5708f3e7660fc35ca2d51bfc6a940d2f513f89c2975fc", [:mix], [], "hexpm", "61bbaf6452b782ef80b33cdb45701afbcf0a918a45ebe7e73f1130d661e66a06"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Astarte.RealmManagement.EngineTest do
alias Astarte.RealmManagement.Engine
alias Astarte.Core.Triggers.SimpleTriggerConfig
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.TaggedSimpleTrigger
alias Astarte.RealmManagement.Config

@test_interface_a_0 """
{
Expand Down Expand Up @@ -615,6 +616,22 @@ defmodule Astarte.RealmManagement.EngineTest do
}
"""

@test_trigger_policy_3 """
{
"name": "yetanothername",
"error_handlers": [
{
"on" : "any_error",
"strategy": "retry"
}
],
"maximum_capacity": 300,
"retry_times": 10,
"event_ttl": 10,
"prefetch_count": 1
}
"""

@test_realm_name "autotestrealm"

setup do
Expand Down Expand Up @@ -1392,6 +1409,20 @@ defmodule Astarte.RealmManagement.EngineTest do
assert sorted_policies == ["aname", "anothername"]
end

test "install trigger policy with set prefetch_count succeeds only when the feature is enabled" do
assert Config.allow_trigger_policy_prefetch_count!() == false

assert Engine.install_trigger_policy("autotestrealm", @test_trigger_policy_3) ==
{:error, :trigger_policy_prefetch_count_not_allowed}

Config.put_allow_trigger_policy_prefetch_count(true)
assert Config.allow_trigger_policy_prefetch_count!() == true
assert Engine.install_trigger_policy("autotestrealm", @test_trigger_policy_3) == :ok

# Let's go back to normal
Config.put_allow_trigger_policy_prefetch_count(false)
end

test "trigger and policy installation coherence" do
assert Engine.install_trigger_policy("autotestrealm", @test_trigger_policy_1) == :ok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ defmodule Astarte.RealmManagement.APIWeb.FallbackController do
|> render(:trigger_policy_already_present)
end

def call(conn, {:error, :trigger_policy_prefetch_count_not_allowed}) do
conn
|> put_status(:conflict)
|> put_view(Astarte.RealmManagement.APIWeb.ErrorView)
|> render(:trigger_policy_prefetch_count_not_allowed)
end

def call(conn, {:error, :cannot_delete_currently_used_trigger_policy}) do
conn
|> put_status(:conflict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ defmodule Astarte.RealmManagement.APIWeb.ErrorView do
%{errors: %{detail: "Cannot delete policy as it is being currently used by triggers"}}
end

def render("trigger_policy_prefetch_count_not_allowed.json", _assigns) do
%{errors: %{detail: "Not allowed to specify prefetch_count in policy"}}
end

def render("overlapping_mappings.json", _assigns) do
%{errors: %{detail: "Overlapping endpoints in interface mappings"}}
end
Expand Down
2 changes: 1 addition & 1 deletion apps/astarte_realm_management_api/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "2.1.1", "ad8dec713ba885afffffcb81feb619fe7cfcbcabe9377ab65ab7a110bd4f43a0", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "b6d926770e4508e30e3e9e476c57b6c8aeda44f7715663bdc38935620ce5be6f"},
"amqp_client": {:hex, :amqp_client, "3.8.14", "7569517aefb47e0d1c41bca2f4768dc8a2d88487daf7819fecca0d78943f293c", [:make, :rebar3], [{:rabbit_common, "3.8.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "e5ba3ac18abbe34a1d990a6bcac25633dc7061ab8f8d101c7dcff97f49f4c523"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "636f50d6d7dddd1ee6f205a677acf378d2bddeea", [branch: "release-1.1"]},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "aa2714ff9da0a89ca66d9c8c042f28e9aef4050d", [branch: "release-1.1"]},
"astarte_rpc": {:git, "https://github.com/astarte-platform/astarte_rpc.git", "25130b1df6cf4a04a14caef9723e34de704b42ac", [branch: "release-1.1"]},
"castore": {:hex, :castore, "0.1.16", "2675f717adc700475345c5512c381ef9273eb5df26bdd3f8c13e2636cf4cc175", [:mix], [], "hexpm", "28ed2c43d83b5c25d35c51bc0abf229ac51359c170cba76171a462ced2e4b651"},
"certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTracker do
alias Astarte.Core.Triggers.Policy.Handler
alias Astarte.Core.Triggers.Policy.ErrorKeyword
alias Astarte.Core.Triggers.PolicyProtobuf.Policy, as: PolicyProto
alias Astarte.TriggerEngine.Config

# 30 seconds
@update_timeout 30 * 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
queue_name = generate_queue_name(realm_name, policy.name)
routing_key = generate_routing_key(realm_name, policy.name)

with :ok <- @adapter.qos(channel, prefetch_count: Config.amqp_consumer_prefetch_count!()),
with :ok <- @adapter.qos(channel, prefetch_count: prefetch_count_or_default(policy)),
:ok <- @adapter.declare_exchange(channel, exchange_name, type: :direct, durable: true),
{:ok, _queue} <- @adapter.declare_queue(channel, queue_name, durable: true),
:ok <-
Expand Down Expand Up @@ -199,6 +199,13 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
end
end

# Protobuf3 encodes missing int field as 0
defp prefetch_count_or_default(%PolicyStruct{prefetch_count: 0}),
do: Config.amqp_consumer_prefetch_count!()

defp prefetch_count_or_default(%PolicyStruct{prefetch_count: prefetch_count}),
do: prefetch_count

defp generate_policy_x_args(%PolicyStruct{
maximum_capacity: maximum_capacity,
event_ttl: event_ttl
Expand Down
2 changes: 1 addition & 1 deletion apps/astarte_trigger_engine/mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"amqp": {:hex, :amqp, "2.1.1", "ad8dec713ba885afffffcb81feb619fe7cfcbcabe9377ab65ab7a110bd4f43a0", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "b6d926770e4508e30e3e9e476c57b6c8aeda44f7715663bdc38935620ce5be6f"},
"amqp_client": {:hex, :amqp_client, "3.8.14", "7569517aefb47e0d1c41bca2f4768dc8a2d88487daf7819fecca0d78943f293c", [:make, :rebar3], [{:rabbit_common, "3.8.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "e5ba3ac18abbe34a1d990a6bcac25633dc7061ab8f8d101c7dcff97f49f4c523"},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "636f50d6d7dddd1ee6f205a677acf378d2bddeea", [branch: "release-1.1"]},
"astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "aa2714ff9da0a89ca66d9c8c042f28e9aef4050d", [branch: "release-1.1"]},
"astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "0c6895707cdf23e44d20da67bc645e801db08119", [branch: "release-1.1"]},
"bbmustache": {:hex, :bbmustache, "1.11.0", "a6dbfc5cee3e1d7d17aad5f5b8880b4508d974611da8d73e1f6c28bde65d4c47", [:rebar3], [], "hexpm", "7c9cdcf58dc043377ab792a8c7109d8902695fcae3b35c1078a8b38ddcf86e5f"},
"castore": {:hex, :castore, "0.1.18", "deb5b9ab02400561b6f5708f3e7660fc35ca2d51bfc6a940d2f513f89c2975fc", [:mix], [], "hexpm", "61bbaf6452b782ef80b33cdb45701afbcf0a918a45ebe7e73f1130d661e66a06"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPConsumerTrackerTest do
# make sure we update the consumer list without waiting for the update timeout
AMQPConsumerTracker.handle_info(:update_consumers, [])

# Take some more time, as Registry unsubscription is slower than other operations.
# See https://hexdocs.pm/elixir/1.13.4/Registry.html#module-registrations
Process.sleep(1000)

assert not Enum.member?(
Registry.select(Registry.AMQPConsumerRegistry, [{{:"$1", :_, :_}, [], [:"$1"]}]),
{@test_realm, policy_name}
Expand Down
16 changes: 9 additions & 7 deletions doc/pages/architecture/062-trigger_delivery_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ A Trigger Delivery Policy is composed of:
milliseconds an event is retained in the event queue. When an event expires, it is discarded from the event queue, even if it has not been
delivered. This is optional.


## Known issues

At the moment, Trigger Delivery Policies in general do not provide a guarantee of in-order delivery of events.
Note that, since previous Astarte versions (i.e. < 1.1) did not provide a retry mechanism for events, this change does
not impact the expected behaviour if Trigger Delivery Policies are not used.
If the `retry` strategy is specified, in-order delivery cannot be guaranteed because a > 1 consumer prefetch count is being used. This allows for higher throughput at the cost of consistency.
An experimental feature allows to set the maximum number of messages that can be dequeued concurrently from the event queue
using AMQP (RabbitMQ) [consumer prefetch count](https://www.rabbitmq.com/consumer-prefetch.html). When prefetch count is set to 1, events are processed in order. Higher values allow for higher throughput by relaxing the ordering guarantee.
This feature is disabled by default.

Moreover, Trigger Delivery Policies do not provide a guarantee of in-order delivery of events if the Astarte Trigger Engine component
is replicated (event when the policy prefetch count is set to 1), as data from event queues are delivered to consumers in a round-robin fashion.

- If the Astarte Trigger Engine service is replicated, events could be delivered out of order, as data from event queues are delivered to consumers in a round-robin fashion.
- If the `retry` strategy is specified, in-order delivery cannot be guaranteed because a > 1 [consumer prefetch count](https://www.rabbitmq.com/consumer-prefetch.html) is being used.
This allows for higher throughput at the cost of consistency. In the future, the user will be allowed to choose between having an higher number of
events handled, but out of order, or ordered event handling at a lower rate.
Note that, since previous Astarte versions (i.e. < 1.1) did not provide a retry mechanism for events, both issues do
not impact the expected behaviour if Trigger Delivery Policies are not used.