Skip to content

Commit

Permalink
Merge branch 'master' into no-gzip
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Jan 30, 2024
2 parents 5d54e21 + a0036ca commit 4b45750
Show file tree
Hide file tree
Showing 49 changed files with 4,417 additions and 266 deletions.
15 changes: 13 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ FROM python:3.11-slim-bullseye AS base


FROM base AS builder
RUN apt update
RUN apt install -y gcc
RUN pip install pdm
WORKDIR /project
RUN pdm venv create --with venv
Expand Down Expand Up @@ -51,8 +53,17 @@ FROM base as p2p-worker
COPY --from=p2p-worker-builder /project/.venv /app/env/
COPY --from=p2p-worker-builder /project/sqa /app/sqa/
VOLUME /app/data
ENV DATA_DIR=/app/data/worker
ENV DATA_DIR=/app/data
ENV ENABLE_ALLOCATIONS=1
ENV PING_INTERVAL_SEC=20
RUN echo "#!/bin/bash \n exec /app/env/bin/python -m sqa.worker.p2p --data-dir \${DATA_DIR} --proxy \${PROXY_ADDR} --scheduler-id \${SCHEDULER_ID}" > ./entrypoint.sh
ENV LOGS_SEND_INTERVAL_SEC=600
ENV PROMETHEUS_PORT=9090
RUN echo "#!/bin/bash \n exec /app/env/bin/python -m sqa.worker.p2p \
--data-dir \${DATA_DIR} \
--proxy \${PROXY_ADDR} \
--prometheus-port \${PROMETHEUS_PORT} \
--scheduler-id \${SCHEDULER_ID} \
--rpc-url \${RPC_URL} \
--logs-collector-id \${LOGS_COLLECTOR_ID}" > ./entrypoint.sh
RUN chmod +x ./entrypoint.sh
ENTRYPOINT ["./entrypoint.sh"]
480 changes: 468 additions & 12 deletions pdm.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ requires-python = ">=3.11"
dependencies = [
"duckdb == 0.9.1",
"httpx == 0.*",
"prometheus-client==0.17.*",
"pyarrow == 13.*",
"s3fs == 2023.10.0",
"s3fs >= 2023.12.0",
"sentry-sdk == 1.28.*",
"uvloop>=0.19.0",
]

[project.optional-dependencies]
writer = [
"pycryptodome>=3.18.0",
"prometheus-client==0.17.*",
"trie==2.1.1",
"rlp>=3.0.0",
"setuptools>=68.2.2",
Expand All @@ -31,10 +32,13 @@ http-worker = [
]

p2p-worker = [
"base58>=2.1.1",
"grpcio == 1.*",
"marshmallow == 3.*",
"protobuf == 4.*",
"psutil>=5.9.5",
"setuptools>=68.2.2",
"web3>=6.11.3",
]

[build-system]
Expand Down
4 changes: 3 additions & 1 deletion query.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import json
import sys

Expand All @@ -13,8 +14,9 @@ def main():
q: ArchiveQuery = json.load(f)

result = execute_query(dataset_dir, (0, sys.maxsize), q)
data = gzip.decompress(result.compressed_data)

json.dump(json.loads(result.result), sys.stdout, indent=2)
json.dump(json.loads(data), sys.stdout, indent=2)


if __name__ == '__main__':
Expand Down
17 changes: 13 additions & 4 deletions sqa/eth/ingest/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(
use_trace_api: bool = False,
use_debug_api_for_statediffs: bool = False,
validate_tx_root: bool = False,
validate_tx_type: bool = False,
validate_logs_bloom: bool = False,
):
self._rpc = rpc
self._finality_confirmation = finality_confirmation
Expand All @@ -36,6 +38,8 @@ def __init__(
self._use_trace_api = use_trace_api
self._use_debug_api_for_statediffs = use_debug_api_for_statediffs
self._validate_tx_root = validate_tx_root
self._validate_tx_type = validate_tx_type
self._validate_logs_bloom = validate_logs_bloom
self._height = from_block - 1
self._genesis = genesis_block
self._end = to_block
Expand All @@ -52,7 +56,6 @@ def __init__(
self._is_polygon_testnet = False
self._is_optimism = False
self._is_astar = False
self._is_zksync = False
self._is_skale_nebula = False

async def loop(self) -> AsyncIterator[list[Block]]:
Expand Down Expand Up @@ -85,7 +88,6 @@ async def _detect_special_chains(self) -> None:
self._is_polygon_testnet = genesis_hash == '0x7b66506a9ebdbf30d32b43c5f15a3b1216269a1ec3a75aa3182b86176a2b1ca7'
self._is_optimism = genesis_hash == '0x7ca38a1916c42007829c55e69d3e9a73265554b586a499015373241b8a3fa48b'
self._is_astar = genesis_hash == '0x0d28a86ac0fe37871285bd1dac45d83a4b3833e01a37571a1ac4f0a44c64cdc2'
self._is_zksync = genesis_hash == '0xe8e77626586f73b955364c7b4bbf0bb7f7685ebd40e852b164633a4acbd3244c'
self._is_skale_nebula = genesis_hash == '0x28e07f346c28a837dfd2897ce70c8500de6e67ddbc33cb5b9cd720fff4aeb598'

def _schedule_strides(self):
Expand Down Expand Up @@ -240,7 +242,10 @@ async def _fetch_logs(self, blocks: list[Block]) -> None:

for block in blocks:
block_logs = logs_by_hash.get(block['hash'], [])
assert block['logsBloom'] == logs_bloom(block_logs)

if self._validate_logs_bloom:
assert block['logsBloom'] == logs_bloom(block_logs)

block['logs_'] = block_logs

async def _fetch_receipts(self, blocks: list[Block]) -> None:
Expand Down Expand Up @@ -269,6 +274,8 @@ async def _fetch_receipts(self, blocks: list[Block]) -> None:
continue
if self._is_skale_nebula:
r['type'] = '0x0'
elif self._validate_tx_type:
assert r.get('type') is not None

try:
tx = tx_by_index[r['blockHash']][r['transactionIndex']]
Expand Down Expand Up @@ -304,8 +311,10 @@ async def _fetch_receipts(self, blocks: list[Block]) -> None:
_fix_astar_995596(block)

block_logs = logs_by_hash.get(block['hash'], [])
if not self._is_zksync:

if self._validate_logs_bloom:
assert block['logsBloom'] == logs_bloom(block_logs)

for tx in block['transactions']:
if self._is_arbitrum_one and tx['hash'] == '0x1d76d3d13e9f8cc713d484b0de58edd279c4c62e46e963899aec28eb648b5800' and block['number'] == hex(4527955):
continue
Expand Down
14 changes: 14 additions & 0 deletions sqa/eth/ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,18 @@ def parse_cli_arguments():
help='validate block transactions against transactions root'
)

program.add_argument(
'--validate-logs-bloom',
action='store_true',
help='validate block logs against logs bloom'
)

program.add_argument(
'--validate-tx-type',
action='store_true',
help='check if transaction type is not empty',
)

program.add_argument(
'--write-chunk-size',
metavar='MB',
Expand Down Expand Up @@ -288,6 +300,8 @@ async def rpc_ingest(args, rpc: RpcClient, first_block: int, last_block: int | N
use_trace_api=args.use_trace_api,
use_debug_api_for_statediffs=args.use_debug_api_for_statediffs,
validate_tx_root=args.validate_tx_root,
validate_tx_type=args.validate_tx_type,
validate_logs_bloom=args.validate_logs_bloom,
)

try:
Expand Down
5 changes: 3 additions & 2 deletions sqa/eth/ingest/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Block(TypedDict):
baseFeePerGas: NotRequired[Qty]
uncles: list[Hash32]
transactions: list['Transaction']
l1BlockNumber: NotRequired[Qty]
logs_: NotRequired[list['Log']]
unknownTraceReplays_: NotRequired[list]

Expand All @@ -47,7 +48,7 @@ class Block(TypedDict):
'to': Optional[Address20],
'input': Bytes,
'value': Qty,
'type': Qty,
'type': NotRequired[Qty],
'gas': Qty,
'gasPrice': Qty,
'maxFeePerGas': NotRequired[Qty],
Expand Down Expand Up @@ -86,7 +87,7 @@ class Receipt(TypedDict):
gasUsed: Qty
contractAddress: NotRequired[Address20]
logs: list[Log]
type: Qty
type: NotRequired[Qty]
status: NotRequired[Qty]


Expand Down
4 changes: 3 additions & 1 deletion sqa/eth/ingest/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self):
self.gas_used = Column(qty())
self.timestamp = Column(pyarrow.timestamp('s'))
self.base_fee_per_gas = Column(qty())
self.l1_block_number = Column(pyarrow.int32())

def append(self, block: Block) -> None:
self.number.append(qty2int(block['number']))
Expand All @@ -55,6 +56,7 @@ def append(self, block: Block) -> None:
self.gas_limit.append(block['gasLimit'])
self.timestamp.append(qty2int(block['timestamp']))
self.base_fee_per_gas.append(block.get('baseFeePerGas'))
self.l1_block_number.append(block.get('l1BlockNumber') and qty2int(block['l1BlockNumber']))


class TxTableBuilder(TableBuilder):
Expand Down Expand Up @@ -114,7 +116,7 @@ def append(self, tx: Transaction):
self.gas_used.append(receipt['gasUsed'])
self.cumulative_gas_used.append(receipt['cumulativeGasUsed'])
self.effective_gas_price.append(receipt.get('effectiveGasPrice'))
self.type.append(qty2int(receipt['type']))
self.type.append(receipt.get('type') and qty2int(receipt['type']))
self.status.append(qty2int(receipt['status']))
self.contract_address.append(receipt.get('contractAddress'))
else:
Expand Down
74 changes: 74 additions & 0 deletions sqa/eth/ingest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,80 @@ def transactions_root(transactions: list[Transaction]) -> str:
qty2int(tx['r']),
qty2int(tx['s'])
])
elif tx['type'] == '0x64':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L338
trie[path] = b'\x64' + rlp.encode([
qty2int(tx['chainId']),
decode_hex(tx['requestId']),
decode_hex(tx['from']),
decode_hex(tx['to']),
qty2int(tx['value'])
])
elif tx['type'] == '0x65':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L43
pass
# trie[path] = b'\x65' + rlp.encode([
# qty2int(tx['chainId']),
# decode_hex(tx['from']),
# qty2int(tx['nonce']),
# qty2int(tx['gasPrice']),
# qty2int(tx['gas']),
# decode_hex(tx['to']) if tx['to'] else b'',
# qty2int(tx['value']),
# decode_hex(tx['input'])
# ])
elif tx['type'] == '0x66':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L104
pass
# trie[path] = b'\x66' + rlp.encode([
# qty2int(tx['chainId']),
# decode_hex(tx['requestId']),
# decode_hex(tx['from']),
# qty2int(tx['gasPrice']),
# qty2int(tx['gas']),
# decode_hex(tx['to']) if tx['to'] else b'',
# qty2int(tx['value']),
# decode_hex(tx['input'])
# ])
elif tx['type'] == '0x68':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L161
trie[path] = b'\x68' + rlp.encode([
qty2int(tx['chainId']),
qty2int(tx['nonce']),
decode_hex(tx['from']),
qty2int(tx['gasPrice']),
qty2int(tx['gas']),
decode_hex(tx['to']) if tx['to'] else b'',
qty2int(tx['value']),
decode_hex(tx['input']),
decode_hex(tx['ticketId']),
decode_hex(tx['refundTo']),
qty2int(tx['maxRefund']),
qty2int(tx['submissionFeeRefund']),
])
elif tx['type'] == '0x69':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L232
trie[path] = b'\x69' + rlp.encode([
qty2int(tx['chainId']),
decode_hex(tx['requestId']),
decode_hex(tx['from']),
qty2int(tx['l1BaseFee']),
qty2int(tx['depositValue']),
qty2int(tx['gasPrice']),
qty2int(tx['gas']),
decode_hex(tx['retryTo']) if 'retryTo' in tx else b'',
qty2int(tx['retryValue']),
decode_hex(tx['beneficiary']),
qty2int(tx['maxSubmissionFee']),
decode_hex(tx['refundTo']),
decode_hex(tx['retryData']),
])
elif tx['type'] == '0x6a':
# https://github.com/OffchainLabs/go-ethereum/blob/7503143fd13f73e46a966ea2c42a058af96f7fcf/core/types/arb_types.go#L387
trie[path] = b'\x6a' + rlp.encode([
qty2int(tx['chainId']),
decode_hex(tx['input']),
])
elif tx['type'] == '0x7e':
# https://github.com/ethereum-optimism/optimism/blob/9ff3ebb3983be52c3ca189423ae7b4aec94e0fde/specs/deposits.md#the-deposited-transaction-type
trie[path] = b'\x7e' + rlp.encode([
Expand Down
Loading

0 comments on commit 4b45750

Please sign in to comment.