JDBC sink allows you to persist incoming payload into an RDBMS database.
The jdbc.consumer.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{
"name": "My Name",
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.consumer.columns=name,city:address.city,street:address.street
This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the batch-size
and idle-timeout
properties:
Incoming messages are aggregated until batch-size
messages are present, then inserted as a batch.
If idle-timeout
milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size
, capping maximum latency.
Note
|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.
|
java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
--jdbc.consumer.columns=name \
--spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
The jdbc sink has the following options:
Properties grouped by prefix:
- batch-size
-
Threshold in number of messages when data will be flushed to database table. (Integer, default:
1
) - columns
-
The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default:
payload:payload.toString()
) - idle-timeout
-
Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default:
-1
) - initialize
-
'true', 'false' or the location of a custom initialization script for the table. (String, default:
false
) - table-name
-
The name of the table to write into. (String, default:
messages
)
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)