Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sqs source #2355

Merged
merged 14 commits into from
Feb 11, 2025
Merged

feat: add sqs source #2355

merged 14 commits into from
Feb 11, 2025

Conversation

cosmic-chichu
Copy link
Contributor

@cosmic-chichu cosmic-chichu commented Jan 23, 2025

resolves #2367

  • Add SQS source
  • Add unit tests for sqs source
  • Add end to end test for sqs source

vigith and others added 2 commits January 16, 2025 16:14
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
@vigith
Copy link
Member

vigith commented Jan 25, 2025

please resolve conflicts

Copy link

codecov bot commented Jan 27, 2025

Codecov Report

Attention: Patch coverage is 88.34586% with 93 lines in your changes missing coverage. Please review.

Project coverage is 69.95%. Comparing base (72a11e9) to head (8bcb129).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
rust/extns/numaflow-sqs/src/source.rs 86.68% 63 Missing ⚠️
rust/numaflow-core/src/source/sqs.rs 88.54% 30 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2355      +/-   ##
==========================================
+ Coverage   69.65%   69.95%   +0.30%     
==========================================
  Files         361      364       +3     
  Lines       49993    50919     +926     
==========================================
+ Hits        34822    35620     +798     
- Misses      14093    14217     +124     
- Partials     1078     1082       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

rust/extns/numaflow-sqs/Cargo.toml Outdated Show resolved Hide resolved
Comment on lines +18 to +21
chrono = "0.4.38"
tonic = "0.12.3"
prost = "0.11.9"
thiserror = "1.0.69"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workspace

rust/extns/numaflow-sqs/src/lib.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/lib.rs Outdated Show resolved Hide resolved
@BulkBeing
Copy link
Contributor

@cosmic-chichu Run clippy for the crates in which you made changes and make the changes suggested by clippy:

cd rust/extns/numaflow-sqs

cargo clippy --tests --all-features --no-deps

Comment on lines 28 to 30
pub struct SQSSourceConfig {
pub region: String,
pub queue_name: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might have to enhance it with more sqs source related configs like visibility timeout, poll timeout etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add support for all the configurations that is supported by SQS Client.

Comment on lines +25 to +26
/// Used to initialize the SQS client with region and queue settings.
/// Implements serde::Deserialize to support loading from configuration files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to support different ways to authenticate (example role-based)

rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Outdated Show resolved Hide resolved
rust/extns/numaflow-sqs/src/source.rs Show resolved Hide resolved
@yhl25 yhl25 added this to the 1.5 milestone Jan 31, 2025
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review

@@ -18,7 +18,7 @@ use crate::{metrics, shared};
/// - Invokes the SourceTransformer concurrently
/// - Calls the Sinker to write the batch to the Sink
/// - Send Acknowledgement back to the Source
mod forwarder;
pub(crate) mod forwarder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for making it pub?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need it for using in end to end test in sqs

rust/numaflow-core/src/source/sqs.rs Outdated Show resolved Hide resolved
rust/numaflow-core/src/source/sqs.rs Outdated Show resolved Hide resolved
.max_number_of_messages(count)
.message_attribute_names("All")
.message_system_attribute_names(MessageSystemAttributeName::All)
.wait_time_seconds(remaining_time.as_secs() as i32)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support at milliseconds, users do set read timeout in millis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws sqs sdk does not support milliseconds, is there any other way to support milliseconds?

@cosmic-chichu
Copy link
Contributor Author

Will handle these in an upcoming PR. Added todos in code.

  1. add support for all sqs configs
  2. support different ways to authenticate
  3. handle user provided timeout better to support milliseconds. Testing with operation_attempt_timeout did not yield expected results.

Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@vigith vigith mentioned this pull request Feb 11, 2025
4 tasks
@vigith vigith merged commit bf0f9db into main Feb 11, 2025
28 checks passed
@vigith vigith deleted the sqs-source branch February 11, 2025 15:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SQS source implementation in rust
4 participants