Skip to content

Commit

Permalink
Do not mutate headers in transaction session
Browse files Browse the repository at this point in the history
Resolve: #542
  • Loading branch information
joffrey-bion committed Jul 28, 2024
1 parent 02b85b2 commit 137f35d
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.hildan.krossbow.stomp

import org.hildan.krossbow.stomp.frame.FrameBody
import org.hildan.krossbow.stomp.headers.StompSendHeaders
import org.hildan.krossbow.stomp.headers.*

/**
* A special [StompSession] that automatically fills the `transaction` header (if absent) for all SEND, ACK, and NACK
Expand All @@ -13,8 +13,12 @@ internal class TransactionStompSession(
) : StompSession by session {

override suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt? {
headers.transaction = headers.transaction ?: transactionId
return session.send(headers, body)
val newHeaders = if (HeaderNames.TRANSACTION in headers) {
headers
} else {
headers.copy { this[HeaderNames.TRANSACTION] = transactionId }
}
return session.send(newHeaders, body)
}

override suspend fun ack(ackId: String, transactionId: String?) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.hildan.krossbow.stomp

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import org.hildan.krossbow.test.*
import kotlin.test.*

@OptIn(ExperimentalCoroutinesApi::class)
class StompSessionExtensionsTests {

@Test
Expand All @@ -33,80 +31,14 @@ class StompSessionExtensionsTests {
wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}
assertFailsWith(NoSuchElementException::class) {
stompSession.use {
emptyList<Int>().first() // this fails
}
}
assertTrue(wsSession.closed, "The web socket session should be closed after the use block")
}

@Test
fun withTransaction_commitsIfSuccessful() = runTest {
val (wsSession, stompSession) = connectWithMocks()
class MyTestException : Exception()

launch {
stompSession.withTransaction { id ->
sendText("/dest", "Transaction: $id")
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame.headers.transaction)

val commitFrame = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId, commitFrame.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_abortsInCaseOfException() = runTest {
val (wsSession, stompSession) = connectWithMocks()

launch {
runCatching {
stompSession.withTransaction { id ->
sendText("/dest", "Transaction: $id")
emptyList<Int>().first() // this fails
}
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame.headers.transaction)
assertEquals("Transaction: $transactionId", sendFrame.bodyAsText)

val abortFrame = wsSession.awaitAbortFrameAndSimulateCompletion()
assertEquals(transactionId, abortFrame.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_abortsWithSuppressedException() = runTest {
class MyAbortException : Exception("exception during abort")

val stompSession = object : NoopStompSession() {
override suspend fun abort(transactionId: String) {
throw MyAbortException()
}
}

val ex = assertFailsWith(NoSuchElementException::class) {
stompSession.withTransaction {
emptyList<Int>().first() // this fails
assertFailsWith(MyTestException::class) {
stompSession.use {
throw MyTestException()
}
}
assertEquals(1, ex.suppressedExceptions.size)
assertIs<MyAbortException>(ex.suppressedExceptions.single())
assertTrue(wsSession.closed, "The web socket session should be closed after the use block")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package org.hildan.krossbow.stomp

import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import org.hildan.krossbow.stomp.frame.*
import org.hildan.krossbow.stomp.headers.*
import org.hildan.krossbow.test.*
import kotlin.test.*

class StompSessionTransactionsTests {

private class MyTestException : Exception("exception during transaction")

@Test
fun withTransaction_commitsIfSuccessful() = runTest {
val (wsSession, stompSession) = connectWithMocks()

launch {
stompSession.withTransaction { id ->
sendText("/dest", "Transaction: $id")
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame.headers.transaction)

val commitFrame = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId, commitFrame.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_abortsInCaseOfException() = runTest {
val (wsSession, stompSession) = connectWithMocks()

launch {
runCatching {
stompSession.withTransaction { id ->
sendText("/dest", "Transaction: $id")
throw MyTestException()
}
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame.headers.transaction)
assertEquals("Transaction: $transactionId", sendFrame.bodyAsText)

val abortFrame = wsSession.awaitAbortFrameAndSimulateCompletion()
assertEquals(transactionId, abortFrame.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_abortsWithSuppressedException() = runTest {
class MyAbortException : Exception("exception during abort")

val stompSession = object : NoopStompSession() {
override suspend fun abort(transactionId: String) {
throw MyAbortException()
}
}

val ex = assertFailsWith(MyTestException::class) {
stompSession.withTransaction {
throw MyTestException()
}
}
assertEquals(1, ex.suppressedExceptions.size)
assertIs<MyAbortException>(ex.suppressedExceptions.single())
}

@Test
fun withTransaction_nested() = runTest {
val (wsSession, stompSession) = connectWithMocks()

launch {
stompSession.withTransaction { id1 ->
sendText("/dest", "Transaction: $id1")
stompSession.withTransaction { id2 ->
sendText("/dest2", "Transaction: $id2")
}
}
stompSession.disconnect()
}
val beginFrameT1 = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId1 = beginFrameT1.headers.transaction

val sendFrameT1 = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId1, sendFrameT1.headers.transaction)
assertEquals("Transaction: $transactionId1", sendFrameT1.bodyAsText)

val beginFrameT2 = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId2 = beginFrameT2.headers.transaction

val sendFrameT2 = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId2, sendFrameT2.headers.transaction)
assertEquals("Transaction: $transactionId2", sendFrameT2.bodyAsText)

val commitFrameT2 = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId2, commitFrameT2.headers.transaction)

val commitFrameT1 = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId1, commitFrameT1.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_respectExistingTransactionHeader() = runTest {
val (wsSession, stompSession) = connectWithMocks()

launch {
stompSession.withTransaction { id ->
val headers = StompSendHeaders("/dest", transaction = "override")
send(headers, FrameBody.Text("Transaction: override"))
sendText("/dest", "Transaction: $id")
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals("override", sendFrame.headers.transaction)
assertEquals("Transaction: override", sendFrame.bodyAsText)

val sendFrame2 = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame2.headers.transaction)
assertEquals("Transaction: $transactionId", sendFrame2.bodyAsText)

val commitFrame = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId, commitFrame.headers.transaction)

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}

@Test
fun withTransaction_doesNotMutateHeaders() = runTest {
val (wsSession, stompSession) = connectWithMocks()

val initialHeaders = StompSendHeaders("/dest")
launch {
stompSession.withTransaction { id ->
send(initialHeaders, FrameBody.Text("Transaction: $id"))
}
stompSession.disconnect()
}
val beginFrame = wsSession.awaitBeginFrameAndSimulateCompletion()
val transactionId = beginFrame.headers.transaction

val sendFrame = wsSession.awaitSendFrameAndSimulateCompletion()
assertEquals(transactionId, sendFrame.headers.transaction)
assertEquals("Transaction: $transactionId", sendFrame.bodyAsText)

val commitFrame = wsSession.awaitCommitFrameAndSimulateCompletion()
assertEquals(transactionId, commitFrame.headers.transaction)

assertNull(initialHeaders.transaction, "'transaction' header should not be mutated in original headers.")

wsSession.awaitDisconnectFrameAndSimulateCompletion()
wsSession.expectClose()
}
}

0 comments on commit 137f35d

Please sign in to comment.