Skip to content

Commit

Permalink
Put the ably-java client library behind an interface
Browse files Browse the repository at this point in the history
The DefaultAbly class already has quite a bit of logic (e.g. retries) and I
intend to add further logic to it. It would be good to be able to unit test
this class. It sounds like we haven’t done this so far because we didn’t want
to try and mock the AblyRealtime class. But I think we need to be able to unit
test DefaultAbly (which I’ve created issue #869 for), so I’m putting the
ably-java library behind a set of AblySDK* interfaces. This will allow us to
provide mock implementations of the Ably client library when testing
DefaultAbly.

(I’ve taken this approach from the asset-tracking-swift codebase.)

One thing worth noticing is the new use of the !! operator when extracting the
`reason` from a connection that’s in a failed state. This looks like it’s
introducing a new unsafe behaviour, but it’s actually just making explicit a
previous assumption, namely that when a connection is in a failed state its
`reason` will be non-null. The previous version of the code - in which the
connection’s `reason` was of platform type ErrorInfo! – would equally have
thrown a NullPointerException if this assumption were violated.
  • Loading branch information
lawrence-forooghian committed Dec 28, 2022
1 parent 63c5295 commit 36a33be
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 29 deletions.
56 changes: 29 additions & 27 deletions common/src/main/java/com/ably/tracking/common/Ably.kt
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,12 @@ class DefaultAbly
* @throws ConnectionException if something goes wrong during Ably SDK initialization.
*/
constructor(
realtimeFactory: AblySDKRealtimeFactory,
connectionConfiguration: ConnectionConfiguration,
private val logHandler: LogHandler?,
) : Ably {
private val gson = Gson()
private val ably: AblyRealtime
private val ably: AblySDKRealtime
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val TAG = createLoggingTag(this)

Expand All @@ -275,7 +276,7 @@ constructor(
)
}
}
ably = AblyRealtime(clientOptions)
ably = realtimeFactory.create(clientOptions)
} catch (exception: AblyException) {
throw exception.errorInfo.toTrackingException().also {
logHandler?.w("$TAG Failed to create an Ably instance", it)
Expand Down Expand Up @@ -314,7 +315,7 @@ constructor(
* a new auth token is requested and the operation is retried once more.
* @throws ConnectionException if something goes wrong or the retry fails
*/
private suspend fun enterChannelPresence(channel: Channel, presenceData: PresenceData) {
private suspend fun enterChannelPresence(channel: AblySDKRealtime.Channel, presenceData: PresenceData) {
try {
channel.enterPresenceSuspending(presenceData)
} catch (connectionException: ConnectionException) {
Expand Down Expand Up @@ -439,14 +440,14 @@ constructor(
}
}

private suspend fun tryDisconnectChannel(channelToRemove: Channel, presenceData: PresenceData) =
private suspend fun tryDisconnectChannel(channelToRemove: AblySDKRealtime.Channel, presenceData: PresenceData) =
try {
disconnectChannel(channelToRemove, presenceData)
} catch (exception: Exception) {
// no-op
}

private suspend fun disconnectChannel(channelToRemove: Channel, presenceData: PresenceData) {
private suspend fun disconnectChannel(channelToRemove: AblySDKRealtime.Channel, presenceData: PresenceData) {
retryChannelOperationIfConnectionResumeFails(channelToRemove) { channel ->
leavePresence(channel, presenceData)
channel.unsubscribe()
Expand All @@ -455,7 +456,7 @@ constructor(
}
}

private suspend fun failChannel(channel: Channel, presenceData: PresenceData, errorInfo: ErrorInfo) {
private suspend fun failChannel(channel: AblySDKRealtime.Channel, presenceData: PresenceData, errorInfo: ErrorInfo) {
leavePresence(channel, presenceData)
channel.unsubscribe()
channel.presence.unsubscribe()
Expand All @@ -464,7 +465,7 @@ constructor(
ably.channels.release(channel.name)
}

private suspend fun leavePresence(channel: Channel, presenceData: PresenceData) {
private suspend fun leavePresence(channel: AblySDKRealtime.Channel, presenceData: PresenceData) {
suspendCancellableCoroutine<Unit> { continuation ->
try {
channel.presence.leave(
Expand Down Expand Up @@ -549,7 +550,7 @@ constructor(
}
}

private fun sendMessage(channel: Channel, message: Message?, callback: (Result<Unit>) -> Unit) {
private fun sendMessage(channel: AblySDKRealtime.Channel, message: Message?, callback: (Result<Unit>) -> Unit) {
scope.launch {
try {
retryChannelOperationIfConnectionResumeFails(channel) {
Expand All @@ -563,7 +564,7 @@ constructor(
}
}

private suspend fun sendMessage(channel: Channel, message: Message?) {
private suspend fun sendMessage(channel: AblySDKRealtime.Channel, message: Message?) {
suspendCancellableCoroutine<Unit> { continuation ->
try {
channel.publish(
Expand Down Expand Up @@ -638,7 +639,7 @@ constructor(
}

private fun processReceivedLocationUpdateMessage(
channel: Channel,
channel: AblySDKRealtime.Channel,
presenceData: PresenceData,
message: Message,
listener: (LocationUpdate) -> Unit,
Expand Down Expand Up @@ -698,7 +699,7 @@ constructor(
}
}

private fun emitAllCurrentMessagesFromPresence(channel: Channel, listener: (PresenceMessage) -> Unit) {
private fun emitAllCurrentMessagesFromPresence(channel: AblySDKRealtime.Channel, listener: (PresenceMessage) -> Unit) {
getAllCurrentMessagesFromPresence(channel).forEach { presenceMessage ->
// Each message is launched in a fire-and-forget manner to not block this method on the listener() call
scope.launch { listener(presenceMessage) }
Expand All @@ -722,7 +723,7 @@ constructor(
/**
* Warning: This method might block the current thread due to the presence.get(true) call.
*/
private fun getAllCurrentMessagesFromPresence(channel: Channel): List<PresenceMessage> =
private fun getAllCurrentMessagesFromPresence(channel: AblySDKRealtime.Channel): List<PresenceMessage> =
channel.presence.get(true).mapNotNull { presenceMessage ->
presenceMessage.toTracking(gson).also {
if (it == null) {
Expand Down Expand Up @@ -751,7 +752,7 @@ constructor(
}
}

private suspend fun updatePresenceData(channel: Channel, presenceData: PresenceData) {
private suspend fun updatePresenceData(channel: AblySDKRealtime.Channel, presenceData: PresenceData) {
suspendCancellableCoroutine<Unit> { continuation ->
try {
channel.presence.update(
Expand All @@ -777,7 +778,7 @@ constructor(
}
}

private fun getChannelIfExists(trackableId: String): Channel? {
private fun getChannelIfExists(trackableId: String): AblySDKRealtime.Channel? {
val channelName = trackableId.toChannelName()
return if (ably.channels.containsKey(channelName)) {
ably.channels.get(channelName)
Expand All @@ -799,14 +800,14 @@ constructor(
}

/**
* Closes [AblyRealtime] and waits until it's either closed or failed.
* Closes [AblySDKRealtime] and waits until it's either closed or failed.
* If the connection is already closed it returns immediately.
* If the connection is already failed it returns immediately as closing a failed connection should be a no-op
* according to the Ably features spec (https://sdk.ably.com/builds/ably/specification/main/features/#state-conditions-and-operations).
*
* @throws ConnectionException if the [AblyRealtime] state changes to [ConnectionState.failed].
* @throws ConnectionException if the [AblySDKRealtime] state changes to [ConnectionState.failed].
*/
private suspend fun AblyRealtime.closeSuspending() {
private suspend fun AblySDKRealtime.closeSuspending() {
if (connection.state.isClosed() || connection.state.isFailed()) {
return
}
Expand All @@ -833,8 +834,8 @@ constructor(
* the second time the exception is rethrown no matter if it was the "connection resume" exception or not.
*/
private suspend fun retryChannelOperationIfConnectionResumeFails(
channel: Channel,
operation: suspend (Channel) -> Unit
channel: AblySDKRealtime.Channel,
operation: suspend (AblySDKRealtime.Channel) -> Unit
) {
try {
if (channel.state == ChannelState.suspended) {
Expand Down Expand Up @@ -869,7 +870,7 @@ constructor(
* If the [channel] state already is the [ChannelState.attached] state it does not wait and returns immediately.
* If this doesn't happen during the next [timeoutInMilliseconds] milliseconds, then an exception is thrown.
*/
private suspend fun waitForChannelReconnection(channel: Channel, timeoutInMilliseconds: Long = 10_000L) {
private suspend fun waitForChannelReconnection(channel: AblySDKRealtime.Channel, timeoutInMilliseconds: Long = 10_000L) {
if (channel.state.isConnected()) {
return
}
Expand Down Expand Up @@ -908,7 +909,7 @@ constructor(
* Enter the presence of the [Channel] and waits for this operation to complete.
* If something goes wrong then it throws a [ConnectionException].
*/
private suspend fun Channel.enterPresenceSuspending(presenceData: PresenceData) {
private suspend fun AblySDKRealtime.Channel.enterPresenceSuspending(presenceData: PresenceData) {
suspendCancellableCoroutine<Unit> { continuation ->
try {
presence.enter(
Expand All @@ -935,10 +936,10 @@ constructor(
}

/**
* Attaches the [Channel] and waits for this operation to complete.
* Attaches the [AblySDKRealtime.Channel] and waits for this operation to complete.
* If something goes wrong then it throws a [ConnectionException].
*/
private suspend fun Channel.attachSuspending() {
private suspend fun AblySDKRealtime.Channel.attachSuspending() {
suspendCancellableCoroutine<Unit> { continuation ->
try {
attach(object : CompletionListener {
Expand All @@ -961,7 +962,7 @@ constructor(
}
}

private fun Channel.isDetachedOrFailed(): Boolean =
private fun AblySDKRealtime.Channel.isDetachedOrFailed(): Boolean =
state == ChannelState.detached || state == ChannelState.failed

private fun createMalformedMessageErrorInfo(): ErrorInfo = ErrorInfo("Received a malformed message", 100_001, 400)
Expand All @@ -987,19 +988,20 @@ constructor(
}

/**
* A suspending version of the [AblyRealtime.connect] method. It will begin connecting and wait until it's connected.
* A suspending version of the [AblySDKRealtime.connect] method. It will begin connecting and wait until it's connected.
* If the connection enters the "failed" state it will throw a [ConnectionException].
* If the operation doesn't complete in [timeoutInMilliseconds] it will throw a [ConnectionException].
* If the instance is already connected it will finish immediately.
* If the connection is already failed it throws a [ConnectionException].
*
* @throws ConnectionException if something goes wrong.
*/
private suspend fun AblyRealtime.connectSuspending(timeoutInMilliseconds: Long = 10_000L) {
private suspend fun AblySDKRealtime.connectSuspending(timeoutInMilliseconds: Long = 10_000L) {
if (connection.state.isConnected()) {
return
} else if (connection.state.isFailed()) {
throw connection.reason.toTrackingException()
// We expect connection.reason to be non-null if the connection is in a failed state
throw connection.reason!!.toTrackingException()
}
try {
withTimeout(timeoutInMilliseconds) {
Expand Down
81 changes: 81 additions & 0 deletions common/src/main/java/com/ably/tracking/common/AblySDK.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.ably.tracking.common

import io.ably.lib.realtime.Channel.MessageListener
import io.ably.lib.realtime.ChannelState
import io.ably.lib.realtime.ChannelStateListener
import io.ably.lib.realtime.CompletionListener
import io.ably.lib.realtime.ConnectionState
import io.ably.lib.realtime.ConnectionStateListener
import io.ably.lib.realtime.Presence.PresenceListener
import io.ably.lib.rest.Auth.RenewAuthResult
import io.ably.lib.types.ChannelOptions
import io.ably.lib.types.ClientOptions
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.Message
import io.ably.lib.types.PresenceMessage

interface AblySDKRealtimeFactory {
fun create(clientOptions: ClientOptions): AblySDKRealtime
}

/**
* An set of interfaces that represent the parts of the Ably client library which are used by the Ably Asset Tracking SDKs.
*
* These exist so that we can mock out the Ably client library when testing the [DefaultAbly] class. The interfaces here are more or less a direct copy of the corresponding `ably-java` classes, and are expected to exhibit the same behaviour. They do remove some details that would be overkill here, such as class hierarchies and interface conformances, instead opting to add these inherited properties and methods directly on the interfaces.
*
* The aim here is _not_ to remove the usage of all `ably-java` types; we continue using types from that codebase if they are just interfaces or are easy to construct. We just intend to use interfaces to replace `ably-java` classes that would be hard to mock.
*
* `ably-java` doesn’t currently have nullability annotations (see [issue #639](https://github.com/ably/ably-java/issues/639) there), so when writing these interfaces we need to make our own judgements about nullability, based on our knowledge of the behaviour of the Ably client libraries.
*/
interface AblySDKRealtime {
val auth: Auth
val connection: Connection
val channels: Channels

fun connect()
fun close()

interface Auth {
fun renewAuth(result: RenewAuthResult)
}

interface Channel {
val name: String
val state: ChannelState
val presence: Presence

fun attach(listener: CompletionListener)
fun publish(message: Message?, listener: CompletionListener)
fun on(listener: ChannelStateListener)
fun off(listener: ChannelStateListener)
fun off()
fun subscribe(name: String, listener: MessageListener)
fun unsubscribe()
fun setConnectionFailed(reason: ErrorInfo)
}

interface Presence {
fun subscribe(listener: PresenceListener)
fun unsubscribe()
fun get(wait: Boolean): Array<PresenceMessage>
fun enter(data: Any, listener: CompletionListener)
fun update(data: Any, listener: CompletionListener)
fun leave(data: Any, listener: CompletionListener)
}

interface Channels {
fun get(channelName: String, channelOptions: ChannelOptions?): Channel
fun get(channelName: String): Channel
fun entrySet(): Iterable<Map.Entry<String, Channel>>
fun containsKey(key: Any): Boolean
fun release(channelName: String)
}

interface Connection {
val state: ConnectionState
val reason: ErrorInfo?

fun on(listener: ConnectionStateListener)
fun off(listener: ConnectionStateListener)
}
}
Loading

0 comments on commit 36a33be

Please sign in to comment.