Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: load plugins from our github plugin repo #25861

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 61 additions & 0 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,3 +1164,64 @@ def process_writes(influxdb3_local, table_batches, args=None):
);
}
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_load_wal_plugin_from_gh() {
use crate::ConfigProvider;
use influxdb3_client::Precision;

let plugin_dir = TempDir::new().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();

server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
.unwrap();

let db_name = "foo";

// this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py
let plugin_name = "gh:examples/wal_plugin";

// Run the test to make sure it'll load from GH
let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");

let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();

let expected_result = r#"{
"log_lines": [
"INFO: wal_plugin.py done"
],
"database_writes": {
"foo": [
"write_reports,table_name=test_input row_count=1i"
]
},
"errors": []
}"#;
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}
1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true

Expand Down
29 changes: 26 additions & 3 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,30 @@ impl ProcessingEngineManagerImpl {
}
}

pub fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
pub async fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
if name.starts_with("gh:") {
let plugin_path = name.strip_prefix("gh:").unwrap();
// the filename should be the last part of the name after the last /
let plugin_name = plugin_path
.split('/')
.last()
.context("plugin name for github plugins must be <dir>/<name>")?;
let url = format!(
"https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py",
plugin_path, plugin_name
);
let resp = reqwest::get(&url)
.await
.context("error getting plugin from github repo")?;
let resp_body = resp
.text()
.await
.context("error reading plugin from github repo")?;
return Ok(resp_body);
}

// otherwise we assume it is a local file
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
let path = plugin_dir.join(name);
Ok(std::fs::read_to_string(path)?)
Expand Down Expand Up @@ -314,7 +337,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
};
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name)?;
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?;
plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context);
}

Expand Down Expand Up @@ -434,7 +457,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner()));
let now = self.time_provider.now();

let code = self.read_plugin_code(&request.filename)?;
let code = self.read_plugin_code(&request.filename).await?;

let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| WalPluginTestResponse {
Expand Down
Loading