Skip to content

Commit

Permalink
Custom metrics and dht sync lag scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
ThetaSinner committed Mar 8, 2024
1 parent 7448012 commit dfd9c25
Show file tree
Hide file tree
Showing 17 changed files with 383 additions and 17 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ members = [

"scenarios/zome_call_single_value",
"scenarios/single_write_many_read",
"scenarios/dht_sync_lag",

"zomes/return_single_value/coordinator",
"zomes/crud/coordinator",
"zomes/crud/integrity",
"zomes/timed/coordinator",
"zomes/timed/integrity",
]

# By default, don't build the scenarios or zomes.
Expand Down Expand Up @@ -50,14 +53,15 @@ indicatif = "0.17.8"
# TODO waiting for 0.7.3+ relase to use the new reqwest-client-native-tls-vendored feature
# TODO waiting for Holochain 0.3 to add the feature `serde` back here, conflicts at 0.2
influxdb = { version = "0.7.2", git = "https://github.com/influxdb-rs/influxdb-rust", default-features = false, features = ["reqwest-client-native-tls-vendored"] }
influxive-core = "0.0.2-alpha.1"

# Deps for Holochain 0.2
holochain_client = { version = "0.4.8" }
holochain_zome_types = { version = "0.2.6" }
holo_hash = { version = "0.2.6" }
holochain_types = { version = "0.2.6" }
holochain_conductor_api = { version = "0.2.6" }
hdk = "0.2.6"
hdk = { version = "0.2.6", features = [] }
hdi = "0.3.6"
mr_bundle = "0.2.6"

