Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(socket sink): support unix datagram mode #21762

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

jpovixwm
Copy link

Summary

Adds a unix_mode option to the socket sink.
The implementation is somewhat based on how the statsd sink handles this, but not to the full extent, as that felt like too big of a refactor for me to deal with.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

make test on x86_64 Ubuntu 22.04 running within WSL, and then

cargo test --lib --no-default-features --features=sinks-socket --target=x86_64-pc-windows-gnu

to ensure I didn't forget any of the #[cfg(unix)] attributes.
Other than this, I also performed some basic tests with a trivial UDS listener implemented in Python.

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@jpovixwm jpovixwm requested review from a team as code owners November 11, 2024 18:18
@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: external docs Anything related to Vector's external, public documentation labels Nov 11, 2024
Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ci-run-component-features

Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jpovixwm, I did a quick pass. Left some suggestions.

src/sinks/util/unix.rs Outdated Show resolved Hide resolved
Comment on lines 259 to 293
while let Some(mut event) = input.next().await {
let byte_size = event.estimated_json_encoded_size_of();

self.transformer.transform(&mut event);

let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
continue;
}

match udp_send(&mut socket, &bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
mode: SocketMode::Unix,
count: 1,
byte_size,
});

bytes_sent.emit(ByteSize(bytes.len()));
finalizers.update_status(EventStatus::Delivered);
}
Err(error) => {
emit!(UnixSocketSendError {
path: &self.connector.path,
error: &error
});
finalizers.update_status(EventStatus::Errored);
break;
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can reuse this code instead of duplicating it from udp.rs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would certainly be nice. Something like a plain function taking a bunch of parameters (bytes_sent, input, encoder, socket, transformer and the socket's path in the case of UDS), or could this somehow be done with a trait(?) given how encoder and transformer are accessed via self in both UDP and UDS?
To be honest, I don't really work with Rust so I'm not entirely familiar with how things get done there.

Copy link
Contributor

@pront pront Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just a free function with some parameters would do it. BTW, I didn't look into it in detail, maybe there are other parts that can be shared e.g. the udp_send() function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 503cfc9. Let me know what you think about it.

src/sinks/util/unix.rs Outdated Show resolved Hide resolved
src/sinks/util/unix.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jpovixwm!

@jpovixwm
Copy link
Author

@pront I'm not sure how to best resolve the build errors.
The easy part is to un-gate UnixSocketSendError in src/internal_events/unix.rs, similar to how UnixSendIncompleteError is already not gated, as the impact of this should be minimal.

But then we can either:

  1. Trivially relax the gate
    #[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
    pub mod unix;
    so that it becomes just #[cfg(unix)], which I guess will make the binary a bit bigger than necessary for builds without either of those sinks enabled.
  2. Move the UnixEither enum back where it came from, and make it public, so that it can be used in src/sinks/util/unix.rs
  3. Or "Find & replace" all occurrences of #[cfg(unix)] (12 hits) with #[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] in src/sinks/util/service/net/mod.rs, which is kind of ugly, but does appear to work.

@pront
Copy link
Contributor

pront commented Nov 14, 2024

@pront I'm not sure how to best resolve the build errors. The easy part is to un-gate UnixSocketSendError in src/internal_events/unix.rs, similar to how UnixSendIncompleteError is already not gated, as the impact of this should be minimal.

But then we can either:

  1. Trivially relax the gate
    #[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
    pub mod unix;

    so that it becomes just #[cfg(unix)], which I guess will make the binary a bit bigger than necessary for builds without either of those sinks enabled.
  2. Move the UnixEither enum back where it came from, and make it public, so that it can be used in src/sinks/util/unix.rs
  3. Or "Find & replace" all occurrences of #[cfg(unix)] (12 hits) with #[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] in src/sinks/util/service/net/mod.rs, which is kind of ugly, but does appear to work.

@jpovixwm I am OK with (1)

@jpovixwm
Copy link
Author

Verified locally with

cargo test --lib --no-default-features --features=sinks-amqp
cargo test
cargo test --lib --no-default-features --features=sinks-socket --target=x86_64-pc-windows-gnu

Hope it works this time🤞

Copy link
Contributor

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ci-run-component-features

@pront
Copy link
Contributor

pront commented Nov 15, 2024

/ci-run-component-features

Turns out that the above workflow doesn't run on fork branches.

I manually created: https://github.com/vectordotdev/vector/actions/runs/11860591369
If it passes, we can go ahead and merge this PR.

@pront pront enabled auto-merge November 15, 2024 21:48
@pront pront added this pull request to the merge queue Nov 15, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Nov 15, 2024
@jpovixwm
Copy link
Author

I'm unable to reproduce the unit test failure on my machine, which makes it more difficult for me to fix it.
Here's a potential modification that still passes locally for me:

let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();

let handle = tokio::task::spawn_blocking(move || {
    let mut output_lines = Vec::<String>::with_capacity(num_lines);

    for _ in 0..num_lines {
        let mut buf = [0; 100];
        let (size, _) = receiver
            .recv_from(&mut buf)
            .expect("Did not receive message");
        let line = String::from_utf8_lossy(&buf[..size]).to_string();
        output_lines.push(line);
    }

    output_lines
});

Tokio's UnixDatagram is replaced with the stdlib one, so that we can do a blocking receive (as spawn_blocking doesn't allow async functions), in the hopes that the receiver will be able to keep up with the sender if it gets to run in its own thread.
Should I commit this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for UNIX datagram mode to socket sink
3 participants