Skip to content

Commit

Permalink
WIP: Add Autometrics query types to Prometheus provider
Browse files Browse the repository at this point in the history
  • Loading branch information
arendjr committed Nov 29, 2023
1 parent 6cf69cb commit 83fbd16
Show file tree
Hide file tree
Showing 11 changed files with 905 additions and 642 deletions.
1,234 changes: 657 additions & 577 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ url = { version = "2.2.2", features = ["serde"] }
vergen = { version = "8.2.5", features = ["build", "git", "git2"] }

[patch.crates-io]
fiberplane-models = { git = "ssh://[email protected]/fiberplane/fiberplane.git", branch = "main" }
fiberplane-provider-bindings = { git = "ssh://[email protected]/fiberplane/fiberplane.git", branch = "main" }
#fiberplane-models = { git = "ssh://[email protected]/fiberplane/fiberplane.git", branch = "main" }
#fiberplane-provider-bindings = { git = "ssh://[email protected]/fiberplane/fiberplane.git", branch = "main" }
#fp-bindgen = { git = "ssh://[email protected]/fiberplane/fp-bindgen.git", branch = "release-3.0.0" }
#fp-bindgen-support = { git = "ssh://[email protected]/fiberplane/fp-bindgen.git", branch = "release-3.0.0" }

#[patch.'ssh://[email protected]/fiberplane/fiberplane.git']
#fiberplane-models = { path = "../fiberplane/fiberplane-models" }
#fiberplane-provider-bindings = { path = "../fiberplane/fiberplane-provider-protocol/fiberplane-provider-bindings" }
fiberplane-models = { path = "../fiberplane/fiberplane-models" }
fiberplane-provider-bindings = { path = "../fiberplane/fiberplane-provider-protocol/fiberplane-provider-bindings" }
28 changes: 24 additions & 4 deletions fiberplane-pdk/src/types/provider_data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,38 @@ use crate::providers::{
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, ProviderData, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_pdk::prelude")
)]
#[pdk(mime_type = CELLS_MIME_TYPE)]
pub struct Cells(pub Vec<Cell>);

#[derive(Clone, Debug, Deserialize, ProviderData, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_pdk::prelude")
)]
#[pdk(mime_type = EVENTS_MIME_TYPE)]
pub struct Events(pub Vec<ProviderEvent>);

#[derive(Clone, Debug, Deserialize, ProviderData, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_pdk::prelude")
)]
#[pdk(mime_type = SUGGESTIONS_MIME_TYPE)]
pub struct Suggestions(pub Vec<Suggestion>);

#[derive(Clone, Debug, Deserialize, ProviderData, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_pdk::prelude")
)]
#[pdk(mime_type = TIMESERIES_MIME_TYPE)]
pub struct TimeseriesVector(pub Vec<Timeseries>);
1 change: 1 addition & 0 deletions providers/prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const_format = "0.2"
form_urlencoded = "1"
fiberplane-models = { workspace = true }
fiberplane-pdk = { workspace = true }
fp-bindgen = { workspace = true }
grafana-common = { path = "../grafana-common" }
rmp-serde = "1"
rmpv = { version = "1.0.0", features = ["with-serde"] }
Expand Down
38 changes: 38 additions & 0 deletions providers/prometheus/src/autometrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use fiberplane_models::autometrics::{AutometricsFunction, PrometheusResponse};
use fiberplane_pdk::prelude::*;
use grafana_common::{query_direct_and_proxied, Config};
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;

pub const ALL_FUNCTIONS_QUERY: &str = "x-autometrics-functions";
pub const CHILD_FUNCTIONS_QUERY: &str = "x-autometrics-child-functions";

pub const AUTOMETRICS_FUNCTIONS_MIME_TYPE: &str = "application/vnd.autometrics.functions";

#[derive(Deserialize, QuerySchema)]
pub(crate) struct FunctionsQuery;

#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[pdk(mime_type = AUTOMETRICS_FUNCTIONS_MIME_TYPE)]
pub struct FunctionsVector(pub Vec<AutometricsFunction>);

pub(crate) async fn query_all_functions(_query: FunctionsQuery, config: Config) -> Result<Blob> {
let body = Blob::from({
let mut form_data = form_urlencoded::Serializer::new(String::new());
form_data.append_pair(
"match[]",
r#"{__name__=~"function_calls(_count)?(_total)?", function!="", module!=""}"#,
);
form_data
});

let response: PrometheusResponse<Vec<AutometricsFunction>> =
query_direct_and_proxied(&config, "prometheus", "/api/v1/series", Some(body)).await?;

FunctionsVector(filter_unique_functions(response.data)).to_blob()
}

fn filter_unique_functions(functions: Vec<AutometricsFunction>) -> Vec<AutometricsFunction> {
let unique_functions: BTreeSet<_> = functions.into_iter().collect();
unique_functions.into_iter().collect()
}
31 changes: 31 additions & 0 deletions providers/prometheus/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use fiberplane_models::autometrics::PrometheusResponse;
use fiberplane_models::MaybeSerializable;
use fiberplane_pdk::prelude::*;
use fp_bindgen::prelude::Serializable;
use grafana_common::{query_direct_and_proxied, Config};
use serde::{Deserialize, Serialize};

pub const CONFIG_QUERY: &str = "x-prometheus-config";

pub const YAML_MIME_TYPE: &str = "text/yaml";

#[derive(Deserialize, QuerySchema)]
pub(crate) struct ConfigQuery;

#[derive(Clone, Deserialize, PartialEq, Serialize, Serializable)]
pub struct ConfigYaml {
yaml: String,
}

impl MaybeSerializable for ConfigYaml {}

#[derive(Clone, Debug, Deserialize, PartialEq, ProviderData, Serialize)]
#[pdk(mime_type = YAML_MIME_TYPE)]
pub struct Yaml(pub String);

pub(crate) async fn query_config(_query: ConfigQuery, config: Config) -> Result<Blob> {
let response: PrometheusResponse<ConfigYaml> =
query_direct_and_proxied(&config, "prometheus", "/api/v1/status/config", None).await?;

Yaml(response.data.yaml).to_blob()
}
91 changes: 91 additions & 0 deletions providers/prometheus/src/functions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use super::instants::Instant;
use fiberplane_pdk::prelude::Timestamp;
use fiberplane_pdk::providers::*;
use serde::Deserialize;
use std::{
collections::BTreeMap,
num::ParseFloatError,
time::{Duration, SystemTime},
};

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusResponse {
pub data: PrometheusData,
}

#[derive(Deserialize)]
#[serde(tag = "resultType", content = "result", rename_all = "snake_case")]
pub enum PrometheusData {
Vector(Vec<InstantVector>),
Matrix(Vec<RangeVector>),
}

#[derive(Deserialize)]
pub struct InstantVector {
pub metric: BTreeMap<String, String>,
pub value: PrometheusPoint,
}

impl InstantVector {
pub fn into_instant(self) -> Result<Instant, Error> {
let mut labels = self.metric;
let name = labels.remove("__name__").unwrap_or_else(|| "".to_owned());
let metric = self.value.to_metric()?;
Ok(Instant {
name,
labels,
metric,
})
}
}

