Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce reconnection handling for wireless mbus bridges. #89

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.connectorio.addons.binding.handler.GenericBridgeHandlerBase;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.KeyStore;
Expand All @@ -30,10 +34,8 @@
import org.connectorio.addons.binding.wmbus.internal.security.MutableKeyStore;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
import org.openhab.core.thing.binding.ThingHandler;
import org.openhab.core.thing.binding.ThingHandlerService;
import org.openhab.core.types.Command;
import org.openmuc.jmbus.wireless.WMBusConnection;
Expand All @@ -58,19 +60,45 @@ public WMBusBridgeBaseHandler(Bridge bridge, DiscoveryCoordinator discoveryCoord
@Override
public void initialize() {
DefaultWMBusMessageDispatcher messageDispatcher = new DefaultWMBusMessageDispatcher();
connection = initializeConnection(messageDispatcher);
connection.whenComplete((connection, error) -> {
BiConsumer<WMBusConnection, Throwable> callback = (connection, error) -> {
if (error != null) {
logger.error("Could not open serial connection", error);
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Could not open port");
return;
}
updateStatus(ThingStatus.ONLINE);
dispatcher.complete(messageDispatcher);
};

open(callback, messageDispatcher, new Consumer<>() {
@Override
public void accept(IOException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Error in receiver thread " + e.getMessage());
WMBusConnection connection = WMBusBridgeBaseHandler.this.connection.getNow(null);
if (connection != null) {
try {
connection.close();
} catch (IOException ex) {
logger.warn("Failed to close connection", e);
}
}

open(callback, messageDispatcher, this);
}
});
}

protected abstract CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher);
private void open(BiConsumer<WMBusConnection, Throwable> callback, WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
connection = initializeConnection(dispatcher, reconnect);
connection.whenComplete(callback);
}
}, 5L, TimeUnit.SECONDS);
}

protected abstract CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect);

@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.SerialBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
Expand All @@ -39,7 +41,8 @@ public WMBusSerialBridgeHandler(Bridge bridge, SerialPortManager serialPortManag
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher,
Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);

Expand All @@ -60,9 +63,7 @@ protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDi
@Override
public void run() {
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
SerialTransportBuilder builder = new SerialTransportBuilder(serialPortManager, config.manufacturer, listener, config.serialPort);
builder.setMode(config.mode);
builder.setSerialPortConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.SerialBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
import org.connectorio.addons.binding.wmbus.internal.transport.WMBusMessageListenerAdapter;
import org.openhab.core.io.transport.serial.SerialPortManager;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
Expand All @@ -38,7 +39,8 @@ public WMBusSerialJrxtxBridgeHandler(Bridge bridge, DiscoveryCoordinator discove
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher,
Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);

Expand All @@ -59,9 +61,7 @@ protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDi
@Override
public void run() {
SerialBridgeConfig config = getConfigAs(SerialBridgeConfig.class);
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
WMBusSerialBuilder builder = new WMBusSerialBuilder(
config.manufacturer, listener, config.serialPort
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.connectorio.addons.binding.wmbus.internal.handler;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.connectorio.addons.binding.wmbus.dispatch.WMBusMessageDispatcher;
import org.connectorio.addons.binding.wmbus.internal.config.TcpBridgeConfig;
import org.connectorio.addons.binding.wmbus.internal.discovery.DiscoveryCoordinator;
Expand All @@ -37,7 +39,7 @@ public WMBusTcpBridgeHandler(Bridge bridge, DiscoveryCoordinator discoveryCoordi
}

@Override
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher) {
protected CompletableFuture<WMBusConnection> initializeConnection(WMBusMessageDispatcher dispatcher, Consumer<IOException> reconnect) {
CompletableFuture<WMBusConnection> connection = new CompletableFuture<>();
scheduler.execute(new Runnable() {
@Override
Expand All @@ -56,9 +58,7 @@ public void run() {
return;
}

WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, (message) -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, message);
});
WMBusMessageListenerAdapter listener = new WMBusMessageListenerAdapter(dispatcher, reconnect);
WMBusTcpBuilder builder = new WMBusTcpBuilder(
config.manufacturer, listener, config.hostAddress.trim(), config.port
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class WMBusMessageListenerAdapter implements WMBusListener {

private final Logger logger = LoggerFactory.getLogger(WMBusMessageListenerAdapter.class);
private final WMBusMessageDispatcher dispatcher;
private final Consumer<String> offlineCallback;
private final Consumer<IOException> offlineCallback;

public WMBusMessageListenerAdapter(WMBusMessageDispatcher dispatcher, Consumer<String> offlineCallback) {
public WMBusMessageListenerAdapter(WMBusMessageDispatcher dispatcher, Consumer<IOException> offlineCallback) {
this.dispatcher = dispatcher;
this.offlineCallback = offlineCallback;
}
Expand All @@ -53,7 +53,7 @@ public void discardedBytes(byte[] bytes) {
public void stoppedListening(IOException cause) {
logger.error("Error while reading serial data stream", cause);

offlineCallback.accept("Error in receiver thread " + cause.getMessage());
offlineCallback.accept(cause);
}

}
Loading