diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 4a283ba6e0..00cb7c32b0 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -530,6 +530,43 @@ module LavinMQ @exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false) end + private def load_definitions_from_etcd + @etcd.get_prefix(join_path("lavinmq", @name, "queues")).each do |key, value| + queue_name = "" + key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @queues[queue_name] = QueueFactory.make(self, json) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "exchanges")).each do |key, value| + exchange_name = "" + key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @exchanges[exchange_name] = + make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "queue-bindings")).each do |key, value| + _, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + x = @exchanges[exchange_name] + q = @queues[queue_name] + x.bind(q, routing_key, json["arguments"].to_h) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "exchange-bindings")).each do |key, value| + _, _, _, destination, source, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + src = @exchanges[source] + dst = @queues[destination] + src.bind(dst, routing_key, json["arguments"].to_h) + end + end + + private def split_etcd_path(path) : Array(String) + path.split('/').map! { |p| URI.decode_www_form p } + end + private def compact! @definitions_lock.synchronize do @log.info { "Compacting definitions" } @@ -595,21 +632,21 @@ module LavinMQ }.to_json) when AMQP::Frame::Exchange::Delete @etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name)) + @etcd.del_prefix(join_path("lavinmq", @name, "exchange-bindings", f.exchange_name)) when AMQP::Frame::Exchange::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash), - { - arguments: f.arguments, - }.to_json) + @etcd.put(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), + {arguments: f.arguments}.to_json) when AMQP::Frame::Exchange::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash)) + @etcd.del(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) when AMQP::Frame::Queue::Declare @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Delete @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name)) + @etcd.del_prefix(join_path("lavinmq", @name, "queue-bindings", f.queue_name)) when AMQP::Frame::Queue::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash),