-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.py
32 lines (26 loc) · 1009 Bytes
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from aiokafka import AIOKafkaProducer
import asyncio
import json
import os
from random import randint
# env variables
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'mytopic')
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
# global variables
loop = asyncio.get_event_loop()
async def send_one():
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
# get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# produce message
msg_id = f'{randint(1, 10000)}'
value = {'message_id': msg_id, 'text': 'some text', 'state': randint(1, 100)}
print(f'Sending message with value: {value}')
value_json = json.dumps(value).encode('utf-8')
await producer.send_and_wait(KAFKA_TOPIC, value_json)
finally:
# wait for all pending messages to be delivered or expire.
await producer.stop()
# send message
loop.run_until_complete(send_one())