Skip to content

Commit

Permalink
feat: load plugins from our github plugin repo
Browse files Browse the repository at this point in the history
Adds the ability to load a plugin from our public github repo here: https://github.com/influxdata/influxdb3_pluginshttps://github.com/influxdata/influxdb3_plugins

If the plugin filename is specified as `gh:examples/wal_plugin` it will pull from the github repo at `<dir>/<name>/<name>.py`.

Closes #25836
  • Loading branch information
pauldix committed Jan 17, 2025
1 parent eb0b1eb commit 84f5702
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 61 additions & 0 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,3 +1164,64 @@ def process_writes(influxdb3_local, table_batches, args=None):
);
}
}

#[cfg(feature = "system-py")]
#[test_log::test(tokio::test)]
async fn test_load_wal_plugin_from_gh() {
use crate::ConfigProvider;
use influxdb3_client::Precision;

let plugin_dir = TempDir::new().unwrap();

let server = TestServer::configure()
.with_plugin_dir(plugin_dir.path().to_str().unwrap())
.spawn()
.await;
let server_addr = server.client_addr();

server
.write_lp_to_db(
"foo",
"cpu,host=s1,region=us-east usage=0.9\n\
cpu,host=s2,region=us-east usage=0.89\n\
cpu,host=s1,region=us-east usage=0.85",
Precision::Nanosecond,
)
.await
.unwrap();

let db_name = "foo";

// this will pull from https://github.com/influxdata/influxdb3_plugins/blob/main/examples/wal_plugin/wal_plugin.py
let plugin_name = "gh:examples/wal_plugin";

// Run the test to make sure it'll load from GH
let result = run_with_confirmation(&[
"test",
"wal_plugin",
"--database",
db_name,
"--host",
&server_addr,
"--lp",
"test_input,tag1=tag1_value,tag2=tag2_value field1=1i 500",
plugin_name,
]);
debug!(result = ?result, "test wal plugin");

let res = serde_json::from_str::<serde_json::Value>(&result).unwrap();

let expected_result = r#"{
"log_lines": [
"INFO: wal_plugin.py done"
],
"database_writes": {
"foo": [
"write_reports,table_name=test_input row_count=1i"
]
},
"errors": []
}"#;
let expected_result = serde_json::from_str::<serde_json::Value>(expected_result).unwrap();
assert_eq!(res, expected_result);
}
1 change: 1 addition & 0 deletions influxdb3_processing_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
observability_deps.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true

Expand Down
29 changes: 26 additions & 3 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,30 @@ impl ProcessingEngineManagerImpl {
}
}

pub fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
pub async fn read_plugin_code(&self, name: &str) -> Result<String, plugins::Error> {
// if the name starts with gh: then we need to get it from the public github repo at https://github.com/influxdata/influxdb3_plugins/tree/main
if name.starts_with("gh:") {
let plugin_path = name.strip_prefix("gh:").unwrap();
// the filename should be the last part of the name after the last /
let plugin_name = plugin_path
.split('/')
.last()
.context("plugin name for github plugins must be <dir>/<name>")?;
let url = format!(
"https://raw.githubusercontent.com/influxdata/influxdb3_plugins/main/{}/{}.py",
plugin_path, plugin_name
);
let resp = reqwest::get(&url)
.await
.context("error getting plugin from github repo")?;
let resp_body = resp
.text()
.await
.context("error reading plugin from github repo")?;
return Ok(resp_body);
}

// otherwise we assume it is a local file
let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?;
let path = plugin_dir.join(name);
Ok(std::fs::read_to_string(path)?)
Expand Down Expand Up @@ -314,7 +337,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
write_buffer,
query_executor,
};
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name)?;
let plugin_code = self.read_plugin_code(&trigger.plugin_file_name).await?;
plugins::run_plugin(db_name.to_string(), plugin_code, trigger, plugin_context);
}

Expand Down Expand Up @@ -434,7 +457,7 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner()));
let now = self.time_provider.now();

let code = self.read_plugin_code(&request.filename)?;
let code = self.read_plugin_code(&request.filename).await?;

let res = plugins::run_test_wal_plugin(now, catalog, query_executor, code, request)
.unwrap_or_else(|e| WalPluginTestResponse {
Expand Down

0 comments on commit 84f5702

Please sign in to comment.