From 0737b433ee0f17eece65aa431dac1b6274e10d71 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Fri, 6 Dec 2024 20:14:58 +0100 Subject: [PATCH 1/7] Update the PubSubClient to thingsboard/pubsubclient version 2.10 --- BSB_LAN/src/PubSubClient/CHANGES.txt | 2 + BSB_LAN/src/PubSubClient/README.md | 8 + BSB_LAN/src/PubSubClient/library.json | 11 +- BSB_LAN/src/PubSubClient/library.properties | 8 +- BSB_LAN/src/PubSubClient/src/PubSubClient.cpp | 272 ++++++++++-------- BSB_LAN/src/PubSubClient/src/PubSubClient.h | 31 +- 6 files changed, 195 insertions(+), 137 deletions(-) diff --git a/BSB_LAN/src/PubSubClient/CHANGES.txt b/BSB_LAN/src/PubSubClient/CHANGES.txt index e23d5315f..2c27d9ca8 100644 --- a/BSB_LAN/src/PubSubClient/CHANGES.txt +++ b/BSB_LAN/src/PubSubClient/CHANGES.txt @@ -1,3 +1,5 @@ +2.9 + * Add ability to use function for callback not just for ESP boards 2.8 * Add setBufferSize() to override MQTT_MAX_PACKET_SIZE * Add setKeepAlive() to override MQTT_KEEPALIVE diff --git a/BSB_LAN/src/PubSubClient/README.md b/BSB_LAN/src/PubSubClient/README.md index 2e1317185..4aac81301 100644 --- a/BSB_LAN/src/PubSubClient/README.md +++ b/BSB_LAN/src/PubSubClient/README.md @@ -1,3 +1,11 @@ +## ThingsBoard note + +This is a fork of the main repository, which was last updated in 2020. +Since we have an SDK based on this client, we decided to continue with this repository. +We also believe that this library provides a lot of opportunities for people who want to build their own IoT devices. +As with our other open source repositories, we appreciate every contribution to this library. + + # Arduino Client for MQTT This library provides a client for doing simple publish/subscribe messaging with diff --git a/BSB_LAN/src/PubSubClient/library.json b/BSB_LAN/src/PubSubClient/library.json index c0d7bae2d..136d2ca2d 100644 --- a/BSB_LAN/src/PubSubClient/library.json +++ b/BSB_LAN/src/PubSubClient/library.json @@ -1,18 +1,19 @@ { - "name": "PubSubClient", - "keywords": "ethernet, mqtt, m2m, iot", + "name": "TBPubSubClient", + "keywords": "ethernet, mqtt, m2m, iot, thingsboard, messages", "description": "A client library for MQTT messaging. MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It supports the latest MQTT 3.1.1 protocol and can be configured to use the older MQTT 3.1 if needed. It supports all Arduino Ethernet Client compatible hardware, including the Intel Galileo/Edison, ESP8266 and TI CC3000.", "repository": { "type": "git", - "url": "https://github.com/knolleary/pubsubclient.git" + "url": "https://github.com/thingsboard/pubsubclient.git" }, - "version": "2.8", + "version": "2.10.0", "exclude": "tests", "examples": "examples/*/*.ino", "frameworks": "arduino", "platforms": [ "atmelavr", "espressif8266", - "espressif32" + "espressif32", + "rp2040" ] } diff --git a/BSB_LAN/src/PubSubClient/library.properties b/BSB_LAN/src/PubSubClient/library.properties index e47ffe928..b9dcc2221 100644 --- a/BSB_LAN/src/PubSubClient/library.properties +++ b/BSB_LAN/src/PubSubClient/library.properties @@ -1,7 +1,7 @@ -name=PubSubClient -version=2.8 -author=Nick O'Leary <nick.oleary@gmail.com> -maintainer=Nick O'Leary <nick.oleary@gmail.com> +name=TBPubSubClient +version=2.10.0 +author=ThingsBoard <info@thingsboard.io> +maintainer=ThingsBoard Team sentence=A client library for MQTT messaging. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It supports the latest MQTT 3.1.1 protocol and can be configured to use the older MQTT 3.1 if needed. It supports all Arduino Ethernet Client compatible hardware, including the Intel Galileo/Edison, ESP8266 and TI CC3000. category=Communication diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp index 25e74257d..5b8538440 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp @@ -159,7 +159,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN } PubSubClient::~PubSubClient() { - free(this->buffer); + free(this->receive_buffer); + free(this->send_buffer); } boolean PubSubClient::connect(const char *id) { @@ -195,9 +196,9 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (result == 1) { nextMsgId = 1; - // Leave room in the buffer for header and variable length field + // Leave room in the receive_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - unsigned int j; + size_t j; #if MQTT_VERSION == MQTT_VERSION_3_1 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION}; @@ -207,7 +208,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) { - this->buffer[length++] = d[j]; + this->receive_buffer[length++] = d[j]; } uint8_t v; @@ -227,30 +228,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass v = v|(0x80>>1); } } - this->buffer[length++] = v; + this->receive_buffer[length++] = v; - this->buffer[length++] = ((this->keepAlive) >> 8); - this->buffer[length++] = ((this->keepAlive) & 0xFF); + this->receive_buffer[length++] = ((this->keepAlive) >> 8); + this->receive_buffer[length++] = ((this->keepAlive) & 0xFF); CHECK_STRING_LENGTH(length,id) - length = writeString(id,this->buffer,length); + length = writeString(id,this->receive_buffer,length); if (willTopic) { CHECK_STRING_LENGTH(length,willTopic) - length = writeString(willTopic,this->buffer,length); + length = writeString(willTopic,this->receive_buffer,length); CHECK_STRING_LENGTH(length,willMessage) - length = writeString(willMessage,this->buffer,length); + length = writeString(willMessage,this->receive_buffer,length); } if(user != NULL) { CHECK_STRING_LENGTH(length,user) - length = writeString(user,this->buffer,length); + length = writeString(user,this->receive_buffer,length); if(pass != NULL) { CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->buffer,length); + length = writeString(pass,this->send_buffer,length); } } - write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE); + write(MQTTCONNECT,this->receive_buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); @@ -266,13 +267,13 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass uint32_t len = readPacket(&llen); if (len == 4) { - if (buffer[3] == 0) { + if (receive_buffer[3] == 0) { lastInActivity = millis(); pingOutstanding = false; _state = MQTT_CONNECTED; return true; } else { - _state = buffer[3]; + _state = receive_buffer[3]; } } _client->stop(); @@ -311,8 +312,8 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { uint16_t len = 0; - if(!readByte(this->buffer, &len)) return 0; - bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH; + if(!readByte(this->receive_buffer, &len)) return 0; + bool isPublish = (this->receive_buffer[0]&0xF0) == MQTTPUBLISH; uint32_t multiplier = 1; uint32_t length = 0; uint8_t digit = 0; @@ -327,7 +328,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { return 0; } if(!readByte(&digit)) return 0; - this->buffer[len++] = digit; + this->receive_buffer[len++] = digit; length += (digit & 127) * multiplier; multiplier <<=7; //multiplier *= 128 } while ((digit & 128) != 0); @@ -335,11 +336,11 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { if (isPublish) { // Read in topic length to calculate bytes to skip over for Stream writing - if(!readByte(this->buffer, &len)) return 0; - if(!readByte(this->buffer, &len)) return 0; - skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2]; + if(!readByte(this->receive_buffer, &len)) return 0; + if(!readByte(this->receive_buffer, &len)) return 0; + skip = (this->receive_buffer[*lengthLength+1]<<8)+this->receive_buffer[*lengthLength+2]; start = 2; - if (this->buffer[0]&MQTTQOS1) { + if (this->receive_buffer[0]&MQTTQOS1) { // skip message id skip += 2; } @@ -355,7 +356,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { } if (len < this->bufferSize) { - this->buffer[len] = digit; + this->receive_buffer[len] = digit; len++; } idx++; @@ -367,65 +368,97 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { return len; } +boolean PubSubClient::loop_read() { + if (_client == NULL) { + return false; + } + if (!_client->available()) { + return false; + } + uint8_t llen; + uint16_t len = readPacket(&llen); + if (len == 0) { + return false; + } + unsigned long t = millis(); + lastInActivity = t; + uint8_t type = receive_buffer[0]&0xF0; + + switch(type) { + case MQTTPUBLISH: + { + if (callback) { + const boolean msgId_present = (receive_buffer[0]&0x06) == MQTTQOS1; + const uint16_t tl_offset = llen+1; + const uint16_t tl = (receive_buffer[tl_offset]<<8)+receive_buffer[tl_offset+1]; /* topic length in bytes */ + const uint16_t topic_offset = tl_offset+2; + const uint16_t msgId_offset = topic_offset+tl; + const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset; + if ((payload_offset) >= this->bufferSize) {return false;} + if (len < payload_offset) {return false;} + memmove(receive_buffer+topic_offset-1,receive_buffer+topic_offset,tl); /* move topic inside receive_buffer 1 byte to front */ + receive_buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ + char *topic = (char*) receive_buffer+topic_offset-1; + uint8_t *payload; + // msgId only present for QOS>0 + if (msgId_present) { + const uint16_t msgId = (receive_buffer[msgId_offset]<<8)+receive_buffer[msgId_offset+1]; + payload = receive_buffer+payload_offset; + callback(topic,payload,len-payload_offset); + if (_client->connected()) { + receive_buffer[0] = MQTTPUBACK; + receive_buffer[1] = 2; + receive_buffer[2] = (msgId >> 8); + receive_buffer[3] = (msgId & 0xFF); + if (_client->write(receive_buffer,4) != 0) { + lastOutActivity = t; + } + } + } else { + // No msgId + payload = receive_buffer+payload_offset; + callback(topic,payload,len-payload_offset); + } + } + break; + } + case MQTTPINGREQ: + { + if (_client->connected()) { + receive_buffer[0] = MQTTPINGRESP; + receive_buffer[1] = 0; + _client->write(receive_buffer,2); + } + break; + } + case MQTTPINGRESP: + { + pingOutstanding = false; + break; + } + default: + return false; + } + return true; +} + boolean PubSubClient::loop() { + loop_read(); if (connected()) { unsigned long t = millis(); - if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { + if (((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) && keepAlive != 0) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); return false; } else { - this->buffer[0] = MQTTPINGREQ; - this->buffer[1] = 0; - _client->write(this->buffer,2); - lastOutActivity = t; - lastInActivity = t; - pingOutstanding = true; - } - } - if (_client->available()) { - uint8_t llen; - uint16_t len = readPacket(&llen); - uint16_t msgId = 0; - uint8_t *payload; - if (len > 0) { - lastInActivity = t; - uint8_t type = this->buffer[0]&0xF0; - if (type == MQTTPUBLISH) { - if (callback) { - uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */ - memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */ - this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */ - char *topic = (char*) this->buffer+llen+2; - // msgId only present for QOS>0 - if ((this->buffer[0]&0x06) == MQTTQOS1) { - msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1]; - payload = this->buffer+llen+3+tl+2; - callback(topic,payload,len-llen-3-tl-2); - - this->buffer[0] = MQTTPUBACK; - this->buffer[1] = 2; - this->buffer[2] = (msgId >> 8); - this->buffer[3] = (msgId & 0xFF); - _client->write(this->buffer,4); - lastOutActivity = t; - - } else { - payload = this->buffer+llen+3+tl; - callback(topic,payload,len-llen-3-tl); - } - } - } else if (type == MQTTPINGREQ) { - this->buffer[0] = MQTTPINGRESP; - this->buffer[1] = 0; - _client->write(this->buffer,2); - } else if (type == MQTTPINGRESP) { - pingOutstanding = false; + receive_buffer[0] = MQTTPINGREQ; + receive_buffer[1] = 0; + if (_client->write(receive_buffer,2) != 0) { + lastOutActivity = t; + lastInActivity = t; } - } else if (!connected()) { - // readPacket has closed the connection - return false; + pingOutstanding = true; } } return true; @@ -441,24 +474,24 @@ boolean PubSubClient::publish(const char* topic, const char* payload, boolean re return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) { +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength) { return publish(topic, payload, plength, false); } -boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { +boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { if (connected()) { - if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) { + if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, this->bufferSize) + plength) { // Too long return false; } - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); // Add payload uint16_t i; for (i=0;i<plength;i++) { - this->buffer[length++] = payload[i]; + this->send_buffer[length++] = payload[i]; } // Write the header @@ -466,25 +499,25 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne if (retained) { header |= 1; } - return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE); + return write(header,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { - return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained); + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, this->bufferSize) : 0, retained); } -boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) { +boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { uint8_t llen = 0; uint8_t digit; - unsigned int rc = 0; + size_t rc = 0; uint16_t tlen; - unsigned int pos = 0; - unsigned int i; + size_t pos = 0; + size_t i; uint8_t header; - unsigned int len; - unsigned int expectedLength; + size_t len; + size_t expectedLength; if (!connected()) { return false; @@ -496,7 +529,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig if (retained) { header |= 1; } - this->buffer[pos++] = header; + this->send_buffer[pos++] = header; len = plength + 2 + tlen; do { digit = len & 127; //digit = len %128 @@ -504,13 +537,13 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig if (len > 0) { digit |= 0x80; } - this->buffer[pos++] = digit; + this->send_buffer[pos++] = digit; llen++; } while(len>0); - pos = writeString(topic,this->buffer,pos); + pos = writeString(topic,this->send_buffer,pos); - rc += _client->write(this->buffer,pos); + rc += _client->write(this->send_buffer,pos); for (i=0;i<plength;i++) { rc += _client->write((char)pgm_read_byte_near(payload + i)); @@ -523,17 +556,17 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig return (rc == expectedLength); } -boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) { +boolean PubSubClient::beginPublish(const char* topic, size_t plength, boolean retained) { if (connected()) { // Send the header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; - length = writeString(topic,this->buffer,length); + length = writeString(topic,this->send_buffer,length); uint8_t header = MQTTPUBLISH; if (retained) { header |= 1; } - size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE); - uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); + size_t hlen = buildHeader(header, this->send_buffer, plength+length-MQTT_MAX_HEADER_SIZE); + uint16_t rc = _client->write(this->send_buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen)); lastOutActivity = millis(); return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen))); } @@ -541,7 +574,7 @@ boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, bool } int PubSubClient::endPublish() { - return 1; + return 1; } size_t PubSubClient::write(uint8_t data) { @@ -619,23 +652,23 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { return false; } if (connected()) { - // Leave room in the buffer for header and variable length field + // Leave room in the send_buffer for header and variable length field uint16_t length = MQTT_MAX_HEADER_SIZE; nextMsgId++; if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString((char*)topic, this->buffer,length); - this->buffer[length++] = qos; - return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString((char*)topic, this->send_buffer,length); + this->send_buffer[length++] = qos; + return write(MQTTSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } boolean PubSubClient::unsubscribe(const char* topic) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->bufferSize); if (topic == 0) { return false; } @@ -649,18 +682,18 @@ boolean PubSubClient::unsubscribe(const char* topic) { if (nextMsgId == 0) { nextMsgId = 1; } - this->buffer[length++] = (nextMsgId >> 8); - this->buffer[length++] = (nextMsgId & 0xFF); - length = writeString(topic, this->buffer,length); - return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE); + this->send_buffer[length++] = (nextMsgId >> 8); + this->send_buffer[length++] = (nextMsgId & 0xFF); + length = writeString(topic, this->send_buffer,length); + return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); } return false; } void PubSubClient::disconnect() { - this->buffer[0] = MQTTDISCONNECT; - this->buffer[1] = 0; - _client->write(this->buffer,2); + this->send_buffer[0] = MQTTDISCONNECT; + this->send_buffer[1] = 0; + _client->write(this->send_buffer,2); _state = MQTT_DISCONNECTED; _client->flush(); _client->stop(); @@ -683,7 +716,7 @@ uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t po boolean PubSubClient::connected() { boolean rc; - if (_client == NULL ) { + if (_client == NULL) { rc = false; } else { rc = (int)_client->connected(); @@ -743,26 +776,35 @@ boolean PubSubClient::setBufferSize(uint16_t size) { return false; } if (this->bufferSize == 0) { - this->buffer = (uint8_t*)malloc(size); + this->receive_buffer = (uint8_t*)malloc(size); + this->send_buffer = (uint8_t*)malloc(size); } else { - uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size); + uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, size); if (newBuffer != NULL) { - this->buffer = newBuffer; + this->receive_buffer = newBuffer; + } else { + return false; + } + newBuffer = (uint8_t*)realloc(this->send_buffer, size); + if (newBuffer != NULL) { + this->send_buffer = newBuffer; } else { return false; } } this->bufferSize = size; - return (this->buffer != NULL); + return (this->receive_buffer != NULL) && (this->send_buffer != NULL); } uint16_t PubSubClient::getBufferSize() { return this->bufferSize; } + PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { this->keepAlive = keepAlive; return *this; } + PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) { this->socketTimeout = timeout; return *this; diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.h b/BSB_LAN/src/PubSubClient/src/PubSubClient.h index f6e5acd75..5acfc5b4b 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.h +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.h @@ -76,19 +76,24 @@ // Maximum size of fixed header and variable length size header #define MQTT_MAX_HEADER_SIZE 5 -#if defined(ESP8266) || defined(ESP32) -#include <functional> -#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback -#else -#define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, unsigned int) -#endif +# ifdef __has_include +# if __has_include(<functional>) +# include <functional> +# define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, size_t)> callback +# else +# define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, size_t) +# endif +# else +# define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, size_t) +# endif #define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} class PubSubClient : public Print { private: Client* _client; - uint8_t* buffer; + uint8_t* receive_buffer; + uint8_t* send_buffer; uint16_t bufferSize; uint16_t keepAlive; uint16_t socketTimeout; @@ -128,7 +133,7 @@ class PubSubClient : public Print { PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client); PubSubClient(const char*, uint16_t, MQTT_CALLBACK_SIGNATURE,Client& client, Stream&); - virtual ~PubSubClient(); + ~PubSubClient(); PubSubClient& setServer(IPAddress ip, uint16_t port); PubSubClient& setServer(uint8_t * ip, uint16_t port); @@ -150,10 +155,10 @@ class PubSubClient : public Print { void disconnect(); boolean publish(const char* topic, const char* payload); boolean publish(const char* topic, const char* payload, boolean retained); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength); - boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish(const char* topic, const uint8_t * payload, size_t plength); + boolean publish(const char* topic, const uint8_t * payload, size_t plength, boolean retained); boolean publish_P(const char* topic, const char* payload, boolean retained); - boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained); + boolean publish_P(const char* topic, const uint8_t * payload, size_t plength, boolean retained); // Start to publish a message. // This API: // beginPublish(...) @@ -162,7 +167,7 @@ class PubSubClient : public Print { // Allows for arbitrarily large payloads to be sent without them having to be copied into // a new buffer and held in memory at one time // Returns 1 if the message was started successfully, 0 if there was an error - boolean beginPublish(const char* topic, unsigned int plength, boolean retained); + boolean beginPublish(const char* topic, size_t plength, boolean retained); // Finish off this publish message (started with beginPublish) // Returns 1 if the packet was sent successfully, 0 if there was an error int endPublish(); @@ -174,11 +179,11 @@ class PubSubClient : public Print { boolean subscribe(const char* topic); boolean subscribe(const char* topic, uint8_t qos); boolean unsubscribe(const char* topic); + boolean loop_read(); boolean loop(); boolean connected(); int state(); }; - #endif From b781b867e4b57e38dd990422a9993ca08ed3e8a1 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Sat, 7 Dec 2024 00:30:56 +0100 Subject: [PATCH 2/7] Fix a copypaste bug in release 2.10 making connection with password fail; improve keep-alive handling --- BSB_LAN/include/mqtt_handler.h | 6 +++--- BSB_LAN/src/PubSubClient/src/PubSubClient.cpp | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index f17cd8c4b..56d647e1e 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -196,8 +196,7 @@ bool mqtt_connect() { mqtt_client= new ComClient(); MQTTPubSubClient = new PubSubClient(mqtt_client[0]); MQTTPubSubClient->setBufferSize(2048); - MQTTPubSubClient->setKeepAlive(30); - MQTTPubSubClient->setSocketTimeout(120); + MQTTPubSubClient->setKeepAlive(90); // raise to higher value so broker does not disconnect on latency mqtt_reconnect_timer = 0; first_connect = true; } @@ -225,6 +224,7 @@ bool mqtt_connect() { MQTTPubSubClient->setServer(mqtt_host, mqtt_port); printFmtToDebug("Client ID: %s\r\n", mqtt_get_client_id()); printFmtToDebug("Will topic: %s\r\n", mqtt_get_will_topic()); + MQTTPubSubClient->setSocketTimeout(MQTT_SOCKET_TIMEOUT); // reset to default MQTTPubSubClient->connect(mqtt_get_client_id(), MQTTUser, MQTTPass, mqtt_get_will_topic(), 1, true, "offline"); if (!MQTTPubSubClient->connected()) { printlnToDebug("Failed to connect to MQTT broker, retrying..."); @@ -237,7 +237,7 @@ bool mqtt_connect() { strcat(tempTopic, "/#"); MQTTPubSubClient->subscribe(tempTopic, 1); //Luposoft: set the topic listen to printFmtToDebug("Subscribed to topic '%s'\r\n", tempTopic); - MQTTPubSubClient->setKeepAlive(120); //Luposoft: just for savety + MQTTPubSubClient->setSocketTimeout(120); // uschindler: set socket timeout to higher value after connection MQTTPubSubClient->setCallback(mqtt_callback); //Luposoft: set to function is called when incoming message MQTTPubSubClient->publish(mqtt_get_will_topic(), "online", true); printFmtToDebug("Published status 'online' to topic '%s'\r\n", mqtt_get_will_topic()); diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp index 5b8538440..fbaa0823e 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp @@ -247,13 +247,14 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass length = writeString(user,this->receive_buffer,length); if(pass != NULL) { CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->send_buffer,length); + length = writeString(pass,this->receive_buffer,length); // uschindler: patched } } write(MQTTCONNECT,this->receive_buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); + pingOutstanding = false; // uschindler: patched while (!_client->available()) { unsigned long t = millis(); @@ -450,6 +451,7 @@ boolean PubSubClient::loop() { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); + pingOutstanding = false; // uschindler: patched return false; } else { receive_buffer[0] = MQTTPINGREQ; @@ -457,8 +459,8 @@ boolean PubSubClient::loop() { if (_client->write(receive_buffer,2) != 0) { lastOutActivity = t; lastInActivity = t; + pingOutstanding = true; // uschindler: patched } - pingOutstanding = true; } } return true; From 76b156b2f622f35b7afab0ea889a02036e73ea70 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Sat, 7 Dec 2024 10:09:37 +0100 Subject: [PATCH 3/7] Move malloc based host/port tokenizer to actual connection, so it is not called when connection already exists --- BSB_LAN/include/mqtt_handler.h | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index 56d647e1e..a146e82a7 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -181,16 +181,6 @@ char* mqtt_get_will_topic() { * *************************************************************** */ bool mqtt_connect() { - char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation - strcpy(tempstr, mqtt_broker_addr); - uint16_t mqtt_port = 1883; - char* mqtt_host = strtok(tempstr,":"); // hostname is before an optional colon that separates the port - char* token = strtok(NULL, ":"); // remaining part is the port number - if (token != 0) { - mqtt_port = atoi(token); - } - free(tempstr); - bool first_connect = false; if(MQTTPubSubClient == NULL) { mqtt_client= new ComClient(); @@ -212,6 +202,16 @@ bool mqtt_connect() { return false; } + char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation + strcpy(tempstr, mqtt_broker_addr); + uint16_t mqtt_port = 1883; + char* mqtt_host = strtok(tempstr,":"); // hostname is before an optional colon that separates the port + char* token = strtok(NULL, ":"); // remaining part is the port number + if (token != 0) { + mqtt_port = atoi(token); + } + free(tempstr); + char* MQTTUser = NULL; if(MQTTUsername[0]) { MQTTUser = MQTTUsername; From 8fe8a2d3f9b4fc73f5a5aeeb459f49058eed9672 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Sat, 7 Dec 2024 12:26:00 +0100 Subject: [PATCH 4/7] Improve debugging when connection fails or disconnects --- BSB_LAN/include/mqtt_handler.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index a146e82a7..78bbfa6c6 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -194,7 +194,7 @@ bool mqtt_connect() { if (!first_connect && !mqtt_reconnect_timer) { // We just lost connection, don't try to reconnect immediately mqtt_reconnect_timer = millis(); - printlnToDebug("MQTT connection lost"); + printFmtToDebug("MQTT connection lost with status code %d\r\n", MQTTPubSubClient->state()); return false; } if (mqtt_reconnect_timer && millis() - mqtt_reconnect_timer < 10000) { @@ -227,7 +227,7 @@ bool mqtt_connect() { MQTTPubSubClient->setSocketTimeout(MQTT_SOCKET_TIMEOUT); // reset to default MQTTPubSubClient->connect(mqtt_get_client_id(), MQTTUser, MQTTPass, mqtt_get_will_topic(), 1, true, "offline"); if (!MQTTPubSubClient->connected()) { - printlnToDebug("Failed to connect to MQTT broker, retrying..."); + printFmtToDebug("Failed to connect to MQTT broker with status code %d, retrying...\r\n", MQTTPubSubClient->state()); mqtt_reconnect_timer = millis(); } else { printlnToDebug("Connected to MQTT broker, updating will topic"); From ac7c3f07a568467661d4bab7eee1b63bfddbe23f Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Sat, 7 Dec 2024 13:18:03 +0100 Subject: [PATCH 5/7] Some more keepalive problems in original code --- BSB_LAN/src/PubSubClient/src/PubSubClient.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp index fbaa0823e..527bbec70 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp @@ -270,7 +270,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (len == 4) { if (receive_buffer[3] == 0) { lastInActivity = millis(); - pingOutstanding = false; + // uschindler: removed unnecessary code _state = MQTT_CONNECTED; return true; } else { @@ -428,7 +428,10 @@ boolean PubSubClient::loop_read() { if (_client->connected()) { receive_buffer[0] = MQTTPINGRESP; receive_buffer[1] = 0; - _client->write(receive_buffer,2); + // uschindler: fix keepalive + if (_client->write(receive_buffer,2) != 0) { + lastOutActivity = t; + } } break; } @@ -628,6 +631,10 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { result = (rc == bytesToWrite); bytesRemaining -= rc; writeBuf += rc; + // uschindler: fix keepalive + if (rc != 0) { + lastOutActivity = millis(); + } } return result; #else @@ -700,6 +707,7 @@ void PubSubClient::disconnect() { _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); + pingOutstanding = false; // uschindler: fix keepalive } uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { From 4eda051899e58fb8a5713344ca05181dcb288650 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Sun, 8 Dec 2024 12:39:21 +0100 Subject: [PATCH 6/7] Revert socket timeout changes, the correct keepalive settings are enough! --- BSB_LAN/include/mqtt_handler.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index 78bbfa6c6..d9270f6db 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -186,7 +186,7 @@ bool mqtt_connect() { mqtt_client= new ComClient(); MQTTPubSubClient = new PubSubClient(mqtt_client[0]); MQTTPubSubClient->setBufferSize(2048); - MQTTPubSubClient->setKeepAlive(90); // raise to higher value so broker does not disconnect on latency + MQTTPubSubClient->setKeepAlive(120); // raise to higher value so broker does not disconnect on latency mqtt_reconnect_timer = 0; first_connect = true; } @@ -224,7 +224,6 @@ bool mqtt_connect() { MQTTPubSubClient->setServer(mqtt_host, mqtt_port); printFmtToDebug("Client ID: %s\r\n", mqtt_get_client_id()); printFmtToDebug("Will topic: %s\r\n", mqtt_get_will_topic()); - MQTTPubSubClient->setSocketTimeout(MQTT_SOCKET_TIMEOUT); // reset to default MQTTPubSubClient->connect(mqtt_get_client_id(), MQTTUser, MQTTPass, mqtt_get_will_topic(), 1, true, "offline"); if (!MQTTPubSubClient->connected()) { printFmtToDebug("Failed to connect to MQTT broker with status code %d, retrying...\r\n", MQTTPubSubClient->state()); @@ -237,7 +236,6 @@ bool mqtt_connect() { strcat(tempTopic, "/#"); MQTTPubSubClient->subscribe(tempTopic, 1); //Luposoft: set the topic listen to printFmtToDebug("Subscribed to topic '%s'\r\n", tempTopic); - MQTTPubSubClient->setSocketTimeout(120); // uschindler: set socket timeout to higher value after connection MQTTPubSubClient->setCallback(mqtt_callback); //Luposoft: set to function is called when incoming message MQTTPubSubClient->publish(mqtt_get_will_topic(), "online", true); printFmtToDebug("Published status 'online' to topic '%s'\r\n", mqtt_get_will_topic()); From a56ea52218f1c523b6147ff1dcc1c684f318ad63 Mon Sep 17 00:00:00 2001 From: Uwe Schindler <uwe@thetaphi.de> Date: Mon, 9 Dec 2024 14:00:57 +0100 Subject: [PATCH 7/7] Update pubsubclient to released version 2.11.0 --- BSB_LAN/include/mqtt_handler.h | 2 +- BSB_LAN/src/PubSubClient/src/PubSubClient.cpp | 173 ++++++++++-------- BSB_LAN/src/PubSubClient/src/PubSubClient.h | 11 +- 3 files changed, 104 insertions(+), 82 deletions(-) diff --git a/BSB_LAN/include/mqtt_handler.h b/BSB_LAN/include/mqtt_handler.h index d9270f6db..eb928b65c 100644 --- a/BSB_LAN/include/mqtt_handler.h +++ b/BSB_LAN/include/mqtt_handler.h @@ -185,7 +185,7 @@ bool mqtt_connect() { if(MQTTPubSubClient == NULL) { mqtt_client= new ComClient(); MQTTPubSubClient = new PubSubClient(mqtt_client[0]); - MQTTPubSubClient->setBufferSize(2048); + MQTTPubSubClient->setBufferSize(2048, 2048); MQTTPubSubClient->setKeepAlive(120); // raise to higher value so broker does not disconnect on latency mqtt_reconnect_timer = 0; first_connect = true; diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp index 527bbec70..bdb0dedb2 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.cpp @@ -13,8 +13,9 @@ PubSubClient::PubSubClient() { this->_client = NULL; this->stream = NULL; setCallback(NULL); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -23,8 +24,9 @@ PubSubClient::PubSubClient(Client& client) { this->_state = MQTT_DISCONNECTED; setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -34,8 +36,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { setServer(addr, port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -44,8 +47,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream setServer(addr,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -55,8 +59,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -66,8 +71,9 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -77,8 +83,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { setServer(ip, port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -87,8 +94,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& s setServer(ip,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -98,8 +106,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -109,8 +118,9 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -120,8 +130,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { setServer(domain,port); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -130,8 +141,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, St setServer(domain,port); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -141,8 +153,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); this->stream = NULL; - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -152,8 +165,9 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setCallback(callback); setClient(client); setStream(stream); - this->bufferSize = 0; - setBufferSize(MQTT_MAX_PACKET_SIZE); + this->sendBufferSize = 0; + this->receiveBufferSize = 0; + setBufferSize(MQTT_MAX_PACKET_SIZE, MQTT_MAX_PACKET_SIZE); setKeepAlive(MQTT_KEEPALIVE); setSocketTimeout(MQTT_SOCKET_TIMEOUT); } @@ -208,7 +222,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass #define MQTT_HEADER_VERSION_LENGTH 7 #endif for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) { - this->receive_buffer[length++] = d[j]; + this->send_buffer[length++] = d[j]; } uint8_t v; @@ -228,33 +242,33 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass v = v|(0x80>>1); } } - this->receive_buffer[length++] = v; + this->send_buffer[length++] = v; - this->receive_buffer[length++] = ((this->keepAlive) >> 8); - this->receive_buffer[length++] = ((this->keepAlive) & 0xFF); + this->send_buffer[length++] = ((this->keepAlive) >> 8); + this->send_buffer[length++] = ((this->keepAlive) & 0xFF); - CHECK_STRING_LENGTH(length,id) - length = writeString(id,this->receive_buffer,length); + CHECK_SEND_STRING_LENGTH(length,id) + length = writeString(id,this->send_buffer,length); if (willTopic) { - CHECK_STRING_LENGTH(length,willTopic) - length = writeString(willTopic,this->receive_buffer,length); - CHECK_STRING_LENGTH(length,willMessage) - length = writeString(willMessage,this->receive_buffer,length); + CHECK_SEND_STRING_LENGTH(length,willTopic) + length = writeString(willTopic,this->send_buffer,length); + CHECK_SEND_STRING_LENGTH(length,willMessage) + length = writeString(willMessage,this->send_buffer,length); } if(user != NULL) { - CHECK_STRING_LENGTH(length,user) - length = writeString(user,this->receive_buffer,length); + CHECK_SEND_STRING_LENGTH(length,user) + length = writeString(user,this->send_buffer,length); if(pass != NULL) { - CHECK_STRING_LENGTH(length,pass) - length = writeString(pass,this->receive_buffer,length); // uschindler: patched + CHECK_SEND_STRING_LENGTH(length,pass) + length = writeString(pass,this->send_buffer,length); } } - write(MQTTCONNECT,this->receive_buffer,length-MQTT_MAX_HEADER_SIZE); + write(MQTTCONNECT,this->send_buffer,length-MQTT_MAX_HEADER_SIZE); lastInActivity = lastOutActivity = millis(); - pingOutstanding = false; // uschindler: patched + pingOutstanding = false; while (!_client->available()) { unsigned long t = millis(); @@ -270,7 +284,6 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass if (len == 4) { if (receive_buffer[3] == 0) { lastInActivity = millis(); - // uschindler: removed unnecessary code _state = MQTT_CONNECTED; return true; } else { @@ -356,14 +369,14 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { } } - if (len < this->bufferSize) { + if (len < this->receiveBufferSize) { this->receive_buffer[len] = digit; len++; } idx++; } - if (!this->stream && idx > this->bufferSize) { + if (!this->stream && idx > this->receiveBufferSize) { len = 0; // This will cause the packet to be ignored. } return len; @@ -386,7 +399,7 @@ boolean PubSubClient::loop_read() { uint8_t type = receive_buffer[0]&0xF0; switch(type) { - case MQTTPUBLISH: + case MQTTPUBLISH: { if (callback) { const boolean msgId_present = (receive_buffer[0]&0x06) == MQTTQOS1; @@ -395,7 +408,7 @@ boolean PubSubClient::loop_read() { const uint16_t topic_offset = tl_offset+2; const uint16_t msgId_offset = topic_offset+tl; const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset; - if ((payload_offset) >= this->bufferSize) {return false;} + if ((payload_offset) >= this->receiveBufferSize) {return false;} if (len < payload_offset) {return false;} memmove(receive_buffer+topic_offset-1,receive_buffer+topic_offset,tl); /* move topic inside receive_buffer 1 byte to front */ receive_buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */ @@ -422,19 +435,18 @@ boolean PubSubClient::loop_read() { } } break; - } + } case MQTTPINGREQ: { if (_client->connected()) { receive_buffer[0] = MQTTPINGRESP; receive_buffer[1] = 0; - // uschindler: fix keepalive if (_client->write(receive_buffer,2) != 0) { lastOutActivity = t; } } break; - } + } case MQTTPINGRESP: { pingOutstanding = false; @@ -454,7 +466,7 @@ boolean PubSubClient::loop() { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); - pingOutstanding = false; // uschindler: patched + pingOutstanding = false; return false; } else { receive_buffer[0] = MQTTPINGREQ; @@ -462,7 +474,7 @@ boolean PubSubClient::loop() { if (_client->write(receive_buffer,2) != 0) { lastOutActivity = t; lastInActivity = t; - pingOutstanding = true; // uschindler: patched + pingOutstanding = true; } } } @@ -472,11 +484,11 @@ boolean PubSubClient::loop() { } boolean PubSubClient::publish(const char* topic, const char* payload) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->sendBufferSize) : 0,false); } boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) { - return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained); + return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->sendBufferSize) : 0,retained); } boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength) { @@ -485,7 +497,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { if (connected()) { - if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, this->bufferSize) + plength) { + if (this->sendBufferSize < MQTT_MAX_HEADER_SIZE + 2 + strnlen(topic, this->sendBufferSize) + plength) { // Too long return false; } @@ -510,7 +522,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t } boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) { - return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, this->bufferSize) : 0, retained); + return publish_P(topic, (const uint8_t*)payload, payload ? strnlen_P(payload, this->sendBufferSize) : 0, retained); } boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, boolean retained) { @@ -528,7 +540,7 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_ return false; } - tlen = strnlen(topic, this->bufferSize); + tlen = strnlen(topic, this->sendBufferSize); header = MQTTPUBLISH; if (retained) { @@ -631,7 +643,6 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) { result = (rc == bytesToWrite); bytesRemaining -= rc; writeBuf += rc; - // uschindler: fix keepalive if (rc != 0) { lastOutActivity = millis(); } @@ -649,14 +660,14 @@ boolean PubSubClient::subscribe(const char* topic) { } boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->sendBufferSize); if (topic == 0) { return false; } if (qos > 1) { return false; } - if (this->bufferSize < 9 + topicLength) { + if (this->sendBufferSize < 9 + topicLength) { // Too long return false; } @@ -677,11 +688,11 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) { } boolean PubSubClient::unsubscribe(const char* topic) { - size_t topicLength = strnlen(topic, this->bufferSize); + size_t topicLength = strnlen(topic, this->sendBufferSize); if (topic == 0) { return false; } - if (this->bufferSize < 9 + topicLength) { + if (this->sendBufferSize < 9 + topicLength) { // Too long return false; } @@ -707,7 +718,7 @@ void PubSubClient::disconnect() { _client->flush(); _client->stop(); lastInActivity = lastOutActivity = millis(); - pingOutstanding = false; // uschindler: fix keepalive + pingOutstanding = false; } uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) { @@ -780,34 +791,42 @@ int PubSubClient::state() { return this->_state; } -boolean PubSubClient::setBufferSize(uint16_t size) { - if (size == 0) { +boolean PubSubClient::setBufferSize(uint16_t receive_size, uint16_t send_size ) { + if (receive_size == 0 || send_size == 0) { // Cannot set it back to 0 return false; } - if (this->bufferSize == 0) { - this->receive_buffer = (uint8_t*)malloc(size); - this->send_buffer = (uint8_t*)malloc(size); + if (this->sendBufferSize == 0) { + this->send_buffer = (uint8_t*)malloc(send_size); } else { - uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, size); + uint8_t* newBuffer = (uint8_t*)realloc(this->send_buffer, send_size); if (newBuffer != NULL) { - this->receive_buffer = newBuffer; + this->send_buffer = newBuffer; } else { return false; } - newBuffer = (uint8_t*)realloc(this->send_buffer, size); + } + this->sendBufferSize = send_size; + if (this->receiveBufferSize == 0) { + this->receive_buffer = (uint8_t*)malloc(receive_size); + } else { + uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, receive_size); if (newBuffer != NULL) { - this->send_buffer = newBuffer; + this->receive_buffer = newBuffer; } else { return false; } } - this->bufferSize = size; + this->receiveBufferSize = receive_size; return (this->receive_buffer != NULL) && (this->send_buffer != NULL); } -uint16_t PubSubClient::getBufferSize() { - return this->bufferSize; +uint16_t PubSubClient::getSendBufferSize() { + return this->sendBufferSize; +} + +uint16_t PubSubClient::getReceiveBufferSize() { + return this->receiveBufferSize; } PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { diff --git a/BSB_LAN/src/PubSubClient/src/PubSubClient.h b/BSB_LAN/src/PubSubClient/src/PubSubClient.h index 5acfc5b4b..7bc05a79e 100644 --- a/BSB_LAN/src/PubSubClient/src/PubSubClient.h +++ b/BSB_LAN/src/PubSubClient/src/PubSubClient.h @@ -87,14 +87,16 @@ # define MQTT_CALLBACK_SIGNATURE void (*callback)(char*, uint8_t*, size_t) # endif -#define CHECK_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->bufferSize) > this->bufferSize) {_client->stop();return false;} +#define CHECK_SEND_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->sendBufferSize) > this->sendBufferSize) {_client->stop();return false;} +#define CHECK_RECEIVE_STRING_LENGTH(l,s) if (l+2+strnlen(s, this->receiveBufferSize) > this->receiveBufferSize) {_client->stop();return false;} class PubSubClient : public Print { private: Client* _client; uint8_t* receive_buffer; uint8_t* send_buffer; - uint16_t bufferSize; + uint16_t sendBufferSize; + uint16_t receiveBufferSize; uint16_t keepAlive; uint16_t socketTimeout; uint16_t nextMsgId; @@ -144,8 +146,9 @@ class PubSubClient : public Print { PubSubClient& setKeepAlive(uint16_t keepAlive); PubSubClient& setSocketTimeout(uint16_t timeout); - boolean setBufferSize(uint16_t size); - uint16_t getBufferSize(); + boolean setBufferSize(uint16_t receive_size, uint16_t send_size); + uint16_t getSendBufferSize(); + uint16_t getReceiveBufferSize(); boolean connect(const char* id); boolean connect(const char* id, const char* user, const char* pass);