Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(BIDS-2634) reordered transactions, fix almost-empty table #2663

Merged
merged 9 commits into from
Feb 6, 2024
173 changes: 164 additions & 9 deletions db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,138 @@ func (bigtable *Bigtable) TransformWithdrawals(block *types.Eth1Block, cache *fr
return bulkData, bulkMetadataUpdates, nil
}

type IndexKeys struct {
indexes []string
keys []string
}

type SortByIndexes IndexKeys

func (sbi SortByIndexes) Len() int {
return len(sbi.indexes)
}

func (sbi SortByIndexes) Swap(i, j int) {
sbi.indexes[i], sbi.indexes[j] = sbi.indexes[j], sbi.indexes[i]
sbi.keys[i], sbi.keys[j] = sbi.keys[j], sbi.keys[i]
}

func (sbi SortByIndexes) Less(i, j int) bool {
i_splits := strings.Split(sbi.indexes[i], ":")
j_splits := strings.Split(sbi.indexes[j], ":")
if len(i_splits) != len(j_splits) || len(i_splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction indices", 0, map[string]interface{}{"index_i": sbi.indexes[i], "index_j": sbi.indexes[j]})
return false
}

// block
if i_splits[5] != j_splits[5] {
return i_splits[5] < j_splits[5]
}
// tx idx
if i_splits[6] != j_splits[6] {
if i_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) || j_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) {
return j_splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT)
}
return i_splits[6] < j_splits[6]
}
// itx idx
if len(i_splits) > 7 && i_splits[7] != j_splits[7] {
if i_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) || j_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) {
return j_splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT)
}
return i_splits[7] < j_splits[7]
}
// shouldn't happen, this means we've the same key twice
utils.LogError(nil, "unexpected bigtable transaction indices", 0, map[string]interface{}{"index_i": sbi.indexes[i], "index_j": sbi.indexes[j]})
return false
}

func (bigtable *Bigtable) rearrangeReversePaddedIndexZero(ctx context.Context, indexes, keys []string) ([]string, []string) {
if len(indexes) < 2 {
return indexes, keys
}

// first find out if we've a (sub)transaction with index 0 whose block/transaction has maybe not been completed by the request
// if we find one, make sure we do complete the request by querying the remainder from bigtable. So we won't miss that (i)tx next time (=> ignoring the query limit)
for i := 0; i < len(indexes); i++ {
splits := strings.Split(indexes[i], ":")
if len(splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": indexes[i]})
continue
}

if splits[6] != strconv.Itoa(TX_PER_BLOCK_LIMIT) && len(splits) > 7 && splits[7] != strconv.Itoa(ITX_PER_TX_LIMIT) {
continue
}
// check if results list all following (i)txs already
for i++; i < len(indexes); i++ {
next_splits := strings.Split(indexes[i], ":")
if len(next_splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": indexes[i]})
continue
}
if next_splits[5] != splits[5] || (len(splits) > 7 && next_splits[6] != splits[6]) {
// next block/tx
break
}
}
if i == len(indexes) {
// block/tx maybe isn't fully included in results, request all missing entries (ignoring the query limit)
i, err := strconv.Atoi(splits[5])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index timestamp", 0, map[string]interface{}{"index": splits[5]})
continue
}
splits[5] = fmt.Sprintf("%d", i+1)
rowRange := gcp_bigtable.NewRange(indexes[len(indexes)-1]+"\x00", strings.Join(splits[:6], ":"))
err = bigtable.tableData.ReadRows(ctx, rowRange, func(row gcp_bigtable.Row) bool {
keys = append(keys, strings.TrimPrefix(row[DEFAULT_FAMILY][0].Column, "f:"))
indexes = append(indexes, row.Key())
return true
})
if err != nil {
utils.LogError(err, "error reading from bigtable", 0)
}
break
}
i--
}

// sort
ik := IndexKeys{indexes, keys}
sort.Sort(SortByIndexes(ik))

return ik.indexes, ik.keys
}

