Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filterx pubsub message #456

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

filterx pubsub message #456

wants to merge 4 commits into from

Conversation

bshifter
Copy link
Member

Add pubsub_message() function to FilterX and extend Pub/Sub destination with protovar

This PR introduces new functionality for managing Google Pub/Sub messages in syslog-ng:

  1. pubsub_message() Function:
  • Adds a FilterX representation of the Google Pub/Sub message Protobuf descriptor.
  • Enables users to define and manipulate Pub/Sub message content directly within the FilterX scripting module (source-side).
  1. protovar Option in Pub/Sub Destination:
  • Extends the Pub/Sub destination module with the protovar option.
  • Allows referencing the FilterX-defined Pub/Sub Protobuf variable for setting the outgoing message payload.
    Together, these changes provide greater flexibility and control over Pub/Sub destination messages, supporting advanced customization directly from FilterX.

example:

filterx {
    # Adding data and attributes 
    $x = pubsub_message($MESSAGE, {"foo":{"bar":"baz"}});
};

log {
    destination {
        google-pubsub-grpc(
            ...
            proto_var($x)
        );
    };
};


Copy link
Contributor

github-actions bot commented Jan 15, 2025

This Pull Request introduces config grammar changes

axoflow/7a663104e8c0914fc63cb0d3da350f988dda1f50 -> bshifter/fx-pubsub

--- a/destination
+++ b/destination

 google-pubsub-grpc(
+    proto-var(
+        <template-content>
+        <template-reference>
+    )
 )

@bshifter bshifter force-pushed the fx-pubsub branch 3 times, most recently from f7510c9 to d15fbc7 Compare January 16, 2025 10:27
@alltilla alltilla self-requested a review January 16, 2025 12:45
The implementation follows the existing patterns and designs of the OpenTelemetry (OTel) filterx modules for consistency.

Signed-off-by: shifter <[email protected]>
The protovar option in the Pub/Sub destination allows users to directly access the Protobuf byte representation created by a FilterX function on the source side.

The Protobuf serialized variable is used to set the Pub/Sub message's content.
This enables users to fully control and manage the outgoing messages of the Pub/Sub destination from within FilterX.
By leveraging this option, users can customize the message payload dynamically, based on their specific requirements.
The data() and attributes() options cannot be used with protovar(). These options are mutually exclusive.

Signed-off-by: shifter <[email protected]>
Comment on lines +2 to +3
* Copyright (c) 2024 Axoflow
* Copyright (c) 2023 shifter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the current year.

{
std::string data_cpp(filterx_string_get_value_ref(data_arg, NULL));
std::map<std::string, std::string> attributes_cpp;
if (!filterx_dict_iter(attributes_arg, _build_map, static_cast<gpointer>(&attributes_cpp)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We iterate through the dict once here, than through the map created here once again the the ctor. I believe there is opportunity for optimization.

FilterXObject *data_arg = filterx_ref_unwrap_ro(data);
FilterXObject *attributes = args[1];
FilterXObject *attributes_arg = filterx_ref_unwrap_ro(attributes);
if (filterx_object_is_type(data_arg, &FILTERX_TYPE_NAME(string)) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an interesting question here: what types should we support here? The PubSub gRPC proto allows us to send any bytes data, not just string, so I believe we should support everything here. The question then arises: how should we format each type? I think for string, bytes and protobuf get_value() can be used for performance optimization, and filterx_object_marshal() for every other type would bring us far.

}
catch (const std::runtime_error &e)
{
msg_error("FilterX: Failed to create Pubsup Message object", evt_tag_str("error", e.what()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Pubsub -> PubSub or Pub/Sub

Comment on lines +180 to +183
if (!filterx_object_repr(key, key_str))
return FALSE;
if (!filterx_object_repr(val, val_str))
return FALSE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the majority of the time these will already be strings, and repr() copies strings even. For strings we could call get_value(), to optimize out the copy, for the rest repr() is perfect.

Comment on lines +2 to +3
* Copyright (c) 2024 Axoflow
* Copyright (c) 2023 shifter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use recent dates.

FILTERX_DECLARE_TYPE(pubsub_message);

static inline void
pubsub_filterx_objects_global_init(void)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be called in google_pubsub_grpc_module_init()?

buf_slice = this->format_template(attribute.value, msg, buf, NULL, this->super->super.seq_num);
attributes->insert({attribute.name, buf_slice.str});
message_bytes += buf_slice.len;
if (log_template_is_trivial(owner_->protovar))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this check should go to the setter and we should refuse the value if it is not trivial.

Comment on lines +138 to +139
std::cout << "Key: \"" << key << "\", Length: " << key.length() << "\n";
std::cout << "Value: \"" << value << "\", Length: " << value.length() << "\n";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove these debug messages.


auto attributes = message->mutable_attributes();
for (const auto &attribute : owner_->attributes)
if (owner_->protovar)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional]

I think this is a good candidate for factoring out to some smaller functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants