Skip to content

Commit

Permalink
shovel: bugfix. reorg may halt progress until restart
Browse files Browse the repository at this point in the history
The process for handling a reorg is as follows:

1. Shovel task gets latest remote block
2. Shovel task gets latest local block
3. Shovel decides which blocks to process
4. Shovel uses jrpc2 client (with cache) to download blocks
5. Shovel checks downloaded chain with latest local hash for reorg
6. If reorg then delete local (and all indexed data for that block)
7. GOTO 1

The problem is that step 4 may keep a block that has been removed
from the chain in memory so that subsequent requests are
given the removed block and therefore all new blocks are unable
to be connected.

This commit introduces the concept of maxreads for the block/header
cache. When a task hits the maxreads for a particular block (segment)
then the block will be removed from the cache. This approach maintains
our ability to reduce downloaded data for concurrent tasks while
providing a way for Shovel to make progress in the event of temporary,
corrupt data. The maxreads is set in shovel's task setup to be equal to
the total number of integrations configured. This should at least
provide a smart way to reduce the time waiting for the invalid data to
be pruned.

Another approach would have been to have the task reset the cache when a
reorg is detected, but that seemed too complicated. I don't like the idea
of leaking the cache abstraction outside of the jrpc2 package.

Another idea is to change the Get API to take a hash in addition to
num/limit. But this makes the API more complicated to use.
  • Loading branch information
ryandotsmith committed May 1, 2024
1 parent 64c35e8 commit 4ee10c4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 17 deletions.
40 changes: 36 additions & 4 deletions jrpc2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func New(url string) *Client {
pollDuration: time.Second,
url: url,
lcache: NumHash{maxreads: 20},
bcache: cache{maxreads: 20},
hcache: cache{maxreads: 20},
}
}

Expand All @@ -66,6 +68,8 @@ type Client struct {

func (c *Client) WithMaxReads(n int) *Client {
c.lcache.maxreads = n
c.bcache.maxreads = n
c.hcache.maxreads = n
return c
}

Expand Down Expand Up @@ -368,6 +372,16 @@ func (c *Client) Get(
filter *glf.Filter,
start, limit uint64,
) ([]eth.Block, error) {
t0 := time.Now()
defer func() {
slog.DebugContext(ctx,
"jrpc2-get",
"start", start,
"limit", limit,
"filter", filter,
"elapsed", time.Since(t0),
)
}()
var (
blocks []eth.Block
err error
Expand Down Expand Up @@ -422,18 +436,28 @@ type blockResp struct {

type segment struct {
sync.Mutex
done bool
d []eth.Block
nreads int
done bool
d []eth.Block
}

type cache struct {
sync.Mutex
maxreads int
segments map[key]*segment
}

type getter func(ctx context.Context, start, limit uint64) ([]eth.Block, error)

func (c *cache) prune() {
func (c *cache) pruneMaxRead() {
for k, v := range c.segments {
if v.nreads >= c.maxreads {
delete(c.segments, k)
}
}
}

func (c *cache) pruneSegments() {
const size = 5
if len(c.segments) <= size {
return
Expand All @@ -458,16 +482,18 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge
if c.segments == nil {
c.segments = make(map[key]*segment)
}
c.pruneMaxRead()
seg, ok := c.segments[key{start, limit}]
if !ok {
seg = &segment{}
c.segments[key{start, limit}] = seg
}
c.prune()
c.pruneSegments()
c.Unlock()

seg.Lock()
defer seg.Unlock()
seg.nreads++
if seg.done {
return seg.d, nil
}
Expand All @@ -484,6 +510,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge

func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, error) {
var (
t0 = time.Now()
reqs = make([]request, limit)
resps = make([]blockResp, limit)
blocks = make([]eth.Block, limit)
Expand All @@ -507,6 +534,11 @@ func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block,
return nil, fmt.Errorf("rpc=%s %w", tag, resps[i].Error)
}
}
slog.Debug("http get blocks",
"start", start,
"limit", limit,
"latency", time.Since(t0),
)
return blocks, validate("blocks", start, limit, blocks)
}

Expand Down
55 changes: 54 additions & 1 deletion jrpc2/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/indexsupply/x/eth"
"github.com/indexsupply/x/shovel/glf"
"github.com/indexsupply/x/tc"
"golang.org/x/sync/errgroup"
"kr.dev/diff"
)
Expand All @@ -44,7 +45,7 @@ func (tg *testGetter) get(ctx context.Context, start, limit uint64) ([]eth.Block
func TestCache_Prune(t *testing.T) {
ctx := context.Background()
tg := testGetter{}
c := cache{}
c := cache{maxreads: 2}
blocks, err := c.get(false, ctx, 1, 1, tg.get)
diff.Test(t, t.Fatalf, nil, err)
diff.Test(t, t.Errorf, 1, len(blocks))
Expand Down Expand Up @@ -76,6 +77,25 @@ func TestCache_Prune(t *testing.T) {
})
}

func TestCache_MaxReads(t *testing.T) {
var (
ctx = context.Background()
tg = testGetter{}
c = cache{maxreads: 2}
)
_, err := c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 1, tg.callCount)

_, err = c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 1, tg.callCount)

_, err = c.get(false, ctx, 1, 1, tg.get)
tc.NoErr(t, err)
tc.WantGot(t, 2, tg.callCount)
}

var (
//go:embed testdata/block-18000000.json
block18000000JSON string
Expand Down Expand Up @@ -356,6 +376,39 @@ func TestGet_Cached(t *testing.T) {
eg.Wait()
}

// Test that a block cache removes its segments after
// they've been read N times. Once N is reached, subsequent
// calls to Get should make new requests.
func TestGet_Cached_Pruned(t *testing.T) {
var n int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
diff.Test(t, t.Fatalf, nil, err)
switch {
case strings.Contains(string(body), "eth_getBlockByNumber"):
atomic.AddInt32(&n, 1)
_, err := w.Write([]byte(block18000000JSON))
diff.Test(t, t.Fatalf, nil, err)
}
}))
defer ts.Close()
var (
ctx = context.Background()
c = New(ts.URL).WithMaxReads(2)
)
_, err := c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(1))
_, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(1))

