Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Solve for Issue 189 - RPCs in rpc.proto and server.rs for #214

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::proto::{FidRequest, FidTimestampRequest};
use crate::proto::{GetInfoRequest, StorageLimitsResponse};
use crate::proto::{
LinkRequest, LinksByFidRequest, Message, MessagesResponse, ReactionRequest,
ReactionsByFidRequest, UserDataRequest, VerificationRequest,
ReactionsByFidRequest, UserDataRequest, VerificationRequest, CastsByParentRequest
};
use crate::storage::constants::OnChainEventPostfix;
use crate::storage::constants::RootPrefix;
Expand Down Expand Up @@ -571,6 +571,55 @@ impl HubService for MyHubService {
.as_response()
}

async fn get_casts_by_parent(
&self,
request: Request<CastsByParentRequest>,
) -> Result<Response<MessagesResponse>, Status> {
let request = request.into_inner();


let parent = match request.parent {
Some(parent) => match parent {
proto::casts_by_parent_request::Parent::ParentCastId(cast_id) => {
proto::cast_add_body::Parent::ParentCastId(cast_id)
}
proto::casts_by_parent_request::Parent::ParentUrl(url) => {
proto::cast_add_body::Parent::ParentUrl(url)
}
},
None => {
return Err(Status::invalid_argument(
"Parent cast identifier must be provided",
));
}
};

let stores = match &parent {
proto::cast_add_body::Parent::ParentCastId(cast_id) => {
self.get_stores_for(cast_id.fid)?
}
proto::cast_add_body::Parent::ParentUrl(_) => {
return Err(Status::invalid_argument(
"Stores cannot be fetched for parent URL",
));
}
};


let page_options = PageOptions {
page_size: request.page_size.map(|s| s as usize),
page_token: request.page_token.clone(),
reverse: request.reverse.unwrap_or(false),
};

CastStore::get_casts_by_parent(
&stores.cast_store,
&parent,
&page_options
).
as_response()
}

async fn get_reaction(
&self,
request: Request<ReactionRequest>,
Expand Down
161 changes: 161 additions & 0 deletions src/network/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,4 +602,165 @@ mod tests {
.collect::<Vec<_>>()[0];
assert_eq!(links_limit.used, 1);
}

#[tokio::test]
async fn test_get_casts_by_parent() {
// Set up a temporary database and stores
let (_, _, [mut engine1, mut engine2], service) = make_server().await;
let engine1 = &mut engine1;
let engine2 = &mut engine2;

// Register users for different shards
test_helper::register_user(
SHARD1_FID,
test_helper::default_signer(),
test_helper::default_custody_address(),
engine1,
)
.await;
test_helper::register_user(
SHARD2_FID,
test_helper::default_signer(),
test_helper::default_custody_address(),
engine2,
)
.await;

// Create a parent CastId and multiple CastAdd messages
let parent_cast_id = CastId {
fid: SHARD1_FID,
hash: "parent_hash".to_string(),
};

let cast_add = messages_factory::casts::create_cast_add(SHARD1_FID, "test", Some(parent_cast_id.clone()), None);
let cast_add2 = messages_factory::casts::create_cast_add(SHARD1_FID, "test2", Some(parent_cast_id.clone()), None);

// Create another cast for a different shard
let another_shard_cast = messages_factory::casts::create_cast_add(SHARD2_FID, "another fid", None, None);

// Commit CastAdd messages to the engine
test_helper::commit_message(engine1, &cast_add).await;
test_helper::commit_message(engine1, &cast_add2).await;
test_helper::commit_message(engine2, &another_shard_cast).await;

// Define page options for fetching casts by parent
let page_options = PageOptions {
page_size: Some(1),
page_token: None,
reverse: false,
};

// Test fetching Casts by parent ID for SHARD1_FID
let result = service
.get_casts_by_parent(Request::new(proto::ParentRequest {
fid: SHARD1_FID,
parent_hash: cast_add.hash.clone(),
page_size: Some(1),
page_token: None,
reverse: Some(false),
}))
.await
.unwrap();

test_helper::assert_contains_all_messages(&result, &[&cast_add]);

// Test with reverse ordering
let reverse_result = service
.get_casts_by_parent(Request::new(proto::ParentRequest {
fid: SHARD1_FID,
parent_hash: cast_add.hash.clone(),
page_size: Some(1),
page_token: None,
reverse: Some(true),
}))
.await
.unwrap();

test_helper::assert_contains_all_messages(&reverse_result, &[&cast_add2]);

// Fetching an invalid parent cast should return an error
let invalid_result = service
.get_casts_by_parent(Request::new(proto::ParentRequest {
fid: SHARD1_FID,
parent_hash: "invalid_parent_hash".to_string(),
page_size: Some(1),
page_token: None,
reverse: Some(false),
}))
.await
.unwrap_err();
assert_eq!(invalid_result.code(), tonic::Code::NotFound);

// Test fetching casts by parent from a different shard (should fail)
let cross_shard_result = service
.get_casts_by_parent(Request::new(proto::ParentRequest {
fid: SHARD2_FID,
parent_hash: cast_add.hash.clone(),
page_size: Some(1),
page_token: None,
reverse: Some(false),
}))
.await
.unwrap_err();
assert_eq!(cross_shard_result.code(), tonic::Code::NotFound);

// Returns all casts for SHARD1_FID
let all_casts_request = proto::FidRequest {
fid: SHARD1_FID,
page_size: Some(2),
page_token: None,
reverse: None,
};
let all_casts_response = service
.get_casts_by_fid(Request::new(all_casts_request))
.await
.unwrap();

// Test that the correct messages are returned for the parent cast
test_helper::assert_contains_all_messages(&all_casts_response, &[&cast_add, &cast_add2]);

// Pagination works: Fetch second page
let second_page_request = proto::FidRequest {
fid: SHARD1_FID,
page_size: Some(1),
page_token: all_casts_response.get_ref().next_page_token.clone(),
reverse: None,
};
let second_page_response = service
.get_casts_by_fid(Request::new(second_page_request))
.await
.unwrap();

test_helper::assert_contains_all_messages(&second_page_response, &[&cast_add2]);


let reverse_request = proto::FidRequest {
fid: SHARD1_FID,
page_size: Some(1),
page_token: None,
reverse: Some(true),
};
let reverse_response = service
.get_casts_by_fid(Request::new(reverse_request))
.await
.unwrap();

test_helper::assert_contains_all_messages(&reverse_response, &[&cast_add]);

// Returns all casts including removed ones (optional, based on business logic)
let bulk_casts_request = proto::FidTimestampRequest {
fid: SHARD1_FID,
page_size: None,
page_token: None,
reverse: None,
start_timestamp: None,
stop_timestamp: None,
};
let bulk_casts_response = service
.get_all_cast_messages_by_fid(Request::new(bulk_casts_request))
.await
.unwrap();

test_helper::assert_contains_all_messages(&bulk_casts_response, &[&cast_add, &cast_add2]);
}
}
4 changes: 2 additions & 2 deletions src/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ service HubService {
// Casts
rpc GetCast(CastId) returns (Message);
rpc GetCastsByFid(FidRequest) returns (MessagesResponse);
// rpc GetCastsByParent(CastsByParentRequest) returns (MessagesResponse);
// rpc GetCastsByMention(FidRequest) returns (MessagesResponse);
rpc GetCastsByParent(CastsByParentRequest) returns (MessagesResponse);
rpc GetCastsByMention(FidRequest) returns (MessagesResponse);

// // Reactions
rpc GetReaction(ReactionRequest) returns (Message);
Expand Down
20 changes: 10 additions & 10 deletions src/storage/store/account/cast_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,12 @@ impl CastStore {
store.get_removes_by_fid::<fn(&Message) -> bool>(fid, page_options, None)
}

pub fn get_casts_by_parent(
pub fn get_casts_by_mention(
store: &Store<CastStoreDef>,
parent: &Parent,
mention: u64,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
let prefix = CastStoreDef::make_cast_by_parent_key(parent, 0, None);
let prefix = CastStoreDef::make_cast_by_mention_key(mention, 0, None);

let mut message_keys = vec![];
let mut last_key = vec![];
Expand Down Expand Up @@ -443,25 +443,25 @@ impl CastStore {
},
)?;

let messages = get_many_messages(store.db().borrow(), message_keys)?;
let messages_bytes = get_many_messages(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages,
messages: messages_bytes,
next_page_token,
})
}

pub fn get_casts_by_mention(
pub fn get_casts_by_parent(
store: &Store<CastStoreDef>,
mention: u64,
parent: &Parent,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
let prefix = CastStoreDef::make_cast_by_mention_key(mention, 0, None);
let prefix = CastStoreDef::make_cast_by_parent_key(parent, 0, None);

let mut message_keys = vec![];
let mut last_key = vec![];
Expand Down Expand Up @@ -489,15 +489,15 @@ impl CastStore {
},
)?;

let messages_bytes = get_many_messages(store.db().borrow(), message_keys)?;
let messages = get_many_messages(store.db().borrow(), message_keys)?;
let next_page_token = if last_key.len() > 0 {
Some(last_key[prefix.len()..].to_vec())
} else {
None
};

Ok(MessagesPage {
messages: messages_bytes,
messages,
next_page_token,
})
}
Expand Down