Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested search #2171

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions application/apps/indexer/processor/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,66 @@ impl SearchMap {
Ok(&self.matches[*range.start() as usize..=*range.end() as usize])
}

/// Returns information about all matches in the search results occurring after the specified position.
///
/// # Parameters
///
/// * `from` - The starting position from which to retrieve matches.
///
/// # Returns
///
/// * `Ok(&[stypes::FilterMatch])` - A slice of matches starting from the specified position.
/// * `Err(MapError::OutOfRange)` - If the `from` position exceeds the available matches.
pub fn indexes_from(&self, from: u64) -> Result<&[stypes::FilterMatch], MapError> {
if from >= self.len() as u64 {
return Err(MapError::OutOfRange(format!(
"Search has: {} matches. Requested from: {from}",
self.len(),
)));
}
Ok(&self.matches[from as usize..])
}

/// Returns information about all matches in the search results occurring before the specified position.
///
/// # Parameters
///
/// * `to` - The ending position up to which to retrieve matches.
///
/// # Returns
///
/// * `Ok(&[stypes::FilterMatch])` - A slice of matches up to the specified position.
/// * `Err(MapError::OutOfRange)` - If the `to` position exceeds the available matches.
pub fn indexes_to_rev(&self, to: u64) -> Result<&[stypes::FilterMatch], MapError> {
if to >= self.len() as u64 {
return Err(MapError::OutOfRange(format!(
"Search has: {} matches. Requested from: {to}",
self.len(),
)));
}
Ok(&self.matches[..to as usize])
}

/// Returns the search result line index corresponding to a line index in the session file.
///
/// # Parameters
///
/// * `pos` - The line index in the session file.
///
/// # Returns
///
/// * `Some(u64)` - The index of the matching line in the search results.
/// * `None` - If no match is found for the specified position.
pub fn get_match_index(&self, pos: u64) -> Option<u64> {
self.matches.iter().enumerate().find_map(|(index, m)| {
if m.index == pos {
Some(index as u64)
} else {
None
}
})
}

/// Takes position of row in main stream/file and try to find
/// relevant nearest position in search results.
/// For example, search results are (indexes or rows):
Expand Down
46 changes: 46 additions & 0 deletions application/apps/indexer/processor/src/search/searchers/linear.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::search::{error::SearchError, filter, filter::SearchFilter};
use regex::Regex;
use std::str::FromStr;

/// Represents a utility for searching matches in a string.
/// Primarily used for nested searches, such as filtering results from a primary search.
#[derive(Debug)]
pub struct LineSearcher {
/// A compiled regular expression used for matching lines.
re: Regex,
}