func skipBlockIfLastTxIndex(key string) string {
splits := strings.Split(key, ":")
if len(splits) < 7 {
utils.LogError(nil, "unexpected bigtable transaction index", 0, map[string]interface{}{"index": key})
return key
}
if len(splits) == 8 && splits[7] == strconv.Itoa(ITX_PER_TX_LIMIT) && splits[6] != strconv.Itoa(TX_PER_BLOCK_LIMIT) {
i, err := strconv.Atoi(splits[6])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index", 0, map[string]interface{}{"index": splits[6]})
} else {
splits[6] = fmt.Sprintf("%d", i+1)
}
splits = splits[:7]
}
if splits[6] == strconv.Itoa(TX_PER_BLOCK_LIMIT) {
i, err := strconv.Atoi(splits[5])
if err != nil {
utils.LogError(err, "error converting bigtable transaction index timestamp", 0, map[string]interface{}{"index": splits[5]})
} else {
splits[5] = fmt.Sprintf("%d", i+1)
return strings.Join(splits[:6], ":") + ":"
}
}
return key
}

func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*types.Eth1TransactionIndexed, []string, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: This is just a change from a merge but the method name changed here but not in the error message here

logger.WithError(err).WithField("prefix", prefix).WithField("limit", limit).Errorf("error reading rows in bigtable_eth1 / GetEth1TxForAddress")


tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
Expand Down Expand Up @@ -1974,6 +2106,8 @@ func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*t
return data, nil, nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1TransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -1986,7 +2120,7 @@ func (bigtable *Bigtable) GetEth1TxsForAddress(prefix string, limit int64) ([]*t
return true
})
if err != nil {
logger.WithError(err).WithField("prefix", prefix).WithField("limit", limit).Errorf("error reading rows in bigtable_eth1 / GetEth1TxForAddress")
logger.WithError(err).WithField("prefix", prefix).WithField("limit", limit).Errorf("error reading rows in bigtable_eth1 / GetEth1TxsForAddress")
return nil, nil, err
}

