Skip to content

Commit

Permalink
Merge pull request #2222 from ripienaar/2221.2
Browse files Browse the repository at this point in the history
(#2221) Correctly handle replica count for exec stream
  • Loading branch information
ripienaar authored Feb 13, 2025
2 parents 096601a + 1424fda commit 2bcdacc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
13 changes: 9 additions & 4 deletions broker/network/network_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
var err error

cfg := s.config.Choria
if cfg.NetworkEventStoreReplicas == -1 || cfg.NetworkMachineStoreReplicas == -1 || cfg.NetworkStreamAdvisoryReplicas == -1 || cfg.NetworkLeaderElectionReplicas == -1 {
if cfg.NetworkExecutorReplicas == -1 || cfg.NetworkEventStoreReplicas == -1 || cfg.NetworkMachineStoreReplicas == -1 || cfg.NetworkStreamAdvisoryReplicas == -1 || cfg.NetworkLeaderElectionReplicas == -1 {
delay := time.Duration(rand.N(60)+10) * time.Second
s.log.Infof("Configuring system streams after %v", delay)
err = backoff.Default.Sleep(ctx, delay)
Expand Down Expand Up @@ -92,6 +92,11 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
s.log.Infof("Setting Choria Streams Leader election Replicas to %d", count)
cfg.NetworkLeaderElectionReplicas = count
}

if cfg.NetworkExecutorReplicas == -1 {
s.log.Infof("Setting Choria Streams Executor Replicas to %d", count)
cfg.NetworkExecutorReplicas = count
}
}

err = backoff.TwentySec.For(ctx, func(try int) error {
Expand Down Expand Up @@ -136,7 +141,7 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {

if cfg.NetworkExecutorStoreDuration > 0 {
execCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream,
jsm.Subjects("choria.submission.choria.execution.>"),
jsm.Subjects("choria.submission.in.choria.executor.>"),
jsm.StreamDescription("Choria Executor Events"),
jsm.Replicas(cfg.NetworkExecutorReplicas),
jsm.MaxAge(cfg.NetworkExecutorStoreDuration),
Expand All @@ -147,8 +152,8 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
return err
}
execCfg.SubjectTransform = &api.SubjectTransformConfig{ // TODO: next jsm has a option func for this
Source: "choria.submission.choria.execution.>",
Destination: "choria.execution.>",
Source: "choria.submission.in.choria.executor.>",
Destination: "choria.executor.>",
}

err = s.createOrUpdateStreamWithConfig("CHORIA_EXECUTOR", *execCfg, mgr)
Expand Down
5 changes: 5 additions & 0 deletions cmd/executor_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package cmd
5 changes: 5 additions & 0 deletions providers/execution/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) 2025, R.I. Pienaar and the Choria Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package execution

0 comments on commit 2bcdacc

Please sign in to comment.