diff --git a/Cargo.lock b/Cargo.lock index 0aaa6beab8b..3ccadfbb6e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2990,6 +2990,7 @@ dependencies = [ "observability_deps", "parquet_file", "pyo3", + "reqwest 0.11.27", "tempfile", "thiserror 1.0.69", "tokio", diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index f021a2c341b..6ea41e0eaee 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -1164,3 +1164,64 @@ def process_writes(influxdb3_local, table_batches, args=None): ); } } + +#[cfg(feature = "system-py")] +#[test_log::test(tokio::test)] +async fn test_load_wal_plugin_from_gh() { + use crate::ConfigProvider; + use influxdb3_client::Precision; + + let plugin_dir = TempDir::new().unwrap(); + + let server = TestServer::configure() + .with_plugin_dir(plugin_dir.path().to_str().unwrap()) + .spawn() + .await; + let server_addr = server.client_addr(); + + server + .write_lp_to_db( + "foo", + "cpu,host=s1,region=us-east usage=0.9\n\ + cpu,host=s2,region=us-east usage=0.89\n\ + cpu,host=s1,region=us-east usage=0.85", + Precision::Nanosecond, + ) + .await + .unwrap(); + + let db_name = "foo"; + + // this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py + let plugin_name = "gh:examples/wal_plugin"; + + // Run the test to make sure it'll load from GH + let result = run_with_confirmation(&[ + "test", + "wal_plugin", + "--database", + db_name, + "--host", + &server_addr, + "--lp", + "test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500", + plugin_name, + ]); + debug!(result = ?result, "test wal plugin"); + + let res = serde_json::from_str::(&result).unwrap(); + + let expected_result = r#"{ + "log_lines": [ + "INFO: wal_plugin.py done" + ], + "database_writes": { + "foo": [ + "write_reports,table_name=test_input row_count=1i" + ] + }, + "errors": [] +}"#; + let expected_result = serde_json::from_str::(expected_result).unwrap(); + assert_eq!(res, expected_result); +} diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index 2b952c89ecc..9b2b9f51e53 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -21,6 +21,7 @@ influxdb3_py_api = { path = "../influxdb3_py_api" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_write = { path = "../influxdb3_write" } observability_deps.workspace = true +reqwest.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 1578d279a59..19a0178d053 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -117,7 +117,30 @@ impl ProcessingEngineManagerImpl { } } - pub fn read_plugin_code(&self, name: &str) -> Result { + pub async fn read_plugin_code(&self, name: &str) -> Result { + // if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main + if name.starts_with("gh:") { + let plugin_path = name.strip_prefix("gh:").unwrap(); + // the filename should be the last part of the name after the last / + let plugin_name = plugin_path + .split('/') + .last() + .context("plugin name for github plugins must be /")?; + let url = format!( + "https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py", + plugin_path, plugin_name + ); + let resp = reqwest::get(&url) + .await + .context("error getting plugin from github repo")?; + let resp_body = resp + .text() + .await + .context("error reading plugin from github repo")?; + return Ok(resp_body); + } + + // otherwise we assume it is a local file let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; let path = plugin_dir.join(name); Ok(std::fs::read_to_string(path)?) @@ -314,7 +337,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { write_buffer, query_executor, }; - let plugin_code = self.read_plugin_code(&trigger.plugin_file_name)?; + let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?; plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context); } @@ -434,7 +457,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); let now = self.time_provider.now(); - let code = self.read_plugin_code(&request.filename)?; + let code = self.read_plugin_code(&request.filename).await?; let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request) .unwrap_or_else(|e| WalPluginTestResponse {