//maxreads should have been reached with last 2 calls
_, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1)
diff.Test(t, t.Errorf, nil, err)
diff.Test(t, t.Errorf, n, int32(2))
}

func TestNoLogs(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
Expand Down
29 changes: 18 additions & 11 deletions shovel/glf/filter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// eth_getLogs filter
package glf

import "fmt"
import "strings"

type Filter struct {
needs []string
Expand Down Expand Up @@ -46,16 +46,23 @@ func (f *Filter) Addresses() []string { return f.addresses }
func (f *Filter) Topics() [][]string { return f.topics }

func (f *Filter) String() string {
return fmt.Sprintf(
"headers=%t blocks=%t receipts=%t logs=%t trace=%t addrs=%d topics=%d",
f.UseHeaders,
f.UseBlocks,
f.UseReceipts,
f.UseLogs,
f.UseTraces,
len(f.addresses),
len(f.topics),
)
var opts = make([]string, 0, 7)
if f.UseLogs {
opts = append(opts, "l")
}
if f.UseHeaders {
opts = append(opts, "h")
}
if f.UseBlocks {
opts = append(opts, "b")
}
if f.UseReceipts {
opts = append(opts, "r")
}
if f.UseTraces {
opts = append(opts, "t")
}
return strings.Join(opts, ",")
}

func any(a, b []string) bool {
Expand Down
5 changes: 4 additions & 1 deletion shovel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,10 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, c config.Root) ([]*Task,
}
var sources = map[string]Source{}
for _, sc := range scByName {
sources[sc.Name] = jrpc2.New(sc.URL).WithWSURL(sc.WSURL).WithPollDuration(sc.PollDuration)
sources[sc.Name] = jrpc2.New(sc.URL).
WithWSURL(sc.WSURL).
WithPollDuration(sc.PollDuration).
WithMaxReads(len(allIntegrations))
}
var tasks []*Task
for _, ig := range allIntegrations {
Expand Down

0 comments on commit 4ee10c4

Please sign in to comment.