Skip to content

Commit

Permalink
[TH2-5149] gRPC server implementation skips columns with null value a…
Browse files Browse the repository at this point in the history
…fter fix.
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jan 15, 2024
1 parent 88abaa3 commit e8eeec9
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 16 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The list of queries that can be executed by read-db.
It might contain parameters in the following format: `${<name>[:<type>]}`.
The **type** part can be omitted if the type is `varchar`.
Examples: `${id:integer}`, `${registration_time:timestamp}`, `${first_name}`
[Types](https://docs.oracle.com/javase/8/docs/api/java/sql/JDBCType.html): bit, tinyint, smallint, integer, bigint, float, real, double, numeric, decimal, char, varchar, longvarchar, date, time, timestamp, binary, varbinary, longvarbinary, null, other, java_object, distinct, struct, array, blob, clob, ref, datalink, boolean, rowid, nchar, nvarchar, longnvarchar, nclob, sqlxml, ref_cursor, time_with_timezone, timestamp_with_timezone
+ defaultParameters - the default values for parameters. They will be used if the parameter was not specified in the request
+ messageType - the message type that should be associated with this query.
If it is set the read-db will set a property `th2.csv.override_message_type` with specified value
Expand Down Expand Up @@ -344,6 +345,10 @@ spec:

+ gRPC execute method generates unique id for each execution and puts it into related event and messages.

#### Fix:

+ gRPC Execute method doesn't respond rows with null values. gRPC server implementation skips columns with null value after fix.

### 0.6.0

#### Feature:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,10 +74,6 @@ internal fun TableRow.toTransportMessage(dataSourceId: DataSourceId, properties:
return builder
}

internal fun TableRow.toMap(): Map<String, String?> {
return columns.mapValues { it.value?.toStringValue() }
}

internal fun TableRow.toCsvBody(): ByteArray {
return ByteArrayOutputStream().use {
CSVWriterBuilder(it.writer())
Expand Down Expand Up @@ -132,7 +128,7 @@ internal fun MessageSearchResponse.toTableRow(): TableRow {
}
}

private fun Any.toStringValue(): String = when (this) {
internal fun Any.toStringValue(): String = when (this) {
is BigDecimal -> stripTrailingZeros().toPlainString()
is Double -> toBigDecimal().toStringValue()
is Float -> toBigDecimal().toStringValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.exactpro.th2.common.message.toJson
import com.exactpro.th2.read.db.app.DataBaseReader
import com.exactpro.th2.read.db.app.ExecuteQueryRequest
import com.exactpro.th2.read.db.app.PullTableRequest
import com.exactpro.th2.read.db.bootstrap.toMap
import com.exactpro.th2.read.db.bootstrap.toStringValue
import com.exactpro.th2.read.db.core.DataSourceConfiguration
import com.exactpro.th2.read.db.core.DataSourceId
import com.exactpro.th2.read.db.core.QueryConfiguration
Expand Down Expand Up @@ -152,7 +152,7 @@ class DataBaseReaderGrpcServer(
}
observer.onNext(
QueryResponse.newBuilder()
.putAllRow(row.toMap())
.putRows(row)
.setExecutionId(row.executionId)
.build()
)
Expand Down Expand Up @@ -195,6 +195,15 @@ class DataBaseReaderGrpcServer(
private val EXECUTION_COUNTER = AtomicLong(
Instant.now().run { epochSecond * TimeUnit.SECONDS.toNanos(1) + nano }
)

private fun QueryResponse.Builder.putRows(tableRow: TableRow) = apply {
tableRow.columns.forEach { (key, value) ->
// null values should be skipped because they aren't supported by Protobuf
if (value != null) {
putRow(key, value.toStringValue())
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
Person("person$it", Instant.now().truncatedTo(ChronoUnit.DAYS), "test-data-$it".toByteArray())
}

private data class Person(val name: String, val birthday: Instant, val data: ByteArray) {
private data class Person(val name: String, val birthday: Instant, val data: ByteArray?) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
Expand All @@ -102,15 +102,18 @@ class DataBaseReaderGrpcServerIntegrationTest {

if (name != other.name) return false
if (birthday != other.birthday) return false
if (!data.contentEquals(other.data)) return false
if (data != null) {
if (other.data == null) return false
if (!data.contentEquals(other.data)) return false
} else if (other.data != null) return false

return true
}

override fun hashCode(): Int {
var result = name.hashCode()
result = 31 * result + birthday.hashCode()
result = 31 * result + data.contentHashCode()
result = 31 * result + (data?.contentHashCode() ?: 0)
return result
}
}
Expand All @@ -134,7 +137,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
}

@Test
fun `test execute gRPC request`() {
fun `execute gRPC request test`() {
val genericUpdateListener = mock<UpdateListener> { }
val genericRowListener = mock<RowListener> { }
val messageLoader = mock<MessageLoader> { }
Expand Down Expand Up @@ -203,6 +206,79 @@ class DataBaseReaderGrpcServerIntegrationTest {
}
}

@Test
fun `null value in response test`() {
val genericUpdateListener = mock<UpdateListener> { }
val genericRowListener = mock<RowListener> { }
val messageLoader = mock<MessageLoader> { }
val onEvent = mock<OnEvent> { }

val person = Person("null-test", Instant.now().truncatedTo(ChronoUnit.DAYS), null)
execute {
insertData(listOf(person))
}

val sourceId = "persons"
val queryId = "select"
val cfg = DataBaseReaderConfiguration(
dataSources = mapOf(
DataSourceId(sourceId) to DataSourceConfiguration(
mysql.jdbcUrl,
mysql.username,
mysql.password,
)
),
queries = mapOf(
QueryId(queryId) to QueryConfiguration(
"SELECT * FROM test_data.person WHERE name = \${name}",
mapOf(
"name" to listOf(person.name),
)
),
)
)


val request = QueryRequest.newBuilder().apply {
sourceIdBuilder.setId(sourceId)
queryIdBuilder.setId(queryId)
}.build()

runBlocking(Dispatchers.IO) {
val reader = DataBaseReader.createDataBaseReader(
cfg,
this,
genericUpdateListener,
genericRowListener,
messageLoader,
)

val service = DataBaseReaderGrpcServer(
reader,
{ cfg.dataSources[it] ?: error("'$it' data source isn't found in custom config") },
{ cfg.queries[it] ?: error("'$it' query isn't found in custom config") },
onEvent,
)

val responses: List<QueryResponse> = GrpcTestHolder(service).use { (stub) ->
withCancellation {
stub.execute(request).asSequence().toList()
}
}

val expectedData = listOf(person)

verifyNoInteractions(genericUpdateListener)
verifyNoInteractions(messageLoader)

val executionIds = hashSetOf<Long>()
genericRowListener.assertCaptured(expectedData).also(executionIds::add)
responses.assert(expectedData).also(executionIds::add)
onEvent.assertCaptured(cfg, request, 1_000).also(executionIds::add)
assertEquals(1, executionIds.size, "execution ids mismatch $executionIds")
}
}

private fun RowListener.assertCaptured(persons: List<Person>): Long {
val captor = argumentCaptor<TableRow>()
verify(this, times(persons.size)).onRow(any(), captor.capture())
Expand All @@ -212,7 +288,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
Person(
checkNotNull(it.columns["name"]).toString(),
(checkNotNull(it.columns["birthday"]) as LocalDate).atStartOfDay().toInstant(ZoneOffset.UTC),
(checkNotNull(it.columns["data"]) as ByteArray),
it.columns["data"] as ByteArray?,
)
}.also {
expectThat(it).containsExactly(persons)
Expand All @@ -230,7 +306,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
Person(
response.getRowOrThrow("name").toString(),
LocalDate.parse(response.getRowOrThrow("birthday")).atStartOfDay().toInstant(ZoneOffset.UTC),
decodeHexDump(response.getRowOrThrow("data")),
if (response.containsRow("data")) decodeHexDump(response.getRowOrThrow("data")) else null,
)
}.also {
expectThat(it).containsExactly(persons)
Expand Down Expand Up @@ -282,7 +358,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
`id` INT NOT NULL AUTO_INCREMENT,
`name` VARCHAR(45) NOT NULL,
`birthday` DATE NOT NULL,
`data` BLOB NOT NULL,
`data` BLOB,
PRIMARY KEY (`id`));
""".trimIndent()
)
Expand Down Expand Up @@ -312,7 +388,7 @@ class DataBaseReaderGrpcServerIntegrationTest {
for (person in persons) {
prepareStatement.setString(1, person.name)
prepareStatement.setDate(2, Date(person.birthday.toEpochMilli()))
prepareStatement.setBlob(3, ByteArrayInputStream(person.data))
prepareStatement.setBlob(3, person.data?.let { ByteArrayInputStream(it) })
prepareStatement.addBatch()
}
prepareStatement.executeBatch()
Expand Down

0 comments on commit e8eeec9

Please sign in to comment.