You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use datetime.datetime to seek offset with the function consumer.reset_offsets. But it doesn't behave as expected. See the following example:
import pykafka
print(pykafka.__version__)
output:
'2.8.0'
from pykafka import KafkaClient
import time
client = KafkaClient(hosts="<ip:port>")
topic = client.topics['test1']
# create 10 messages with timestamp as the value
p = topic.get_producer()
for m_id in range(0,10):
msg = '{}'.format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
p.produce(msg.encode("utf-8"))
time.sleep(5)
p.stop()
# create a consumer and print out the 10 messages
consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=True)
for message in consumer:
if message is not None:
raw_string = message.value.decode("utf-8")
print(message.offset, ':',raw_string)
I want to query the historical data by resetting the offset based on timestamp. For example, if I seek the offset by 2019-11-05 00:56:32 I would expect the offset be reset to 4 (or something close due to any network delays).
# seek by timestamp
partition_offset_pairs = [(p, datetime.fromisoformat('2019-11-05 00:56:32')) for p in consumer.partitions.values()]
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
print(consumer.held_offsets)
output:
{0: -2}
The output is not as expected.
I have done a couple experiments with this and I don't see reset_offsets((pykafka.partition.Partition, datetime.datetime)) behaves in any reasonable way.
Note: I can do the same historical data query (seek offset) in kafka-python (with the function consumer.offsets_for_times) with no issues.
The text was updated successfully, but these errors were encountered:
Sign up for freeto subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use
datetime.datetime
to seek offset with the functionconsumer.reset_offsets
. But it doesn't behave as expected. See the following example:output:
output:
I want to query the historical data by resetting the offset based on timestamp. For example, if I seek the offset by
2019-11-05 00:56:32
I would expect the offset be reset to4
(or something close due to any network delays).output:
The output is not as expected.
I have done a couple experiments with this and I don't see
reset_offsets((pykafka.partition.Partition, datetime.datetime))
behaves in any reasonable way.Note: I can do the same historical data query (seek offset) in kafka-python (with the function
consumer.offsets_for_times
) with no issues.The text was updated successfully, but these errors were encountered: