-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[TH2-3249] #266
base: message_pipeline_refactoring_cradle_2_20
Are you sure you want to change the base?
[TH2-3249] #266
Conversation
import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService | ||
import com.fasterxml.jackson.databind.DeserializationFeature | ||
import com.fasterxml.jackson.databind.ObjectMapper | ||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper | ||
import io.ktor.http.* | ||
|
||
@Suppress("MemberVisibilityCanBePrivate") | ||
class Context( | ||
class Context constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
Executors.newFixedThreadPool(configuration.codecRequestThreadPool.value.toInt()).asCoroutineDispatcher() | ||
) | ||
|
||
private val callbackScope = CoroutineScope( | ||
Executors.newFixedThreadPool(configuration.codecCallbackThreadPool.value.toInt()).asCoroutineDispatcher() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need separate thread pools for these scopes? Maybe they can share a single one?
val firstSequence = | ||
decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence | ||
val lastSequence = | ||
decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence | ||
val stream = | ||
"${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val firstSequence = | |
decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.sequence | |
val lastSequence = | |
decodedBatch.groupsList?.lastOrNull()?.messagesList?.lastOrNull()?.sequence | |
val stream = | |
"${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.sessionAlias}:${decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message?.direction.toString()}" | |
val firstMessage = decodedBatch.groupsList.firstOrNull()?.messagesList?.firstOrNull()?.message | |
val lastMessage = decodedBatch.groupsList.lastOrNull()?.messagesList?.lastOrNull()?.message | |
val firstSequence = firstMessage?.sequence | |
val lastSequence = lastMessage?.sequence | |
val stream = "${firstMessage?.sessionAlias}:${firstMessage?.direction.toString()}" |
val sessionAlias = | ||
request.protobufRawMessageBatch.groupsList | ||
.first().messagesList | ||
.first().rawMessage.metadata.id.connectionId.sessionAlias |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val sessionAlias = | |
request.protobufRawMessageBatch.groupsList | |
.first().messagesList | |
.first().rawMessage.metadata.id.connectionId.sessionAlias | |
val sessionAlias = request.protobufRawMessageBatch.groupsList[0] | |
.messagesList[0] | |
.rawMessage | |
.sessionAlias |
val firstSequence = | ||
request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence | ||
val lastSequence = | ||
request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence | ||
val stream = | ||
"${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Also why do we use first
/last
here, but firstOrNull
/lastOrNull
above?
val firstSequence = | ||
request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sequence | ||
val lastSequence = | ||
request.protobufRawMessageBatch.groupsList.last()?.messagesList?.last()?.rawMessage?.sequence | ||
val stream = | ||
"${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.sessionAlias}:${request.protobufRawMessageBatch.groupsList.first()?.messagesList?.first()?.rawMessage?.direction.toString()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I think someone from maintainers should look at this too
|
||
suspend fun sendToCodec(request: CodecBatchRequest): CodecBatchResponse { | ||
return withContext(requestSenderScope.coroutineContext) { | ||
while (pendingRequests.keys.size > maximumPendingRequests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while (pendingRequests.keys.size > maximumPendingRequests) { | |
while (pendingRequests.size > maximumPendingRequests) { |
codec gRPC interface