-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add sqs source #2355
feat: add sqs source #2355
Conversation
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
please resolve conflicts |
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2355 +/- ##
==========================================
+ Coverage 69.65% 69.95% +0.30%
==========================================
Files 361 364 +3
Lines 49993 50919 +926
==========================================
+ Hits 34822 35620 +798
- Misses 14093 14217 +124
- Partials 1078 1082 +4 ☔ View full report in Codecov by Sentry. |
chrono = "0.4.38" | ||
tonic = "0.12.3" | ||
prost = "0.11.9" | ||
thiserror = "1.0.69" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workspace
@cosmic-chichu Run cd rust/extns/numaflow-sqs
cargo clippy --tests --all-features --no-deps |
pub struct SQSSourceConfig { | ||
pub region: String, | ||
pub queue_name: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might have to enhance it with more sqs
source related configs like visibility timeout, poll timeout etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add support for all the configurations that is supported by SQS Client.
/// Used to initialize the SQS client with region and queue settings. | ||
/// Implements serde::Deserialize to support loading from configuration files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might need to support different ways to authenticate (example role-based)
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
@@ -18,7 +18,7 @@ use crate::{metrics, shared}; | |||
/// - Invokes the SourceTransformer concurrently | |||
/// - Calls the Sinker to write the batch to the Sink | |||
/// - Send Acknowledgement back to the Source | |||
mod forwarder; | |||
pub(crate) mod forwarder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason for making it pub?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need it for using in end to end test in sqs
.max_number_of_messages(count) | ||
.message_attribute_names("All") | ||
.message_system_attribute_names(MessageSystemAttributeName::All) | ||
.wait_time_seconds(remaining_time.as_secs() as i32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Support at milliseconds, users do set read timeout in millis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aws sqs sdk does not support milliseconds, is there any other way to support milliseconds?
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Will handle these in an upcoming PR. Added todos in code.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
resolves #2367