In this sample, we'll create two microservices, one with an input binding and another with an output binding. We'll bind to Kafka, but note that there are a myriad of components that Dapr can bind to (see Dapr components).
This sample includes two microservices:
- Node.js microservice that utilizes an input binding
- Python microservice that utilizes an output binding
The bindings connect to Kafka, allowing us to push messages into a Kafka instance (from our Python microservice) and receive message from that instance (from our Node microservice) without having to know where the instance is hosted. Instead, we connect through our sidecars using the Dapr API. See architecture diagram to see how our components interconnect locally:
Dapr allows us to deploy the same microservices from our local machines to Kubernetes. Correspondingly, this sample has instructions for deploying this project locally or in Kubernetes.
Clone this sample repository to your local machine:
git clone https://github.com/dapr/samples.git
In order to run the Kafka bindings sample locally, you will run the Kafka broker server in a docker container on your machine.
- Run
docker-compose -f ./docker-compose-single-kafka.yml up -d
to run the container locally - Run
docker ps
to see the container running locally:
342d3522ca14 kafka-docker_kafka "start-kafka.sh" 14 hours ago Up About
a minute 0.0.0.0:9092->9092/tcp kafka-docker_kafka_1
0cd69dbe5e65 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 8 days ago Up About
a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
Now that you have Kafka running locally on your machine, we'll need to run our microservices. We'll start by running the Node microservice that uses input bindings:
- Navigate to Node subscriber directory in your CLI:
cd nodeapp
- Install dependencies:
npm install
- Run Node sample app with Dapr:
dapr run --app-id bindings-nodeapp --app-port 3000 node app.js
Next, we'll run the Python microservice that uses output bindings
- Open a new CLI window and navigate to Python subscriber directory in your CLI:
cd pythonapp
- Install dependencies:
pip install requests
- Run Python sample app with Dapr:
dapr run --app-id bindings-pythonapp python app.py
- Observe the Python logs, which demonstrate our successful output binding with Kafka:
[0m?[94;1m== APP == {'data': {'orderId': 1}}
[0m?[94;1m== APP == <Response [200]>
[0m?[94;1m== APP == {'data': {'orderId': 2}}
[0m?[94;1m== APP == <Response [200]>
[0m?[94;1m== APP == {'data': {'orderId': 3}}
[0m?[94;1m== APP == <Response [200]>
- Observe the Node logs, which demonstrate our successful input binding with Kafka:
[0m?[94;1m== APP == { orderId: 1 }
[0m?[94;1m== APP == Hello from Kafka!
[0m?[94;1m== APP == { orderId: 2 }
[0m?[94;1m== APP == Hello from Kafka!
[0m?[94;1m== APP == { orderId: 3 }
[0m?[94;1m== APP == Hello from Kafka!
Once you're done using the sample locally, you can spin down your local Kafka Docker Container by running:
docker-compose -f ./docker-compose-single-kafka.yml down
- Make sure that you installed helm tiller on your cluster
- Install Kafka via incubator/kafka helm chart
$ helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
$ helm repo update
$ kubectl create ns kafka
$ helm install dapr-kafka incubator/kafka --namespace kafka -f ./kafka-non-persistence.yaml
- Wait until kafka pods are running
$ kubectl -n kafka get pods -w
NAME READY STATUS RESTARTS AGE
dapr-kafka-0 1/1 Running 0 2m7s
dapr-kafka-zookeeper-0 1/1 Running 0 2m57s
dapr-kafka-zookeeper-1 1/1 Running 0 2m13s
dapr-kafka-zookeeper-2 1/1 Running 0 109s
Now that we've set up the Kafka binding, we can deploy our assets.
- In your CLI window, navigate to the deploy directory
- Run
kubectl apply -f .
which will deploy our bindings-nodeapp and bindings-pythonapp microservices. It will also apply the kafka bindings component configuration we set up in the last step. - Run
kubectl get pods -w
to see each pod being provisioned.
- Observe the Python app logs, which demonstrate our successful output binding with Kafka:
# Get Pod ID for bindings-pythonapp
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-699489b8b6-mqhrj 2/2 Running 0 4s
bindings-pythonapp-644489969b-c8lg5 2/2 Running 0 4m9s
dapr-operator-86cddcfcb7-v2zjp 1/1 Running 0 6h6m
dapr-placement-5d6465f8d5-pz2qt 1/1 Running 0 6h6m
dapr-sidecar-injector-dc489d7bc-k2h4q 1/1 Running 0 6h6m
# Get the log from bindings-pythonapp
$ kubectl logs bindings-pythonapp-644489969b-c8lg5 python
...
{'data': {'orderId': 240}}
<Response [200]>
{'data': {'orderId': 241}}
<Response [200]>
...
- Observe the Node app logs, which demonstrate our successful input bining with Kafka:
# Get Pod ID for bindings-nodeapp
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-699489b8b6-mqhrj 2/2 Running 0 4s
bindings-pythonapp-644489969b-c8lg5 2/2 Running 0 4m9s
dapr-operator-86cddcfcb7-v2zjp 1/1 Running 0 6h6m
dapr-placement-5d6465f8d5-pz2qt 1/1 Running 0 6h6m
dapr-sidecar-injector-dc489d7bc-k2h4q 1/1 Running 0 6h6m
# Get the log from bindings-nodeapp pod
$ kubectl logs bindings-nodeapp-699489b8b6-mqhrj node
Node App listening on port 3000!
...
Hello from Kafka!
{ orderId: 240 }
Hello from Kafka!
{ orderId: 241 }
...
Once you're done using the sample, you can spin down your Kubernetes resources by navigating to the ./deploy
directory and running:
cd ./deploy
kubectl delete -f .
This will spin down each resource defined by the .yaml files in the deploy
directory, including the kafka component.
Once you delete all samples apps, delete Kafka in the cluster.
# move to sample root
kubectl delete -f ./k8s_kafka_testclient.yaml
# clean up kafka cluster
helm del --purge dapr-kafka
Now that you've run the sample locally and/or in Kubernetes, let's unpack how this all works. Our app is broken up into input binding app and output binding app:
Before looking at the application code, let's see the Kafka bindings component yamls(nodeapp, pythonapp, and Kubernetes), which specify brokers
for Kafka connection, topics
and consumerGroup
for consumer, and publishTopic
for publisher topic.
See the howtos in references for the details on input and output bindings
This configuration yaml creates sample-topic
component to set up Kafka input and output bindings through the Kafka sample
topic.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
metadata:
# Kafka broker connection setting
- name: brokers
value: [kafka broker address]
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
Navigate to the nodeapp
directory and open app.js
, the code for our Node.js input bindings sample app. Here we're exposing one API endpoint using express
. The API name must be identical to the component name which is specified in Kafka bindings component yaml. Then Dapr runtime will consume the event from sample
topic and then send the POST request to Node app with the event payload.
app.post('/sample-topic', (req, res) => {
console.log("Hello from Kafka!");
console.log(req.body);
res.status(200).send();
});
Navigate to the pythonapp
directory and open app.py
, the code for our output bindings sample app. This sends POST request to Dapr http endpoint http://localhost:3500/v1.0/bindings/<output_bindings_name>
with the event payload every second. This app uses sample-topic
bindings component name as <output_bindings_name>
. Then Dapr runtime will send the event to sample
topic which is specified in the above Kafka bindings component yaml.
dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
n += 1
payload = { "data": {"orderId": n}}
print(payload, flush=True)
try:
response = requests.post(dapr_url, json=payload)
print(response.text, flush=True)
except Exception as e:
print(e)
time.sleep(1)