Skip to content

Commit

Permalink
Add ServerLocalSubscribe class
Browse files Browse the repository at this point in the history
  • Loading branch information
mlesniew committed May 23, 2024
1 parent 460c258 commit 7239687
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 22 deletions.
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,23 @@ Notes:
* The topic and the payload are both buffers allocated on the stack. They will become invalid after the callback returns. If you need to store the payload for later, make sure to copy it to a separate buffer.
* By default, the maximum topic and payload sizes are is 128 and 1024 bytes respectively. This can be tuned by using `#define` directives to override values from [config.h](src/PicoMQTT/config.h). Consider using the advanced API described in the later sections to handle bigger messages.
* If a received message's topic matches more than one pattern, then only one of the callbacks will be fired.
* `PicoMQTT::Server` will not deliver published messages locally. This means that if you set up a `PicoMQTT::Server`
and use `subscribe`, your callback will only be called when messages from clients are received. Messages published on
the same device will not trigger the callback.
* Try to return from message handlers quickly. Don't call functions which may block (like reading from serial or network connections), don't use the `delay()` function.
* More examples available [here](examples/advanced_consume/advanced_consume.ino)

### Delivery of messages published on the broker

`PicoMQTT::Server` will not deliver published messages locally. This means that setting up a `PicoMQTT::Server` and using `subscribe`, will fire callbacks only when messages from clients are received. Messages published locally, on the same device will not trigger the callback.

`PicoMQTT::ServerLocalSubscribe` can be used as a drop-in replacement for `PicoMQTT::Server` to get around this limitation. This variant of the broker works just the same, but it fires subscription callbacks for messages published locally using the `publish` and `publish_P` methods. Note that publishing using `begin_publish` will work as in `PicoMQTT::Server` (so it will not fire local callbacks).

`PicoMQTT::ServerLocalSubscribe` has slightly worse performance and can be memory intensive, especially if large messages are published and subscribed to locally. Therefore, it should only be used when really needed. Moreover, it has one additional limitation: it's *subscription callbacks must not publish any messages* or it may cause a crash.

Example available [here](examples/server_local_subscribe/server_local_subscribe.ino).


## Last Will Testament messages

Clients can be configured with a will message (aka LWT). This can be configured by changing elements of the client's `will`.structure:
Clients can be configured with a will message (aka LWT). This can be configured by changing elements of the client's `will` structure:

