Skip to content

Commit

Permalink
WIP: Romove fixtype thread inserting it in the on_message_callback an…
Browse files Browse the repository at this point in the history
…d some other code simplifications
  • Loading branch information
EdoZua95 committed May 17, 2024
1 parent b92a710 commit 1d77e16
Showing 1 changed file with 152 additions and 93 deletions.
245 changes: 152 additions & 93 deletions src/bin/sailtrack-aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use eskf::ESKF;
use rand::Rng;
use std::sync::{Arc, Mutex};
use std::thread;
Expand Down Expand Up @@ -75,8 +76,12 @@ struct Boat {

#[derive(Debug, Clone, Copy)]
struct Measure {
data: Gps,
gps_ref: Gps,
position: Point3<f32>,
velocity_xy: Vector2<f32>,
velocity_z: f32,
pos_variance: Matrix3<f32>,
vel_variance: Matrix2<f32>,
vel_z_variance: f32,
new_measure: bool,
}

Expand All @@ -87,60 +92,72 @@ struct Input {
new_input: bool,
}

#[derive(Debug, Clone, Copy)]
struct Debug {
mqtt_recieve_process_counter:u64,
mqtt_send_process_counter:u64,
filter_process_counter:u64,
}


// Function to continuously try locking the mutex until successful
fn acquire_lock<T>(mutex: &Arc<Mutex<T>>) -> std::sync::MutexGuard<T> {
let mut count = 0;
loop {
if let Ok(guard) = mutex.try_lock() {
return guard;
}
let mut rng = rand::thread_rng();
let sleep_time: u64 = rng.gen_range(5..10);
thread::sleep(Duration::from_millis(sleep_time));
count += 1;
/*
if count > 500{
println!("Acquire lock failed on mutex containig value {:?}", std::any::type_name::<T>());
}
*/
}
}

fn get_matrix_from_measure(
measure: &Measure,
) -> (
Point3<f32>,
Vector2<f32>,
f32,
Matrix3<f32>,
Matrix2<f32>,
f32,
) {
// Function to compute the measure for the Kalman filter from the raw GPS data
fn get_measure_forom_gps(
measure: &Gps, reference: &Gps,
) -> Measure {
let position = Point3::new(
(measure.data.lat - measure.gps_ref.lat) * EARTH_CIRCUMFERENCE_METERS / 360.0,
(measure.data.lon - measure.gps_ref.lon) * EARTH_CIRCUMFERENCE_METERS * LAT_FACTOR / 360.0,
measure.data.h_msl - measure.gps_ref.h_msl,
(measure.lat*f32::powf(10.0, -7.0) - reference.lat*f32::powf(10.0, -7.0)) * EARTH_CIRCUMFERENCE_METERS / 360.0,
(measure.lon*f32::powf(10.0, -7.0) - reference.lon*f32::powf(10.0, -7.0)) * EARTH_CIRCUMFERENCE_METERS * LAT_FACTOR / 360.0,
measure.h_msl*f32::powf(10.0, -3.0) - reference.h_msl*f32::powf(10.0, -3.0),
);
let velocity_xy = Vector2::new(measure.data.vel_n, measure.data.vel_e);
let velocity_z = -measure.data.vel_d;
let velocity_xy = Vector2::new(measure.vel_n*f32::powf(10.0, -3.0), measure.vel_e*f32::powf(10.0, -3.0));
let velocity_z = -measure.vel_d*f32::powf(10.0, -3.0);
let mut pos_variance = Matrix3::zeros();
pos_variance[(0, 0)] = 0.25 * measure.data.h_acc.powf(2.0);
pos_variance[(1, 1)] = 0.25 * measure.data.h_acc.powf(2.0);
pos_variance[(2, 2)] = 0.25 * measure.data.h_acc.powf(2.0);
let vel_variance = Matrix2::identity() * 0.25 * measure.data.s_acc.powf(2.0);
let vel_z_variance = 0.25 * measure.data.s_acc.powf(2.0);
(
pos_variance[(0, 0)] = 0.25 * (measure.h_acc*f32::powf(10.0, -3.0)).powf(2.0);
pos_variance[(1, 1)] = 0.25 * (measure.h_acc*f32::powf(10.0, -3.0)).powf(2.0);
pos_variance[(2, 2)] = 0.25 * (measure.v_acc*f32::powf(10.0, -3.0)).powf(2.0);
let vel_variance = Matrix2::identity() * 0.25 * (measure.s_acc*f32::powf(10.0, -3.0)).powf(2.0);
let vel_z_variance = 0.25 * (measure.s_acc*f32::powf(10.0, -3.0)).powf(2.0);
let measure = Measure {
position,
velocity_xy,
velocity_z,
pos_variance,
vel_variance,
vel_z_variance,
)
new_measure: true,
};
measure
}


// Function that keeps on controll if the GPS fix is obtained
fn wait_for_fix_tipe(measure_arc: &Arc<Mutex<Measure>>) {
fn wait_for_fix_tipe(gps_ref_arc: &Arc<Mutex<Gps>>) {
loop {
let measure_lock = acquire_lock(measure_arc);
if measure_lock.gps_ref.fix_type == 3 {
drop(measure_lock);
let gps_ref_lock = acquire_lock(gps_ref_arc);
if gps_ref_lock.fix_type == 3 {
drop(gps_ref_lock);
break;
}
drop(measure_lock);
drop(gps_ref_lock);
thread::sleep(Duration::from_millis(1000));
}
}
Expand All @@ -159,22 +176,42 @@ fn on_message_imu(message: Imu, input: &Arc<Mutex<Input>>) {
drop(input_lock);
}

fn on_message_gps(message: Gps, measure_mutex: &Arc<Mutex<Measure>>) {
let mut measure_lock = acquire_lock(measure_mutex);
measure_lock.data.fix_type = message.fix_type;
measure_lock.data.lat = message.lat * f32::powf(10.0, -7.0);
measure_lock.data.lon = message.lon * f32::powf(10.0, -7.0);
measure_lock.data.h_msl = message.h_msl * f32::powf(10.0, -3.0);
measure_lock.data.vel_n = message.vel_n * f32::powf(10.0, -3.0);
measure_lock.data.vel_e = message.vel_e * f32::powf(10.0, -3.0);
measure_lock.data.vel_d = message.vel_d * f32::powf(10.0, -3.0);
measure_lock.data.h_acc = message.h_acc * f32::powf(10.0, -3.0);
measure_lock.data.v_acc = message.v_acc * f32::powf(10.0, -3.0);
measure_lock.data.s_acc = message.s_acc * f32::powf(10.0, -3.0);
measure_lock.new_measure = true;
fn on_message_gps(message: Gps, gps_ref_arc: &Arc<Mutex<Gps>>, measure_arc: &Arc<Mutex<Measure>>) {

let mut gps_ref_lock = acquire_lock(gps_ref_arc);
let mut measure_lock = acquire_lock(measure_arc);
println!("Message recieved: {:?}", message);

if gps_ref_lock.fix_type != 3 {
*gps_ref_lock = message;
println!("GPS fix obtained: {:?}", gps_ref_lock);
}
let measure = get_measure_forom_gps(&message, &gps_ref_lock);
*measure_lock = measure;
println!("Measure obtained: {:?}", measure);
drop(measure_lock);
drop(gps_ref_lock);
println!("Mutex Dropped");
}

// Kalman predict function on new input
fn filter_predict(kalman: &mut ESKF, input: &mut Input, filter_ts: Duration) {
kalman.predict(input.acceleration, input.rotation, filter_ts);
input.new_input = false;
}

// Kalman update function on new measure
fn filter_update(kalman: &mut ESKF, measure: &mut Measure) {
kalman
.observe_position_velocity2d(measure.position, measure.pos_variance, measure.velocity_xy, measure.vel_variance)
.unwrap();
kalman
.observe_height(measure.velocity_z, measure.vel_z_variance)
.unwrap();
measure.new_measure = false;
}


fn angle_wrap_180(angle: f32) -> f32 {
(angle + 180.0) % 360.0 - 180.0
}
Expand All @@ -198,7 +235,7 @@ fn main() {
new_input: false,
};

let empty_gps_struct = Gps {
let gps_ref = Gps {
fix_type: 0,
epoch: 0,
lon: 0.0,
Expand All @@ -216,8 +253,12 @@ fn main() {
};

let measure = Measure {
data: empty_gps_struct,
gps_ref: empty_gps_struct,
position: Point3::new(0.0, 0.0, 0.0),
velocity_xy: Vector2::new(0.0, 0.0),
velocity_z: 0.0,
pos_variance: Matrix3::zeros(),
vel_variance: Matrix2::zeros(),
vel_z_variance: 0.0,
new_measure: false,
};

Expand All @@ -228,10 +269,21 @@ fn main() {
.build();

// Defining Mutex for thread share
let gps_ref_mutex = Arc::new(Mutex::new(gps_ref));
let measure_mutex = Arc::new(Mutex::new(measure));
let input_mutex = Arc::new(Mutex::new(input));
let filter_mutex = Arc::new(Mutex::new(filter));

// DEBUG
let debug = Debug {
mqtt_recieve_process_counter:0,
mqtt_send_process_counter:0,
filter_process_counter:0,
};

let debug_mutex = Arc::new(Mutex::new(debug));


// TODO: Add username and password authentication
let mqqt_opts = MqttOptions::new("sailtrack-aggregator", "localhost", 1883);

Expand All @@ -240,8 +292,10 @@ fn main() {
client.subscribe("sensor/imu0", QoS::AtMostOnce).unwrap();

// // MQTT Callbacks thread
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
let measure_clone = Arc::clone(&measure_mutex);
let input_clone = Arc::clone(&input_mutex);
let debug_clone = Arc::clone(&debug_mutex);
thread::spawn(move || {
for notification in connection.iter().flatten() {
if let Event::Incoming(Incoming::Publish(packet)) = notification {
Expand All @@ -259,89 +313,72 @@ fn main() {
let payload = packet.payload.clone(); // Clone the payload for later use
on_message_gps(
serde_json::from_slice(payload.as_ref()).unwrap(),
&gps_ref_clone,
&measure_clone,
);
}
_ => (),
}
}
}
});

// // GPS fix check
let measure_clone = Arc::clone(&measure_mutex);
thread::spawn(move || {
println!("Waiting for GPS fix...");
loop {
let mut measure_lock = acquire_lock(&measure_clone);
if measure_lock.data.fix_type == 3 {
measure_lock.gps_ref = measure_lock.data;
println!("GPS fix obtained");
break; // Exit the loop when GPS fix is obtained
}
drop(measure_lock); // Ensure the lock is released
thread::sleep(Duration::from_millis(1000));
// Degug
let mut debug_lock = acquire_lock(&debug_clone);
debug_lock.mqtt_recieve_process_counter += 1;
drop(debug_lock);
}
});

// Kalman filter thread
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
let measure_clone = Arc::clone(&measure_mutex);
let input_clone = Arc::clone(&input_mutex);
let filter_clone = Arc::clone(&filter_mutex);
let debug_clone = Arc::clone(&debug_mutex);
thread::spawn(move || loop {
// Check if the GPS fix has been obtained
wait_for_fix_tipe(&measure_clone);
// wait_for_fix_tipe(&gps_ref_clone);
let thread_start = Instant::now();
let mut measure_lock = acquire_lock(&measure_clone);
let mut input_lock = acquire_lock(&input_clone);
let mut filter_lock = acquire_lock(&filter_clone);

// Reliable GPS and IMU data: predict and update
if measure_lock.new_measure {
filter_lock.predict(input_lock.acceleration, input_lock.rotation, filter_ts);
let (position, velocity_xy, velocity_z, pos_variance, vel_variance, vel_z_variance) =
get_matrix_from_measure(&measure_lock);
filter_lock
.observe_position_velocity2d(position, pos_variance, velocity_xy, vel_variance)
.unwrap();
filter_lock
.observe_height(velocity_z, vel_z_variance)
.unwrap();
measure_lock.new_measure = false;
input_lock.new_input = false;
if measure_lock.new_measure && input_lock.new_input {
filter_predict(&mut filter_lock, &mut input_lock, filter_ts);
filter_update(&mut filter_lock, &mut measure_lock);
drop(input_lock);
drop(measure_lock);
drop(filter_lock);
}
// Only reliable GPS data: only update
else if !input_lock.new_input && measure_lock.new_measure {
let (position, velocity_xy, velocity_z, pos_variance, vel_variance, vel_z_variance) =
get_matrix_from_measure(&measure_lock);
filter_lock
.observe_position_velocity2d(position, pos_variance, velocity_xy, vel_variance)
.unwrap();
filter_lock
.observe_height(velocity_z, vel_z_variance)
.unwrap();
measure_lock.new_measure = false;
filter_update(&mut filter_lock, &mut measure_lock);
drop(measure_lock);
drop(filter_lock);
}
// Only reliable or old IMU data: only predict
else {
filter_lock.predict(input_lock.acceleration, input_lock.rotation, filter_ts);
input_lock.new_input = false;
filter_predict(&mut filter_lock, &mut input_lock, filter_ts);
drop(input_lock);
drop(filter_lock);
}
drop(input_lock);
drop(measure_lock);
drop(filter_lock);
let elapsed = thread_start.elapsed();
if elapsed.as_millis() < filter_ts.as_millis() {
thread::sleep(filter_ts - elapsed);
}
// Degug
let mut debug_lock = acquire_lock(&debug_clone);
debug_lock.filter_process_counter += 1;
drop(debug_lock);
});

//MQTT publish thread
let filter_clone = Arc::clone(&filter_mutex);
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
let measure_clone = Arc::clone(&measure_mutex);
let filter_clone = Arc::clone(&filter_mutex);
let debug_clone = Arc::clone(&debug_mutex);
thread::spawn(move || loop {
// Check if the GPS fix has been obtained
wait_for_fix_tipe(&measure_clone);
wait_for_fix_tipe(&gps_ref_clone);
let filter_lock = acquire_lock(&filter_clone);
let position = filter_lock.position;
let velocity = filter_lock.velocity;
Expand Down Expand Up @@ -369,10 +406,11 @@ fn main() {
}
}
let measure_lock = acquire_lock(&measure_clone);
let gps_ref_lock = acquire_lock(&gps_ref_clone);
let lat =
position.x * 360.0 / EARTH_CIRCUMFERENCE_METERS / LAT_FACTOR + measure_lock.gps_ref.lat;
let lon: f32 = position.y * 360.0 / EARTH_CIRCUMFERENCE_METERS + measure_lock.gps_ref.lon;
let altitude = position.z + measure_lock.gps_ref.h_msl;
position.x * 360.0 / EARTH_CIRCUMFERENCE_METERS / LAT_FACTOR + gps_ref_lock.lat;
let lon: f32 = position.y * 360.0 / EARTH_CIRCUMFERENCE_METERS + gps_ref_lock.lon;
let altitude = position.z + gps_ref_lock.h_msl;
drop(measure_lock);
let message = Boat {
lon,
Expand All @@ -395,5 +433,26 @@ fn main() {
)
.unwrap();
thread::sleep(Duration::from_millis(1000 / MQTT_PUBLISH_FREQ_HZ));

// Degug
let mut debug_lock = acquire_lock(&debug_clone);
debug_lock.mqtt_send_process_counter += 1;
drop(debug_lock);
});

let debug_clone = Arc::clone(&debug_mutex);
let gps_ref_clone = Arc::clone(&gps_ref_mutex);
loop {
let debug_lock = acquire_lock(&debug_clone);
println!(
"MQTT send process counter: {}, MQTT receive process counter: {}, Filter process counter: {}",
debug_lock.mqtt_send_process_counter, debug_lock.mqtt_recieve_process_counter, debug_lock.filter_process_counter
);
drop(debug_lock);
thread::sleep(Duration::from_secs(1));
let gps_ref_lock = acquire_lock(&gps_ref_clone);
drop(gps_ref_lock);
println!("GPS ref access granted:");
};
}

0 comments on commit 1d77e16

Please sign in to comment.