Skip to content

Commit

Permalink
Plugins Parser: Adjustment according to master
Browse files Browse the repository at this point in the history
* Change parse return type from collection of results to a result of
  collection and make the needed adjustments in wit files and macros.
* Adjustments on the unit tests for parser macro accordingly.
  • Loading branch information
AmmarAbouZor committed Nov 7, 2024
1 parent 54c06a5 commit ddde66e
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 76 deletions.
30 changes: 17 additions & 13 deletions application/apps/indexer/plugins_api/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait Parser {
&mut self,
data: &[u8],
timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<ParseReturn, ParseError>> + Send;
) -> Result<impl Iterator<Item = ParseReturn>, ParseError>;
}

#[macro_export]
Expand Down Expand Up @@ -102,8 +102,8 @@ pub trait Parser {
/// # &mut self,
/// # _data: &[u8],
/// # _timestamp: Option<u64>,
/// # ) -> impl IntoIterator<Item = Result<ParseReturn, ParseError>> + Send {
/// # Vec::new()
/// # ) -> Result<impl Iterator<Item = ParseReturn>, ParseError> {
/// # Ok(std::iter::empty())
/// # }
/// }
///
Expand Down Expand Up @@ -157,25 +157,29 @@ macro_rules! parser_export {
fn parse(
data: ::std::vec::Vec<u8>,
timestamp: ::std::option::Option<u64>,
) -> ::std::vec::Vec<
::std::result::Result<$crate::parser::ParseReturn, $crate::parser::ParseError>,
) -> ::std::result::Result<
::std::vec::Vec<$crate::parser::ParseReturn>,
$crate::parser::ParseError,
> {
// SAFETY: Parse method has mutable reference to self and can't be called more than
// once on the same time on host
let parser = unsafe { PARSER.as_mut().expect("parser already initialized") };
parser.parse(&data, timestamp).into_iter().collect()
parser.parse(&data, timestamp).map(|items| items.collect())
}

/// Parse the given bytes returning the results to the host one by one using the function `add` provided by the host.
fn parse_with_add(data: ::std::vec::Vec<u8>, timestamp: ::std::option::Option<u64>) {
fn parse_with_add(
data: ::std::vec::Vec<u8>,
timestamp: ::std::option::Option<u64>,
) -> ::std::result::Result<(), $crate::parser::ParseError> {
// SAFETY: Parse method has mutable reference to self and can't be called more than
// once on the same time on host
let parser = unsafe { PARSER.as_mut().expect("parser already initialized") };
for item in parser.parse(&data, timestamp) {
$crate::parser::__internal_bindings::chipmunk::plugin::host_add::add(
item.as_ref(),
);
for item in parser.parse(&data, timestamp)? {
$crate::parser::__internal_bindings::chipmunk::plugin::host_add::add(&item);
}

Ok(())
}
}

Expand Down Expand Up @@ -208,8 +212,8 @@ mod prototyping {
&mut self,
_data: &[u8],
_timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<ParseReturn, ParseError>> + Send {
Vec::new()
) -> Result<impl Iterator<Item = ParseReturn>, ParseError> {
Ok(std::iter::empty())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ impl crate::parser::Parser for Dummy {
&mut self,
_data: &[u8],
_timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<crate::parser::ParseReturn, crate::parser::ParseError>> + Send
{
Vec::new()
) -> Result<impl Iterator<Item = crate::parser::ParseReturn>, crate::parser::ParseError> {
Ok(std::iter::empty())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ mod impl_mod {
&mut self,
_data: &[u8],
_timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<crate::parser::ParseReturn, crate::parser::ParseError>> + Send
) -> Result<impl Iterator<Item = crate::parser::ParseReturn>, crate::parser::ParseError>
{
Vec::new()
Ok(std::iter::empty())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ impl crate::parser::Parser for Dummy {
&mut self,
_data: &[u8],
_timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<crate::parser::ParseReturn, crate::parser::ParseError>> + Send
{
Vec::new()
) -> Result<impl Iterator<Item = crate::parser::ParseReturn>, crate::parser::ParseError> {
Ok(std::iter::empty())
}
}

Expand Down
12 changes: 7 additions & 5 deletions application/apps/indexer/plugins_api/wit/v_0.1.0/parser.wit
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,21 @@ interface parser {
/// Initialize the parser with the given configurations
init: func(general-configs: parser-config, plugin-configs: option<string>) -> result<_, init-error>;

/// Parse the given bytes returning a list of plugins results
parse: func(data: list<u8>, timestamp: option<u64>) -> list<result<parse-return, parse-error>>;
/// Parse the given bytes returning a list of parsed items,
/// or parse error if an error occurred and no item has been parsed.
parse: func(data: list<u8>, timestamp: option<u64>) -> result<list<parse-return>, parse-error>;

/// Parse the given bytes returning the results to the host one by one using the function `add` provided by the host.
parse-with-add: func(data: list<u8>, timestamp: option<u64>);
/// Otherwise it will return a parsing error only if
parse-with-add: func(data: list<u8>, timestamp: option<u64>) -> result<_, parse-error>;
}

/// Provides methods to add parse items directly by the host
interface host-add {
use parse-types.{parse-return, parse-error};

/// Add parse results one by one directly at the host memory
add: func(item: result<parse-return, parse-error>);
/// Add parsed item one by one directly at the host memory
add: func(item: parse-return);
}


Expand Down
94 changes: 47 additions & 47 deletions application/apps/indexer/plugins_host/src/v0_1_0/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use wasmtime::{
use wasmtime_wasi::ResourceTable;

use crate::{
plugins_shared::get_wasi_ctx_builder, v0_1_0::parser::bindings::ParseError,
wasm_host::get_wasm_host, PluginGuestInitError, PluginHostInitError, PluginParseMessage,
PluginType, WasmPlugin,
plugins_shared::get_wasi_ctx_builder, wasm_host::get_wasm_host, PluginGuestInitError,
PluginHostInitError, PluginParseMessage, PluginType, WasmPlugin,
};

use self::{
Expand Down Expand Up @@ -81,8 +80,8 @@ impl PluginParser {
&mut self,
input: &[u8],
timestamp: Option<u64>,
) -> impl IntoIterator<Item = Result<(usize, Option<p::ParseYield<PluginParseMessage>>), p::Error>>
+ Send {
) -> Result<impl Iterator<Item = (usize, Option<p::ParseYield<PluginParseMessage>>)>, p::Error>
{
let call_res =
futures::executor::block_on(self.plugin_bindings.chipmunk_plugin_parser().call_parse(
&mut self.store,
Expand All @@ -91,15 +90,16 @@ impl PluginParser {
));

let parse_results = match call_res {
Ok(results) => results,
Ok(results) => results?,
Err(call_err) => {
vec![Err(ParseError::Unrecoverable(format!(
return Err(p::Error::Unrecoverable(format!(
"Call parse on the plugin failed. Error: {call_err}"
)))]
)))
}
};

parse_results.into_iter().map(guest_to_host_parse_results)
let res = parse_results.into_iter().map(guest_to_host_parse_results);
Ok(res)
}

#[inline]
Expand All @@ -110,48 +110,48 @@ impl PluginParser {
timestamp: Option<u64>,
) -> Result<impl Iterator<Item = (usize, Option<p::ParseYield<PluginParseMessage>>)>, p::Error>
{
//TODO AAZ: Temporary fix.
// In original implementation we were returning Vec<Result<>>
// Now we should return Result<Vec<>>.

// Old solution:
// debug_assert!(
// self.store.data_mut().results_queue.is_empty(),
// "Host results most be empty at the start of parse call"
// );
//
// let call_res = futures::executor::block_on(
// self.plugin_bindings
// .chipmunk_plugin_parser()
// .call_parse_with_add(&mut self.store, input, timestamp),
// );
//
// let parse_results = if let Err(call_err) = call_res {
// vec![Err(ParseError::Unrecoverable(format!(
// "Call parse on the plugin failed. Error: {call_err}"
// )))]
// } else {
// std::mem::take(&mut self.store.data_mut().results_queue)
// };
//
//
// parse_results.into_iter().map(guest_to_host_parse_results)

// Temporary:
Ok([].into_iter())
debug_assert!(
self.store.data_mut().results_queue.is_empty(),
"Host results most be empty at the start of parse call"
);

let parse_res = futures::executor::block_on(
self.plugin_bindings
.chipmunk_plugin_parser()
.call_parse_with_add(&mut self.store, input, timestamp),
)
.map_err(|call_err| {
p::Error::Unrecoverable(format!(
"Call parse on the plugin failed. Error: {call_err}"
))
})?;

if let Err(parse_err) = parse_res {
//TODO AAZ: Decide what to do if we have already parsed items.

if !self.store.data().results_queue.is_empty() {
self.store.data_mut().results_queue.clear();
return Err(p::Error::Unrecoverable(format!("Plugin return parse error and submitted parsed items on the same call. Plugin Error: {parse_err}")));
} else {
return Err(parse_err.into());
}
}

let parse_results = std::mem::take(&mut self.store.data_mut().results_queue);

let res = parse_results.into_iter().map(guest_to_host_parse_results);

Ok(res)
}
}

fn guest_to_host_parse_results(
guest_res: Result<ParseReturn, ParseError>,
) -> Result<(usize, Option<p::ParseYield<PluginParseMessage>>), p::Error> {
match guest_res {
Ok(parse_res) => Ok((
parse_res.consumed as usize,
parse_res.value.map(|v| v.into()),
)),
Err(parse_err) => Err(parse_err.into()),
}
parse_res: ParseReturn,
) -> (usize, Option<p::ParseYield<PluginParseMessage>>) {
(
parse_res.consumed as usize,
parse_res.value.map(|v| v.into()),
)
}

use parsers as p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use super::bindings::{
logging::{self, Level},
parse_types, shared_types,
},
ParseError, ParseReturn,
ParseReturn,
};

pub struct ParserPluginState {
pub ctx: WasiCtx,
pub table: ResourceTable,
pub results_queue: Vec<Result<ParseReturn, ParseError>>,
pub results_queue: Vec<ParseReturn>,
}

impl ParserPluginState {
Expand All @@ -37,7 +37,7 @@ impl WasiView for ParserPluginState {

impl Host for ParserPluginState {
// Add parse results one by one directly at the host memory
fn add(&mut self, item: Result<ParseReturn, ParseError>) {
fn add(&mut self, item: ParseReturn) {
self.results_queue.push(item);
}
}
Expand Down

0 comments on commit ddde66e

Please sign in to comment.