Skip to content

Commit

Permalink
chore: update README (#5)
Browse files Browse the repository at this point in the history
add instruction for setting up and running command
  • Loading branch information
sehz committed Feb 1, 2023
1 parent 17e4fa3 commit 2d81cc4
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:

build:
name: Build ${{ matrix.check }} on (${{ matrix.os }})
if: false
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
165 changes: 122 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
#Loading
# Introduction

fluvio-duck is a DuckDB extension to work with Fluvio streaming platform. It allows you to query Fluvio topics and partitions using SQL. It also allows you to consume Fluvio topics using SQL.

# Building extension

For debugging and testing
## Debug

Execute the following command to build the extension in debug mode:
```
make debug
```

Loading extension:

```
./build/debug/duckdb --unsigned
$ ./build/debug/duckdb --unsigned
load './build/debug/extension/fluvio-duck/fluvioduck.duckdb_extension';
>> getting version
>> init
D load './build/debug/extension/fluvio-duck/fluvioduck.duckdb_extension';
```

Note that Debug mode should not be used for large data sets. It is very slow.

## Release

Execute the following command to build the extension in release mode. Note that building release will take longer than debug mode.

```
make release
```
Expand All @@ -29,16 +37,18 @@ $ ./build/release/duckdb --unsigned
D load './build/release/extension/fluvio-duck/fluvioduck.duckdb_extension';
```

# Running cloud demo

Start cats-facts and helinki in the cloud demo repo.
Download Jolt and Regex SmartModule in the UI console.
# Using fluvio-duck extension

Load either debug or release version of the extension as in the previous section.


# Getting topics and partitions
## Getting topics and partitions

You can use the following commands to get list of topics and partitions.

To get list of topics:
```
D select * from fluvio_topic();
D select * from fluvio_topics();
┌───────────┬────────────┐
│ name │ partitions │
│ varchar │ int32 │
Expand All @@ -48,81 +58,150 @@ D select * from fluvio_topic();
└───────────┴────────────┘
```

Partitions
To get list of partitions:
```
select * from fluvio_partition();
select * from fluvio_partitions();
┌───────────┬───────────┬─────────┐
│ topic │ partition │ LEO │
│ varchar │ varchar │ int32 │
├───────────┼───────────┼─────────┤
│ cat-facts │ 0 │ 30920 │
│ helsinki │ 0 │ 8098386 │
└───────────┴───────────┴─────────┘
```

Computing total number of offets
With SQL, you can sum up all the partitions to get total number of offsets.

```
select sum(leo) from fluvio_partition();
D select sum(leo) from fluvio_partitions();
┌──────────┐
│ sum(leo) │
│ int128 │
├──────────┤
8129308
1859060
└──────────┘
```

# MQTT example
## Querying Fluvio topics

## Simple select
With SQL, you can query Fluvio topics and materialize as SQL table.

get last 1000 events
```
select * from fluvio_consume('helsinki --tail 1000');
The command follow the format:

```sql
D select <param> from fluvio_consume('<topic_name> <options>');
```

## Do transformation with jolt
The options are same as in the Fluvio CLI except options related to output format.

For example, to get last 5 events from topic `helsinki`:

```sql
D select * from fluvio_consume('helsinki --tail 5');
┌─────────┬──────────────────────┬────────────────────────────────────────────────────────────────────┐
│ offset │ timestamp │ value │
│ int32 │ timestamp_ms │ varchar
├─────────┼──────────────────────┼────────────────────────────────────────────────────────────────────┤
18590532023-01-28 23:54:2… │ {"mqtt_topic":"/hfp/v2/journey/ongoing/vp/bus/0018/00258/1065/1/… │
│ 1859054 │ 2023-01-28 23:54:2… │ {"mqtt_topic":"/hfp/v2/journey/ongoing/vp/train/0090/06065/3001T… │
18590552023-01-28 23:54:2… │ {"mqtt_topic":"/hfp/v2/journey/ongoing/vp/bus/0022/00971/2118N/2… │
│ 1859056 │ 2023-01-29 00:12:5… │ {"mqtt_topic":"/hfp/v2/journey/ongoing/vp/train/0090/06326/3001T… │
18590572023-01-29 00:12:5… │ {"mqtt_topic":"/hfp/v2/journey/ongoing/vp/bus/0022/01360/1085N/2… │
└─────────┴──────────────────────┴────────────────────────────────────────────────────────────────────┘
```
D select * from fluvio_consume('helsinki --tail 1000 --transforms-file=jolt.yaml');
You can ask for help by using `--help` option:
```sql
select * from fluvio_consume('--help');
.... help command output
```
view with first 10
## SmartModule Transformations
You can use SmartModule transformations to transform the data. The transformations are defined in a YAML file. The file is passed to the `--transforms-file` option.
For example, to get the last 1000 events from topic `helsinki` and transform the data using `jolt.yaml` file:
```
D create view transit as select * from fluvio_consume('helsinki --tail 10 --transforms-file=jolt.yaml
-c lat:d="lat" -c long:d="long" -c vehicle:i="vehicle" -c route="route" -c speed:d="speed"
-c time:t="tst" -c acc:d="acc" -c line:i="line" -c stop:i="stop" -c desi="desi" -c operator:i="oper"
-c dl:i="dl" -c odo:i="odo" -c drst:i="drst" -c occu:i="occu" -c hdg:i="hdg" -c dir="dir" -c tsi:i="tsi"
-c jrn:i="jrn" -c start="start"');
D select * from fluvio_consume('helsinki --tail 5 --transforms-file=examples/short.yaml');
select * from fluvio_consume('helsinki --tail 5 --transforms-file=examples/short.yaml');
┌─────────┬──────────────────────┬────────────────────────────────────────────────────────────────────┐
│ offset │ timestamp │ value │
│ int32 │ timestamp_ms │ varchar │
├─────────┼──────────────────────┼────────────────────────────────────────────────────────────────────┤
│ 1859053 │ 1969-12-31 23:59:5… │ {"acc":0.0,"desi":"65","dir":"1","dl":-19,"drst":0,"hdg":109,"jr… │
18590541969-12-31 23:59:5… │ {"acc":0.15,"desi":"T","dir":"1","dl":-180,"drst":null,"hdg":357… │
18590551969-12-31 23:59:5… │ {"acc":-0.56,"desi":"118N","dir":"2","dl":-305,"drst":0,"hdg":17… │
18590561969-12-31 23:59:5… │ {"acc":-0.48,"desi":"T","dir":"1","dl":3419,"drst":null,"hdg":18… │
18590571969-12-31 23:59:5… │ {"acc":0.0,"desi":"85N","dir":"2","dl":719,"drst":0,"hdg":null,"… │
└─────────┴──────────────────────┴────────────────────────────────────────────────────────────────────┘
```
All data
This assumes you have downloaded jolt SmartModule from the hub. Please see fluvio SmartModule documentation for more information.
## Mapping JSON columns to SQL columns
In the previous example, the JSON data is returned as a single column. You can map the JSON columns to SQL columns using the `-c` option. The `-c` option takes a column name and a JSON path. The JSON path is a dot separated path to the JSON column. For example, to map the `lat` column to `d` column, you can use `-c lat:d="lat"`.
Following example show how to create materialized view with mapped columns:
```
D create view transit as select * from fluvio_consume('helsinki -B --rows=1859058 --transforms-file=jolt.yaml
D create view transit as select * from fluvio_consume('helsinki --tail 5 --transforms-file=examples/short.yaml
-c lat:d="lat" -c long:d="long" -c vehicle:i="vehicle" -c route="route" -c speed:d="speed"
-c time:t="tst" -c acc:d="acc" -c line:i="line" -c stop:i="stop" -c desi="desi" -c operator:i="oper"
-c dl:i="dl" -c odo:i="odo" -c drst:i="drst" -c occu:i="occu" -c hdg:i="hdg" -c dir="dir" -c tsi:i="tsi"
-c jrn:i="jrn" -c start="start"');
```
-c time:t="tst"');
D select * from transit;
───────────┬───────────┬─────────┬─────────┬────────┬─────────────────────────┐
│ lat │ long │ vehicle │ route │ speed │ time │
│ double │ double │ int32 │ varchar │ double │ timestamp_ms │
├───────────┼───────────┼─────────┼─────────┼────────┼─────────────────────────┤
│ 60.170393 │ 24.944114 │ 258 │ 1065 │ 5.56 │ 2023-01-28 23:54:23.399 │
│ 60.174296 │ 24.941409 │ 6065 │ 3001T │ 8.75 │ 2023-01-28 23:54:22.629 │
│ 60.172573 │ 24.77937 │ 971 │ 2118N │ 7.36 │ 2023-01-28 23:54:23.39 │
│ 60.171846 │ 24.941544 │ 6326 │ 3001T │ 0.77 │ 2023-01-28 23:54:23.44 │
│ 60.170552 │ 25.079789 │ 1360 │ 1085N │ 0.0 │ 2023-01-28 23:54:23.405 │
└───────────┴───────────┴─────────┴─────────┴────────┴─────────────────────────┘
```
D select avg(speed) from events where route == '3001R';
With mapped columns, you can use SQL to perform analysis on the data. For example, to get the average speed of the vehicles by route:
```sql
D select route, avg(speed) from transit group by route;
┌─────────┬────────────┐
│ route │ avg(speed) │
│ varchar │ double │
├─────────┼────────────┤
│ 1065 │ 5.56 │
│ 3001T │ 4.76 │
│ 2118N │ 7.36 │
│ 1085N │ 0.0 │
└─────────┴────────────┘
```
## Convert to Parquet
## Converting fluvio topic data to Parquet
Previous examples show how to consume data from fluvio topic and perform SQL analysis on the data. You can also convert the data to Parquet format and perform analysis using Parquet tools. For example, to convert the data to Parquet format, you can use the `COPY` command:
First install Parquet extensions into DuckDB:
```
D INSTALL parquet; Load 'parquet';
D COPY (SELECT * FROM transit) TO 'helsinki.parquet' (FORMAT 'parquet');
D create view pevents as SELECT * FROM read_parquet('helsinki.parquet');
```
Performance
Then run the following command to convert the data to Parquet format:
```
EXPLAIN analyze select avg(speed) from pevents;
D COPY (SELECT * FROM <fluvio_topic>) TO '<parquet_file>' (FORMAT 'parquet');
```
For example, to convert the data from `transit` materialized view to `helsinki.parquet` file, you can run the following command:
```
D COPY (SELECT * FROM transit) TO 'helsinki.parquet' (FORMAT 'parquet');
```
Note that current version of fluvio-duck extension is not optimized for performance. It is recommended to use the `COPY` command for small data sets.
45 changes: 45 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@


# Helsinki Demo

This demo requires development release of CDK and MQTT connector.

## Connect to Fluvio

Either set up local Fluvio cluster (https://www.fluvio.io) or connect to Infinyon cloud at https://infinyon.cloud/signup.

Test by running this command:

```
$ fluvio topic list
< show topic list>
```

## Download Connector Developer Kit (CDK)

Download CDK by running this command:

```
$ fluvio install --hub cdk
```


# full transformation


```
D create view transit as select * from fluvio_consume('helsinki --tail 10 --transforms-file=jolt.yaml
-c lat:d="lat" -c long:d="long" -c vehicle:i="vehicle" -c route="route" -c speed:d="speed"
-c time:t="tst" -c acc:d="acc" -c line:i="line" -c stop:i="stop" -c desi="desi" -c operator:i="oper"
-c dl:i="dl" -c odo:i="odo" -c drst:i="drst" -c occu:i="occu" -c hdg:i="hdg" -c dir="dir" -c tsi:i="tsi"
-c jrn:i="jrn" -c start="start"');
```

All data
```
D create view transit as select * from fluvio_consume('helsinki -B --rows=1859058 --transforms-file=jolt.yaml
-c lat:d="lat" -c long:d="long" -c vehicle:i="vehicle" -c route="route" -c speed:d="speed"
-c time:t="tst" -c acc:d="acc" -c line:i="line" -c stop:i="stop" -c desi="desi" -c operator:i="oper"
-c dl:i="dl" -c odo:i="odo" -c drst:i="drst" -c occu:i="occu" -c hdg:i="hdg" -c dir="dir" -c tsi:i="tsi"
-c jrn:i="jrn" -c start="start"');
```
16 changes: 1 addition & 15 deletions examples/jolt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,4 @@ transforms:
veh: "vehicle"
route: "route"
spd: "speed"
tst: "tst"
acc: "acc"
line: "line"
stop: "stop"
desi: "desi"
oper: "oper"
dl: "dl"
odo: "odo"
drst: "drst"
occu: "occu"
hdg: "hdg"
dir: "dir"
tsi: "tsi"
jrn: "jrn"
start: "start"
tst: "tst"
28 changes: 28 additions & 0 deletions examples/short.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
transforms:
- uses: infinyon/[email protected]
with:
spec:
- operation: shift
spec:
payload:
VP:
lat: "lat"
long: "long"
veh: "vehicle"
route: "route"
spd: "speed"
tst: "tst"
acc: "acc"
line: "line"
stop: "stop"
desi: "desi"
oper: "oper"
dl: "dl"
odo: "odo"
drst: "drst"
occu: "occu"
hdg: "hdg"
dir: "dir"
tsi: "tsi"
jrn: "jrn"
start: "start"
2 changes: 1 addition & 1 deletion src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::bind::{

pub fn fluvio_admin_partition_function_def() -> TableFunction {
let table_function = TableFunction::new();
table_function.set_name("fluvio_partition");
table_function.set_name("fluvio_partitions");

table_function.set_function(Some(partition_read));
table_function.set_init(Some(partition_init));
Expand Down
2 changes: 1 addition & 1 deletion src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::bind::{

pub fn fluvio_admin_topic_function_def() -> TableFunction {
let table_function = TableFunction::new();
table_function.set_name("fluvio_topic");
table_function.set_name("fluvio_topics");

table_function.set_function(Some(topic_read));
table_function.set_init(Some(topic_init));
Expand Down

0 comments on commit 2d81cc4

Please sign in to comment.