An opinionated framework to run and orchestrate RabbitMQ consumers.
The package can be installed by adding errol
to your list of dependencies in mix.exs
:
def deps do
[{:errol, "~> 0.2.0"}]
end
You should also update your application list to include :errol
:
def application do
[applications: [:errol]]
end
Documentation can be found at https://hexdocs.pm/errol.
To bind consumers to queue, you can use the Errol.Wiring
module:
defmodule Sample.Wiring do
use Errol.Wiring
connection("amqp://guest:guest@localhost")
@exchange "/users"
@exchange_type :topic
# You can pass a reference to a function with arity of 1
consume("account_created", "users.account.created", &UsersConsumer.account_created/1)
# or even an anonymous function
consume("account_updated", "users.account.updated", fn message -> ... end)
end
For more complex setups, you can add middleware and group different consumers for more granularity.
defmodule Sample.Wiring do
use Errol.Wiring
connection("amqp://guest:guest@localhost")
@exchange "/users"
@exchange_type :topic
# Use pipe_before/1, pipe_after/1 or pipe_error/1 to run middleware functions
# middlewares declared outside of a group will run for every consumer
pipe_after(&Sample.StatisticsMiddleware.track/2)
# Use the `group` macro to group consumers with specific middleware
group :account do
# This middlewares will run only for consumers in the :account group
pipe_before(&Errol.Middleware.Json.parse/2)
pipe_error(&Errol.Middleware.Retry.basic_retry/2)
consume("account_created", "users.account.created", &UsersConsumer.account_created/1)
consume("account_updated", "users.account.updated", fn message -> ... end)
end
group :photos do
pipe_before(&Sample.ImagesMiddleware.uncompress/2)
consume("profile_photo_uploaded", "users.profile.photo.uploaded", fn message -> ... end)
end
end
At this point, the only thing left is to run Sample.Wiring
as a supervisor in your application.ex
file.
This is important because if RabbitMQ goes down, all wiring supervisors will be killed, so if they are not in the supervision tree they will not be restarted.
defmodule Sample.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Sample.Wiring, []),
...
]
opts = [strategy: :one_for_one, name: Sample.Supervisor]
Supervisor.start_link(children, opts)
end
end
Voilà! This will spin up the following supervision tree:
--------------------
| Sample.Application |
--------------------
|
|
---------------
| Sample.Wiring |
---------------
|
_____________________________|_____________________________
| | |
| | |
--------------------------- --------------------------- -------------------------
| :account_created_consumer | | :account_updated_consumer | | :profile_photo_uploaded |
--------------------------- --------------------------- -------------------------
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
. . . . . . . . . . . . .
New monitored process per each message received
Compatibility with other AMQP implementations exists but is not guaranteed (at least for now 😁).
Clone the repository
git clone [email protected]:uesteibar/errol.git
Install dependencies
cd errol
mix deps.get
To run the tests (you will need docker installed)
./scripts/test_prepare.sh
mix test
- Allow to retry messages from
pipe_error
middleware. This would enable users to handle retries and requeuing to dead letter exchange. - Allow to reject messages from
pipe_before
middleware. - Handle RabbitMQ outages, following the great explanation in the amqp hex documentation.
- Allow to specify number of workers per consumer. Poolboy would come handy here.
- Publish messages.
Pull requests are always welcome =)
The project uses standard-changelog to update the Changelog with each commit message and upgrade the package version.
For that reason every contribution must have a title and body that follows the conventional commits standard conventions (e.g. feat(consumer): Consume it all
).
To make this process easier, you can do the following:
Install commitizen
and cz-conventional-changelog
globally
npm i -g commitizen cz-conventional-changelog
Save cz-conventional-changelog
as default
echo '{ "path": "cz-conventional-changelog" }' > ~/.czrc
Instead of git commit
, you can now run
git cz
and follow the instructions to generate the commit message.
🎉 Special thanks to @pma for the amazing work on the amqp hex.