Skip to content

Commit

Permalink
updated documentation (quickstart) with python code
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jun 12, 2024
1 parent 1031048 commit 8148871
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 23 deletions.
31 changes: 31 additions & 0 deletions docs/_code/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys
import json
import pymargo.core
from pymargo.core import Engine
from mochi.mofka.client import Client


def consume(engine: Engine, group_file: str, topic_name: str):
client = Client(engine)
service = client.connect(group_file)
topic = service.open_topic(topic_name)
consumer = topic.consumer(name="myconsumer")

for i in range(0, 100):
event = consumer.pull().wait()
print(event.metadata)
if (i+1) % 10:
event.acknowledge()


if __name__ == "__main__":

if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <protocol> <groupfile> <topic>")
protocol = sys.argv[1]
group_file = sys.argv[2]
topic_name = sys.argv[3]

with Engine(protocol, pymargo.core.server) as engine:
consume(engine, group_file, topic_name)
engine.finalize()
31 changes: 31 additions & 0 deletions docs/_code/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys
import pymargo.core
from pymargo.core import Engine
from mochi.mofka.client import Client


def produce(engine: Engine, group_file: str, topic_name: str):
client = Client(engine)
service = client.connect(group_file)
topic = service.open_topic(topic_name)
producer = topic.producer()

for i in range(0, 100):
producer.push(
metadata={"x": i*42, "name": "bob"},
data=bytes())

producer.flush()


if __name__ == "__main__":

if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <protocol> <groupfile> <topic>")
protocol = sys.argv[1]
group_file = sys.argv[2]
topic_name = sys.argv[3]

with Engine(protocol, pymargo.core.server) as engine:
produce(engine, group_file, topic_name)
engine.finalize()
69 changes: 46 additions & 23 deletions docs/usage/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,44 +81,59 @@ Using the Mofka library
-----------------------

Mofka can be used in C++ or in Python (if built with Python support). The following
*CMakeLists.txt* file shows how to link an application against the Mofka library in CMake.
*CMakeLists.txt* file shows how to link a C++ application against the Mofka library in CMake.


.. literalinclude:: ../_code/CMakeLists.txt
:language: cmake
:end-before: CUSTOM TOPIC OBJECTS


In Python, most of Mofka's client interface is located in the :code:`mochi.mofka.client` module.
The sections hereafter show how to use both the C++ and Python interface to produce and consume events.


Simple producer application
---------------------------

The following code examplified a producer. We first need to initialize a
:code:`thallium::engine`, which is the runtime used by all the Mochi libraries.
Then, we also initialize SSG with :code:`ssg_init` and tell the engine to finalize
it when it is itself finalized.
The following code examplified a producer.

We first need to initialize a :code:`thallium::engine` in C++, or a :code:`pymargo.core.Engine` in
Python, which handles the Mochi runtime.

.. important::

The Thallium engine needs to be initialized in *server mode* for Mofka to work.
The engine needs to be initialized in *server mode* for Mofka to work.
This is because Mofka servers will send RPCs to the clients.

Next, we create a :code:`mofka::Client` object and use it to create a
:code:`mofka::ServiceHandle`. The latter is initialized using the file
created by our running Mofka server (*mofka.ssg*).
Next, we create a :code:`Client` object and use it to create a
:code:`ServiceHandle`. The latter is initialized using the file
created by our running Mofka server (*mofka.json*).

We then open the topic we have created, using :code:`service_handle.openTopic()`,
which gives us a :code:`TopicHandle` to interact with the topic.
We then open the topic we have created, using :code:`service_handle.openTopic()`
(:code:`server.open_topic()` in Python), which gives us a :code:`TopicHandle`
to interact with the topic.

We create a :code:`mofka::Producer` using :code:`topic.producer()`, and we use
We create a :code:`Producer` using :code:`topic.producer()`, and we use
it in a loop to create 100 events with their :code:`Metadata` and :code:`Data`
parts (we always send the same metadata here and we don't provide any data).
In Python, the metadata part can be a :code:`dict` convertible to JSON,
and the data part can be anything that satisfies the buffer protocol.

The :code:`push()` function is non-blocking. To ensure that the events
have all been sent, we call :code:`producer.flush()`.

.. literalinclude:: ../_code/producer.cpp
:language: cpp
.. tabs::

.. group-tab:: C++

.. literalinclude:: ../_code/producer.cpp
:language: cpp

.. group-tab:: Python

.. literalinclude:: ../_code/producer.py
:language: python

.. note::

Expand All @@ -129,25 +144,33 @@ have all been sent, we call :code:`producer.flush()`.
Simple consumer application
---------------------------

The following code shows how to create a consumer and use it to consume
the events. It starts the same as the producer application (except for the
topic creation), but we then create a :code:`mofka::Consumer` object.
The following code shows how to create a consumer and use it to consume the events.

The consumer object is created with a name. This is for Mofka to associate
the name with the last event that was acknowledged by the application.
In case of a crash of the application, it will be able to restart from the
last acknowledged event. This acknowledgement is done using the
:code:`mofka::Event::acknowledge()` function, which in the example bellow
:code:`Event`'s :code:`acknowledge()` function, which in the example bellow
is called every 10 events.

:code:`consumer.pull()` is a non-blocking function returning a :code`mofka::Future`.
Waiting for this future with :code:`.wait()` returns a :code:`mofka::Event` object
from which we can retrieve an event ID as well as the event's metadata.
:code:`consumer.pull()` is a non-blocking function returning a :code`Future`.
Waiting for this future with :code:`.wait()` returns an :code:`Event` object
from which we can retrieve an event ID as well as the event's metadata and data.

As it is, the data associated with an event will not be pulled automatically
by the consumer, contrary to the event's metadata. Further in this documentation
you will learn how to pull this data, or part of it.

.. literalinclude:: ../_code/consumer.cpp
:language: cpp
.. tabs::

.. group-tab:: C++

.. literalinclude:: ../_code/consumer.cpp
:language: cpp

.. group-tab:: Python

.. literalinclude:: ../_code/consumer.py
:language: python


1 change: 1 addition & 0 deletions python/mochi/mofka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


ClientException = pymofka_client.Exception
TopicHandle = pymofka_client.TopicHandle
Validator = pymofka_client.Validator
PartitionSelector = pymofka_client.PartitionSelector
Serializer = pymofka_client.Serializer
Expand Down

0 comments on commit 8148871

Please sign in to comment.