```
#include <PicoMQTT.h>
Expand Down
2 changes: 1 addition & 1 deletion examples/advanced_publish/advanced_publish.ino
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void loop() {
mqtt.publish_P("picomqtt/flash_string/global", flash_string);

// The topic can be an F-string too:
// mqtt.publish_P(F("picomqtt/flash_string/global"), flash_string);
mqtt.publish_P(F("picomqtt/flash_string/global"), flash_string);

// publish binary data
const char binary_payload[] = "This string could contain binary data including a zero byte";
Expand Down
57 changes: 57 additions & 0 deletions examples/server_local_subscribe/server_local_subscribe.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include <PicoMQTT.h>

#if __has_include("config.h")
#include "config.h"
#endif

#ifndef WIFI_SSID
#define WIFI_SSID "WiFi SSID"
#endif

#ifndef WIFI_PASSWORD
#define WIFI_PASSWORD "password"
#endif

PicoMQTT::ServerLocalSubscribe mqtt;
unsigned long last_publish;

static const char flash_string[] PROGMEM = "Hello from the broker's flash.";

void setup() {
// Setup serial
Serial.begin(115200);

// Connect to WiFi
Serial.printf("Connecting to WiFi %s\n", WIFI_SSID);
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
while (WiFi.status() != WL_CONNECTED) { delay(1000); }
Serial.printf("WiFi connected, IP: %s\n", WiFi.localIP().toString().c_str());

mqtt.begin();

mqtt.subscribe("picomqtt/foo", [](const char * payload) {
Serial.printf("Message received: %s\n", payload);
});
}


void loop() {
mqtt.loop();

if (millis() - last_publish >= 5000) {
// this will publish the message to subscribed clients and fire the local callback
mqtt.publish("picomqtt/foo", "hello from broker!");

// this will also work as usual and fire the callback
mqtt.publish_P("picomqtt/foo", flash_string);

// begin_publish will publish to clients, but it will NOT fire the local callback
auto publish = mqtt.begin_publish("picomqtt/foo", 33);
publish.write((const uint8_t *) "Message delivered to clients only", 33);
publish.send();

last_publish = millis();
}

}
7 changes: 4 additions & 3 deletions src/PicoMQTT/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ Client::Client(ClientSocketInterface * socket,
BasicClient(this->socket->get_client(), keep_alive_millis, socket_timeout_millis),
host(host), port(port), client_id(id), username(user), password(password),
will({"", "", 0, false}),
reconnect_interval_millis(reconnect_interval_millis),
last_reconnect_attempt(millis() - reconnect_interval_millis) {
reconnect_interval_millis(reconnect_interval_millis),
last_reconnect_attempt(millis() - reconnect_interval_millis) {
TRACE_FUNCTION
}

Expand Down Expand Up @@ -254,8 +254,9 @@ void Client::loop() {
last_reconnect_attempt = millis();

if (!connection_established) {
if (connection_failure_callback)
if (connection_failure_callback) {
connection_failure_callback();
}
return;
}

Expand Down
6 changes: 4 additions & 2 deletions src/PicoMQTT/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ class Client: public SocketOwner<std::unique_ptr<ClientSocketInterface>>, public
Client(const char * host = nullptr, uint16_t port = 1883, const char * id = nullptr, const char * user = nullptr,
const char * password = nullptr, unsigned long reconnect_interval_millis = 5 * 1000,
unsigned long keep_alive_millis = 60 * 1000, unsigned long socket_timeout_millis = 10 * 1000)
: Client(new ClientSocket<::WiFiClient>(), host, port, id, user, password, reconnect_interval_millis, keep_alive_millis, socket_timeout_millis) {
: Client(new ClientSocket<::WiFiClient>(), host, port, id, user, password, reconnect_interval_millis, keep_alive_millis,
socket_timeout_millis) {
}

template <typename ClientType>
Client(ClientType & client, const char * host = nullptr, uint16_t port = 1883, const char * id = nullptr,
const char * user = nullptr, const char * password = nullptr,
unsigned long reconnect_interval_millis = 5 * 1000,
unsigned long keep_alive_millis = 60 * 1000, unsigned long socket_timeout_millis = 10 * 1000)
: Client(new ClientSocketProxy(client), host, port, id, user, password, reconnect_interval_millis, keep_alive_millis, socket_timeout_millis) {
: Client(new ClientSocketProxy(client), host, port, id, user, password, reconnect_interval_millis, keep_alive_millis,
socket_timeout_millis) {
}

using SubscribedMessageListener::subscribe;
Expand Down
5 changes: 5 additions & 0 deletions src/PicoMQTT/incoming_packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ IncomingPacket::IncomingPacket(IncomingPacket && other)
other.pos = size;
}

IncomingPacket::IncomingPacket(const Type type, const uint8_t flags, const size_t size, Client & client)
: Packet(type, flags, size), client(client) {
TRACE_FUNCTION
}

IncomingPacket::~IncomingPacket() {
TRACE_FUNCTION
#ifdef PICOMQTT_DEBUG
Expand Down
1 change: 1 addition & 0 deletions src/PicoMQTT/incoming_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace PicoMQTT {
class IncomingPacket: public Packet, public Client {
public:
IncomingPacket(Client & client);
IncomingPacket(const Type type, const uint8_t flags, const size_t size, Client & client);
IncomingPacket(IncomingPacket &&);

IncomingPacket(const IncomingPacket &) = delete;
Expand Down
37 changes: 25 additions & 12 deletions src/PicoMQTT/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,51 @@ class Publisher {

Publish begin_publish(const String & topic, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION
return begin_publish(topic.c_str(), payload_size, qos, retain, message_id);
}

template <typename TopicStringType>
bool publish(TopicStringType topic, const void * payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
virtual bool publish(const char * topic, const void * payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION
auto packet = begin_publish(get_c_str(topic), payload_size, qos, retain, message_id);
packet.write((const uint8_t *) payload, payload_size);
return packet.send();
}

template <typename TopicStringType>
bool publish_P(TopicStringType topic, PGM_P payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION;
auto packet = begin_publish(get_c_str(topic), payload_size, qos, retain, message_id);
packet.write_P(payload, payload_size);
return packet.send();
bool publish(const String & topic, const void * payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION
return publish(topic.c_str(), payload, payload_size, qos, retain, message_id);
}

template <typename TopicStringType, typename PayloadStringType>
bool publish(TopicStringType topic, PayloadStringType payload,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
return publish(topic, (const void *) get_c_str(payload), get_c_str_len(payload),
TRACE_FUNCTION
return publish(get_c_str(topic), (const void *) get_c_str(payload), get_c_str_len(payload),
qos, retain, message_id);
}

virtual bool publish_P(const char * topic, PGM_P payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION
auto packet = begin_publish(topic, payload_size, qos, retain, message_id);
packet.write_P(payload, payload_size);
return packet.send();
}

bool publish_P(const String & topic, PGM_P payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
TRACE_FUNCTION
return publish_P(topic.c_str(), payload, payload_size, qos, retain, message_id);
}

template <typename TopicStringType>
bool publish_P(TopicStringType topic, PGM_P payload,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) {
return publish_P(topic, payload, strlen_P(payload),
TRACE_FUNCTION
return publish_P(get_c_str(topic), payload, strlen_P(payload),
qos, retain, message_id);
}

Expand Down
77 changes: 77 additions & 0 deletions src/PicoMQTT/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,63 @@
#include "debug.h"
#include "server.h"

namespace {

class BufferClient: public ::Client {
public:
BufferClient(const void * ptr): ptr((const char *) ptr) { TRACE_FUNCTION }

// these methods are nop dummies
virtual int connect(IPAddress ip, uint16_t port) override final { TRACE_FUNCTION return 0; }
virtual int connect(const char * host, uint16_t port) override final { TRACE_FUNCTION return 0; }
virtual size_t write(const uint8_t * buffer, size_t size) override final { TRACE_FUNCTION return 0; }
virtual size_t write(uint8_t value) override final { TRACE_FUNCTION return 0; }
virtual void flush() override final { TRACE_FUNCTION }
virtual void stop() override final { TRACE_FUNCTION }

// these methods are in jasager mode
virtual int available() override final { TRACE_FUNCTION return std::numeric_limits<int>::max(); }
virtual operator bool() override final { TRACE_FUNCTION return true; }
virtual uint8_t connected() override final { TRACE_FUNCTION return true; }

// actual reads implemented here
virtual int read(uint8_t * buf, size_t size) override {
memcpy(buf, ptr, size);
ptr += size;
return size;
}

virtual int read() override final {
TRACE_FUNCTION
uint8_t ret;
read(&ret, 1);
return ret;
}

virtual int peek() override final {
TRACE_FUNCTION
const int ret = read();
--ptr;
return ret;
}

protected:
const char * ptr;
};

class BufferClientP: public BufferClient {
public:
using BufferClient::BufferClient;

virtual int read(uint8_t * buf, size_t size) override {
memcpy_P(buf, ptr, size);
ptr += size;
return size;
}
};

}

namespace PicoMQTT {

Server::Client::Client(Server & server, ::Client * client)
Expand Down Expand Up @@ -347,4 +404,24 @@ void Server::on_message(const char * topic, IncomingPacket & packet) {
fire_message_callbacks(topic, packet);
}

bool ServerLocalSubscribe::publish(const char * topic, const void * payload, const size_t payload_size,
uint8_t qos, bool retain, uint16_t message_id) {
TRACE_FUNCTION
const bool ret = Server::publish(topic, payload, payload_size, qos, retain, message_id);
BufferClient buffer(payload);
IncomingPacket packet(IncomingPacket::PUBLISH, 0, payload_size, buffer);
fire_message_callbacks(topic, packet);
return ret;
}

bool ServerLocalSubscribe::publish_P(const char * topic, PGM_P payload, const size_t payload_size,
uint8_t qos, bool retain, uint16_t message_id) {
TRACE_FUNCTION
const bool ret = Server::publish_P(topic, payload, payload_size, qos, retain, message_id);
BufferClientP buffer((void *) payload);
IncomingPacket packet(IncomingPacket::PUBLISH, 0, payload_size, buffer);
fire_message_callbacks(topic, packet);
return ret;
}

}
13 changes: 13 additions & 0 deletions src/PicoMQTT/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,17 @@ class Server: public PicoMQTTInterface, public Publisher, public SubscribedMessa
std::list<std::unique_ptr<Client>> clients;
};

class ServerLocalSubscribe: public Server {
public:
using Server::Server;
using Server::publish;
using Server::publish_P;

virtual bool publish(const char * topic, const void * payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) override;

virtual bool publish_P(const char * topic, PGM_P payload, const size_t payload_size,
uint8_t qos = 0, bool retain = false, uint16_t message_id = 0) override;
};

}

0 comments on commit 7239687

Please sign in to comment.