The Kafka Connect TDengine Source connector is used to move creations in a TDengine database to Apache Kafka® topics in real-time. Data is loaded by periodically executing query, the connector loads only new records. By default, all super tables in a database are copied, each to its own output topic.
This example assumes you are running Confluent version 7.1.1 locally on the default ports. It also assumes your have TDengine installed and running.
-
Start the Confluent Platform using the Confluent CLI command below.
confluent local services start
-
create a configuration file for the connector. This configuration is used typically with standalone workers. This file is included with the connector in
config/source-quickstart.properties
, and contains the following settings:name=tdengine-source connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector tasks.max=1 connection.url=jdbc:TAOS://127.0.0.1:6030 connection.username=root connection.password=taosdata connection.database=source connection.attempts=3 connection.backoff.ms=5000 poll.interval.ms=1000 topic.prefix=tdengine- fetch.max.rows=100 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
The
connection.url
,connection.database
specify the connection URL,database name of the TDengine server. By default theconnection.user
,connection.password
areroot
andtaosdata
. -
Run the connector with this configuration:
confluent local services connect connector load sourceConnector --config source-quickstart.properties
-
open the taos shell and
taos
-
create a database in TDengine. create a super table and seed it with some data:
create database source precision 'ns'; use source; create table st (ts timestamp , value int) tags (tg nchar(30)); insert into t1 using st tags('Los Angeles') values(now, 100); insert into t2 using st tags('chicago') values(now, 200);
-
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer, reading from the beginning of the topic:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic tdengine-source ## Your output should resemble: st,tg=L"Los Angeles" value=100i32 1656591975567764000 st,tg=L"chicago" value=200i32 1656592200810039000
this configuration is used typically with distributed workers. Write the following JSON to tdengine-source-connector.json
, configure all of the required values, and use the command below to post the configuration to one of the distributed connect worker.
{
"name": "TDengineSourceConnector",
"config": {
"connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector",
"tasks.max": 1,
"connection.url": "jdbc:TAOS://127.0.0.1:6030",
"connection.username": "root",
"connection.password": "taosdata",
"connection.database": "source",
"connection.attempts": 3,
"connection.backoff.ms": 1000,
"topic.prefix": "tdengine-",
"poll.interval.ms": 1000,
"fetch.max.rows": 100,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Run the connector with this configuration
curl -X POST -d @tdengine-source-connector.json http://localhost:8083/connectors -H "Content-Type:application/json"
To use this connector, specify the name of the connector class in the connector.class
configuration property.
connector.class=com.taosdata.kafka.connect.source.TDengineSourceConnector
- Type: string
- Importance: high
- Default: null
The connector can be configured to use as few as one task (tasks.max=1) or scale to as many tasks as required to capture all table changes.
- Type: int
- Importance: high
- Default: 1
The URL of the TDengine database to write to.
- Type: string
- Importance: high
- Default: null
The username to connect to TDengine with. default value is root
.
- Type: string
- Importance: high
- Default: null
The password to connect to TDengine with. default value is taosdata
.
- Type: string
- Importance: high
- Default: null
The TDengine database name from which records have to be read and publish data to configured Apache Kafka® topic.
- Type: string
- Importance: high
- Default: null
The maximum number of times to retry on errors before failing the connection.
- Type: int
- Importance: high
- Default: 3
Backoff time duration to wait before retrying connection (in milliseconds).
- Type: int
- Importance: high
- Default: 5000
Prefix that should be prepended to super table names to generate the name of Apache Kafka® topic to publish to.
- Type: string
- Importance: high
- Default: ""
The timestamp used for initial queries. If not specified, all data will be retrieved. format is yyyy-MM-dd HH:mm:ss
- Type: string
- Importance: low
- Default: "1970-01-01 00:00:00"
Frequency in ms to poll for new or removed tables, which may result in updated task configurations to start polling for data in added tables. (in milliseconds)
- Type: int
- Importance: medium
- Default: 1000
Maximum number of rows to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector.
- Type: int
- Importance: low
- Default: 100
Converter class used to convert between Kafka Connect format and the serialized form that is read from Kafka. currently support String and JSON format. if value.converter use org.apache.kafka.connect.json.JsonConverter, The best practice is to add "value.converter.schemas.enable" property with "false".
- Type: string
- Importance: medium
- Default: "org.apache.kafka.connect.storage.StringConverter"