Skip to content

Commit

Permalink
Update kafka connector docs from kafka-connect-mesh v0.2.4-rc-2 (#477)
Browse files Browse the repository at this point in the history
Co-authored-by: streamnativebot <[email protected]>
  • Loading branch information
streamnativebot and streamnativebot authored Jan 3, 2025
1 parent e3ab52e commit 961757c
Show file tree
Hide file tree
Showing 10 changed files with 750 additions and 0 deletions.
32 changes: 32 additions & 0 deletions connectors/kafka-connect-bigquery/v2.5.7/kafka-connect-bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ The following prerequisites are required before setting up the BigQuery connecto
bigquery.tables.updateData
```

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a GSA(google service account) in Google Cloud, and get the private key of it
3. Create a secret in StreamNative Console, and save the GSA private key's content , please refer to: [doc](https://docs.streamnative.io/docs/kafka-connect-create#create-kafka-connect-with-secret), let's say the secret name is `gcp`, and key is `auth`
4. Create a dataset in Google Cloud
5. Create a json file like below:

```json
{
"name": "test-bq",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "${INPUT_TOPIC}",
"project": "${GCP_PROJECT_NAME}",
"defaultDataset": "${DATA_SET_NAME}",
"key.converter": " org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"autoCreateTables": "false",
"keySource": "JSON",
"keyfile": "${snsecret:gcp:auth}",
"value.converter.schemas.enable": false
}
}
```

6. Run the following command to create the connector:

```shell
kcctl apply -f <your-json-file>
```

### Configuration

The `kafka-connect-bigquery` connector is configured using the following properties:
Expand Down
25 changes: 25 additions & 0 deletions connectors/kafka-connect-datagen/v0.6.5/kafka-connect-datagen.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ powered_by: "Kafka Connect"

`kafka-connect-datagen` is a Kafka Connect connector for generating mock data for testing and is not suitable for production scenarios. It is available in the StreamNative Cloud.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a json file like below:
```
{
"name": "datagen",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}
```
3. Run the following command to create the connector:
```
kcctl create -f <filename>.json
```
### Configuration
The `kafka-connect-datagen` connector is configured using the following properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ The ElasticSearch Kafka Connect Sink connector is a Kafka Connect connector that

- A running ElasticSearch cluster

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Set up a ElasticSearch cluster
3. Create a JSON file like the following:

```json
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.aiven.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "kafka-elastic-input",
"connection.url": "http://elastic:9200",
"type.name": "kafka-connect",
"key.ignore": "true",
"schema.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
```
4. Run the following command to create the connector:

```bash
kcctl create -f <filename>.json
```


### Configuration

The ElasticSearch Kafka Connect Sink connector is configured using the following properties:
Expand Down
273 changes: 273 additions & 0 deletions connectors/kafka-connect-iceberg/v0.6.19/kafka-connect-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,279 @@ The Apache Iceberg Kafka Connect Sink connector is a Kafka Connect connector tha
- Setup the [Iceberg Catalog](https://iceberg.apache.org/concepts/catalog/)
- Create the Iceberg connector control topic, which cannot be used by other connectors.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Set up the Iceberg Catalog, we can use the below yaml file to create a local iceberg catalog in k8s:
```yaml
apiVersion: v1
data:
spark-defaults.conf: |
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://iceberg-rest.default.svc.cluster.local:8181
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo.warehouse s3://warehouse/
spark.sql.catalog.demo.s3.endpoint http://minio.default.svc.cluster.local:9000
spark.sql.defaultCatalog demo
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.sql.catalogImplementation in-memory
kind: ConfigMap
metadata:
name: spark-config
namespace: default
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-iceberg
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: spark-iceberg
template:
metadata:
labels:
app: spark-iceberg
spec:
containers:
- name: spark-iceberg
image: tabulario/spark-iceberg
ports:
- name: one
containerPort: 8888
- name: two
containerPort: 8080
- name: three
containerPort: 10000
- name: four
containerPort: 10001
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
volumeMounts:
- name: spark-config
mountPath: /opt/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
volumes:
- name: spark-config
configMap:
name: spark-config
---
apiVersion: v1
kind: Service
metadata:
name: spark-iceberg
namespace: default
spec:
selector:
app: spark-iceberg
ports:
- protocol: TCP
name: one
port: 8888
targetPort: 8888
- protocol: TCP
name: two
port: 8080
targetPort: 8080
- protocol: TCP
name: three
port: 10000
targetPort: 10000
- protocol: TCP
port: 10001
name: four
targetPort: 10001
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: iceberg-rest
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: iceberg-rest
template:
metadata:
labels:
app: iceberg-rest
spec:
containers:
- name: iceberg-rest
image: tabulario/iceberg-rest
ports:
- name: one
containerPort: 8181
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
- name: CATALOG_WAREHOUSE
value: s3://warehouse/
- name: CATALOG_IO__IMPL
value: org.apache.iceberg.aws.s3.S3FileIO
- name: CATALOG_S3_ENDPOINT
value: http://minio.default.svc.cluster.local:9000
---
apiVersion: v1
kind: Service
metadata:
name: iceberg-rest
namespace: default
spec:
selector:
app: iceberg-rest
ports:
- protocol: TCP
name: one
port: 8181
targetPort: 8181
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
hostname: warehouse
subdomain: minio
containers:
- name: minio
image: minio/minio
args:
- server
- /data
- --console-address
- ":9001"
ports:
- name: one
containerPort: 9000
- name: two
containerPort: 9001
env:
- name: MINIO_ROOT_USER
value: admin
- name: MINIO_ROOT_PASSWORD
value: password
- name: MINIO_DOMAIN
value: minio.default.svc.cluster.local
---
apiVersion: v1
kind: Service
metadata:
name: minio
namespace: default
spec:
selector:
app: minio
ports:
- protocol: TCP
name: one
port: 9000
targetPort: 9000
- protocol: TCP
name: two
port: 9001
targetPort: 9001
```
3. Initialize the Iceberg table:
```bash
kubectl apply -f iceberg-spark.yaml
kubectl wait -l app=spark-iceberg --for=condition=Ready pod --timeout=5m
kubectl wait -l app=iceberg-rest --for=condition=Ready pod --timeout=5m
kubectl wait -l app=minio --for=condition=Ready pod --timeout=5m

sleep 30

# initialize the bucket
minio_pod_name=$(kubectl get pods -l app=minio -o=jsonpath='{.items[0].metadata.name}')
kubectl exec $minio_pod_name -- /usr/bin/mc config host add minio http://minio.default.svc.cluster.local:9000 admin password
kubectl exec $minio_pod_name -- /usr/bin/mc rm -r --force minio/warehouse || true
kubectl exec $minio_pod_name -- /usr/bin/mc mb minio/warehouse
kubectl exec $minio_pod_name -- /usr/bin/mc policy set public minio/warehouse
```

4. Create a JSON file like the following:

```json
{
"name": "iceberg-sink",
"config": {
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"topics": "kafka-iceberg-input",
"iceberg.tables": "sink.kafka",
"iceberg.catalog": "demo",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://iceberg-rest.default.svc.cluster.local:8181",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse": "s3://warehouse",
"iceberg.catalog.s3.endpoint": "http://minio.default.svc.cluster.local:9000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.s3.access-key-id": "admin",
"iceberg.catalog.s3.secret-access-key": "password",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}
}
```
5. Run the following command to create the connector:

```bash
kcctl create -f <filename>.json
```

### Limitations

- Each Iceberg sink connector must have its own control topic.
Expand Down
Loading

0 comments on commit 961757c

Please sign in to comment.