Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS SQS FIFO messages being processed out of order or in duplicate #78

Open
nuno-barreiro opened this issue Sep 21, 2023 · 7 comments
Open

Comments

@nuno-barreiro
Copy link

I've been using broadway_sqs to consume AWS SQS FIFO queues and I noticed some unexpected behaviours when processing the messages since sometimes those were processed out of order or more than one time.

Initially I didn't had the Broadway partition_by configured and once I did that, things seemed to improve but I can still see some double processing and out of order processing occurring. For example, looking at the below logs – organized by process identifier to help readability – we can see that:

  • PID 383 consumed the second message from group C, without waiting for the first message of the same group to be acknowledge and thus removed from queue. Visibility timeout is of 10 seconds and even that wasn't guaranteed.
  • PID 419 processed the same fifth message two times.
16:39:34.610 [info] [#PID<0.382.0>] Handling message: "B" / "B1"
16:39:37.270 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B1"
16:39:37.300 [info] [#PID<0.382.0>] Handling message: "B" / "B2"
16:39:39.633 [info] [#PID<0.382.0>] Message acknowledge: "B" / "B2"

16:39:34.610 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Message processing failed : "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] timeout: "C" / "C1"
16:39:39.830 [info] [#PID<0.427.0>] Handling message: "C" / "C2"
16:39:42.194 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C2"
16:39:45.814 [info] [#PID<0.427.0>] Handling message: "C" / "C1"
16:39:47.528 [info] [#PID<0.427.0>] Message acknowledge: "C" / "C1"

16:39:34.610 [info] [#PID<0.419.0>] Handling message: "A" / "A1"
16:39:37.504 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A1"
16:39:37.521 [info] [#PID<0.419.0>] Handling message: "A" / "A2"
16:39:40.242 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A2"
16:39:40.263 [info] [#PID<0.419.0>] Handling message: "A" / "A3"
16:39:42.404 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A3"
16:39:42.422 [info] [#PID<0.419.0>] Handling message: "A" / "A4"
16:39:44.933 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A4"
16:39:44.954 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:47.037 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"
16:39:47.056 [info] [#PID<0.419.0>] Handling message: "A" / "A5"
16:39:50.001 [info] [#PID<0.419.0>] Message acknowledge: "A" / "A5"

Before setting up the partition_by the behaviour was even more awkward with different consumers handling messages from the same message_group_id:

16:25:33.076 [info] [#PID<0.423.0>] Handling message: "A" / "A1"
16:25:36.304 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A1"
16:25:36.326 [info] [#PID<0.423.0>] Handling message: "A" / "A2"
16:25:38.859 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A2"
16:25:38.869 [info] [#PID<0.423.0>] Handling message: "A" / "A3"
16:25:41.709 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A3"
16:25:41.728 [info] [#PID<0.423.0>] Handling message: "A" / "A4"
16:25:44.206 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A4"
16:25:44.230 [info] [#PID<0.423.0>] Handling message: "A" / "A5"
16:25:46.592 [info] [#PID<0.423.0>] Message acknowledge: "A" / "A5"
16:25:46.613 [info] [#PID<0.423.0>] Handling message: "C" / "C1"
16:25:48.219 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C1"
16:25:48.233 [info] [#PID<0.423.0>] Handling message: "C" / "C2"
16:25:50.134 [info] [#PID<0.423.0>] Message acknowledge: "C" / "C2"
16:25:50.152 [info] [#PID<0.423.0>] Handling message: "B" / "B1"
16:25:52.073 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B1"
16:25:52.087 [info] [#PID<0.423.0>] Handling message: "B" / "B2"
16:25:53.323 [info] [#PID<0.423.0>] Message acknowledge: "B" / "B2"

16:25:44.286 [info] [#PID<0.424.0>] Handling message: "C" / "C1"
16:25:46.866 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C1"
16:25:46.877 [info] [#PID<0.424.0>] Handling message: "C" / "C2"
16:25:48.653 [info] [#PID<0.424.0>] Message acknowledge: "C" / "C2"
16:25:48.661 [info] [#PID<0.424.0>] Handling message: "A" / "A5"
16:25:50.938 [info] [#PID<0.424.0>] Message acknowledge: "A" / "A5"
16:25:50.958 [info] [#PID<0.424.0>] Handling message: "B" / "B1"
16:25:52.517 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B1"
16:25:52.536 [info] [#PID<0.424.0>] Handling message: "B" / "B2"
16:25:54.045 [info] [#PID<0.424.0>] Message acknowledge: "B" / "B2"

My understanding is that AWS SQS FIFO queues, using the message_group_id, should guarantee message order within the same message group identifier and that once a message has been received, during its visibility timeout, no other consumer can receive the same message.

I'll leave here the code for my SQS producer:

defmodule ElixirBroadwayPlayground.SQSProducer do
  use Broadway

  require Logger

  alias Broadway.Message

  def start_link(config) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: Keyword.get(config, :queue_url),
          receive_interval: 1000,
          on_success: :ack,
          on_failure: :noop,
          visibility_timeout: 10,
          attribute_names: [:message_group_id, :approximate_first_receive_timestamp]
        }
      ],
      processors: [
        default: [concurrency: Keyword.get(config, :num_workers, 1)]
      ],
      partition_by: &partition_by/1
    )
  end

  @impl true
  def handle_message(
        _,
        %Message{
          data: data,
          metadata: %{attributes: %{"message_group_id" => message_group_id}}
        } = message,
        _
      ) do
    log_event("Handling message", data, message_group_id)

    HTTPoison.get("https://swapi.dev/api/people/1", [{"Content-Type", "application/json"}],
      ssl: [verify: :verify_none]
    )
    |> handle_response(message)
  end

  defp handle_response(
         {:ok, _},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message acknowledge", data, message_group_id)
    Message.ack_immediately(message)
  end

  defp handle_response(
         {:error, %HTTPoison.Error{reason: reason}},
         %Message{
           data: data,
           metadata: %{attributes: %{"message_group_id" => message_group_id}}
         } = message
       ) do
    log_event("Message processing failed ", data, message_group_id)
    log_event(reason, data, message_group_id)
    Message.failed(message, reason)
  end

  defp log_event(text, data, msg_group_id) do
    message_id = Jason.decode!(data)["id"]
    Logger.info("[#{inspect(self())}] #{text}: #{inspect(msg_group_id)} / #{inspect(message_id)}")
  end

  defp partition_by(%Message{
         metadata: %{attributes: %{"message_group_id" => message_group_id}}
       }) do
    :erlang.phash2(message_group_id)
  end
end

I'm I misinterpreting the behaviour that should be expected? Anyone has experienced the same behaviour?

@nuno-barreiro
Copy link
Author

Another test, now with the max_number_of_messages set to 1 seems to reveal that all the double processing issues disappear, but I'll continue with more testing.

17:13:24.052 [info] [#PID<0.413.0>] Handling message: "A" / "A1"
17:13:24.057 [info] [#PID<0.421.0>] Handling message: "C" / "C1"
17:13:24.063 [info] [#PID<0.376.0>] Handling message: "B" / "B1"
17:13:27.241 [info] [#PID<0.421.0>] Message acknowledge: "C" / "C1"
17:13:27.314 [info] [#PID<0.376.0>] Message acknowledge: "B" / "B1"
17:13:27.694 [info] [#PID<0.413.0>] Message acknowledge: "A" / "A1"
17:13:28.140 [info] [#PID<0.421.0>] Handling message: "C" / "C2"
17:13:28.151 [info] [#PID<0.376.0>] Handling message: "B" / "B2"
17:13:28.160 [info] [#PID<0.413.0>] Handling message: "A" / "A2"
17:13:30.683 [info] [#PID<0.376.0>] Message acknowledge: "B" / "B2"
17:13:33.144 [info] [#PID<0.421.0>] Message processing failed : "C" / "C2"
17:13:33.144 [info] [#PID<0.421.0>] timeout: "C" / "C2"
17:13:33.163 [info] [#PID<0.413.0>] Message processing failed : "A" / "A2"
17:13:33.163 [info] [#PID<0.413.0>] timeout: "A" / "A2"
17:13:38.376 [info] [#PID<0.421.0>] Handling message: "C" / "C2"
17:13:38.392 [info] [#PID<0.413.0>] Handling message: "A" / "A2"
17:13:41.484 [info] [#PID<0.421.0>] Message acknowledge: "C" / "C2"
17:13:41.582 [info] [#PID<0.413.0>] Message acknowledge: "A" / "A2"
17:13:42.495 [info] [#PID<0.413.0>] Handling message: "A" / "A3"
17:13:45.339 [info] [#PID<0.413.0>] Message acknowledge: "A" / "A3"
17:13:45.578 [info] [#PID<0.413.0>] Handling message: "A" / "A4"
17:13:50.583 [info] [#PID<0.413.0>] Message processing failed : "A" / "A4"
17:13:50.583 [info] [#PID<0.413.0>] timeout: "A" / "A4"
17:13:56.822 [info] [#PID<0.413.0>] Handling message: "A" / "A4"
17:13:59.910 [info] [#PID<0.413.0>] Message acknowledge: "A" / "A4"
17:14:00.925 [info] [#PID<0.413.0>] Handling message: "A" / "A5"
17:14:05.929 [info] [#PID<0.413.0>] Message processing failed : "A" / "A5"
17:14:05.929 [info] [#PID<0.413.0>] timeout: "A" / "A5"
17:14:12.179 [info] [#PID<0.413.0>] Handling message: "A" / "A5"
17:14:17.347 [info] [#PID<0.413.0>] Message processing failed : "A" / "A5"
17:14:17.347 [info] [#PID<0.413.0>] timeout: "A" / "A5"
17:14:22.456 [info] [#PID<0.413.0>] Handling message: "A" / "A5"
17:14:27.205 [info] [#PID<0.413.0>] Message acknowledge: "A" / "A5"

@josevalim
Copy link
Member

josevalim commented Sep 21, 2023

Everything seems to be expected given the current implementation:

  1. The out of ordering processing seems to happen only when a message fails. This library does not block processing of the next item if the previous one fails, since this is not generally a requirement for SQS (although it may be a useful feature for SQS FIFO - the Kafka bindings implement it)

  2. If you have multiple workers, then we don't route messages to any given worker/partition, which may allow parallel processing of a given group, unless you configure partition_by

@josevalim
Copy link
Member

Btw, awesome job on the bug report. Everything was clear!

@nuno-barreiro
Copy link
Author

nuno-barreiro commented Sep 21, 2023

@josevalim thanks for the quick feedback. Glad you appreciated the somewhat extensive report 😄

although it may be a useful feature for SQS FIFO

Indeed I believe it would be. In my specific scenario it does makes sense to guarantee processing order and, if a message fails for some reason, retry it after the visibility timeout blocking the processing of other messages with the same group id. My latest checks tells me that I can achieve that by retrieving only a single message each time but that is definitely slower than working with a batch in-memory and increases a lot the number of API requests.

unless you configure partition_by

As for this point, indeed it seems that works fine with the partition_by set to be the same as the message_group_id that messages have been assigned in SQS.

@josevalim
Copy link
Member

Perfect. I am afraid however that we don’t have anyone with this particular need from our side, so I don’t see us implementing this any time soon.

Also, keep in mind this choice can have a cascading effect on the system. SQS seems to be happy to still send messages for a message group even if previous ones were not yet acked. This means that one failure will make it so future messages continue to arrive, and all of them will now have to be failed and wait for the visibility timeout, potentially for several minutes. If SQS provides no control over this (such as a number of messages in flow for a given partition), perhaps going down this route is not recommended after all.

@nuno-barreiro
Copy link
Author

SQS seems to be happy to still send messages for a message group even if previous ones were not yet acked

My understanding of SQS FIFO is a bit different since I thought that multiple consumers can receive messages but not with the same group id during the visibility timeout period. If more than one message is received for the same group in a consumer, that process should ensure processing order in-memory too. Once all the messages are acknowledge, another consumer can grab messages for that same group if they are enqueued in the meantime.

Thus I do think SQS does have control over the message flow for a given partition (message group id) or at least those were my conclusions from the docs and PoCs I did when I choose to go with SQS FIFO for my use case, but I admit that I might have misunderstood or made some mistake while testing.

I am afraid however that we don’t have anyone with this particular need from our side, so I don’t see us implementing this any time soon.

Totally get that. I will see what I can do for my specific use case. Who knows? If it makes sense I might find some time to propose you some changes that might help others with similar use cases 😉

@rogerweb
Copy link
Contributor

Just sharing an AWS article that helped me understanding this topic in the past, just in case:

https://aws.amazon.com/blogs/compute/solving-complex-ordering-challenges-with-amazon-sqs-fifo-queues/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants