Skip to content

Commit

Permalink
Feature/auto reconnect ws (#166)
Browse files Browse the repository at this point in the history
* - 3 states of connectivity implementation and usage for trusted node with reconnection on connectivity recovered

* - reconnect services on reconnection for a resilient experience
  • Loading branch information
rodvar authored Jan 17, 2025
1 parent 330de78 commit 04f3528
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ClientApplicationBootstrapFacade(
// } else {
setProgress(0.5f)
setState("Connecting to Trusted Node..")
if (!trustedNodeService.isConnected()) {
if (!trustedNodeService.isConnected) {
try {
trustedNodeService.connect()
setState("bootstrap.connectedToTrustedNode".i18n())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import io.ktor.websocket.close
import io.ktor.websocket.readText
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -36,40 +39,77 @@ class WebSocketClient(
val port: Int
) : Logging {

companion object {
const val DELAY_TO_RECONNECT = 3000L
}

private val webSocketUrl: String = "ws://$host:$port/websocket"
private var session: DefaultClientWebSocketSession? = null
var isConnected = false
private val webSocketEventObservers = ConcurrentMap<String, WebSocketEventObserver>()
private val requestResponseHandlers = mutableMapOf<String, RequestResponseHandler>()
private var connectionReady = CompletableDeferred<Boolean>()
private val requestResponseHandlersMutex = Mutex()

suspend fun connect() {
private val backgroundScope = CoroutineScope(BackgroundDispatcher)

enum class WebSockectClientStatus {
DISCONNECTED,
CONNECTING,
CONNECTED
}

val _connected = MutableStateFlow(WebSockectClientStatus.DISCONNECTED)
val connected: StateFlow<WebSockectClientStatus> = _connected

fun isConnected(): Boolean = connected.value == WebSockectClientStatus.CONNECTED

suspend fun connect(isTest: Boolean = false) {
log.i("Connecting to websocket at: $webSocketUrl")
if (!isConnected) {
if (connected.value != WebSockectClientStatus.CONNECTED) {
try {
_connected.value = WebSockectClientStatus.CONNECTING
session = httpClient.webSocketSession { url(webSocketUrl) }
if (session?.isActive == true) {
isConnected = true
_connected.value = WebSockectClientStatus.CONNECTED
CoroutineScope(BackgroundDispatcher).launch { startListening() }
connectionReady.complete(true)
if (!isTest) {
log.d { "Websocket connected" }
}
}
} catch (e: Exception) {
log.e("Connecting websocket failed", e)
throw e
_connected.value = WebSockectClientStatus.DISCONNECTED
if (isTest) {
throw e
} else {
reconnect()
}
}
}
}

suspend fun disconnect() {
suspend fun disconnect(isTest: Boolean = false) {
requestResponseHandlersMutex.withLock {
requestResponseHandlers.values.forEach { it.dispose() }
requestResponseHandlers.clear()
}

session?.close()
session = null
isConnected = false
_connected.value = WebSockectClientStatus.DISCONNECTED
if (!isTest) {
log.d { "WS disconnected" }
}
}

private fun reconnect() {
backgroundScope.launch {
log.d { "Launching reconnect" }
disconnect()
delay(DELAY_TO_RECONNECT) // Delay before reconnecting
connect() // Try reconnecting recursively
}
}

// Blocking request until we get the associated response
Expand Down Expand Up @@ -133,21 +173,29 @@ class WebSocketClient(

private suspend fun startListening() {
session?.let { session ->
for (frame in session.incoming) {
if (frame is Frame.Text) {
val message = frame.readText()
//todo add input validation
log.d { "Received raw text $message" }
val webSocketMessage: WebSocketMessage =
json.decodeFromString(WebSocketMessage.serializer(), message)
log.i { "Received webSocketMessage $webSocketMessage" }
if (webSocketMessage is WebSocketResponse) {
onWebSocketResponse(webSocketMessage)
} else if (webSocketMessage is WebSocketEvent) {
onWebSocketEvent(webSocketMessage)
try {
for (frame in session.incoming) {
if (frame is Frame.Text) {
val message = frame.readText()
//todo add input validation
log.d { "Received raw text $message" }
val webSocketMessage: WebSocketMessage =
json.decodeFromString(WebSocketMessage.serializer(), message)
log.i { "Received webSocketMessage $webSocketMessage" }
if (webSocketMessage is WebSocketResponse) {
onWebSocketResponse(webSocketMessage)
} else if (webSocketMessage is WebSocketEvent) {
onWebSocketEvent(webSocketMessage)
}
}
}
} catch (e: Exception) {
log.e(e) { "Exception ocurred whilst listening for WS messages - triggering reconnect" }
} finally {
log.d { "Not listining for WS messages anymore" }
reconnect()
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class WebSocketClientProvider(
}
// only update if there was actually a change
if (currentClient == null || currentClient!!.host != host || currentClient!!.port != port) {
if (currentClient?.isConnected == true) {
if (currentClient?.isConnected() == true) {
currentClient?.disconnect()
}
log.d { "Websocket client updated with url $host:$port" }
Expand All @@ -70,13 +70,13 @@ class WebSocketClientProvider(
val url = "ws://$host:$port"
return try {
// if connection is refused, catch will execute returning false
client.connect()
return client.isConnected
client.connect(true)
return client.isConnected()
} catch (e: Exception) {
log.e("Error testing connection to $url: ${e.message}")
false
} finally {
client.disconnect() // Ensure the client is closed to free resources
client.disconnect(true) // Ensure the client is closed to free resources
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network.bisq.mobile.domain.service

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.domain.data.BackgroundDispatcher
import network.bisq.mobile.domain.utils.Logging
Expand All @@ -12,14 +13,16 @@ import network.bisq.mobile.domain.utils.Logging
class TrustedNodeService(private val webSocketClientProvider: WebSocketClientProvider) : Logging {
private val backgroundScope = CoroutineScope(BackgroundDispatcher)

// TODO websocketClient.isConnected should be observable so that we emit
// events when disconnected and UI can react
fun isConnected() = webSocketClientProvider.get().isConnected
var isConnected: Boolean = false
var observingConnectivity = false

/**
* Connects to the trusted node, throws an exception if connection fails
*/
suspend fun connect() {
if (!observingConnectivity) {
observeConnectivity()
}
runCatching {
webSocketClientProvider.get().connect()
}.onSuccess {
Expand All @@ -33,4 +36,14 @@ class TrustedNodeService(private val webSocketClientProvider: WebSocketClientPro
suspend fun disconnect() {
// TODO
}

private fun observeConnectivity() {
backgroundScope.launch {
webSocketClientProvider.get().connected.collect {
log.d { "connectivity status changed - connected = $it" }
isConnected = webSocketClientProvider.get().isConnected()
}
}
observingConnectivity = true
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package network.bisq.mobile.client

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.domain.UrlLauncher
import network.bisq.mobile.domain.service.bootstrap.ApplicationBootstrapFacade
import network.bisq.mobile.domain.service.controller.NotificationServiceController
Expand All @@ -11,6 +14,7 @@ import network.bisq.mobile.presentation.MainPresenter

class ClientMainPresenter(
notificationServiceController: NotificationServiceController,
private val webSocketClientProvider: WebSocketClientProvider,
private val applicationBootstrapFacade: ApplicationBootstrapFacade,
private val offersServiceFacade: OffersServiceFacade,
private val marketPriceServiceFacade: MarketPriceServiceFacade,
Expand All @@ -21,7 +25,35 @@ class ClientMainPresenter(

override fun onViewAttached() {
super.onViewAttached()
activateServices()
listenForConnectivity()
}

override fun onViewUnattaching() {
// For Tor we might want to leave it running while in background to avoid delay of re-connect
// when going into foreground again.
// coroutineScope.launch { webSocketClient.disconnect() }
deactivateServices()
super.onViewUnattaching()
}

private fun listenForConnectivity() {
backgroundScope.launch {
webSocketClientProvider.get().connected.collect {
if (webSocketClientProvider.get().isConnected()) {
log.d { "connectivity status changed to $it - reconnecting services" }
reactiveServices()
}
}
}
}

private fun reactiveServices() {
deactivateServices()
activateServices()
}

private fun activateServices() {
runCatching {
applicationBootstrapFacade.activate()
offersServiceFacade.activate()
Expand All @@ -35,16 +67,11 @@ class ClientMainPresenter(
}
}

override fun onViewUnattaching() {
// For Tor we might want to leave it running while in background to avoid delay of re-connect
// when going into foreground again.
// coroutineScope.launch { webSocketClient.disconnect() }

private fun deactivateServices() {
applicationBootstrapFacade.deactivate()
offersServiceFacade.deactivate()
marketPriceServiceFacade.deactivate()
tradesServiceFacade.deactivate()
settingsServiceFacade.deactivate()
super.onViewUnattaching()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.koin.dsl.bind
import org.koin.dsl.module

val presentationModule = module {
single<MainPresenter> { ClientMainPresenter(get(), get(), get(), get(), get(), get(), get()) } bind AppPresenter::class
single<MainPresenter> { ClientMainPresenter(get(), get(), get(), get(), get(), get(), get(), get()) } bind AppPresenter::class

single<TopBarPresenter> { TopBarPresenter(get(), get()) } bind ITopBarPresenter::class

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class InterruptedTradePresenter(
var reportToMediatorButtonVisible: Boolean = false

override fun onViewAttached() {
super.onViewAttached()
require(tradesServiceFacade.selectedTrade.value != null)
val openTradeItemModel = tradesServiceFacade.selectedTrade.value!!
presenterScope.launch {
Expand All @@ -46,6 +47,7 @@ class InterruptedTradePresenter(

override fun onViewUnattaching() {
reset()
super.onViewUnattaching()
}

private fun tradeStateChanged(state: BisqEasyTradeStateEnum?) {
Expand Down Expand Up @@ -138,9 +140,10 @@ class InterruptedTradePresenter(

fun onCloseTrade() {
backgroundScope.launch {
require(selectedTrade.value != null)
tradesServiceFacade.closeTrade()
navigateToTab(Routes.TabOpenTradeList)
if (selectedTrade.value != null) {
tradesServiceFacade.closeTrade()
}
navigateBack()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ class TradeDetailsHeaderPresenter(
}
}

fun closeWorkflow() {
// doing a shark navigateBack causes white broken UI screen
navigateToTab(Routes.TabOpenTradeList)
private fun closeWorkflow() {
// Do not navigate, close button on the same screen does it
// navigateBack()
}

private fun reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class TradeFlowPresenter(
_tradePhaseState.value = TradePhaseState.INIT
isSeller = false
isMainChain = false
super.onViewUnattaching()
}

private fun tradeStateChanged(state: BisqEasyTradeStateEnum?) {
Expand Down

0 comments on commit 04f3528

Please sign in to comment.