Skip to content

Commit

Permalink
Plugin Manager: Loading ... (in progress)
Browse files Browse the repository at this point in the history
* Refactoring to remove code duplication in parser plugin & Renaming.
* Move async functions to the plugin trait API.
* Create plugins directories if they don't exist.
* Add host init error to plugins general errors.
  • Loading branch information
AmmarAbouZor committed Dec 5, 2024
1 parent 9ad67a7 commit 8fcc42b
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn plugin_parser_init(c: &mut Criterion) {
.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| async {
// Creating parser will load compile and validate the provided plugin
let parser = plugins_host::PluginsParser::create(
let parser = plugins_host::PluginsParser::initialize(
&settings.plugin_path,
&settings.general_settings,
settings.plugin_configs.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,17 @@ impl WasmPlugin for PluginsByteSource {
PluginType::ByteSource
}

fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
async fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
match &mut self.source {
PlugVerByteSource::Ver010(source) => source.plugin_version(),
PlugVerByteSource::Ver010(source) => source.plugin_version().await,
}
}

fn get_config_schemas(
async fn get_config_schemas(
&mut self,
) -> Result<Vec<sources::plugins::ConfigSchemaItem>, PluginError> {
match &mut self.source {
PlugVerByteSource::Ver010(source) => source.get_config_schemas(),
PlugVerByteSource::Ver010(source) => source.get_config_schemas().await,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions application/apps/indexer/plugins_host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub enum PluginType {
ByteSource,
}

// Trait is used with Chipmunk only.
#[allow(async_fn_in_trait)]
pub trait WasmPlugin {
/// Provides the Type of the plugin.
fn get_type() -> PluginType;
Expand All @@ -31,13 +33,13 @@ pub trait WasmPlugin {
///
/// # Note
/// This version is for the plugin only and is different from the plugin's API version.
fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError>;
async fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError>;

/// Provides the schemas for the configurations required by the plugin, which
/// will be specified by the users.
///
/// These schemas define the expected structure, types, and constraints
/// for plugin-specific configurations. The values of these configurations
/// will be passed to the initializing method of the plugin.
fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError>;
async fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError>;
}
95 changes: 37 additions & 58 deletions application/apps/indexer/plugins_host/src/parser_shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,57 +42,21 @@ enum PlugVerParser {
}

impl PluginsParser {
pub async fn get_info(plugin_path: PathBuf) -> Result<ValidPluginInfo, PluginHostInitError> {
let engine = get_wasm_host()
.map(|host| &host.engine)
.map_err(|err| err.to_owned())?;

if !plugin_path.exists() {
return Err(PluginHostInitError::IO("Plugin path doesn't exist".into()));
}

if !plugin_path.is_file() {
return Err(PluginHostInitError::IO("Plugin path is not a file".into()));
}

let component = Component::from_file(engine, &plugin_path)
.map_err(|err| PluginHostInitError::PluginInvalid(err.to_string()))?;

let component_types = component.component_type();

let export_info = component_types.exports(engine).next().ok_or_else(|| {
PluginHostInitError::PluginInvalid("Plugin doesn't have exports information".into())
})?;

let (interface_name, version) = export_info.0.split_once('@').ok_or_else(|| {
PluginHostInitError::PluginInvalid(
"Plugin package schema doesn't match `wit` file definitions".into(),
)
})?;

if interface_name != PARSER_INTERFACE_NAME {
return Err(PluginHostInitError::PluginInvalid(
"Plugin package name doesn't match `wit` file".into(),
));
}

let version: SemanticVersion = version.parse().map_err(|err| {
PluginHostInitError::PluginInvalid(format!("Plugin version parsing failed: {err}"))
})?;
/// Loads the plugin and extract the needed plugin info if valid.
pub async fn get_info(plugin_path: PathBuf) -> Result<ValidPluginInfo, PluginError> {
let (component, version) = Self::load(&plugin_path).await?;

let plug_info = match version {
SemanticVersion {
major: 0,
minor: 1,
patch: 0,
} => {
let info = v0_1_0::parser::PluginParser::get_info(component).await?;
info
}
} => v0_1_0::parser::PluginParser::get_info(component).await?,
invalid_version => {
return Err(PluginHostInitError::PluginInvalid(format!(
"Plugin version {invalid_version} is not supported"
)))
))
.into())
}
};

Expand All @@ -107,20 +71,20 @@ impl PluginsParser {
Ok(plugin_info)
}

pub async fn create(
/// Loads and validate a plugin returning the its [`Component`] and API [`SemanticVersion`]
async fn load(
plugin_path: impl AsRef<Path>,
general_config: &pl::PluginParserGeneralSetttings,
plugin_configs: Vec<pl::ConfigItem>,
) -> Result<Self, PluginHostInitError> {
) -> Result<(Component, SemanticVersion), PluginHostInitError> {
let engine = get_wasm_host()
.map(|host| &host.engine)
.map_err(|err| err.to_owned())?;
.map_err(|err| PluginHostInitError::from(err.to_owned()))?;
let plugin_path = plugin_path.as_ref();

if !plugin_path.as_ref().exists() {
if !plugin_path.exists() {
return Err(PluginHostInitError::IO("Plugin path doesn't exist".into()));
}

if !plugin_path.as_ref().is_file() {
if !plugin_path.is_file() {
return Err(PluginHostInitError::IO("Plugin path is not a file".into()));
}

Expand Down Expand Up @@ -149,15 +113,30 @@ impl PluginsParser {
PluginHostInitError::PluginInvalid(format!("Plugin version parsing failed: {err}"))
})?;

Ok((component, version))
}

/// Initialize parser instance with the needed configuration to be used within a parsing
/// session.
pub async fn initialize(
plugin_path: impl AsRef<Path>,
general_config: &pl::PluginParserGeneralSetttings,
plugin_configs: Vec<pl::ConfigItem>,
) -> Result<Self, PluginHostInitError> {
let (component, version) = Self::load(&plugin_path).await?;

match version {
SemanticVersion {
major: 0,
minor: 1,
patch: 0,
} => {
let parser =
v0_1_0::parser::PluginParser::create(component, general_config, plugin_configs)
.await?;
let parser = v0_1_0::parser::PluginParser::initialize(
component,
general_config,
plugin_configs,
)
.await?;
Ok(Self {
parser: PlugVerParser::Ver010(parser),
errors_counter: 0,
Expand All @@ -169,9 +148,9 @@ impl PluginsParser {
}
}

pub fn get_render_options(&mut self) -> Result<ParserRenderOptions, PluginError> {
pub async fn get_render_options(&mut self) -> Result<ParserRenderOptions, PluginError> {
match &mut self.parser {
PlugVerParser::Ver010(parser) => parser.get_render_options(),
PlugVerParser::Ver010(parser) => parser.get_render_options().await,
}
}
}
Expand All @@ -181,15 +160,15 @@ impl WasmPlugin for PluginsParser {
PluginType::Parser
}

fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
async fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
match &mut self.parser {
PlugVerParser::Ver010(parser) => parser.plugin_version(),
PlugVerParser::Ver010(parser) => parser.plugin_version().await,
}
}

fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError> {
async fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError> {
match &mut self.parser {
PlugVerParser::Ver010(parser) => parser.get_config_schemas(),
PlugVerParser::Ver010(parser) => parser.get_config_schemas().await,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ mod paths;
use std::{
fs::{self, read_to_string},
io,
path::PathBuf,
path::{Path, PathBuf},
};

use crate::{
plugins_manager::{InvalidPluginInfo, PluginState},
plugins_shared::plugin_errors::PluginError,
PluginHostInitError, PluginType, PluginsParser,
};

Expand All @@ -17,8 +18,8 @@ use super::{InitError, PluginEntity, PluginMetadata};
pub async fn load_plugins() -> Result<Vec<PluginEntity>, InitError> {
let plugins_dir = paths::plugins_dir()?;
if !plugins_dir.exists() {
log::trace!("Plugins directory doens't exist");
return Ok(Vec::new());
log::trace!("Plugins directory doens't exist. Creating it...");
fs::create_dir_all(plugins_dir)?;
}

let mut plugins = load_all_parsers().await?;
Expand All @@ -36,7 +37,8 @@ async fn load_all_parsers() -> Result<Vec<PluginEntity>, InitError> {

let parsers_dir = paths::parser_dir()?;
if !parsers_dir.exists() {
log::trace!("Parsers directory deosn't exist");
log::trace!("Parsers directory deosn't exist. Creating it ...");
fs::create_dir_all(&parsers_dir)?;

return Ok(parsers);
}
Expand All @@ -52,7 +54,6 @@ async fn load_all_parsers() -> Result<Vec<PluginEntity>, InitError> {
/// Retrieves all directory form the given directory path
fn get_dirs(dir_path: &PathBuf) -> Result<impl Iterator<Item = PathBuf>, io::Error> {
let dirs = fs::read_dir(dir_path)?
.into_iter()
.filter_map(|entry| entry.ok().map(|e| e.path()))
.filter(|path| path.is_dir());

Expand Down Expand Up @@ -121,7 +122,9 @@ async fn load_parser(dir: PathBuf) -> Result<PluginEntity, InitError> {
let plugin_info = match PluginsParser::get_info(wasm_file).await {
Ok(info) => info,
// Stop the whole loading on engine errors
Err(PluginHostInitError::EngineError(err)) => return Err(err.into()),
Err(PluginError::HostInitError(PluginHostInitError::EngineError(err))) => {
return Err(err.into())
}
Err(err) => {
let err_msg = format!("Loading plugin binray fail. Error: {err}");
let invalid = PluginEntity {
Expand Down Expand Up @@ -173,15 +176,16 @@ fn parse_metadata(file: &PathBuf) -> Result<PluginMetadata, String> {
}

async fn load_all_bytesources() -> Result<Vec<PluginEntity>, InitError> {
return Ok(Vec::new());
Ok(Vec::new())

// TODO AAZ: Activate when implementing byte source
//
// let mut bytesources = Vec::new();
//
// let bytesource_dir = paths::bytesource_dir()?;
// if !bytesource_dir.exists() {
// log::trace!("Bytesources directory doesn't exist");
// log::trace!("Bytesources directory doesn't exist. Creating it ...");
// fs::create_dir_all(&bytesource_dir)?;
// return Ok(bytesources);
// }
//
Expand All @@ -194,6 +198,6 @@ async fn load_all_bytesources() -> Result<Vec<PluginEntity>, InitError> {
}

#[allow(unused)]
fn load_bytesource(dir: &PathBuf) -> Result<PluginEntity, InitError> {
fn load_bytesource(dir: &Path) -> Result<PluginEntity, InitError> {
todo!()
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum PluginGuestInitError {
/// Represents general errors in communication with plugins
#[derive(Debug, Error)]
pub enum PluginError {
#[error("Error while initializing plugin host. {0}")]
HostInitError(#[from] PluginHostInitError),
#[error(transparent)]
WasmRunTimeError(#[from] anyhow::Error),
}
25 changes: 12 additions & 13 deletions application/apps/indexer/plugins_host/src/v0_1_0/bytesource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::io;

use bindings::BytesourcePlugin;
use bytesource_plugin_state::ByteSourcePluginState;
use futures::executor::block_on;
use wasmtime::{
component::{Component, Linker},
Store,
Expand Down Expand Up @@ -67,22 +66,22 @@ impl PluginByteSource {
})
}

pub fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError> {
let schemas = block_on(
self.plugin_bindings
.chipmunk_plugin_byte_source()
.call_get_config_schemas(&mut self.store),
)?;
pub async fn get_config_schemas(&mut self) -> Result<Vec<pl::ConfigSchemaItem>, PluginError> {
let schemas = self
.plugin_bindings
.chipmunk_plugin_byte_source()
.call_get_config_schemas(&mut self.store)
.await?;

Ok(schemas.into_iter().map(|item| item.into()).collect())
}

pub fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
let version = block_on(
self.plugin_bindings
.chipmunk_plugin_byte_source()
.call_get_version(&mut self.store),
)?;
pub async fn plugin_version(&mut self) -> Result<SemanticVersion, PluginError> {
let version = self
.plugin_bindings
.chipmunk_plugin_byte_source()
.call_get_version(&mut self.store)
.await?;

Ok(version.into())
}
Expand Down
Loading

0 comments on commit 8fcc42b

Please sign in to comment.