#[derive(Deserialize)]
pub struct Metadata {
pub help: Option<String>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusMetadataResponse {
pub data: BTreeMap<String, Vec<Metadata>>,
}

#[derive(Deserialize)]
pub struct PrometheusPoint(f64, String);

impl PrometheusPoint {
pub fn to_metric(&self) -> Result<Metric, ParseFloatError> {
let time = SystemTime::UNIX_EPOCH + Duration::from_millis((self.0 * 1000.0) as u64);
Ok(Metric::builder()
.time(Timestamp::from(time))
.value(self.1.parse()?)
.otel(OtelMetadata::default())
.build())
}
}

#[derive(Deserialize)]
pub struct RangeVector {
pub metric: BTreeMap<String, String>,
pub values: Vec<PrometheusPoint>,
}

impl RangeVector {
pub fn into_series(self) -> Result<Timeseries, Error> {
let mut labels = self.metric;
let name = labels.remove("__name__").unwrap_or_else(|| "".to_owned());
let metrics = self
.values
.into_iter()
.map(|value| value.to_metric())
.collect::<Result<_, _>>()?;
Ok(Timeseries::builder()
.name(name)
.labels(labels)
.metrics(metrics)
.otel(OtelMetadata::default())
.visible(true)
.build())
}
}
17 changes: 7 additions & 10 deletions providers/prometheus/src/instants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{constants::*, prometheus::*};
use fiberplane_models::autometrics::PrometheusResponse;
use fiberplane_models::blobs::Blob;
use fiberplane_models::providers::Metric;
use fiberplane_pdk::prelude::*;
Expand All @@ -20,23 +21,19 @@ pub struct Instant {
}

pub async fn query_instants(request: ProviderRequest) -> Result<Blob> {
let response: PrometheusResponse = query_direct_and_proxied(
let response: PrometheusResponse<Vec<InstantVector>> = query_direct_and_proxied(
&Config::parse(request.config)?,
"prometheus",
"api/v1/query",
Some(request.query_data),
)
.await?;

let PrometheusData::Vector(instants) = response.data else {
return Err(Error::Data {
message: "Expected a vector of instants".to_string(),
});
};

instants
let instants = response
.data
.into_iter()
.map(InstantVector::into_instant)
.collect::<Result<Vec<_>>>()
.and_then(|instants| Instants(instants).to_blob())
.collect::<Result<Vec<_>>>()?;

Instants(instants).to_blob()
}
17 changes: 15 additions & 2 deletions providers/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
mod auto_suggest;
mod autometrics;
mod config;
mod constants;
mod instants;
mod prometheus;
mod timeseries;

use auto_suggest::query_suggestions;
use autometrics::*;
use config::*;
use constants::{INSTANTS_MIME_TYPE, INSTANTS_QUERY_TYPE};
use fiberplane_pdk::prelude::*;
use grafana_common::{query_direct_and_proxied, Config};
Expand All @@ -13,10 +17,19 @@ use serde_json::Value;
use std::env;
use timeseries::{create_graph_cell, query_series, TimeseriesQuery};

static COMMIT_HASH: &str = env!("VERGEN_GIT_SHA");
static BUILD_TIMESTAMP: &str = env!("VERGEN_BUILD_TIMESTAMP");
const COMMIT_HASH: &str = env!("VERGEN_GIT_SHA");
const BUILD_TIMESTAMP: &str = env!("VERGEN_BUILD_TIMESTAMP");

pdk_query_types! {
ALL_FUNCTIONS_QUERY => {
handler: query_all_functions(FunctionsQuery, Config).await,
supported_mime_types: [AUTOMETRICS_FUNCTIONS_MIME_TYPE]
},
CONFIG_QUERY => {
handler: query_config(ConfigQuery, Config).await,
label: "Prometheus config",
supported_mime_types: [YAML_MIME_TYPE]
},
INSTANTS_QUERY_TYPE => {
handler: query_instants(ProviderRequest).await,
supported_mime_types: [INSTANTS_MIME_TYPE]
Expand Down
32 changes: 15 additions & 17 deletions providers/prometheus/src/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
use super::instants::Instant;
use fiberplane_models::MaybeSerializable;
use fiberplane_pdk::prelude::Timestamp;
use fiberplane_pdk::providers::*;
use serde::Deserialize;
use fp_bindgen::prelude::Serializable;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
num::ParseFloatError,
time::{Duration, SystemTime},
};

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusResponse {
pub data: PrometheusData,
}

#[derive(Deserialize)]
#[serde(tag = "resultType", content = "result", rename_all = "snake_case")]
pub enum PrometheusData {
Vector(Vec<InstantVector>),
Matrix(Vec<RangeVector>),
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize, PartialEq, Serialize, Serializable)]
#[fp(rust_module = "crate::prometheus")]
pub struct InstantVector {
pub metric: BTreeMap<String, String>,
pub value: PrometheusPoint,
}

impl MaybeSerializable for InstantVector {}

impl InstantVector {
pub fn into_instant(self) -> Result<Instant, Error> {
let mut labels = self.metric;
Expand All @@ -51,9 +43,12 @@ pub struct PrometheusMetadataResponse {
pub data: BTreeMap<String, Vec<Metadata>>,
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize, PartialEq, Serialize, Serializable)]
#[fp(rust_module = "crate::prometheus")]
pub struct PrometheusPoint(f64, String);

impl MaybeSerializable for PrometheusPoint {}

impl PrometheusPoint {
pub fn to_metric(&self) -> Result<Metric, ParseFloatError> {
let time = SystemTime::UNIX_EPOCH + Duration::from_millis((self.0 * 1000.0) as u64);
Expand All @@ -65,12 +60,15 @@ impl PrometheusPoint {
}
}

#[derive(Deserialize)]
#[derive(Clone, Deserialize, PartialEq, Serialize, Serializable)]
#[fp(rust_module = "crate::prometheus")]
pub struct RangeVector {
pub metric: BTreeMap<String, String>,
pub values: Vec<PrometheusPoint>,
}

impl MaybeSerializable for RangeVector {}

impl RangeVector {
pub fn into_series(self) -> Result<Timeseries, Error> {
let mut labels = self.metric;
Expand Down
Loading

0 comments on commit 83fbd16

Please sign in to comment.