Skip to content

Commit

Permalink
Merge pull request #1944 from OneSignal/deadlock-between-model-and-su…
Browse files Browse the repository at this point in the history
…bscriber

Deadlock and concurrent modification related to Model.data
  • Loading branch information
jinliu9508 authored Jan 10, 2024
2 parents 52987ab + 4f0d3d1 commit 42d19e0
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ open class Model(
* specified, must also specify [_parentModel]
*/
private val _parentProperty: String? = null,
private val initializationLock: Any = Any(),
) : IEventNotifier<IModelChangedHandler> {
/**
* A unique identifier for this model.
Expand Down Expand Up @@ -80,33 +79,43 @@ open class Model(
* @param jsonObject The [JSONObject] to initialize this model from.
*/
fun initializeFromJson(jsonObject: JSONObject) {
data.clear()
for (property in jsonObject.keys()) {
val jsonValue = jsonObject.get(property)
if (jsonValue is JSONObject) {
val childModel = createModelForProperty(property, jsonValue)
if (childModel != null) {
data[property] = childModel
}
} else if (jsonValue is JSONArray) {
val listOfItems = createListForProperty(property, jsonValue)
if (listOfItems != null) {
data[property] = listOfItems
}
} else {
val method = this.javaClass.methods.firstOrNull { it.returnType != Void::class.java && it.name.contains(property, true) }

if (method == null) {
data[property] = jsonObject.get(property)
synchronized(data) {
data.clear()
for (property in jsonObject.keys()) {
val jsonValue = jsonObject.get(property)
if (jsonValue is JSONObject) {
val childModel = createModelForProperty(property, jsonValue)
if (childModel != null) {
data[property] = childModel
}
} else if (jsonValue is JSONArray) {
val listOfItems = createListForProperty(property, jsonValue)
if (listOfItems != null) {
data[property] = listOfItems
}
} else {
when (method.returnType) {
Double::class.java, java.lang.Double::class.java -> data[property] = jsonObject.getDouble(property)
Long::class.java, java.lang.Long::class.java -> data[property] = jsonObject.getLong(property)
Float::class.java, java.lang.Float::class.java -> data[property] = jsonObject.getDouble(property).toFloat()
Int::class.java, java.lang.Integer::class.java -> data[property] = jsonObject.getInt(property)
Boolean::class.java, java.lang.Boolean::class.java -> data[property] = jsonObject.getBoolean(property)
String::class.java, java.lang.String::class.java -> data[property] = jsonObject.getString(property)
else -> data[property] = jsonObject.get(property)
val method =
this.javaClass.methods.firstOrNull {
it.returnType !=
Void::class.java &&
it.name.contains(
property,
true,
)
}

if (method == null) {
data[property] = jsonObject.get(property)
} else {
when (method.returnType) {
Double::class.java, java.lang.Double::class.java -> data[property] = jsonObject.getDouble(property)
Long::class.java, java.lang.Long::class.java -> data[property] = jsonObject.getLong(property)
Float::class.java, java.lang.Float::class.java -> data[property] = jsonObject.getDouble(property).toFloat()
Int::class.java, java.lang.Integer::class.java -> data[property] = jsonObject.getInt(property)
Boolean::class.java, java.lang.Boolean::class.java -> data[property] = jsonObject.getBoolean(property)
String::class.java, java.lang.String::class.java -> data[property] = jsonObject.getString(property)
else -> data[property] = jsonObject.get(property)
}
}
}
}
Expand Down Expand Up @@ -140,7 +149,7 @@ open class Model(
newData[::id.name] = id
}

synchronized(initializationLock) {
synchronized(data) {
data.clear()
data.putAll(newData)
}
Expand Down Expand Up @@ -436,9 +445,8 @@ open class Model(
tag: String = ModelChangeTags.NORMAL,
forceChange: Boolean = false,
) {
val oldValue = data[name]
synchronized(data) {
val oldValue = data[name]

if (oldValue == value && !forceChange) {
return
}
Expand All @@ -448,9 +456,8 @@ open class Model(
} else if (data.containsKey(name)) {
data.remove(name)
}

notifyChanged(name, name, tag, oldValue, value)
}
notifyChanged(name, name, tag, oldValue, value)
}

/**
Expand Down Expand Up @@ -671,13 +678,14 @@ open class Model(
* @return The resulting [JSONObject].
*/
fun toJSON(): JSONObject {
synchronized(initializationLock) {
val jsonObject = JSONObject()
val jsonObject = JSONObject()
synchronized(data) {
for (kvp in data) {
when (val value = kvp.value) {
is Model -> {
jsonObject.put(kvp.key, value.toJSON())
}

is List<*> -> {
val jsonArray = JSONArray()
for (arrayItem in value) {
Expand All @@ -689,13 +697,14 @@ open class Model(
}
jsonObject.put(kvp.key, jsonArray)
}

else -> {
jsonObject.put(kvp.key, value)
}
}
}
return jsonObject
}
return jsonObject
}

override fun subscribe(handler: IModelChangedHandler) = changeNotifier.subscribe(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,8 @@ abstract class ModelStore<TModel>(
args: ModelChangedArgs,
tag: String,
) {
synchronized(models) {
persist()

changeSubscription.fire { it.onModelUpdated(args, tag) }
}
persist()
changeSubscription.fire { it.onModelUpdated(args, tag) }
}

override fun replaceAll(
Expand All @@ -108,17 +105,16 @@ abstract class ModelStore<TModel>(
}

override fun clear(tag: String) {
val localList = models.toList()
synchronized(models) {
val localList = models.toList()
models.clear()

persist()

for (item in localList) {
// no longer listen for changes to this model
item.unsubscribe(this)
changeSubscription.fire { it.onModelRemoved(item, tag) }
}
}
for (item in localList) {
// no longer listen for changes to this model
item.unsubscribe(this)
changeSubscription.fire { it.onModelRemoved(item, tag) }
}
}

Expand All @@ -138,9 +134,8 @@ abstract class ModelStore<TModel>(
model.subscribe(this)

persist()

changeSubscription.fire { it.onModelAdded(model, tag) }
}
changeSubscription.fire { it.onModelAdded(model, tag) }
}

private fun removeItem(
Expand All @@ -154,9 +149,8 @@ abstract class ModelStore<TModel>(
model.unsubscribe(this)

persist()

changeSubscription.fire { it.onModelRemoved(model, tag) }
}
changeSubscription.fire { it.onModelRemoved(model, tag) }
}

protected fun load() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.onesignal.common

import com.onesignal.common.events.EventProducer
import com.onesignal.common.modeling.IModelChangedHandler
import com.onesignal.common.modeling.IModelStoreChangeHandler
import com.onesignal.common.modeling.ModelChangedArgs
import com.onesignal.mocks.MockHelper
import com.onesignal.mocks.MockPreferencesService
import com.onesignal.user.internal.subscriptions.SubscriptionModel
import com.onesignal.user.internal.subscriptions.SubscriptionModelStore
import io.kotest.core.spec.style.FunSpec
import io.kotest.runner.junit4.KotestTestRunner
import junit.framework.TestCase
import org.junit.runner.RunWith

@RunWith(KotestTestRunner::class)
class ModelingTests : FunSpec({

test("Deadlock related to Model.setOptAnyProperty") {
// Given
val modelStore = MockHelper.configModelStore()
val model = modelStore.model

val t1 =
Thread {
// acquire "model.data", then trigger the onChanged event
model.setOptAnyProperty("key1", "value1")
}

val t2 =
Thread {
// acquire "model.initializationLock", then wait for "model.data" to be released
model.initializeFromModel("", MockHelper.configModelStore().model)
}

model.subscribe(
object : IModelChangedHandler {
// will be executed in t1
override fun onChanged(
args: ModelChangedArgs,
tag: String,
) {
Thread.sleep(200)
// waiting for "model.initializationLock"
model.toJSON()
}
},
)

t1.start()
t2.start()

// Set 1s timeout for t2 to complete the task
t2.join(1000)

// verify if the thread has been successfully terminated
TestCase.assertEquals(Thread.State.TERMINATED, t2.state)
}

test("Deadlock related to ModelSstore add() or remove()") {
// Given
val modelStore = SubscriptionModelStore(MockPreferencesService())
val event = EventProducer<SubscriptionModel>()
val oldSubscriptionModel = SubscriptionModel()
val newSubscriptionModel = SubscriptionModel()
oldSubscriptionModel.id = "oldModel"
newSubscriptionModel.id = "newModel"
modelStore.add(oldSubscriptionModel)

val t1 =
Thread {
// acquire "ModelStore.models", then trigger the onChanged event
System.out.println("1")
modelStore.add(newSubscriptionModel)
}

val t2 =
Thread {
System.out.println("2")
// acquire "model.data", then wait for "ModelStore.models"
newSubscriptionModel.toJSON()
}

modelStore.subscribe(
object : IModelStoreChangeHandler<SubscriptionModel> {
override fun onModelAdded(
model: SubscriptionModel,
tag: String,
) {
// waiting for "model.data"
model.initializeFromModel("", MockHelper.configModelStore().model)
}

override fun onModelUpdated(
args: ModelChangedArgs,
tag: String,
) {
// left empty in purpose
}

override fun onModelRemoved(
model: SubscriptionModel,
tag: String,
) {
// left empty in purpose
}
},
)

t1.start()
t2.start()

// Set 1s timeout for t2 to complete the task
t2.join(1000)

// verify if the thread has been successfully terminated
TestCase.assertEquals(Thread.State.TERMINATED, t2.state)
}
})

0 comments on commit 42d19e0

Please sign in to comment.