Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Rodrigo Gomez Palacio committed Aug 6, 2024
1 parent 43c7d61 commit df351c8
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.onesignal.common.kafka

/**
* Used for read your write consistency when fetching In-App Messages. Corresponds to conditions
* for fetching after upserting models via HTTP CREATE.
*/
class CreateOffsetsIamCondition: ICondition {
override fun isMet(offsets: Map<String, Long?>): Boolean {
// one or the other is available
return offsets["userCreateOffset"] != null || offsets["subscriptionCreateOffset"] != null
}

override fun getNewestOffset(offsets: Map<String, Long?>): Long? {
return listOfNotNull(offsets["userCreateOffset"], offsets["subscriptionCreateOffset"]).maxOrNull()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.onesignal.common.kafka

interface ICondition {
fun isMet(offsets: Map<String, Long?>): Boolean
fun getNewestOffset(offsets: Map<String, Long?>): Long?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.onesignal.common

import com.onesignal.common.kafka.ICondition
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Manages offsets that function as read-your-write tokens for more accurate segment membership
* calculation.
*/
class KafkaOffsetManager {

private val mutex = Mutex()
private val offsets: MutableMap<String, Long?> = mutableMapOf()
private var offsetDeferred = CompletableDeferred<Long?>()

private val conditions: MutableList<Pair<ICondition, CompletableDeferred<Long?>>> =
mutableListOf()

/**
* Set method to update the offset based on the key.
*/
suspend fun setOffset(key: String, value: Long?) {
mutex.withLock {
offsets[key] = value
checkConditionsAndComplete()
}
}

/**
* Register a condition with its corresponding deferred action. Returns a deferred condition.
*/
fun registerCondition(condition: ICondition): CompletableDeferred<Long?> {
val deferred = CompletableDeferred<Long?>()
conditions.add(condition to deferred)
checkConditionsAndComplete()
return deferred
}

private fun checkConditionsAndComplete() {
for ((condition, deferred) in conditions) {
if (condition.isMet(offsets)) {
val newestOffset = condition.getNewestOffset(offsets)
if (!deferred.isCompleted) {
deferred.complete(newestOffset)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.onesignal.common.kafka

/**
* Used for read your write consistency when fetching In-App Messages. Corresponds to conditions
* for fetching after updating models via HTTP PATCH.
*/
class UpdateOffsetsIamCondition : ICondition {
override fun isMet(offsets: Map<String, Long?>): Boolean {
// both are available
return offsets["userUpdateOffset"] != null && offsets["subscriptionUpdateOffset"] != null
}

override fun getNewestOffset(offsets: Map<String, Long?>): Long? {
return listOfNotNull(offsets["userUpdateOffset"], offsets["subscriptionUpdateOffset"]).maxOrNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ internal class OperationRepo(
private val retryWaiter = WaiterWithValue<LoopWaiterMessage>()
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))
private val initialized = CompletableDeferred<Unit>()
private val initialized = CompletableDeferred<Unit>() // unit = void in Kt , sort of like await
// private val kafkaOffsetForUser: Long // move to offset manager class
// private val kafkaOffsetForSubscription: Long

override suspend fun awaitInitialized() {
initialized.await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal class SubscriptionBackendService(
return subscriptionJSON.getString("id")
}

// add a return value with payload including kafka offset
override suspend fun updateSubscription(
appId: String,
subscriptionId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ internal class SubscriptionOperationExecutor(
private val _configModelStore: ConfigModelStore,
private val _buildUserService: IRebuildUserService,
private val _newRecordState: NewRecordsState,
private val _kafkaOffsetManager: KafkaOffsetManager,
) : IOperationExecutor {
override val operations: List<String>
get() = listOf(CREATE_SUBSCRIPTION, UPDATE_SUBSCRIPTION, DELETE_SUBSCRIPTION, TRANSFER_SUBSCRIPTION)
Expand Down Expand Up @@ -101,13 +102,15 @@ internal class SubscriptionOperationExecutor(
AndroidUtils.getAppVersion(_applicationService.appContext),
)

// need to return some sort of payload here with the offset, create a new value
val backendSubscriptionId =
_subscriptionBackend.createSubscription(
createOperation.appId,
IdentityConstants.ONESIGNAL_ID,
createOperation.onesignalId,
subscription,
) ?: return ExecutionResponse(ExecutionResult.SUCCESS)
// set the kafka offset via the manager

// update the subscription model with the new ID, if it's still active.
val subscriptionModel = _subscriptionModelStore.get(createOperation.subscriptionId)
Expand Down Expand Up @@ -175,6 +178,7 @@ internal class SubscriptionOperationExecutor(
AndroidUtils.getAppVersion(_applicationService.appContext),
)

// add a return value here and do the same
_subscriptionBackend.updateSubscription(lastOperation.appId, lastOperation.subscriptionId, subscription)
} catch (ex: BackendException) {
val responseType = NetworkUtils.getResponseStatusType(ex.statusCode)
Expand Down Expand Up @@ -216,6 +220,7 @@ internal class SubscriptionOperationExecutor(
return ExecutionResponse(ExecutionResult.SUCCESS)
}

// TODO: whenever the end-user changes users, we need to add the kafka offset here, currently no code to handle the re-fetch IAMs
private suspend fun transferSubscription(startingOperation: TransferSubscriptionOperation): ExecutionResponse {
try {
_subscriptionBackend.transferSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ internal class UpdateUserOperationExecutor(
override val operations: List<String>
get() = listOf(SET_TAG, DELETE_TAG, SET_PROPERTY, TRACK_SESSION_START, TRACK_SESSION_END, TRACK_PURCHASE)

// by this point, operations have already been deemed executable since we don't check for that here
override suspend fun execute(ops: List<Operation>): ExecutionResponse {
Logging.log(LogLevel.DEBUG, "UpdateUserOperationExecutor(operation: $ops)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import android.app.AlertDialog
import com.onesignal.common.AndroidUtils
import com.onesignal.common.IDManager
import com.onesignal.common.JSONUtils
import com.onesignal.common.KafkaOffsetManager
import com.onesignal.common.events.EventProducer
import com.onesignal.common.exceptions.BackendException
import com.onesignal.common.modeling.ISingletonModelStoreChangeHandler
Expand Down Expand Up @@ -66,6 +67,7 @@ internal class InAppMessagesManager(
private val _lifecycle: IInAppLifecycleService,
private val _languageContext: ILanguageContext,
private val _time: ITime,
private val _kafkaOffsetManager: KafkaOffsetManager
) : IInAppMessagesManager,
IStartableService,
ISubscriptionChangedHandler,
Expand Down Expand Up @@ -199,6 +201,7 @@ internal class InAppMessagesManager(

override fun onSubscriptionRemoved(subscription: ISubscription) { }


override fun onSubscriptionChanged(
subscription: ISubscription,
args: ModelChangedArgs,
Expand Down Expand Up @@ -227,7 +230,7 @@ internal class InAppMessagesManager(
override fun onSessionEnded(duration: Long) { }

// called when a new push subscription is added, or the app id is updated, or a new session starts
private suspend fun fetchMessages() {
private suspend fun fetchMessages(kafkaOffset: Long) {
// We only want to fetch IAMs if we know the app is in the
// foreground, as we don't want to do this for background
// events (such as push received), wasting resources for
Expand All @@ -252,7 +255,7 @@ internal class InAppMessagesManager(
lastTimeFetchedIAMs = now
}

val newMessages = _backend.listInAppMessages(appId, subscriptionId)
val newMessages = _backend.listInAppMessages(appId, subscriptionId, offset)

if (newMessages != null) {
this.messages = newMessages as MutableList<InAppMessage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ internal interface IInAppBackendService {
*
* @param appId The ID of the application that the IAM will be retrieved from.
* @param subscriptionId The specific subscription within the [appId] the IAM will be delivered to.
* @param kafkaOffset OPTIONAL - used for read your write consistency
*
* @return The list of IAMs associated to the subscription, or null if the IAMs could not be retrieved.
*/
suspend fun listInAppMessages(
appId: String,
subscriptionId: String,
kafkaOffset: Long? = null
): List<InAppMessage>?

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,18 @@ internal class InAppBackendService(
override suspend fun listInAppMessages(
appId: String,
subscriptionId: String,
kafkaOffset: Long?
): List<InAppMessage>? {
// Construct the URL with or without the Kafka offset parameter
val url = buildString {
append("apps/$appId/subscriptions/$subscriptionId/iams")
kafkaOffset?.let {
append("?offset=$it")
}
}

// Retrieve any in app messages that might exist
val response = _httpClient.get("apps/$appId/subscriptions/$subscriptionId/iams")
val response = _httpClient.get(url)

if (response.isSuccess) {
val jsonResponse = JSONObject(response.payload)
Expand All @@ -34,7 +43,7 @@ internal class InAppBackendService(
// TODO: Outstanding question on whether we still want to cache this. Only used when
// hard start of the app, but within 30 seconds of it being killed (i.e. same session startup).
// Cache copy for quick cold starts
// _prefs.savedIAMs = iamMessagesAsJSON.toString()
// _prefs.savedIAMs = iamMessagesAsJSON.toString()
return _hydrator.hydrateIAMMessages(iamMessagesAsJSON)
}
}
Expand Down

0 comments on commit df351c8

Please sign in to comment.