Skip to content

Commit

Permalink
fix: stop streaming on querying new stuff from ds client
Browse files Browse the repository at this point in the history
  • Loading branch information
V-Staykov committed Oct 31, 2024
1 parent fb6eb38 commit d90a0fd
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
14 changes: 10 additions & 4 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (c *StreamClient) stopStreamingIfStarted() error {
c.sendStopCmd()
c.streaming.Store(false)

// empty the socket buffer
for {
c.conn.SetReadDeadline(time.Now().Add(100))
if _, err := c.readBuffer(100); err != nil {
Expand Down Expand Up @@ -266,9 +267,7 @@ func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error
func (c *StreamClient) GetLastWrittenTimeAtomic() *atomic.Int64 {
return &c.lastWrittenTime
}
func (c *StreamClient) GetStreamingAtomic() *atomic.Bool {
return &c.streaming
}

func (c *StreamClient) GetProgressAtomic() *atomic.Uint64 {
return &c.progress
}
Expand Down Expand Up @@ -300,6 +299,10 @@ func (c *StreamClient) Stop() {
// Returns the current status of the header.
// If started, terminate the connection.
func (c *StreamClient) GetHeader() (*types.HeaderEntry, error) {
if err := c.stopStreamingIfStarted(); err != nil {
return nil, fmt.Errorf("stopStreamingIfStarted: %w", err)
}

if err := c.sendHeaderCmd(); err != nil {
return nil, fmt.Errorf("sendHeaderCmd: %w", err)
}
Expand Down Expand Up @@ -417,6 +420,10 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
default:
}
if connected {
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("stopStreamingIfStarted: %w", err)
}

if err = c.readAllEntriesToChannel(); err == nil {
break
}
Expand All @@ -442,7 +449,6 @@ func (c *StreamClient) handleSocketError(socketErr error) bool {
return false
}

c.streaming.Store(false)
c.RenewEntryChannel()

return true
Expand Down
1 change: 0 additions & 1 deletion zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type DatastreamClient interface {
GetEntryChan() *chan interface{}
GetL2BlockByNumber(blockNum uint64) (*types.FullL2Block, error)
GetLatestL2Block() (*types.FullL2Block, error)
GetStreamingAtomic() *atomic.Bool
GetProgressAtomic() *atomic.Uint64
Start() error
Stop()
Expand Down
4 changes: 0 additions & 4 deletions zk/stages/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ func (c *TestDatastreamClient) GetLastWrittenTimeAtomic() *atomic.Int64 {
return &c.lastWrittenTimeAtomic
}

func (c *TestDatastreamClient) GetStreamingAtomic() *atomic.Bool {
return &c.streamingAtomic
}

func (c *TestDatastreamClient) GetProgressAtomic() *atomic.Uint64 {
return &c.progress
}
Expand Down

0 comments on commit d90a0fd

Please sign in to comment.