Skip to content

Commit

Permalink
Implement Tumbling and Hopping windows (#257)
Browse files Browse the repository at this point in the history
* Implement Tumbling and Hopping windows with  basic aggregations (reduce, mean, max, min, etc.)
* Add the WindowedRocksDBStore to store window data
* Add bloom filter settings to default Rocksdb options
* Add default WAL size settings to default Rocksdb options
* add a timestamp extractor to parse timestamps from incoming messages



---------

Co-authored-by: Haris Botić <[email protected]>
  • Loading branch information
daniil-quix and harisbotic authored Jan 19, 2024
1 parent 510b131 commit 428d567
Show file tree
Hide file tree
Showing 42 changed files with 4,292 additions and 417 deletions.
33 changes: 17 additions & 16 deletions docs/api-reference/application.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class Application()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L42)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L43)

The main Application class.

Expand Down Expand Up @@ -78,7 +78,7 @@ def __init__(broker_address: str,
loglevel: Optional[LogLevel] = "INFO")
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L81)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L82)


<br>
Expand Down Expand Up @@ -155,7 +155,7 @@ def Quix(cls,
auto_create_topics: bool = True) -> Self
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L187)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L188)

Initialize an Application to work with Quix platform,

Expand Down Expand Up @@ -262,10 +262,11 @@ def topic(name: str,
key_deserializer: DeserializerType = "bytes",
value_serializer: SerializerType = "json",
key_serializer: SerializerType = "bytes",
creation_configs: Optional[TopicCreationConfigs] = None) -> Topic
creation_configs: Optional[TopicCreationConfigs] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L333)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L334)

Create a topic definition.

Expand Down Expand Up @@ -324,7 +325,7 @@ Its name will be overridden by this method's 'name' param.
def dataframe(topic: Topic) -> StreamingDataFrame
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L396)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L400)

A simple helper method that generates a `StreamingDataFrame`, which is used

Expand Down Expand Up @@ -374,7 +375,7 @@ to be used as an input topic.
def stop()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L432)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L436)

Stop the internal poll loop and the message processing.

Expand All @@ -394,7 +395,7 @@ To otherwise stop an application, either send a `SIGTERM` to the process
def get_producer() -> Producer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L444)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L448)

Create and return a pre-configured Producer instance.
The Producer is initialized with params passed to Application.
Expand Down Expand Up @@ -429,7 +430,7 @@ with app.get_producer() as producer:
def get_consumer() -> Consumer
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L480)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L484)

Create and return a pre-configured Consumer instance.
The Consumer is initialized with params passed to Application.
Expand Down Expand Up @@ -474,7 +475,7 @@ with app.get_consumer() as consumer:
def clear_state()
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L529)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L533)

Clear the state of the application.

Expand All @@ -488,7 +489,7 @@ Clear the state of the application.
def run(dataframe: StreamingDataFrame)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/app.py#L558)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/app.py#L562)

Start processing data from Kafka using provided `StreamingDataFrame`

Expand Down Expand Up @@ -532,7 +533,7 @@ app.run(dataframe=df)
class State(Protocol)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L95)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L102)

Primary interface for working with key-value state data from `StreamingDataFrame`

Expand All @@ -546,7 +547,7 @@ Primary interface for working with key-value state data from `StreamingDataFrame
def get(key: Any, default: Any = None) -> Optional[Any]
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L100)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L107)

Get the value for key if key is present in the state, else default

Expand All @@ -573,7 +574,7 @@ value or None if the key is not found and `default` is not provided
def set(key: Any, value: Any)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L109)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L116)

Set value for the key.

Expand All @@ -594,7 +595,7 @@ Set value for the key.
def delete(key: Any)
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L116)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L123)

Delete value for the key.

Expand All @@ -616,7 +617,7 @@ This function always returns `None`, even if value is not found.
def exists(key: Any) -> bool
```

[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/7bb0b4e47d690ffe285f34c87c3bbe7d39c6b397/quixstreams/state/types.py#L124)
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/5ea02f7558e6dd73d58eb673f71bad48aaf23282/quixstreams/state/types.py#L131)

Check if the key exists in state.

Expand Down
Loading

0 comments on commit 428d567

Please sign in to comment.