From 4ee10c4789af31a054f96f968ed038db3aa3501f Mon Sep 17 00:00:00 2001 From: ryan smith Date: Tue, 30 Apr 2024 18:15:50 -0700 Subject: [PATCH] shovel: bugfix. reorg may halt progress until restart The process for handling a reorg is as follows: 1. Shovel task gets latest remote block 2. Shovel task gets latest local block 3. Shovel decides which blocks to process 4. Shovel uses jrpc2 client (with cache) to download blocks 5. Shovel checks downloaded chain with latest local hash for reorg 6. If reorg then delete local (and all indexed data for that block) 7. GOTO 1 The problem is that step 4 may keep a block that has been removed from the chain in memory so that subsequent requests are given the removed block and therefore all new blocks are unable to be connected. This commit introduces the concept of maxreads for the block/header cache. When a task hits the maxreads for a particular block (segment) then the block will be removed from the cache. This approach maintains our ability to reduce downloaded data for concurrent tasks while providing a way for Shovel to make progress in the event of temporary, corrupt data. The maxreads is set in shovel's task setup to be equal to the total number of integrations configured. This should at least provide a smart way to reduce the time waiting for the invalid data to be pruned. Another approach would have been to have the task reset the cache when a reorg is detected, but that seemed too complicated. I don't like the idea of leaking the cache abstraction outside of the jrpc2 package. Another idea is to change the Get API to take a hash in addition to num/limit. But this makes the API more complicated to use. --- jrpc2/client.go | 40 ++++++++++++++++++++++++++++---- jrpc2/client_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++- shovel/glf/filter.go | 29 ++++++++++++++--------- shovel/task.go | 5 +++- 4 files changed, 112 insertions(+), 17 deletions(-) diff --git a/jrpc2/client.go b/jrpc2/client.go index a3c0aaa0..c15a8e47 100644 --- a/jrpc2/client.go +++ b/jrpc2/client.go @@ -47,6 +47,8 @@ func New(url string) *Client { pollDuration: time.Second, url: url, lcache: NumHash{maxreads: 20}, + bcache: cache{maxreads: 20}, + hcache: cache{maxreads: 20}, } } @@ -66,6 +68,8 @@ type Client struct { func (c *Client) WithMaxReads(n int) *Client { c.lcache.maxreads = n + c.bcache.maxreads = n + c.hcache.maxreads = n return c } @@ -368,6 +372,16 @@ func (c *Client) Get( filter *glf.Filter, start, limit uint64, ) ([]eth.Block, error) { + t0 := time.Now() + defer func() { + slog.DebugContext(ctx, + "jrpc2-get", + "start", start, + "limit", limit, + "filter", filter, + "elapsed", time.Since(t0), + ) + }() var ( blocks []eth.Block err error @@ -422,18 +436,28 @@ type blockResp struct { type segment struct { sync.Mutex - done bool - d []eth.Block + nreads int + done bool + d []eth.Block } type cache struct { sync.Mutex + maxreads int segments map[key]*segment } type getter func(ctx context.Context, start, limit uint64) ([]eth.Block, error) -func (c *cache) prune() { +func (c *cache) pruneMaxRead() { + for k, v := range c.segments { + if v.nreads >= c.maxreads { + delete(c.segments, k) + } + } +} + +func (c *cache) pruneSegments() { const size = 5 if len(c.segments) <= size { return @@ -458,16 +482,18 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge if c.segments == nil { c.segments = make(map[key]*segment) } + c.pruneMaxRead() seg, ok := c.segments[key{start, limit}] if !ok { seg = &segment{} c.segments[key{start, limit}] = seg } - c.prune() + c.pruneSegments() c.Unlock() seg.Lock() defer seg.Unlock() + seg.nreads++ if seg.done { return seg.d, nil } @@ -484,6 +510,7 @@ func (c *cache) get(nocache bool, ctx context.Context, start, limit uint64, f ge func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, error) { var ( + t0 = time.Now() reqs = make([]request, limit) resps = make([]blockResp, limit) blocks = make([]eth.Block, limit) @@ -507,6 +534,11 @@ func (c *Client) blocks(ctx context.Context, start, limit uint64) ([]eth.Block, return nil, fmt.Errorf("rpc=%s %w", tag, resps[i].Error) } } + slog.Debug("http get blocks", + "start", start, + "limit", limit, + "latency", time.Since(t0), + ) return blocks, validate("blocks", start, limit, blocks) } diff --git a/jrpc2/client_test.go b/jrpc2/client_test.go index 11e543f6..ee7dcb5b 100644 --- a/jrpc2/client_test.go +++ b/jrpc2/client_test.go @@ -18,6 +18,7 @@ import ( "github.com/indexsupply/x/eth" "github.com/indexsupply/x/shovel/glf" + "github.com/indexsupply/x/tc" "golang.org/x/sync/errgroup" "kr.dev/diff" ) @@ -44,7 +45,7 @@ func (tg *testGetter) get(ctx context.Context, start, limit uint64) ([]eth.Block func TestCache_Prune(t *testing.T) { ctx := context.Background() tg := testGetter{} - c := cache{} + c := cache{maxreads: 2} blocks, err := c.get(false, ctx, 1, 1, tg.get) diff.Test(t, t.Fatalf, nil, err) diff.Test(t, t.Errorf, 1, len(blocks)) @@ -76,6 +77,25 @@ func TestCache_Prune(t *testing.T) { }) } +func TestCache_MaxReads(t *testing.T) { + var ( + ctx = context.Background() + tg = testGetter{} + c = cache{maxreads: 2} + ) + _, err := c.get(false, ctx, 1, 1, tg.get) + tc.NoErr(t, err) + tc.WantGot(t, 1, tg.callCount) + + _, err = c.get(false, ctx, 1, 1, tg.get) + tc.NoErr(t, err) + tc.WantGot(t, 1, tg.callCount) + + _, err = c.get(false, ctx, 1, 1, tg.get) + tc.NoErr(t, err) + tc.WantGot(t, 2, tg.callCount) +} + var ( //go:embed testdata/block-18000000.json block18000000JSON string @@ -356,6 +376,39 @@ func TestGet_Cached(t *testing.T) { eg.Wait() } +// Test that a block cache removes its segments after +// they've been read N times. Once N is reached, subsequent +// calls to Get should make new requests. +func TestGet_Cached_Pruned(t *testing.T) { + var n int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + diff.Test(t, t.Fatalf, nil, err) + switch { + case strings.Contains(string(body), "eth_getBlockByNumber"): + atomic.AddInt32(&n, 1) + _, err := w.Write([]byte(block18000000JSON)) + diff.Test(t, t.Fatalf, nil, err) + } + })) + defer ts.Close() + var ( + ctx = context.Background() + c = New(ts.URL).WithMaxReads(2) + ) + _, err := c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + diff.Test(t, t.Errorf, nil, err) + diff.Test(t, t.Errorf, n, int32(1)) + _, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + diff.Test(t, t.Errorf, nil, err) + diff.Test(t, t.Errorf, n, int32(1)) + + //maxreads should have been reached with last 2 calls + _, err = c.Get(ctx, &glf.Filter{UseHeaders: true}, 18000000, 1) + diff.Test(t, t.Errorf, nil, err) + diff.Test(t, t.Errorf, n, int32(2)) +} + func TestNoLogs(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) diff --git a/shovel/glf/filter.go b/shovel/glf/filter.go index 6f936233..c506cf44 100644 --- a/shovel/glf/filter.go +++ b/shovel/glf/filter.go @@ -1,7 +1,7 @@ // eth_getLogs filter package glf -import "fmt" +import "strings" type Filter struct { needs []string @@ -46,16 +46,23 @@ func (f *Filter) Addresses() []string { return f.addresses } func (f *Filter) Topics() [][]string { return f.topics } func (f *Filter) String() string { - return fmt.Sprintf( - "headers=%t blocks=%t receipts=%t logs=%t trace=%t addrs=%d topics=%d", - f.UseHeaders, - f.UseBlocks, - f.UseReceipts, - f.UseLogs, - f.UseTraces, - len(f.addresses), - len(f.topics), - ) + var opts = make([]string, 0, 7) + if f.UseLogs { + opts = append(opts, "l") + } + if f.UseHeaders { + opts = append(opts, "h") + } + if f.UseBlocks { + opts = append(opts, "b") + } + if f.UseReceipts { + opts = append(opts, "r") + } + if f.UseTraces { + opts = append(opts, "t") + } + return strings.Join(opts, ",") } func any(a, b []string) bool { diff --git a/shovel/task.go b/shovel/task.go index 5d46d369..6e125210 100644 --- a/shovel/task.go +++ b/shovel/task.go @@ -749,7 +749,10 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, c config.Root) ([]*Task, } var sources = map[string]Source{} for _, sc := range scByName { - sources[sc.Name] = jrpc2.New(sc.URL).WithWSURL(sc.WSURL).WithPollDuration(sc.PollDuration) + sources[sc.Name] = jrpc2.New(sc.URL). + WithWSURL(sc.WSURL). + WithPollDuration(sc.PollDuration). + WithMaxReads(len(allIntegrations)) } var tasks []*Task for _, ig := range allIntegrations {