Skip to content

Commit

Permalink
fix: remove shared state (#4312)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Feb 5, 2025
1 parent f08305e commit abebfb1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
2 changes: 2 additions & 0 deletions deployment/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ timeline:
repository: "ftl:5000/ftl-timeline"
tag: "latest"
pullPolicy: Always

console:
image:
repository: "ftl:5000/ftl-console"
Expand All @@ -75,6 +76,7 @@ lease:
repository: "ftl:5000/ftl-lease"
tag: "latest"
pullPolicy: Always

istio:
enabled: true

Expand Down
14 changes: 7 additions & 7 deletions internal/raft/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/lni/dragonboat/v4/statemachine"

raftpb "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1"
raftpbconnect "github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1/raftpbconnect"
"github.com/block/ftl/backend/protos/xyz/block/ftl/raft/v1/raftpbconnect"
ftlv1 "github.com/block/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/block/ftl/internal/channels"
"github.com/block/ftl/internal/iterops"
Expand Down Expand Up @@ -133,8 +133,6 @@ type ShardHandle[Q any, R any, E sm.Marshallable] struct {
cluster *Cluster
session *client.Session

lastKnownIndex atomic.Value[uint64]

mu sync.Mutex
}

Expand Down Expand Up @@ -223,7 +221,9 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq
if err != nil {
logger.Errorf(err, "Failed to get last index")
}
s.lastKnownIndex.Store(last)

lastKnownIndex := atomic.Value[uint64]{}
lastKnownIndex.Store(last)

previous, err := s.Query(ctx, query)
if err != nil {
Expand All @@ -248,10 +248,10 @@ func (s *ShardHandle[Q, R, E]) StateIter(ctx context.Context, query Q) (iter.Seq
last, err := s.getLastIndex()
if err != nil {
logger.Warnf("Failed to get last index: %s", err)
} else if last > s.lastKnownIndex.Load() {
logger.Debugf("Changes detected, index: %d -> %d on (%d, %d)", s.lastKnownIndex.Load(), last, s.shardID, s.cluster.config.ReplicaID)
} else if last > lastKnownIndex.Load() {
logger.Debugf("Changes detected, index: %d -> %d on (%d, %d)", lastKnownIndex.Load(), last, s.shardID, s.cluster.config.ReplicaID)

s.lastKnownIndex.Store(last)
lastKnownIndex.Store(last)

res, err := s.Query(ctx, query)

Expand Down

0 comments on commit abebfb1

Please sign in to comment.