impl LineSearcher {
/// Creates a new `LineSearcher` instance using the provided search filter.
///
/// # Arguments
///
/// * `filter` - A reference to a `SearchFilter` that specifies the search criteria.
///
/// # Returns
///
/// * `Ok(Self)` - If the regular expression is successfully created.
/// * `Err(SearchError)` - If the regular expression cannot be compiled.
pub fn new(filter: &SearchFilter) -> Result<Self, SearchError> {
let regex_as_str = filter::as_regex(filter);
Ok(Self {
re: Regex::from_str(&regex_as_str).map_err(|err| {
SearchError::Regex(format!("Failed to create regex for {regex_as_str}: {err}"))
})?,
})
}

/// Checks if the given line matches the internal regular expression.
///
/// # Arguments
///
/// * `ln` - A string slice representing the line to be checked.
///
/// # Returns
///
/// * `true` - If the line matches the regular expression.
/// * `false` - Otherwise.
pub fn is_match(&self, ln: &str) -> bool {
self.re.is_match(ln)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ use std::{
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

pub mod linear;
pub mod regular;
#[cfg(test)]
pub mod tests_regular;
#[cfg(test)]
pub mod tests_values;
pub mod values;

#[derive(Debug)]
pub struct BaseSearcher<State: SearchState> {
pub file_path: PathBuf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use tokio::{
};

#[allow(clippy::type_complexity)]
pub async fn observe_file<'a>(
pub async fn observe_file(
operation_api: OperationAPI,
state: SessionStateAPI,
uuid: &str,
file_format: &stypes::FileFormat,
filename: &Path,
parser: &'a stypes::ParserType,
parser: &stypes::ParserType,
) -> OperationResult<()> {
let source_id = state.add_source(uuid).await?;
let (tx_tail, mut rx_tail): (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use sources::{
socket::{tcp::TcpSource, udp::UdpSource},
};

pub async fn observe_stream<'a>(
pub async fn observe_stream(
operation_api: OperationAPI,
state: SessionStateAPI,
uuid: &str,
transport: &stypes::Transport,
parser: &'a stypes::ParserType,
parser: &stypes::ParserType,
rx_sde: Option<SdeReceiver>,
) -> OperationResult<()> {
let source_id = state.add_source(uuid).await?;
Expand Down
35 changes: 35 additions & 0 deletions application/apps/indexer/session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,41 @@ impl Session {
.map_err(stypes::ComputationError::NativeError)
}

/// Calls "nested" search functionality.
/// A "nested" search refers to filtering matches within the primary search results.
///
/// # Parameters
///
/// * `filter` - The search filter used to specify the criteria for the nested search.
/// * `from` - The starting position (within the primary search results) for the nested search.
/// * `rev` - Specifies the direction of the search:
/// * `true` - Perform the search in reverse.
/// * `false` - Perform the search in forward order.
///
/// # Returns
///
/// If a match is found:
/// * `Some((search_result_line_index, session_file_line_index))` - A tuple containing:
/// - The line index within the search results.
/// - The corresponding line index in the session file.
///
/// If no match is found:
/// * `None`
///
/// On error:
/// * `Err(stypes::ComputationError)` - Describes the error encountered during the process.
pub async fn search_nested_match(
&self,
filter: SearchFilter,
from: u64,
rev: bool,
) -> Result<Option<(u64, u64)>, stypes::ComputationError> {
self.state
.search_nested_match(filter, from, rev)
.await
.map_err(stypes::ComputationError::NativeError)
}

pub async fn grab_search(
&self,
range: LineRange,
Expand Down
26 changes: 25 additions & 1 deletion application/apps/indexer/session/src/state/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use parsers;
use processor::{
grabber::LineRange,
map::{FiltersStats, ScaledDistribution},
search::searchers::{regular::RegularSearchHolder, values::ValueSearchHolder},
search::{
filter::SearchFilter,
searchers::{regular::RegularSearchHolder, values::ValueSearchHolder},
},
};
use std::{collections::HashMap, fmt::Display, ops::RangeInclusive, path::PathBuf};
use stypes::GrabbedElement;
Expand Down Expand Up @@ -122,6 +125,15 @@ pub enum Api {
),
),
#[allow(clippy::type_complexity)]
SearchNestedMatch(
(
SearchFilter,
u64,
bool,
oneshot::Sender<Result<Option<(u64, u64)>, stypes::NativeError>>,
),
),
#[allow(clippy::type_complexity)]
GrabRanges(
(
Vec<RangeInclusive<u64>>,
Expand Down Expand Up @@ -215,6 +227,7 @@ impl Display for Api {
Self::SetSearchHolder(_) => "SetSearchHolder",
Self::DropSearch(_) => "DropSearch",
Self::GrabSearch(_) => "GrabSearch",
Self::SearchNestedMatch(_) => "SearchNestedMatch",
Self::GrabIndexed(_) => "GrabIndexed",
Self::SetIndexingMode(_) => "SetIndexingMode",
Self::GetIndexedMapLen(_) => "GetIndexedMapLen",
Expand Down Expand Up @@ -361,6 +374,17 @@ impl SessionStateAPI {
.await?
}

pub async fn search_nested_match(
&self,
filter: SearchFilter,
from: u64,
rev: bool,
) -> Result<Option<(u64, u64)>, stypes::NativeError> {
let (tx, rx) = oneshot::channel();
self.exec_operation(Api::SearchNestedMatch((filter, from, rev, tx)), rx)
.await?
}

pub async fn grab_ranges(
&self,
ranges: Vec<RangeInclusive<u64>>,
Expand Down
Loading
Loading