Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync Kafka Connect Docs from kafka-connect-mesh v0.2.4-rc-2 #477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ The following prerequisites are required before setting up the BigQuery connecto
bigquery.tables.updateData
```

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a GSA(google service account) in Google Cloud, and get the private key of it
3. Create a secret in StreamNative Console, and save the GSA private key's content , please refer to: [doc](https://docs.streamnative.io/docs/kafka-connect-create#create-kafka-connect-with-secret), let's say the secret name is `gcp`, and key is `auth`
4. Create a dataset in Google Cloud
5. Create a json file like below:

```json
{
"name": "test-bq",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "${INPUT_TOPIC}",
"project": "${GCP_PROJECT_NAME}",
"defaultDataset": "${DATA_SET_NAME}",
"key.converter": " org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"autoCreateTables": "false",
"keySource": "JSON",
"keyfile": "${snsecret:gcp:auth}",
"value.converter.schemas.enable": false
}
}
```

6. Run the following command to create the connector:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to follow the pulsar connector catalog. https://docs.streamnative.io/hub/connector-google-bigquery-sink-v4.0

We need to tell the user:

  1. User can create connector by sn console also.
  2. How to use kafka client to send some message.


```shell
kcctl apply -f <your-json-file>
```

### Configuration

The `kafka-connect-bigquery` connector is configured using the following properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ powered_by: "Kafka Connect"

`kafka-connect-datagen` is a Kafka Connect connector for generating mock data for testing and is not suitable for production scenarios. It is available in the StreamNative Cloud.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Create a json file like below:
```
{
"name": "datagen",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}
```
3. Run the following command to create the connector:
```
kcctl create -f <filename>.json
```

### Configuration

The `kafka-connect-datagen` connector is configured using the following properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,35 @@ The ElasticSearch Kafka Connect Sink connector is a Kafka Connect connector that

- A running ElasticSearch cluster

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Set up a ElasticSearch cluster
3. Create a JSON file like the following:

```json
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.aiven.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "kafka-elastic-input",
"connection.url": "http://elastic:9200",
"type.name": "kafka-connect",
"key.ignore": "true",
"schema.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
```
4. Run the following command to create the connector:

```bash
kcctl create -f <filename>.json
```


### Configuration

The ElasticSearch Kafka Connect Sink connector is configured using the following properties:
Expand Down
273 changes: 273 additions & 0 deletions connectors/kafka-connect-iceberg/v0.6.19/kafka-connect-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,279 @@ The Apache Iceberg Kafka Connect Sink connector is a Kafka Connect connector tha
- Setup the [Iceberg Catalog](https://iceberg.apache.org/concepts/catalog/)
- Create the Iceberg connector control topic, which cannot be used by other connectors.

### Quick Start

1. Setup the kcctl client: [doc](https://docs.streamnative.io/docs/kafka-connect-setup)
2. Set up the Iceberg Catalog, we can use the below yaml file to create a local iceberg catalog in k8s:
```yaml
apiVersion: v1
data:
spark-defaults.conf: |
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://iceberg-rest.default.svc.cluster.local:8181
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.demo.warehouse s3://warehouse/
spark.sql.catalog.demo.s3.endpoint http://minio.default.svc.cluster.local:9000
spark.sql.defaultCatalog demo
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.sql.catalogImplementation in-memory
kind: ConfigMap
metadata:
name: spark-config
namespace: default
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-iceberg
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: spark-iceberg
template:
metadata:
labels:
app: spark-iceberg
spec:
containers:
- name: spark-iceberg
image: tabulario/spark-iceberg
ports:
- name: one
containerPort: 8888
- name: two
containerPort: 8080
- name: three
containerPort: 10000
- name: four
containerPort: 10001
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
volumeMounts:
- name: spark-config
mountPath: /opt/spark/conf/spark-defaults.conf
subPath: spark-defaults.conf
volumes:
- name: spark-config
configMap:
name: spark-config
---
apiVersion: v1
kind: Service
metadata:
name: spark-iceberg
namespace: default
spec:
selector:
app: spark-iceberg
ports:
- protocol: TCP
name: one
port: 8888
targetPort: 8888
- protocol: TCP
name: two
port: 8080
targetPort: 8080
- protocol: TCP
name: three
port: 10000
targetPort: 10000
- protocol: TCP
port: 10001
name: four
targetPort: 10001
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: iceberg-rest
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: iceberg-rest
template:
metadata:
labels:
app: iceberg-rest
spec:
containers:
- name: iceberg-rest
image: tabulario/iceberg-rest
ports:
- name: one
containerPort: 8181
env:
- name: AWS_ACCESS_KEY_ID
value: admin
- name: AWS_SECRET_ACCESS_KEY
value: password
- name: AWS_REGION
value: us-east-1
- name: CATALOG_WAREHOUSE
value: s3://warehouse/
- name: CATALOG_IO__IMPL
value: org.apache.iceberg.aws.s3.S3FileIO
- name: CATALOG_S3_ENDPOINT
value: http://minio.default.svc.cluster.local:9000
---
apiVersion: v1
kind: Service
metadata:
name: iceberg-rest
namespace: default
spec:
selector:
app: iceberg-rest
ports:
- protocol: TCP
name: one
port: 8181
targetPort: 8181
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: minio
template:
metadata:
labels:
app: minio
spec:
hostname: warehouse
subdomain: minio
containers:
- name: minio
image: minio/minio
args:
- server
- /data
- --console-address
- ":9001"
ports:
- name: one
containerPort: 9000
- name: two
containerPort: 9001
env:
- name: MINIO_ROOT_USER
value: admin
- name: MINIO_ROOT_PASSWORD
value: password
- name: MINIO_DOMAIN
value: minio.default.svc.cluster.local
---
apiVersion: v1
kind: Service
metadata:
name: minio
namespace: default
spec:
selector:
app: minio
ports:
- protocol: TCP
name: one
port: 9000
targetPort: 9000
- protocol: TCP
name: two
port: 9001
targetPort: 9001
```
3. Initialize the Iceberg table:

```bash
kubectl apply -f iceberg-spark.yaml
kubectl wait -l app=spark-iceberg --for=condition=Ready pod --timeout=5m
kubectl wait -l app=iceberg-rest --for=condition=Ready pod --timeout=5m
kubectl wait -l app=minio --for=condition=Ready pod --timeout=5m

sleep 30

# initialize the bucket
minio_pod_name=$(kubectl get pods -l app=minio -o=jsonpath='{.items[0].metadata.name}')
kubectl exec $minio_pod_name -- /usr/bin/mc config host add minio http://minio.default.svc.cluster.local:9000 admin password
kubectl exec $minio_pod_name -- /usr/bin/mc rm -r --force minio/warehouse || true
kubectl exec $minio_pod_name -- /usr/bin/mc mb minio/warehouse
kubectl exec $minio_pod_name -- /usr/bin/mc policy set public minio/warehouse
```

4. Create a JSON file like the following:

```json
{
"name": "iceberg-sink",
"config": {
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"topics": "kafka-iceberg-input",
"iceberg.tables": "sink.kafka",
"iceberg.catalog": "demo",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "http://iceberg-rest.default.svc.cluster.local:8181",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse": "s3://warehouse",
"iceberg.catalog.s3.endpoint": "http://minio.default.svc.cluster.local:9000",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.s3.access-key-id": "admin",
"iceberg.catalog.s3.secret-access-key": "password",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}
}
```
5. Run the following command to create the connector:

```bash
kcctl create -f <filename>.json
```

### Limitations

- Each Iceberg sink connector must have its own control topic.
Expand Down
Loading
Loading