Skip to content

Commit

Permalink
rust: intercept service takes responsibility of the receiver end
Browse files Browse the repository at this point in the history
  • Loading branch information
rizsotto committed Nov 16, 2024
1 parent 1be5284 commit 7c363ba
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 46 deletions.
31 changes: 19 additions & 12 deletions rust/bear/src/modes/intercept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ use std::process::{Command, ExitCode};
use std::sync::Arc;
use std::{env, thread};

pub(crate) struct InterceptService {
pub(super) struct InterceptService {
collector: Arc<EventCollectorOnTcp>,
receiver: Receiver<Envelope>,
collector_thread: Option<thread::JoinHandle<()>>,
network_thread: Option<thread::JoinHandle<()>>,
output_thread: Option<thread::JoinHandle<()>>,
}

impl InterceptService {
pub fn new() -> anyhow::Result<Self> {
pub fn new<F>(consumer: F) -> anyhow::Result<Self>
where
F: FnOnce(Receiver<Envelope>) -> anyhow::Result<()>,
F: Send + 'static,
{
let collector = EventCollectorOnTcp::new()?;
let collector_arc = Arc::new(collector);
let (sender, receiver) = bounded(32);
Expand All @@ -25,18 +29,18 @@ impl InterceptService {
let collector_thread = thread::spawn(move || {
collector_in_thread.collect(sender).unwrap();
});
let receiver_in_thread = receiver.clone();
let output_thread = thread::spawn(move || {
consumer(receiver_in_thread).unwrap();
});

Ok(InterceptService {
collector: collector_arc,
receiver,
collector_thread: Some(collector_thread),
network_thread: Some(collector_thread),
output_thread: Some(output_thread),
})
}

pub fn receiver(&self) -> Receiver<Envelope> {
self.receiver.clone()
}

pub fn address(&self) -> String {
self.collector.address()
}
Expand All @@ -45,13 +49,16 @@ impl InterceptService {
impl Drop for InterceptService {
fn drop(&mut self) {
self.collector.stop().expect("Failed to stop the collector");
if let Some(thread) = self.collector_thread.take() {
if let Some(thread) = self.network_thread.take() {
thread.join().expect("Failed to join the collector thread");
}
if let Some(thread) = self.output_thread.take() {
thread.join().expect("Failed to join the output thread");
}
}
}

pub(crate) enum InterceptEnvironment {
pub(super) enum InterceptEnvironment {
Wrapper {
bin_dir: tempfile::TempDir,
address: String,
Expand Down
64 changes: 30 additions & 34 deletions rust/bear/src/modes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ pub mod recognition;
pub mod transformation;

use crate::input::EventFileReader;
use crate::intercept::Envelope;
use crate::output::OutputWriter;
use crate::{args, config};
use anyhow::Context;
use crossbeam_channel::Receiver;
use intercept::{InterceptEnvironment, InterceptService};
use recognition::Recognition;
use std::io::BufWriter;
use std::process::ExitCode;
use std::thread;
use transformation::Transformation;

/// The mode trait is used to run the application in different modes.
Expand Down Expand Up @@ -56,43 +57,38 @@ impl Intercept {
config,
}
}

fn write_to_file(
output_file_name: String,
envelopes: Receiver<Envelope>,
) -> anyhow::Result<()> {
let mut writer = std::fs::File::create(&output_file_name)
.map(BufWriter::new)
.with_context(|| format!("Failed to create output file: {:?}", &output_file_name))?;
for envelope in envelopes.iter() {
envelope
.write_into(&mut writer)
.with_context(|| "Failed to write the envelope")?;
}
Ok(())
}
}

impl Mode for Intercept {
fn run(self) -> anyhow::Result<ExitCode> {
match &self.config {
config::Intercept::Wrapper { .. } => {
let service = InterceptService::new()
.with_context(|| "Failed to create the intercept service")?;
let environment = InterceptEnvironment::new(&self.config, service.address())
.with_context(|| "Failed to create the intercept environment")?;

// start writer thread
let writer_thread = thread::spawn(move || {
let file = std::fs::File::create(&self.output.file_name).expect(
format!("Failed to create output file: {:?}", self.output.file_name)
.as_str(),
);
let mut writer = BufWriter::new(file);
for envelope in service.receiver().iter() {
envelope
.write_into(&mut writer)
.expect("Failed to write the envelope");
}
});

let status = environment.execute_build_command(self.command);

writer_thread
.join()
.expect("Failed to join the writer thread");

status
}
config::Intercept::Preload { .. } => {
todo!()
}
}
let output_file_name = self.output.file_name.clone();
let service = InterceptService::new(move |envelopes| {
Self::write_to_file(output_file_name, envelopes)
})
.with_context(|| "Failed to create the intercept service")?;
let environment = InterceptEnvironment::new(&self.config, service.address())
.with_context(|| "Failed to create the intercept environment")?;

let status = environment
.execute_build_command(self.command)
.with_context(|| "Failed to execute the build command")?;

Ok(status)
}
}

Expand Down

0 comments on commit 7c363ba

Please sign in to comment.