From abebfb18fccd774923e3bc6ba9cbef433e81c73e Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 5 Feb 2025 15:19:26 +1100 Subject: [PATCH] fix: remove shared state (#4312) --- deployment/values.yaml | 2 ++ internal/raft/cluster.go | 14 +++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/deployment/values.yaml b/deployment/values.yaml index 3d962b9c83..d0674ce4fd 100644 --- a/deployment/values.yaml +++ b/deployment/values.yaml @@ -58,6 +58,7 @@ timeline: repository: "ftl:5000/ftl-timeline" tag: "latest" pullPolicy: Always + console: image: repository: "ftl:5000/ftl-console" @@ -75,6 +76,7 @@ lease: repository: "ftl:5000/ftl-lease" tag: "latest" pullPolicy: Always + istio: enabled: true diff --git a/internal/raft/cluster.go b/internal/raft/cluster.go index 8ed58c5041..1043b6e408 100644 --- a/internal/raft/cluster.go +++ b/internal/raft/cluster.go @@ -19,7 +19,7 @@ import ( "github.com/lni/dragonboat/v4/statemachine" raftpb "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1" - raftpbconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1/raftpbconnect" + "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1/raftpbconnect" ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1" "github.com/block/ftl/internal/channels" "github.com/block/ftl/internal/iterops" @@ -133,8 +133,6 @@ type ShardHandle[Q any, R any, E sm.Marshallable] struct { cluster *Cluster session *client.Session - lastKnownIndex atomic.Value[uint64] - mu sync.Mutex } @@ -223,7 +221,9 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq if err != nil { logger.Errorf(err, "Failed to get last index") } - s.lastKnownIndex.Store(last) + + lastKnownIndex := atomic.Value[uint64]{} + lastKnownIndex.Store(last) previous, err := s.Query(ctx, query) if err != nil { @@ -248,10 +248,10 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq last, err := s.getLastIndex() if err != nil { logger.Warnf("Failed to get last index: %s", err) - } else if last > s.lastKnownIndex.Load() { - logger.Debugf("Changes detected, index: %d -> %d on (%d, %d)", s.lastKnownIndex.Load(), last, s.shardID, s.cluster.config.ReplicaID) + } else if last > lastKnownIndex.Load() { + logger.Debugf("Changes detected, index: %d -> %d on (%d, %d)", lastKnownIndex.Load(), last, s.shardID, s.cluster.config.ReplicaID) - s.lastKnownIndex.Store(last) + lastKnownIndex.Store(last) res, err := s.Query(ctx, query)