Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Latest commit

 

History

History
97 lines (69 loc) · 2.83 KB

README.adoc

File metadata and controls

97 lines (69 loc) · 2.83 KB

Beetle

Build Status

Beetle is a somewhat higher level Java API on top of the client libraries distributed distributed with Apache Kafka. The goal of this library is not to replace the use of those libraries, but to wrap the library in a more easy to use package.

System Requirements

  • JDK7 or later

Hacking

This project uses Gradle so building and testing should be as easy as executing:

% ./gradlew

Design/Notes/Thoughts

Note
Right now this section is very much just a brain-dump/work-in-progress

What is fundamentally missing from the upstream Kafka clients is an evented view on the world. Despite Zookeeper and Kafka’s models effectively being event-driven, implementing a lower-level SimpleConsumer utilizes busy-loops and rather disjointed logic for reconnects and error handling. The higher-level consumer API is also awkward to use as far as receiving messages (using an iterator) and handling parallel operations (stuffing a thread pool somewhere for receiving).

A high-level Kafka consumer API maps rather nicely to the RxJava usage model of Observers and Subscribers, e.g.g

Consumer.java
/*
 * Prototype code showing how a typical end-user might use Beetle
 */

Brokers.fromZookeeper("localhost:2181")
    /* assuming a custom subscribe() operator exists in Beetle */
    .subscribe("some-topic")
    /* assuming a custom consume() operator exists in Beetle */
    .consume(message -> doSomethingWithMessage(message))
    .map(message -> message.commitOffset());
ZookeeperlessConsumer.java
/*
 * Assuming we already know which broker we want to talk to and
 * we don't care at all about leader changes or committing offsets
 */
long startOffset = 0;

Brokers.just('localhost:6667')
    .subscribe("some-topic", startOffset)
    .consume(m -> doSomethingWithMessage(m))
LowLevelBeetle.java
/*
 * The following is a prototype of some ideas for how the above examples
 * might be implemented internally
 */
CuratorFramework cf = CuratorFrameworkFactory.newClient("localhost:2181");

BrokersObserver.observe(cf)
    .subscribe(brokers -> TopicsObserver.observe(cf, brokers))
    .subscribe(partitions -> ConsumerObserver.observe(cf, partitions))
    .map(message ->
        doCustomBehaviorWith(message))
    .map(message -> m.commitOffsetTo(cf));

Similar Projects

  1. kafka-rx: Scala-based client which provides a push alternative to kafka’s pull-based stream