-
Another subscription question. I have a problem with my subscription here where each What's the proper way to handle a subscription like this, and clean up afterwards? async fn current_trivia_question(context: &Context) -> UnansweredTriviaQuestionStream {
let redis_url = context.config.redis_url.clone();
let stream = async_stream::stream! {
let mut con = data::connect_redis(&redis_url)?;
let mut pubsub = con.as_pubsub();
pubsub.subscribe("current_trivia_question")?;
loop {
let msg = pubsub.get_message()?;
let trivia_question: TriviaQuestion = msg.get_payload()?;
yield Ok(UnansweredTriviaQuestion::from(trivia_question));
}
};
Box::pin(stream)
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
@Teajey as for me, the best option would be for the If you wonder how one could call impl Drop for MyStream {
fn drop(&mut self) {
tokio::spawn(async move { // or `actix::spawn`, depending on what you're using
// you async actions
});
}
} |
Beta Was this translation helpful? Give feedback.
-
Awesome! Thanks to @tyranron's help I seem to have got it working with an asynchronous solution using async fn new_trivia_question(
context: &Context,
) -> FieldResult<Pin<Box<dyn Stream<Item = FieldResult<UnansweredTriviaQuestion>> + Send>>>
{
let redis_url = context.config.redis_url.clone();
let mut pubsub = data::async_connect_redis(&redis_url).await?.into_pubsub();
pubsub.subscribe("new_trivia_question").await?;
let stream = pubsub
.into_on_message()
.map(|m| {
m.get_payload::<UnansweredTriviaQuestion>()
.map_err(|e| FieldError::from(e))
})
.boxed();
Ok(stream)
} |
Beta Was this translation helpful? Give feedback.
Awesome! Thanks to @tyranron's help I seem to have got it working with an asynchronous solution using
redis::aio
: