Skip to content

Commit

Permalink
[TH2-5149] corrected after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jan 15, 2024
1 parent ee16296 commit 88abaa3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ internal fun setupApp(

val handler = DataBaseReaderGrpcServer(
reader,
rootEventId,
{ cfg.dataSources[it] ?: error("'$it' data source isn't found in custom config") },
{ cfg.queries[it] ?: error("'$it' query isn't found in custom config") },
eventBatcher::onEvent
)
) { event, parentEventId ->
eventBatcher.onEvent(
event.toProto(parentEventId ?: rootEventId)
)
}

val server = factory.grpcRouter.startServer(handler)
.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,12 +46,10 @@ import mu.KotlinLogging
import java.time.Instant
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import com.exactpro.th2.common.grpc.Event as ProtoEvent
import com.exactpro.th2.read.db.grpc.QueryId as ProtoQueryId

class DataBaseReaderGrpcServer(
private val app: DataBaseReader,
private val rootEventId: EventID,
private val getSourceCfg: (DataSourceId) -> DataSourceConfiguration,
private val getQueryCfg: (QueryId) -> QueryConfiguration,
private val onEvent: OnEvent,
Expand All @@ -62,7 +60,7 @@ class DataBaseReaderGrpcServer(
val event = Event.start()
.name("Execute '${request.queryId.id}' query ($executionId)")
.type("read-db.execute")
val parentEventId = if (request.hasParentEventId()) request.parentEventId else rootEventId
val parentEventId: EventID? = if (request.hasParentEventId()) request.parentEventId else null
try {
val associatedMessageType: String? = if (request.hasAssociatedMessageType()) request.associatedMessageType.name else null
val executeQueryRequest = request.run {
Expand All @@ -75,15 +73,18 @@ class DataBaseReaderGrpcServer(
)
}
event.bodyData(executeQueryRequest.toBody(executionId))
app.executeQuery(executeQueryRequest, GrpcResultListener(responseObserver, event, parentEventId)) { row ->
app.executeQuery(
executeQueryRequest,
GrpcResultListener(responseObserver, event) {
onEvent.accept(it, parentEventId)
}) { row ->
row.copy(associatedMessageType = associatedMessageType, executionId = executionId)
}
} catch (ex: Exception) {
LOGGER.error(ex) { "cannot execute request ${request.toJson()}" }
responseObserver.onError(Status.INTERNAL.withDescription(ex.message).asRuntimeException())
event.exception(ex, true)
.toProto(parentEventId)
.also(onEvent::accept)
.also { onEvent.accept(it, parentEventId) }
}
}

Expand Down Expand Up @@ -140,10 +141,10 @@ class DataBaseReaderGrpcServer(
}
}

inner class GrpcResultListener(
private class GrpcResultListener(
private val observer: StreamObserver<QueryResponse>,
private val event: Event,
private val parentEventId: EventID,
private val onEvent: (Event) -> Unit,
) : ResultListener {
override fun onRow(sourceId: DataSourceId, row: TableRow) {
requireNotNull(row.executionId) {
Expand All @@ -161,15 +162,13 @@ class DataBaseReaderGrpcServer(
observer.onError(error)
event.endTimestamp()
.exception(error, true)
.toProto(parentEventId)
.also(onEvent::accept)
.also(onEvent)
}

override fun onComplete() {
observer.onCompleted()
event.endTimestamp()
.toProto(parentEventId)
.also(onEvent::accept)
.also(onEvent)
}
}

Expand Down Expand Up @@ -200,5 +199,5 @@ class DataBaseReaderGrpcServer(
}

fun interface OnEvent {
fun accept(event: ProtoEvent)
fun accept(event: Event, parentEventID: EventID?)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package com.exactpro.th2.read.db.impl.grpc

import com.exactpro.th2.common.grpc.Event
import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.EventStatus
import com.exactpro.th2.common.message.toTimestamp
Expand Down Expand Up @@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.isNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
import org.mockito.kotlin.times
Expand Down Expand Up @@ -178,7 +179,6 @@ class DataBaseReaderGrpcServerIntegrationTest {

val service = DataBaseReaderGrpcServer(
reader,
rootEventId,
{ cfg.dataSources[it] ?: error("'$it' data source isn't found in custom config") },
{ cfg.queries[it] ?: error("'$it' query isn't found in custom config") },
onEvent,
Expand Down Expand Up @@ -245,15 +245,15 @@ class DataBaseReaderGrpcServerIntegrationTest {

private fun OnEvent.assertCaptured(cfg: DataBaseReaderConfiguration, request: QueryRequest, timeout: Long): Long {
val captor = argumentCaptor<Event> { }
verify(this, timeout(timeout)).accept(captor.capture())
verify(this, timeout(timeout)).accept(captor.capture(), isNull())

return captor.firstValue.let { event ->
return captor.firstValue.toProto(rootEventId).let { event ->
expectThat(event) {
get { name }.contains(Regex("Execute '${request.queryId.id}' query \\(.*\\)"))
get { type }.isEqualTo("read-db.execute")
get { status }.isEqualTo(EventStatus.SUCCESS)
get { parentId }.isSameInstanceAs(rootEventId)
get { body.parseSingle<ExecuteBodyData>() }.apply {
get { body.parseSingle<ExecuteBodyData>() }.and {
get { dataSource }.isEqualTo(cfg.dataSources[request.sourceId.toModel()]?.copy(password = null))
get { beforeQueries }.isEqualTo(request.beforeQueryIdsList.map { requireNotNull(cfg.queries[it.toModel()]) })
get { query }.isEqualTo(cfg.queries[request.queryId.toModel()])
Expand Down Expand Up @@ -321,27 +321,19 @@ class DataBaseReaderGrpcServerIntegrationTest {
private class GrpcTestHolder(
service: BindableService
) : AutoCloseable {
private val inProcessServer: Server

private val inProcessChannel: ManagedChannel

val stub: ReadDbGrpc.ReadDbBlockingStub

init {
inProcessServer = InProcessServerBuilder
.forName(SERVER_NAME)
.addService(service)
.directExecutor()
.build()
.also(Server::start)

inProcessChannel = InProcessChannelBuilder
.forName(SERVER_NAME)
.directExecutor()
.build()

stub = ReadDbGrpc.newBlockingStub(inProcessChannel)
}
private val inProcessServer: Server = InProcessServerBuilder
.forName(SERVER_NAME)
.addService(service)
.directExecutor()
.build()
.also(Server::start)

private val inProcessChannel: ManagedChannel = InProcessChannelBuilder
.forName(SERVER_NAME)
.directExecutor()
.build()

val stub: ReadDbGrpc.ReadDbBlockingStub = ReadDbGrpc.newBlockingStub(inProcessChannel)

operator fun component1(): ReadDbGrpc.ReadDbBlockingStub = stub

Expand Down

0 comments on commit 88abaa3

Please sign in to comment.