Skip to content

Commit

Permalink
MQTT-support
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Jan 28, 2025
1 parent 09a75d3 commit bd762b2
Show file tree
Hide file tree
Showing 63 changed files with 3,769 additions and 106 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Support for message deduplication on exchanges and queues [854](https://github.com/cloudamqp/lavinmq/pull/854)
- Added filtering for streams [#893](https://github.com/cloudamqp/lavinmq/pull/893)

## [2.1.0] - 2025-01-16

### Changed
Expand Down
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ shards:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

mqtt-protocol:
git: https://github.com/84codes/mqtt-protocol.cr.git
version: 0.2.0+git.commit.3f82ee85d029e6d0505cbe261b108e156df4e598

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr

development_dependencies:
ameba:
Expand Down
44 changes: 44 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ require "../src/lavinmq/launcher"
require "../src/lavinmq/clustering/client"
require "../src/lavinmq/clustering/controller"

alias IndexTree = LavinMQ::MQTT::TopicTree(String)

describe LavinMQ::Clustering::Client do
follower_data_dir = "/tmp/lavinmq-follower"

Expand Down Expand Up @@ -78,6 +80,48 @@ describe LavinMQ::Clustering::Client do
replicator.try &.close
end

it "replicates and streams retained messages to followers" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)

spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
done.send nil
end
wait_for { replicator.followers.size == 1 }

retain_store = LavinMQ::MQTT::RetainStore.new("#{LavinMQ::Config.instance.data_dir}/retain_store", replicator)
wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }

props = LavinMQ::AMQP::Properties.new
msg1 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body1"))
msg2 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body2"))
retain_store.retain("topic1", msg1.body_io, msg1.bodysize)
retain_store.retain("topic2", msg2.body_io, msg2.bodysize)

wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }
repli.close
done.receive

follower_retain_store = LavinMQ::MQTT::RetainStore.new("#{follower_data_dir}/retain_store", LavinMQ::Clustering::NoopServer.new)
a = Array(String).new(2)
b = Array(String).new(2)
follower_retain_store.each("#") do |topic, bytes|
a << topic
b << String.new(bytes)
end

a.sort!.should eq(["topic1", "topic2"])
b.sort!.should eq(["body1", "body2"])
follower_retain_store.retained_messages.should eq(2)
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
Expand Down
39 changes: 39 additions & 0 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,42 @@ describe LavinMQ::AMQP::Exchange do
end
end
end

describe LavinMQ::MQTT::Exchange do
it "should only allow Session to bind" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::AMQP::Queue.new(vhost, "q1")
s1 = LavinMQ::MQTT::Session.new(vhost, "q1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
expect_raises(LavinMQ::Exchange::AccessRefused) do
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
end
end
end

it "publish messages to queues with it's own publish method" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
s1 = LavinMQ::MQTT::Session.new(vhost, "session 1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "mqtt.default", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
pub_args = {
packet_id: 1u16,
payload: Bytes.new(0),
dup: false,
qos: 0u8,
retain: false,
topic: "s1",
}
msg = MQTT::Protocol::Publish.new(**pub_args)
x.publish(msg)
s1.message_count.should eq 1
end
end
end
Loading

0 comments on commit bd762b2

Please sign in to comment.