Skip to content

Commit

Permalink
add example for Rust API
Browse files Browse the repository at this point in the history
  • Loading branch information
ajhunyady committed Jan 14, 2025
1 parent 5d633f2 commit 3098e86
Show file tree
Hide file tree
Showing 13 changed files with 674 additions and 137 deletions.
11 changes: 11 additions & 0 deletions docs/_embeds/apis/rust/_example-cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24"
1 change: 1 addition & 0 deletions docs/_embeds/apis/rust/_fluvio-crate-version.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fluvio = "0.24"
34 changes: 34 additions & 0 deletions docs/_embeds/apis/rust/consumer-auto-offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.unwrap();


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
36 changes: 36 additions & 0 deletions docs/_embeds/apis/rust/consumer-manual-offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.unwrap();


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}
27 changes: 27 additions & 0 deletions docs/_embeds/apis/rust/consumer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use async_std::stream::StreamExt;

use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.unwrap();

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
69 changes: 69 additions & 0 deletions docs/_embeds/apis/rust/consumer-wasm-file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.unwrap();

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");

// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}
61 changes: 61 additions & 0 deletions docs/_embeds/apis/rust/consumer-wasm-name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::BTreeMap;
use async_std::stream::StreamExt;

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/[email protected]",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.unwrap();

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}
25 changes: 25 additions & 0 deletions docs/_embeds/apis/rust/create-list-topics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;

const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// List topics
let topics = admin.all::<TopicSpec>().await.unwrap();
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();

println!("Topics:\n - {}", topic_names.join("\n - "));
}
15 changes: 15 additions & 0 deletions docs/_embeds/apis/rust/producer-kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";

// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
producer.send(key, value).await.unwrap();
producer.flush().await.unwrap();

println!("Sent [{}] {}", key, value);
}
29 changes: 29 additions & 0 deletions docs/_embeds/apis/rust/producer-performance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::time::Duration;

use fluvio::{Fluvio, TopicProducerConfigBuilder, Compression, RecordKey};

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Use config builder to create a topic producer config
let producer_config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(Duration::from_millis(500))
.compression(Compression::Gzip)
.build().expect("Failed to create topic producer config");

// Connet to fluvio cluster & create a producer
let fluvio = Fluvio::connect().await.unwrap();
let producer = fluvio.topic_producer_with_config(TOPIC_NAME, producer_config)
.await.expect("Failed to create a producer");

// Send 10 records
for i in 1..=10 {
let record = format!("Record-{}", i);
producer.send(RecordKey::NULL, record.as_str()).await.unwrap();
}
producer.flush().await.unwrap();

println!("Sent 10 records successfully.");
}
21 changes: 21 additions & 0 deletions docs/_embeds/apis/rust/producer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use chrono::Local;

use fluvio::RecordKey;

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
producer.send(RecordKey::NULL, record.clone()).await.unwrap();

// Fluvio batches outgoing records by default,
// call flush to ensure the record is sent
producer.flush().await.unwrap();

println!("Sent record: {}", record);
}
Loading

0 comments on commit 3098e86

Please sign in to comment.