-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
filterx pubsub message #456
base: main
Are you sure you want to change the base?
Conversation
This Pull Request introduces config grammar changesaxoflow/7a663104e8c0914fc63cb0d3da350f988dda1f50 -> bshifter/fx-pubsub --- a/destination
+++ b/destination
google-pubsub-grpc(
+ proto-var(
+ <template-content>
+ <template-reference>
+ )
)
|
f7510c9
to
d15fbc7
Compare
Signed-off-by: shifter <[email protected]>
The implementation follows the existing patterns and designs of the OpenTelemetry (OTel) filterx modules for consistency. Signed-off-by: shifter <[email protected]>
The protovar option in the Pub/Sub destination allows users to directly access the Protobuf byte representation created by a FilterX function on the source side. The Protobuf serialized variable is used to set the Pub/Sub message's content. This enables users to fully control and manage the outgoing messages of the Pub/Sub destination from within FilterX. By leveraging this option, users can customize the message payload dynamically, based on their specific requirements. The data() and attributes() options cannot be used with protovar(). These options are mutually exclusive. Signed-off-by: shifter <[email protected]>
Signed-off-by: shifter <[email protected]>
* Copyright (c) 2024 Axoflow | ||
* Copyright (c) 2023 shifter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the current year.
{ | ||
std::string data_cpp(filterx_string_get_value_ref(data_arg, NULL)); | ||
std::map<std::string, std::string> attributes_cpp; | ||
if (!filterx_dict_iter(attributes_arg, _build_map, static_cast<gpointer>(&attributes_cpp))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We iterate through the dict once here, than through the map created here once again the the ctor. I believe there is opportunity for optimization.
FilterXObject *data_arg = filterx_ref_unwrap_ro(data); | ||
FilterXObject *attributes = args[1]; | ||
FilterXObject *attributes_arg = filterx_ref_unwrap_ro(attributes); | ||
if (filterx_object_is_type(data_arg, &FILTERX_TYPE_NAME(string)) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an interesting question here: what types should we support here? The PubSub gRPC proto allows us to send any bytes data, not just string, so I believe we should support everything here. The question then arises: how should we format each type? I think for string, bytes and protobuf get_value()
can be used for performance optimization, and filterx_object_marshal()
for every other type would bring us far.
} | ||
catch (const std::runtime_error &e) | ||
{ | ||
msg_error("FilterX: Failed to create Pubsup Message object", evt_tag_str("error", e.what())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: Pubsub -> PubSub or Pub/Sub
if (!filterx_object_repr(key, key_str)) | ||
return FALSE; | ||
if (!filterx_object_repr(val, val_str)) | ||
return FALSE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the majority of the time these will already be strings, and repr()
copies strings even. For strings we could call get_value()
, to optimize out the copy, for the rest repr()
is perfect.
* Copyright (c) 2024 Axoflow | ||
* Copyright (c) 2023 shifter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use recent dates.
FILTERX_DECLARE_TYPE(pubsub_message); | ||
|
||
static inline void | ||
pubsub_filterx_objects_global_init(void) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be called in google_pubsub_grpc_module_init()
?
buf_slice = this->format_template(attribute.value, msg, buf, NULL, this->super->super.seq_num); | ||
attributes->insert({attribute.name, buf_slice.str}); | ||
message_bytes += buf_slice.len; | ||
if (log_template_is_trivial(owner_->protovar)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this check should go to the setter and we should refuse the value if it is not trivial.
std::cout << "Key: \"" << key << "\", Length: " << key.length() << "\n"; | ||
std::cout << "Value: \"" << value << "\", Length: " << value.length() << "\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove these debug messages.
|
||
auto attributes = message->mutable_attributes(); | ||
for (const auto &attribute : owner_->attributes) | ||
if (owner_->protovar) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional]
I think this is a good candidate for factoring out to some smaller functions.
Add pubsub_message() function to FilterX and extend Pub/Sub destination with protovar
This PR introduces new functionality for managing Google Pub/Sub messages in syslog-ng:
Together, these changes provide greater flexibility and control over Pub/Sub destination messages, supporting advanced customization directly from FilterX.
example: