diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index 54b0dd07ef..9fa01180fc 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -42,12 +42,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) + def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) key = hash_key(routing_key, headers) if d = @hasher.get(key) - {d}.each - else - Iterator(Destination).empty + yield d end end diff --git a/src/lavinmq/amqp/exchange/default.cr b/src/lavinmq/amqp/exchange/default.cr index 00012dc706..e0a8b5d1c5 100644 --- a/src/lavinmq/amqp/exchange/default.cr +++ b/src/lavinmq/amqp/exchange/default.cr @@ -11,11 +11,9 @@ module LavinMQ Iterator(BindingDetails).empty end - protected def bindings(routing_key, headers) : Iterator(Destination) + protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) if q = @vhost.queues[routing_key]? - Tuple(Destination).new(q).each - else - Iterator(Destination).empty + yield q end end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index 86c93a12d4..d80cd27f19 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -41,8 +41,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings[routing_key].each + protected def each_destination(routing_key : String, _headers : AMQP::Table?, & : Destination ->) + @bindings[routing_key].each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index 6cbe59f388..0b7ef25edc 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -156,6 +156,7 @@ module LavinMQ abstract def bind(destination : Destination, routing_key : String, headers : AMQP::Table?) abstract def unbind(destination : Destination, routing_key : String, headers : AMQP::Table?) abstract def bindings_details : Iterator(BindingDetails) + abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) def publish(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, @@ -204,7 +205,7 @@ module LavinMQ queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil return unless exchanges.add? self - bindings(routing_key, headers).each do |d| + each_destination(routing_key, headers) do |d| case d in LavinMQ::Queue queues.add(d) diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index 5ef9e9573f..57fd7d7288 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -33,8 +33,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings.each + protected def each_destination(_routing_key : String, _headers : AMQP::Table?, & : Destination ->) + @bindings.each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index 22bbf22ee4..0ab825cbdb 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -51,10 +51,6 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - matches(headers).each - end - private def validate!(headers) : Nil if h = headers if match = h["x-match"]? @@ -65,19 +61,26 @@ module LavinMQ end end - private def matches(headers) : Iterator(Destination) - @bindings.each.select do |args, _| + protected def each_destination(_routing_key : String, headers : AMQP::Table?, & : Destination ->) + @bindings.each do |args, destinations| if headers.nil? || headers.empty? - args.empty? + next unless args.empty? + destinations.each do |destination| + yield destination + end else - case args["x-match"]? - when "any" - args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } - else - args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + is_match = case args["x-match"]? + when "any" + args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } + else + args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + end + next unless is_match + destinations.each do |destination| + yield destination end end - end.flat_map { |_, v| v.each } + end end end end diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 3c76828c43..33f2a750fa 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -42,28 +42,26 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - select_matches(routing_key).each - end - # ameba:disable Metrics/CyclomaticComplexity - private def select_matches(routing_key) : Iterator(Destination) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : Destination ->) binding_keys = @bindings - return Iterator(Destination).empty if binding_keys.empty? + return if binding_keys.empty? # optimize the case where the only binding key is '#' if binding_keys.size == 1 bk, qs = binding_keys.first if bk.size == 1 if bk.first == "#" - return qs.each + qs.each do |q| + yield q + end end end end rk_parts = routing_key.split(".") - binding_keys.each.select do |bks, _| + binding_keys.each do |bks, destinations| ok = false prev_hash = false size = bks.size # binding keys can max be 256 chars long anyway @@ -120,8 +118,12 @@ module LavinMQ break unless ok i += 1 end - ok - end.flat_map { |_, v| v.each } + if ok + destinations.each do |destination| + yield destination + end + end + end end end end