Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#2209) Create a stream to hold execution events #2221

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion broker/network/network_jetstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2024, R.I. Pienaar and the Choria Project contributors
// Copyright (c) 2020-2025, R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -134,6 +134,29 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
return err
}

if cfg.NetworkExecutorStoreDuration > 0 {
execCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream,
jsm.Subjects("choria.submission.choria.execution.>"),
jsm.StreamDescription("Choria Executor Events"),
jsm.Replicas(cfg.NetworkExecutorReplicas),
jsm.MaxAge(cfg.NetworkExecutorStoreDuration),
jsm.FileStorage(),
jsm.AllowDirect(),
)
if err != nil {
return err
}
execCfg.SubjectTransform = &api.SubjectTransformConfig{ // TODO: next jsm has a option func for this
Source: "choria.submission.choria.execution.>",
Destination: "choria.execution.>",
}

err = s.createOrUpdateStreamWithConfig("CHORIA_EXECUTOR", *execCfg, mgr)
if err != nil {
return err
}
}

eCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream,
jsm.Replicas(cfg.NetworkLeaderElectionReplicas),
jsm.MaxAge(cfg.NetworkLeaderElectionTTL),
Expand Down
2 changes: 2 additions & 0 deletions config/choria.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ChoriaPluginConfig struct {
NetworkClientTLSForce bool `confkey:"plugin.choria.network.client_tls_force_required"` // Force requiring/not requiring TLS for all clients
NetworkClientTokenSigners []string `confkey:"plugin.choria.network.client_signer_cert" type:"comma_split"` // Fully qualified paths to the public certificates used by the AAA Service to sign client JWT tokens. This enables users with signed JWTs to use unverified TLS to connect. Can also be a list of ed25519 public keys.
NetworkDenyServers bool `confkey:"plugin.choria.network.deny_server_connections"` // Set ACLs denying server connections to this broker
NetworkExecutorStoreDuration time.Duration `confkey:"plugin.choria.network.stream.executor_retention" type:"duration" default:"24h"` // When not zero enables retaining Executor events in the Stream Store
NetworkExecutorReplicas int `confkey:"plugin.choria.network.stream.executor_replicas" default:"-1"` // When configuring Executor events ensure data is replicated in the cluster over this many servers, -1 means count of peers
NetworkEventStoreDuration time.Duration `confkey:"plugin.choria.network.stream.event_retention" type:"duration" default:"24h"` // When not zero enables retaining Lifecycle events in the Stream Store
NetworkEventStoreReplicas int `confkey:"plugin.choria.network.stream.event_replicas" default:"-1"` // When configuring LifeCycle events ensure data is replicated in the cluster over this many servers, -1 means count of peers
NetworkGatewayName string `confkey:"plugin.choria.network.gateway_name" default:"CHORIA"` // Name for the Super Cluster
Expand Down
Loading