From 8152b7319fa06fe9384833ba50a3fc73bd9728d8 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Wed, 12 Feb 2025 19:35:48 +0100 Subject: [PATCH] (#2209) Create a stream to hold execution events Signed-off-by: R.I.Pienaar --- broker/network/network_jetstream.go | 25 ++++++++++++++++++++++++- config/choria.go | 2 ++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/broker/network/network_jetstream.go b/broker/network/network_jetstream.go index 5060d5f4..f00a5151 100644 --- a/broker/network/network_jetstream.go +++ b/broker/network/network_jetstream.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2024, R.I. Pienaar and the Choria Project contributors +// Copyright (c) 2020-2025, R.I. Pienaar and the Choria Project contributors // // SPDX-License-Identifier: Apache-2.0 @@ -134,6 +134,29 @@ func (s *Server) configureSystemStreams(ctx context.Context) error { return err } + if cfg.NetworkExecutorStoreDuration > 0 { + execCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream, + jsm.Subjects("choria.submission.choria.execution.>"), + jsm.StreamDescription("Choria Executor Events"), + jsm.Replicas(cfg.NetworkExecutorReplicas), + jsm.MaxAge(cfg.NetworkExecutorStoreDuration), + jsm.FileStorage(), + jsm.AllowDirect(), + ) + if err != nil { + return err + } + execCfg.SubjectTransform = &api.SubjectTransformConfig{ // TODO: next jsm has a option func for this + Source: "choria.submission.choria.execution.>", + Destination: "choria.execution.>", + } + + err = s.createOrUpdateStreamWithConfig("CHORIA_EXECUTOR", *execCfg, mgr) + if err != nil { + return err + } + } + eCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream, jsm.Replicas(cfg.NetworkLeaderElectionReplicas), jsm.MaxAge(cfg.NetworkLeaderElectionTTL), diff --git a/config/choria.go b/config/choria.go index 867532ce..2821f933 100644 --- a/config/choria.go +++ b/config/choria.go @@ -50,6 +50,8 @@ type ChoriaPluginConfig struct { NetworkClientTLSForce bool `confkey:"plugin.choria.network.client_tls_force_required"` // Force requiring/not requiring TLS for all clients NetworkClientTokenSigners []string `confkey:"plugin.choria.network.client_signer_cert" type:"comma_split"` // Fully qualified paths to the public certificates used by the AAA Service to sign client JWT tokens. This enables users with signed JWTs to use unverified TLS to connect. Can also be a list of ed25519 public keys. NetworkDenyServers bool `confkey:"plugin.choria.network.deny_server_connections"` // Set ACLs denying server connections to this broker + NetworkExecutorStoreDuration time.Duration `confkey:"plugin.choria.network.stream.executor_retention" type:"duration" default:"24h"` // When not zero enables retaining Executor events in the Stream Store + NetworkExecutorReplicas int `confkey:"plugin.choria.network.stream.executor_replicas" default:"-1"` // When configuring Executor events ensure data is replicated in the cluster over this many servers, -1 means count of peers NetworkEventStoreDuration time.Duration `confkey:"plugin.choria.network.stream.event_retention" type:"duration" default:"24h"` // When not zero enables retaining Lifecycle events in the Stream Store NetworkEventStoreReplicas int `confkey:"plugin.choria.network.stream.event_replicas" default:"-1"` // When configuring LifeCycle events ensure data is replicated in the cluster over this many servers, -1 means count of peers NetworkGatewayName string `confkey:"plugin.choria.network.gateway_name" default:"CHORIA"` // Name for the Super Cluster