Expand All @@ -73,6 +77,7 @@ holochain_wind_tunnel_runner = { path = "./bindings/runner", version = "0.1.0-al

# Zomes for coorindator dependencies
crud_integrity = { path = "./zomes/crud/integrity" }
timed_integrity = { path = "./zomes/timed/integrity" }

[workspace.lints.rust]
unsafe_code = "forbid"
1 change: 1 addition & 0 deletions framework/instruments/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tabled = { workspace = true }
influxdb = { workspace = true, default-features = false }
tokio = { workspace = true }
log = { workspace = true }
influxive-core = { workspace = true }

wind_tunnel_core = { workspace = true }

Expand Down
11 changes: 11 additions & 0 deletions framework/instruments/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use wind_tunnel_core::prelude::DelegatedShutdownListener;

mod report;

pub mod prelude {
pub use crate::report::{ReportCollector, ReportMetric};
pub use crate::{ReportConfig, Reporter, OperationRecord, report_operation};
}

#[derive(Default)]
pub struct ReportConfig {
pub enable_metrics: bool,
Expand Down Expand Up @@ -63,6 +68,12 @@ impl Reporter {
}
}

pub fn add_custom(&self, metric: report::ReportMetric) {
for collector in &self.inner {
collector.write().add_custom(metric.clone());
}
}

pub fn finalize(&self) {
for collector in &self.inner {
collector.write().finalize();
Expand Down
65 changes: 65 additions & 0 deletions framework/instruments/src/report.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,77 @@
mod metrics_report;
mod summary_report;

use std::ops::Deref;
use influxive_core::{Metric, StringType};
use crate::OperationRecord;

pub use metrics_report::MetricsReportCollector;
pub use summary_report::SummaryReportCollector;

/// A simple, opinionated, newtype for the influxive_core::Metric type.
///
/// The reported timestamp for the metric will be the current time when the metric is created.
/// The name you choose will be transformed into `ws.instruments.custom.<name>`.
pub struct ReportMetric(Metric);

impl ReportMetric {
pub fn new(name: &str) -> Self {
Self(Metric::new(
std::time::SystemTime::now(),
format!("wt.custom.{}", name),
))
}

pub fn with_field<N, V>(mut self, name: N, value: V) -> Self
where
N: Into<StringType>,
V: Into<influxive_core::DataType>,
{
self.0 = self.0.with_field(name, value);
self
}

pub fn with_tag<N, V>(mut self, name: N, value: V) -> Self
where
N: Into<StringType>,
V: Into<influxive_core::DataType>,
{
self.0 = self.0.with_tag(name, value);
self
}

pub(crate) fn into_inner(self) -> Metric {
self.0
}
}

// TODO temporary, prefer to do without multiple reporter implementations
impl Clone for ReportMetric {
fn clone(&self) -> Self {
let mut new_inner = Metric::new(self.timestamp, self.name.clone());
for (k, v) in &self.fields {
new_inner = new_inner.with_field(k.clone(), v.clone());
}
for (k, v) in &self.tags {
new_inner = new_inner.with_tag(k.clone(), v.clone());
}
Self(new_inner)
}
}

impl Deref for ReportMetric {
type Target = Metric;

fn deref(&self) -> &Self::Target {
&self.0
}
}

pub trait ReportCollector {
fn add_operation(&mut self, operation_record: &OperationRecord);

/// Record a custom metric that
fn add_custom(&mut self, metric: ReportMetric);

fn finalize(&self);
}
40 changes: 39 additions & 1 deletion framework/instruments/src/report/metrics_report.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
use crate::report::ReportCollector;
use crate::report::{ReportCollector, ReportMetric};
use crate::OperationRecord;
use anyhow::Context;
use influxdb::{Client, InfluxDbWriteable, Timestamp, WriteQuery};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::SystemTime;
use influxive_core::DataType;
use tokio::runtime::Runtime;
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use wind_tunnel_core::prelude::DelegatedShutdownListener;

trait DataTypeExt {
fn into_type(self) -> influxdb::Type;
}

impl DataTypeExt for DataType {
fn into_type(self) -> influxdb::Type {
match self {
DataType::Bool(b) => influxdb::Type::Boolean(b),
DataType::F64(f) => influxdb::Type::Float(f),
DataType::I64(i) => influxdb::Type::SignedInteger(i),
DataType::U64(u) => influxdb::Type::UnsignedInteger(u),
DataType::String(s) => influxdb::Type::Text(s.into_string()),
}
}
}

pub struct MetricsReportCollector {
pub writer: UnboundedSender<WriteQuery>,
pub flush_complete: Arc<AtomicBool>,
Expand Down Expand Up @@ -70,6 +87,27 @@ impl ReportCollector for MetricsReportCollector {
self.writer.send(query).unwrap();
}

fn add_custom(&mut self, metric: ReportMetric) {
let metric = metric.into_inner();

let mut query = Timestamp::Nanoseconds(
metric.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos(),
).into_query(metric.name.into_string());

for (k, v) in metric.fields {
query = query.add_field(k.into_string(), v.into_type());
}

for (k, v) in metric.tags {
query = query.add_tag(k.into_string(), v.into_type());
}

self.writer.send(query).unwrap();
}

fn finalize(&self) {
let wait_started = std::time::Instant::now();
let mut notify_timer = std::time::Instant::now();
Expand Down
4 changes: 4 additions & 0 deletions framework/instruments/src/report/summary_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl ReportCollector for SummaryReportCollector {
self.operation_records.push(operation_record.clone());
}

fn add_custom(&mut self, _metric: crate::report::ReportMetric) {
// No custom metrics for the summary report
}

fn finalize(&self) {
self.print_summary_of_operations();
}
Expand Down
5 changes: 2 additions & 3 deletions framework/runner/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ impl<RV: UserValuesConstraint> RunnerContext<RV> {

/// A handle to the reporter for the runner.
///
/// This is used to record data in-memory and you
/// shouldn't need to access it directly. This should be passed to an instrumented client so
/// that it can report data to the runner.
/// This is used to record data in-memory. You shouldn't need to access it directly.
/// This should be passed to an instrumented client so that it can report data to the runner.
pub fn reporter(&self) -> Arc<Reporter> {
self.reporter.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions framework/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ pub mod prelude {
pub use crate::init::init;
pub use crate::run::run;
pub use crate::types::WindTunnelResult;

// Re-export of the `wind_tunnel_instruments` prelude. This is for convenience so that you can
// access reporting tools from within scenarios.
pub use wind_tunnel_instruments::prelude::*;
}
1 change: 1 addition & 0 deletions influx/templates/dashboards/dht_sync_lag.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"apiVersion":"influxdata.com/v2alpha1","kind":"Dashboard","metadata":{"name":"victorious-kapitsa-bf8001"},"spec":{"charts":[{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear","suffix":"s"}],"colorizeRows":true,"colors":[{"id":"EeycwCWfyw0YSbHfK-_bg","name":"Color Blind Friendly - Light","type":"scale","hex":"#FFFFFF"},{"id":"reCGKSBa8F4FXQG3VYe_B","name":"Color Blind Friendly - Light","type":"scale","hex":"#E69F00"},{"id":"LwHjuuBTah-Q6Kv31XbMb","name":"Color Blind Friendly - Light","type":"scale","hex":"#56B4E9"},{"id":"UYJNhccStV1szRDzc5ZS8","name":"Color Blind Friendly - Light","type":"scale","hex":"#009E73"},{"id":"ZaGqV7BaVmHtlZZ7QiBR9","name":"Color Blind Friendly - Light","type":"scale","hex":"#F0E442"},{"id":"MLJ4MzW3SBtxs8E6jW8FB","name":"Color Blind Friendly - Light","type":"scale","hex":"#0072B2"},{"id":"XQFWzHtO8a6lHXL2D1mHH","name":"Color Blind Friendly - Light","type":"scale","hex":"#D55E00"},{"id":"mwVE20owCu-EGZVTQ7aca","name":"Color Blind Friendly - Light","type":"scale","hex":"#CC79A7"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Sync Lag (mean)","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"windtunnel\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"wt.custom.dht_sync_lag\")\n |> filter(fn: (r) => r[\"_field\"] == \"value\")\n |> map(fn: (r) => ({ r with _value: r._value / 1000.0 })) \n |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n |> yield(name: \"mean\")"}],"shade":true,"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":6,"widthRatio":1,"xCol":"_time","yCol":"_value"},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"2VIaJgN6zA7q-7pJxjW6R","name":"Color Blind Friendly - Light","type":"scale","hex":"#FFFFFF"},{"id":"54Pm1akf7W1i6Qu-haF2O","name":"Color Blind Friendly - Light","type":"scale","hex":"#E69F00"},{"id":"ka2GpKDmGeoX3CkjCsVB9","name":"Color Blind Friendly - Light","type":"scale","hex":"#56B4E9"},{"id":"p2jDtNTUfRNUXEDkHLo3_","name":"Color Blind Friendly - Light","type":"scale","hex":"#009E73"},{"id":"v1bMU4KDeOO9f-cKvLwcz","name":"Color Blind Friendly - Light","type":"scale","hex":"#F0E442"},{"id":"fKPuvSYrt7IdU29E-Lu4G","name":"Color Blind Friendly - Light","type":"scale","hex":"#0072B2"},{"id":"meYBhHM7mf6ZaNQiQy8vb","name":"Color Blind Friendly - Light","type":"scale","hex":"#D55E00"},{"id":"nu4Wev2n_a_Zu712_arTE","name":"Color Blind Friendly - Light","type":"scale","hex":"#CC79A7"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Send count","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"windtunnel\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"wt.custom.dht_sync_sent_count\")\n |> filter(fn: (r) => r[\"_field\"] == \"value\")\n |> increase()\n |> yield(name: \"increase\")"}],"shade":true,"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":6,"widthRatio":1,"xCol":"_time","yCol":"_value","yPos":4},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear","suffix":"s"}],"colorizeRows":true,"colors":[{"id":"EeycwCWfyw0YSbHfK-_bg","name":"Color Blind Friendly - Light","type":"scale","hex":"#FFFFFF"},{"id":"reCGKSBa8F4FXQG3VYe_B","name":"Color Blind Friendly - Light","type":"scale","hex":"#E69F00"},{"id":"LwHjuuBTah-Q6Kv31XbMb","name":"Color Blind Friendly - Light","type":"scale","hex":"#56B4E9"},{"id":"UYJNhccStV1szRDzc5ZS8","name":"Color Blind Friendly - Light","type":"scale","hex":"#009E73"},{"id":"ZaGqV7BaVmHtlZZ7QiBR9","name":"Color Blind Friendly - Light","type":"scale","hex":"#F0E442"},{"id":"MLJ4MzW3SBtxs8E6jW8FB","name":"Color Blind Friendly - Light","type":"scale","hex":"#0072B2"},{"id":"XQFWzHtO8a6lHXL2D1mHH","name":"Color Blind Friendly - Light","type":"scale","hex":"#D55E00"},{"id":"mwVE20owCu-EGZVTQ7aca","name":"Color Blind Friendly - Light","type":"scale","hex":"#CC79A7"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Sync Lag (max)","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"windtunnel\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"wt.custom.dht_sync_lag\")\n |> filter(fn: (r) => r[\"_field\"] == \"value\")\n |> map(fn: (r) => ({ r with _value: r._value / 1000.0 })) \n |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)\n |> yield(name: \"max\")"}],"shade":true,"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":6,"widthRatio":1,"xCol":"_time","xPos":6,"yCol":"_value"},{"axes":[{"base":"10","name":"x","scale":"linear"},{"base":"10","name":"y","scale":"linear"}],"colorizeRows":true,"colors":[{"id":"2VIaJgN6zA7q-7pJxjW6R","name":"Color Blind Friendly - Light","type":"scale","hex":"#FFFFFF"},{"id":"54Pm1akf7W1i6Qu-haF2O","name":"Color Blind Friendly - Light","type":"scale","hex":"#E69F00"},{"id":"ka2GpKDmGeoX3CkjCsVB9","name":"Color Blind Friendly - Light","type":"scale","hex":"#56B4E9"},{"id":"p2jDtNTUfRNUXEDkHLo3_","name":"Color Blind Friendly - Light","type":"scale","hex":"#009E73"},{"id":"v1bMU4KDeOO9f-cKvLwcz","name":"Color Blind Friendly - Light","type":"scale","hex":"#F0E442"},{"id":"fKPuvSYrt7IdU29E-Lu4G","name":"Color Blind Friendly - Light","type":"scale","hex":"#0072B2"},{"id":"meYBhHM7mf6ZaNQiQy8vb","name":"Color Blind Friendly - Light","type":"scale","hex":"#D55E00"},{"id":"nu4Wev2n_a_Zu712_arTE","name":"Color Blind Friendly - Light","type":"scale","hex":"#CC79A7"}],"geom":"line","height":4,"hoverDimension":"auto","kind":"Xy","legendColorizeRows":true,"legendOpacity":1,"legendOrientationThreshold":100000000,"name":"Recieved count","opacity":1,"orientationThreshold":100000000,"position":"overlaid","queries":[{"query":"from(bucket: \"windtunnel\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"wt.custom.dht_sync_recv_count\")\n |> filter(fn: (r) => r[\"_field\"] == \"value\")\n |> increase()\n |> yield(name: \"increase\")"}],"shade":true,"staticLegend":{"colorizeRows":true,"opacity":1,"orientationThreshold":100000000,"widthRatio":1},"width":6,"widthRatio":1,"xCol":"_time","xPos":6,"yCol":"_value","yPos":4}],"name":"DHT Sync Lag"}}]
29 changes: 29 additions & 0 deletions scenarios/dht_sync_lag/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "dht_sync_lag"
version = "0.1.0"
edition = "2021"
build = "../scenario_build.rs"

[dependencies]
anyhow = { workspace = true }
holochain_types = { workspace = true }
holochain_wind_tunnel_runner = { workspace = true }
timed_integrity = { workspace = true }

[build-dependencies]
toml = { workspace = true }
anyhow = { workspace = true }
holochain_types = { workspace = true }
serde_yaml = { workspace = true }
walkdir = { workspace = true }

[lints]
workspace = true

[package.metadata.required-dna]
name = "timed"
zomes = ["timed"]

[package.metadata.required-happ]
name = "timed"
dnas = ["timed"]
31 changes: 31 additions & 0 deletions scenarios/dht_sync_lag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
## dht_sync_lag

### Description

This scenario has two roles:
- _write_: A simple job that just creates entries with a timestamp field. Those entries are linked to a known base hash.
- _record_lag_: A job that repeatedly queries for links from the known base hash. It keeps track of records that it has seen
and when a new record is found, it calculates the time difference between the timestamp of the new record and the current time.
That time difference is then recorded as a custom metric called `wt.custom.dht_sync_lag`.

### Suggested command

You can run the scenario locally with the following command:

```bash
RUST_LOG=info cargo run --package dht_sync_lag -- --connection-string ws://localhost:8888 --agents 2 --behaviour write:1 --behaviour record_lag:1 --duration 900
```

However, doing so is not that meaningful because data is all local so the lag should be minimal.

Running the scenario distributed is suggested to be done by partitioning your nodes. The first group run the command:

```bash
RUST_LOG=info cargo run --package dht_sync_lag -- --connection-string ws://localhost:8888 --behaviour write --duration 300
```

Then the second group run command:

```bash
RUST_LOG=info cargo run --package dht_sync_lag -- --connection-string ws://localhost:8888 --behaviour record_lag --duration 900
```
Loading

0 comments on commit dfd9c25

Please sign in to comment.