Skip to content
This repository has been archived by the owner on Oct 3, 2024. It is now read-only.

Commit

Permalink
Mark new beacons commands
Browse files Browse the repository at this point in the history
  • Loading branch information
automainint committed Aug 20, 2024
1 parent ea92c2a commit 5378298
Showing 1 changed file with 112 additions and 6 deletions.
118 changes: 112 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ CREATE OR REPLACE VIEW mr_t_mutual_score AS SELECT
//
// ================================================================


fn request_raw(payload : Vec<u8>, timeout_msec : Option<u64>) -> Result<Message, Box<dyn Error + 'static>> {
let client = Socket::new(Protocol::Req0)?;
match timeout_msec {
Expand Down Expand Up @@ -128,11 +127,32 @@ fn make_setof_edge(response : &Vec<(String, String, f64)>) -> Result<
let tuples : Vec<PgHeapTuple<'_, AllocatedByRust>> =
response
.iter()
.map(|(ego, target, score)| {
.map(|(ego, dst, score)| {
let mut edge = PgHeapTuple::new_composite_type("mr_t_edge").unwrap();
edge.set_by_name("src", ego.as_str()).unwrap();
edge.set_by_name("dst", target.as_str()).unwrap();
edge.set_by_name("score", *score).unwrap();
edge.set_by_name("dst", dst.as_str()).unwrap();
edge.set_by_name("score", *score) .unwrap();
return edge;
})
.collect();
return Ok(SetOfIterator::new(tuples));
}

fn make_setof_edge_for_src(
src : &str,
response : &Vec<(String, f64)>
) -> Result<
SetOfIterator<'static, pgrx::composite_type!('static, "mr_t_edge")>,
Box<dyn Error + 'static>,
> {
let tuples : Vec<PgHeapTuple<'_, AllocatedByRust>> =
response
.iter()
.map(|(dst, score)| {
let mut edge = PgHeapTuple::new_composite_type("mr_t_edge").unwrap();
edge.set_by_name("src", src) .unwrap();
edge.set_by_name("dst", dst.as_str()).unwrap();
edge.set_by_name("score", *score) .unwrap();
return edge;
})
.collect();
Expand Down Expand Up @@ -445,6 +465,32 @@ fn mr_mutual_scores(
return make_setof_mutual_score(ego, &response);
}

#[pg_extern]
fn mr_unmarked_beacons(
src : Option<&str>,
context : default!(Option<&str>, "''")
) -> Result<
SetOfIterator<'static, pgrx::composite_type!('static, "mr_t_edge")>,
Box<dyn Error + 'static>,
> {
let src = src.expect("src should not be null");
let context = context.unwrap_or("");

let args = rmp_serde::to_vec(&(
src
))?;

let payload = encode_request(&Command {
id : CMD_UNMARKED_BEACONS.to_string(),
context : context.to_string(),
blocking : true,
payload : args
})?;

let response = request(payload, Some(*RECV_TIMEOUT_MSEC))?;
return make_setof_edge_for_src(src, &response);
}

#[pg_extern(immutable)]
fn mr_sync(
timeout_msec : default!(Option<i32>, "6000000"),
Expand Down Expand Up @@ -572,7 +618,7 @@ fn mr_delete_node(
let ego = src.expect("src should not be null");

let args = rmp_serde::to_vec(&(
ego,
ego
))?;

let payload = encode_request(&Command {
Expand All @@ -586,6 +632,29 @@ fn mr_delete_node(
return Ok("Ok");
}

#[pg_extern]
fn mr_mark_beacons(
src : Option<&str>,
context : default!(Option<&str>, "''")
) -> Result<&'static str, Box<dyn Error + 'static>> {
let context = context.unwrap_or("");
let src = src.expect("src should not be null");

let args = rmp_serde::to_vec(&(
src
))?;

let payload = encode_request(&Command {
id : CMD_MARK_BEACONS.to_string(),
context : context.to_string(),
blocking : false,
payload : args
})?;

let _ : () = request(payload, Some(*RECV_TIMEOUT_MSEC))?;
return Ok("Ok");
}

#[pg_extern]
fn mr_reset() -> Result<
&'static str,
Expand Down Expand Up @@ -1026,7 +1095,6 @@ mod tests {

assert_eq!(res.len(), 3);


for x in res {
assert_eq!(x.0, "U1");

Expand Down Expand Up @@ -1159,6 +1227,44 @@ mod tests {
};
}
}

#[pg_test]
fn mark_beacons() {
let _ = crate::mr_reset().unwrap();

let _ = crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None).unwrap();

assert_eq!(
crate::mr_unmarked_beacons(Some("U1"), None).unwrap().count(),
0
);

let _ = crate::mr_put_edge(Some("U1"), Some("B3"), Some(2.0), None).unwrap();
let _ = crate::mr_put_edge(Some("U2"), Some("B4"), Some(3.0), None).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_unmarked_beacons(Some("U1"), None).unwrap();

let beacons : Vec<(String, String, f64)> = res
.map(|x| (
x.get_by_name("src") .unwrap().unwrap(),
x.get_by_name("dst") .unwrap().unwrap(),
x.get_by_name("score").unwrap().unwrap(),
))
.collect();

assert_eq!(beacons.len(), 2);
assert_eq!(beacons[0].1, "B3");
assert_eq!(beacons[1].1, "B4");

let _ = crate::mr_mark_beacons(Some("U1"), None).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

assert_eq!(
crate::mr_unmarked_beacons(Some("U1"), None).unwrap().count(),
0
);
}
}

#[cfg(test)]
Expand Down

0 comments on commit 5378298

Please sign in to comment.