-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhancement(socket sink): support unix datagram mode #21762
base: master
Are you sure you want to change the base?
enhancement(socket sink): support unix datagram mode #21762
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ci-run-component-features
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jpovixwm, I did a quick pass. Left some suggestions.
src/sinks/util/unix.rs
Outdated
while let Some(mut event) = input.next().await { | ||
let byte_size = event.estimated_json_encoded_size_of(); | ||
|
||
self.transformer.transform(&mut event); | ||
|
||
let finalizers = event.take_finalizers(); | ||
let mut bytes = BytesMut::new(); | ||
|
||
// Errors are handled by `Encoder`. | ||
if encoder.encode(event, &mut bytes).is_err() { | ||
continue; | ||
} | ||
|
||
match udp_send(&mut socket, &bytes).await { | ||
Ok(()) => { | ||
emit!(SocketEventsSent { | ||
mode: SocketMode::Unix, | ||
count: 1, | ||
byte_size, | ||
}); | ||
|
||
bytes_sent.emit(ByteSize(bytes.len())); | ||
finalizers.update_status(EventStatus::Delivered); | ||
} | ||
Err(error) => { | ||
emit!(UnixSocketSendError { | ||
path: &self.connector.path, | ||
error: &error | ||
}); | ||
finalizers.update_status(EventStatus::Errored); | ||
break; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can reuse this code instead of duplicating it from udp.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would certainly be nice. Something like a plain function taking a bunch of parameters (bytes_sent
, input
, encoder
, socket
, transformer
and the socket's path in the case of UDS), or could this somehow be done with a trait(?) given how encoder
and transformer
are accessed via self
in both UDP and UDS?
To be honest, I don't really work with Rust so I'm not entirely familiar with how things get done there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just a free function with some parameters would do it. BTW, I didn't look into it in detail, maybe there are other parts that can be shared e.g. the udp_send()
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 503cfc9. Let me know what you think about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jpovixwm!
@pront I'm not sure how to best resolve the build errors. But then we can either:
|
@jpovixwm I am OK with (1) |
Verified locally with
Hope it works this time🤞 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ci-run-component-features
…datagram-mode-in-socket-sink
Turns out that the above workflow doesn't run on fork branches. I manually created: https://github.com/vectordotdev/vector/actions/runs/11860591369 |
I'm unable to reproduce the unit test failure on my machine, which makes it more difficult for me to fix it. let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap();
let handle = tokio::task::spawn_blocking(move || {
let mut output_lines = Vec::<String>::with_capacity(num_lines);
for _ in 0..num_lines {
let mut buf = [0; 100];
let (size, _) = receiver
.recv_from(&mut buf)
.expect("Did not receive message");
let line = String::from_utf8_lossy(&buf[..size]).to_string();
output_lines.push(line);
}
output_lines
}); Tokio's |
Summary
Adds a
unix_mode
option to thesocket
sink.The implementation is somewhat based on how the
statsd
sink handles this, but not to the full extent, as that felt like too big of a refactor for me to deal with.Change Type
Is this a breaking change?
How did you test this PR?
make test
on x86_64 Ubuntu 22.04 running within WSL, and thento ensure I didn't forget any of the
#[cfg(unix)]
attributes.Other than this, I also performed some basic tests with a trivial UDS listener implemented in Python.
Does this PR include user facing changes?
Checklist
Cargo.lock
), pleaserun
dd-rust-license-tool write
to regenerate the license inventory and commit the changes (if any). More details here.References