Expand Down Expand Up @@ -2104,6 +2238,10 @@ func (bigtable *Bigtable) GetAddressTransactionsTableData(address []byte, pageTo
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed tx index: %v", err)
}
if tx_idx == TX_PER_BLOCK_LIMIT {
// handle "reversePaddedIndex 0"-issue
tx_idx = 0
}
tx_idx = TX_PER_BLOCK_LIMIT - tx_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed tx index: %d", tx_idx)
Expand Down Expand Up @@ -2151,7 +2289,7 @@ func (bigtable *Bigtable) GetAddressTransactionsTableData(address []byte, pageTo

token := ""
if len(keys) > 0 {
token = keys[len(keys)-1]
token = skipBlockIfLastTxIndex(keys[len(keys)-1])
}

data := &types.DataTableResponse{
Expand Down Expand Up @@ -2386,6 +2524,8 @@ func (bigtable *Bigtable) GetEth1BtxForAddress(prefix string, limit int64) ([]*t
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1BlobTransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2406,7 +2546,7 @@ func (bigtable *Bigtable) GetEth1BtxForAddress(prefix string, limit int64) ([]*t
}
}

return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressBlobTableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -2487,7 +2627,6 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*

keysMap := make(map[string]*types.Eth1InternalTransactionIndexed, limit)
err := bigtable.tableData.ReadRows(ctx, rowRange, func(row gcp_bigtable.Row) bool {

keys = append(keys, strings.TrimPrefix(row[DEFAULT_FAMILY][0].Column, "f:"))
indexes = append(indexes, row.Key())
return true
Expand All @@ -2499,6 +2638,8 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*
return data, nil, nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1InternalTransactionIndexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2525,6 +2666,7 @@ func (bigtable *Bigtable) GetEth1ItxsForAddress(prefix string, limit int64) ([]*
}
}

indexes[len(indexes)-1] = skipBlockIfLastTxIndex(indexes[len(indexes)-1])
return data, indexes, nil
}

Expand Down Expand Up @@ -2564,6 +2706,10 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed tx index: %v", err)
}
if tx_idx == TX_PER_BLOCK_LIMIT {
// handle "reversePaddedIndex 0"-issue
tx_idx = 0
}
tx_idx = TX_PER_BLOCK_LIMIT - tx_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed tx index: %d", tx_idx)
Expand All @@ -2573,6 +2719,10 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken
if err != nil {
return nil, fmt.Errorf("error parsing Eth1InternalTransactionIndexed trace index: %v", err)
}
if trace_idx == ITX_PER_TX_LIMIT {
// handle "reversePaddedIndex 0"-issue
trace_idx = 0
}
trace_idx = ITX_PER_TX_LIMIT - trace_idx
if tx_idx < 0 {
return nil, fmt.Errorf("invalid Eth1InternalTransactionIndexed trace index: %d", trace_idx)
Expand Down Expand Up @@ -2615,7 +2765,7 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, pageToken

token := ""
if len(keys) > 0 {
token = keys[len(keys)-1]
token = skipBlockIfLastTxIndex(keys[len(keys)-1])
}

data := &types.DataTableResponse{
Expand Down Expand Up @@ -2841,6 +2991,8 @@ func (bigtable *Bigtable) GetEth1ERC20ForAddress(prefix string, limit int64) ([]
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1ERC20Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2862,7 +3014,7 @@ func (bigtable *Bigtable) GetEth1ERC20ForAddress(prefix string, limit int64) ([]
}
}

return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc20TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -2968,6 +3120,8 @@ func (bigtable *Bigtable) GetEth1ERC721ForAddress(prefix string, limit int64) ([
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.Eth1ERC721Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -2988,7 +3142,7 @@ func (bigtable *Bigtable) GetEth1ERC721ForAddress(prefix string, limit int64) ([
data = append(data, d)
}
}
return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc721TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -3078,6 +3232,8 @@ func (bigtable *Bigtable) GetEth1ERC1155ForAddress(prefix string, limit int64) (
return data, "", nil
}

indexes, keys = bigtable.rearrangeReversePaddedIndexZero(ctx, indexes, keys)

err = bigtable.tableData.ReadRows(ctx, gcp_bigtable.RowList(keys), func(row gcp_bigtable.Row) bool {
b := &types.ETh1ERC1155Indexed{}
err := proto.Unmarshal(row[DEFAULT_FAMILY][0].Value, b)
Expand All @@ -3098,7 +3254,7 @@ func (bigtable *Bigtable) GetEth1ERC1155ForAddress(prefix string, limit int64) (
data = append(data, d)
}
}
return data, indexes[len(indexes)-1], nil
return data, skipBlockIfLastTxIndex(indexes[len(indexes)-1]), nil
}

func (bigtable *Bigtable) GetAddressErc1155TableData(address []byte, pageToken string) (*types.DataTableResponse, error) {
Expand Down Expand Up @@ -3789,7 +3945,6 @@ func (bigtable *Bigtable) GetAddressContractInteractionsAtITransactions(itransac
}

// convenience function to get contract interaction status per parity trace
// 2nd parameter specifies [tx_idx, trace_idx] for each internal tx
func (bigtable *Bigtable) GetAddressContractInteractionsAtParityTraces(traces []*rpc.ParityTraceResult) ([][2]types.ContractInteractionType, error) {
requests := make([]contractInteractionAtRequest, 0, len(traces)*2)
for i, itx := range traces {
Expand Down
5 changes: 3 additions & 2 deletions handlers/eth1Transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func getTransactionDataStartingWithPageToken(pageToken string) *types.DataTableR
}
}
}
if pageTokenId == 0 {
if pageTokenId == 0 && pageToken != "0" {
pageTokenId = services.LatestEth1BlockNumber()
}

Expand Down Expand Up @@ -87,7 +87,8 @@ func getTransactionDataStartingWithPageToken(pageToken string) *types.DataTableR
}

var wg errgroup.Group
for i, v := range t {
for i := len(t) - 1; i >= 0; i-- {
v := t[i]
wg.Go(func() error {
method := "Transfer"
{
Expand Down
2 changes: 1 addition & 1 deletion templates/execution/address.html
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@
content.innerHTML = col
el.appendChild(content)
infLoading.insertAdjacentElement("beforebegin", el)
drawCallback()
}
}
drawCallback()
} else if (data.pagingToken === "") {
infLoading.remove()
} else if (data && data.data && data.data.length == 0) {
Expand Down
2 changes: 1 addition & 1 deletion templates/execution/token.html
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@
el.classList.add('text-truncate')
el.innerHTML = col
infLoading.insertAdjacentElement("beforebegin", el)
drawCallback()
}
}
drawCallback()
} else if (data && data.data) {
if (data.data.length < 25) {
infLoading.innerText = ''
Expand Down
Loading