Skip to content

Commit

Permalink
[TH2-5177] implemented the Load gRPC method.
Browse files Browse the repository at this point in the history
* fixed the catching java Error such as OutOfMemoryError problem
  • Loading branch information
Nikita-Smirnov-Exactpro committed Mar 19, 2024
1 parent ff5fb71 commit a31912b
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 44 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-read-db 0.7.0
# th2-read-db 0.8.0

The read-db is a component for extracting data from databases using JDBC technology. If database has JDBC driver the read can work with the database

Expand Down Expand Up @@ -339,6 +339,11 @@ spec:

## Changes

### 0.8.0

+ implemented the `Load` gRPC method.
+ fixed the catching java Error such as OutOfMemoryError problem

### 0.7.0

#### Feature:
Expand Down
2 changes: 1 addition & 1 deletion app/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.7.0
release_version=0.8.0
description=read-db component for extracting data from databases using JDBC technology
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
release_version=0.7.0
release_version=0.8.0
description=core part of read db to create an application with required JDBC drivers in the classpath
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@ import com.exactpro.th2.read.db.core.impl.DataBaseServiceImpl
import com.exactpro.th2.read.db.core.impl.BaseDataSourceProvider
import com.exactpro.th2.read.db.core.impl.BaseHashServiceImpl
import com.exactpro.th2.read.db.core.impl.BaseQueryProvider
import com.exactpro.th2.read.db.core.util.runCatchingException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -158,7 +159,7 @@ class DataBaseReader(

private fun TableRow.transferTo(sourceId: DataSourceId, vararg listeners: RowListener) {
listeners.forEach { listener ->
runCatching { listener.onRow(sourceId, this) }.onFailure {
runCatchingException { listener.onRow(sourceId, this) }.onFailure {
LOGGER.error(it) { "error during row processing by listener ${listener::class}" }
}
}
Expand Down Expand Up @@ -188,7 +189,7 @@ class DataBaseReader(

private inline fun forEach(action: UpdateListener.() -> Unit) {
listeners.forEach { listener ->
listener.runCatching(action).onFailure {
listener.runCatchingException(action).onFailure {
LOGGER.error(it) { "cannot execute action for ${listener::class}" }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package com.exactpro.th2.read.db.bootstrap

import com.exactpro.th2.read.db.core.util.runCatchingException
import mu.KotlinLogging
import java.time.Duration
import java.util.concurrent.BlockingQueue
Expand Down Expand Up @@ -85,7 +86,7 @@ class Saver<K, V>(
}
try {
measureNanoTime {
runCatching {
runCatchingException {
onData(data)
}.onSuccess {
LOGGER.trace { "Data stored" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ import com.exactpro.th2.read.db.core.TableRow
import com.exactpro.th2.read.db.core.exception.QueryExecutionException
import com.exactpro.th2.read.db.core.get
import com.exactpro.th2.read.db.core.util.getColumnValue
import com.exactpro.th2.read.db.core.util.runCatchingException
import com.exactpro.th2.read.db.core.util.set
import com.exactpro.th2.read.db.core.util.setCollection
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -64,7 +65,7 @@ class DataBaseServiceImpl(
LOGGER.trace { "Final execution parameters: $finalParameters" }
val resultSet: ResultSet = try {
beforeQueryHolders.forEach { holder ->
runCatching {
runCatchingException {
execute(connection, holder, finalParameters)
}.getOrElse {
throw QueryExecutionException(
Expand Down Expand Up @@ -93,7 +94,7 @@ class DataBaseServiceImpl(
reason?.also { LOGGER.warn(it) { "query $queryId completed with exception for $dataSourceId source" } }

afterQueryHolders.forEach { holder ->
runCatching {
runCatchingException {
execute(connection, holder, finalParameters)
}.getOrElse {
throw QueryExecutionException(
Expand All @@ -104,7 +105,7 @@ class DataBaseServiceImpl(
LOGGER.info { "Query $queryId for $dataSourceId connection was executed" }
} finally {
LOGGER.trace { "Closing connection to $dataSourceId" }
runCatching { connection.close() }.onFailure { LOGGER.error(it) { "cannot close connection for $dataSourceId" } }
runCatchingException { connection.close() }.onFailure { LOGGER.error(it) { "cannot close connection for $dataSourceId" } }
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/kotlin/com/exactpro/th2/read/db/core/util/Utils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.read.db.core.util


inline fun <R> runCatchingException(block: () -> R): Result<R> {
return try {
Result.success(block())
} catch (e: Exception) {
Result.failure(e)
}
}

inline fun <T, R> T.runCatchingException(block: T.() -> R): Result<R> {
return try {
Result.success(block())
} catch (e: Exception) {
Result.failure(e)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.message.toJavaDuration
import com.exactpro.th2.common.message.toJson
import com.exactpro.th2.common.utils.message.toTimestamp
import com.exactpro.th2.read.db.app.DataBaseReader
import com.exactpro.th2.read.db.app.ExecuteQueryRequest
import com.exactpro.th2.read.db.app.PullTableRequest
Expand All @@ -33,13 +34,15 @@ import com.exactpro.th2.read.db.core.TableRow
import com.exactpro.th2.read.db.core.UpdateListener
import com.exactpro.th2.read.db.grpc.DbPullRequest
import com.exactpro.th2.read.db.grpc.DbPullResponse
import com.exactpro.th2.read.db.grpc.QueryReport
import com.exactpro.th2.read.db.grpc.QueryRequest
import com.exactpro.th2.read.db.grpc.QueryResponse
import com.exactpro.th2.read.db.grpc.ReadDbGrpc
import com.exactpro.th2.read.db.grpc.StopPullingRequest
import com.exactpro.th2.read.db.impl.grpc.util.toGrpc
import com.exactpro.th2.read.db.impl.grpc.util.toModel
import com.google.protobuf.Empty
import com.google.protobuf.Message
import io.grpc.Status
import io.grpc.stub.StreamObserver
import mu.KotlinLogging
Expand All @@ -55,36 +58,21 @@ class DataBaseReaderGrpcServer(
private val onEvent: OnEvent,
) : ReadDbGrpc.ReadDbImplBase() {
override fun execute(request: QueryRequest, responseObserver: StreamObserver<QueryResponse>) {
val executionId = EXECUTION_COUNTER.incrementAndGet()
LOGGER.info { "Executing 'execute' grpc request ${request.toJson()}, execution id: $executionId" }
val event = Event.start()
.name("Execute '${request.queryId.id}' query ($executionId)")
.type("read-db.execute")
val parentEventId: EventID? = if (request.hasParentEventId()) request.parentEventId else null
try {
val associatedMessageType: String? = if (request.hasAssociatedMessageType()) request.associatedMessageType.name else null
val executeQueryRequest = request.run {
ExecuteQueryRequest(
sourceId.toModel(),
beforeQueryIdsList.map(ProtoQueryId::toModel),
queryId.toModel(),
afterQueryIdsList.map(ProtoQueryId::toModel),
parameters.toModel(),
)
execute("execute", request, responseObserver) { event, parentEventId, _ ->
GrpcExecuteListener(responseObserver, event) {
onEvent.accept(it, parentEventId)
}
event.bodyData(executeQueryRequest.toBody(executionId))
app.executeQuery(
executeQueryRequest,
GrpcResultListener(responseObserver, event) {
onEvent.accept(it, parentEventId)
}) { row ->
row.copy(associatedMessageType = associatedMessageType, executionId = executionId)
}
}

override fun load(request: QueryRequest, responseObserver: StreamObserver<QueryReport>) {
execute("load", request, responseObserver) { event, parentEventId, executionId ->
val report = QueryReport.newBuilder()
.setStart(event.startTimestamp.toTimestamp())
.setExecutionId(executionId)
GrpcLoadListener(responseObserver, report, event) {
onEvent.accept(it, parentEventId)
}
} catch (ex: Exception) {
LOGGER.error(ex) { "cannot execute request ${request.toJson()}" }
responseObserver.onError(Status.INTERNAL.withDescription(ex.message).asRuntimeException())
event.exception(ex, true)
.also { onEvent.accept(it, parentEventId) }
}
}

Expand Down Expand Up @@ -141,7 +129,45 @@ class DataBaseReaderGrpcServer(
}
}

private class GrpcResultListener(
private fun <T: Message> execute(
name: String,
request: QueryRequest,
responseObserver: StreamObserver<T>,
createListener: (Event, EventID?, Long) -> ResultListener
) {
val executionId = EXECUTION_COUNTER.incrementAndGet()
LOGGER.info { "Executing '$name' grpc request ${request.toJson()}, execution id: $executionId" }
val event = Event.start()
.name("Execute '${request.queryId.id}' query ($executionId)")
.type("read-db.execute")
val parentEventId: EventID? = if (request.hasParentEventId()) request.parentEventId else null
try {
val associatedMessageType: String? = if (request.hasAssociatedMessageType()) request.associatedMessageType.name else null
val executeQueryRequest = request.run {
ExecuteQueryRequest(
sourceId.toModel(),
beforeQueryIdsList.map(ProtoQueryId::toModel),
queryId.toModel(),
afterQueryIdsList.map(ProtoQueryId::toModel),
parameters.toModel(),
)
}
event.bodyData(executeQueryRequest.toBody(executionId))
app.executeQuery(
executeQueryRequest,
createListener(event, parentEventId, executionId),
) { row ->
row.copy(associatedMessageType = associatedMessageType, executionId = executionId)
}
} catch (ex: Exception) {
LOGGER.error(ex) { "cannot execute request ${request.toJson()}" }
responseObserver.onError(Status.INTERNAL.withDescription(ex.message).asRuntimeException())
event.exception(ex, true)
.also { onEvent.accept(it, parentEventId) }
}
}

private class GrpcExecuteListener(
private val observer: StreamObserver<QueryResponse>,
private val event: Event,
private val onEvent: (Event) -> Unit,
Expand Down Expand Up @@ -172,6 +198,38 @@ class DataBaseReaderGrpcServer(
}
}

private class GrpcLoadListener(
private val observer: StreamObserver<QueryReport>,
private val report: QueryReport.Builder,
private val event: Event,
private val onEvent: (Event) -> Unit,
) : ResultListener {
private val counter = AtomicLong()

override fun onRow(sourceId: DataSourceId, row: TableRow) {
check(report.executionId == row.executionId) {
"'Execution id' isn't equal to '${report.executionId}', row: $row"
}
counter.incrementAndGet()
}

override fun onError(error: Throwable) {
observer.onError(error)
event.endTimestamp()
.exception(error, true)
.also(onEvent)
}

override fun onComplete() {
event.endTimestamp()
report.setRowsReceived(counter.get())
.setEnd(event.endTimestamp.toTimestamp())
observer.onNext(report.build())
observer.onCompleted()
onEvent(event)
}
}

private fun ExecuteQueryRequest.toBody(executionId: Long) = ExecuteBodyData(
executionId,
getSourceCfg(sourceId),
Expand Down
Loading

0 comments on commit a31912b

Please sign in to comment.