diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..4b39ef0c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,10 @@ +target +deployment +configurations +eth_node +fuel_node +helm +compose.yml +.git +*.rs.bk +.dockerignore diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..ccbfa9d7 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @digorithm @hal3e @MujkicA @segfault-magnet @Salka1988 @Br1ght0ne diff --git a/.github/scripts/verify_chart_version.sh b/.github/scripts/verify_chart_version.sh index dcc16b68..c229527c 100755 --- a/.github/scripts/verify_chart_version.sh +++ b/.github/scripts/verify_chart_version.sh @@ -2,12 +2,12 @@ set -e err() { - echo -e "\e[31m\e[1merror:\e[0m $@" 1>&2; + echo -e "\e[31m\e[1merror:\e[0m $@" 1>&2 } status() { - WIDTH=12 - printf "\e[32m\e[1m%${WIDTH}s\e[0m %s\n" "$1" "$2" + WIDTH=12 + printf "\e[32m\e[1m%${WIDTH}s\e[0m %s\n" "$1" "$2" } # install dasel curl -sSLf "https://github.com/TomWright/dasel/releases/download/v1.24.3/dasel_linux_amd64" -L -o dasel @@ -15,10 +15,10 @@ chmod +x dasel mv ./dasel /usr/local/bin/dasel # check appVersion with crate package metadata HELM_APP_VERSION=$(cat deployment/charts/Chart.yaml | dasel -r yaml 'appVersion') -CRATE_VERSION=$(cat Cargo.toml | dasel -r toml 'package.version') +CRATE_VERSION=$(cat Cargo.toml | dasel -r toml 'workspace.package.version') if [ "$HELM_APP_VERSION" != "$CRATE_VERSION" ]; then - err "crate version $CRATE_VERSION, doesn't match helm app version $HELM_APP_VERSION" - exit 1 + err "crate version $CRATE_VERSION, doesn't match helm app version $HELM_APP_VERSION" + exit 1 else - status "crate version matches helm chart app version $HELM_APP_VERSION" + status "crate version matches helm chart app version $HELM_APP_VERSION" fi diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 99b72565..df8bf36b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,7 @@ concurrency: env: DASEL_VERSION: https://github.com/TomWright/dasel/releases/download/v1.24.3/dasel_linux_amd64 - RUST_VERSION: 1.77.0 + RUST_VERSION: 1.77 FUEL_CORE_VERSION: 0.26.0 IMAGE_NAME: ${{ github.repository }} @@ -27,7 +27,7 @@ jobs: - run: | curl -sSLf "$DASEL_VERSION" -L -o dasel && chmod +x dasel mv ./dasel /usr/local/bin/dasel - MIN_VERSION=$(cat Cargo.toml | dasel -r toml 'package.rust-version') + MIN_VERSION=$(cat Cargo.toml | dasel -r toml 'workspace.package.rust-version') RUST_VERSION="${{ env.RUST_VERSION }}" echo "Comparing minimum supported toolchain ($MIN_VERSION) with ci toolchain (RUST_VERSION)" test "$MIN_VERSION" == "$RUST_VERSION" diff --git a/.sqlx/query-6389313221aa32ab73e62573b11d26eb631abbd9e7e114d6ef34207dea8152d3.json b/.sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json similarity index 77% rename from .sqlx/query-6389313221aa32ab73e62573b11d26eb631abbd9e7e114d6ef34207dea8152d3.json rename to .sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json index 9a105428..8b32735b 100644 --- a/.sqlx/query-6389313221aa32ab73e62573b11d26eb631abbd9e7e114d6ef34207dea8152d3.json +++ b/.sqlx/query-2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE eth_fuel_block_submission SET completed = true WHERE fuel_block_hash = $1 RETURNING *", + "query": "UPDATE l1_fuel_block_submission SET completed = true WHERE fuel_block_hash = $1 RETURNING *", "describe": { "columns": [ { @@ -36,5 +36,5 @@ false ] }, - "hash": "6389313221aa32ab73e62573b11d26eb631abbd9e7e114d6ef34207dea8152d3" + "hash": "2207b448e46117ad64084feefc49e3f45511e91468b32f5ef0024f92730588a6" } diff --git a/.sqlx/query-9774b1f03a332c68b0c646ba48723205201c7a3faf38bb0362959e8ae1ebf989.json b/.sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json similarity index 77% rename from .sqlx/query-9774b1f03a332c68b0c646ba48723205201c7a3faf38bb0362959e8ae1ebf989.json rename to .sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json index 3d2fe072..5c81b625 100644 --- a/.sqlx/query-9774b1f03a332c68b0c646ba48723205201c7a3faf38bb0362959e8ae1ebf989.json +++ b/.sqlx/query-6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT * FROM eth_fuel_block_submission ORDER BY fuel_block_height DESC LIMIT 1", + "query": "SELECT * FROM l1_fuel_block_submission ORDER BY fuel_block_height DESC LIMIT 1", "describe": { "columns": [ { @@ -34,5 +34,5 @@ false ] }, - "hash": "9774b1f03a332c68b0c646ba48723205201c7a3faf38bb0362959e8ae1ebf989" + "hash": "6f7e6ba876d49bef1bf870514ed38be642af65ed848f53a191ef58c2e02f227c" } diff --git a/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json b/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json new file mode 100644 index 00000000..27479ad2 --- /dev/null +++ b/.sqlx/query-a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO l1_fuel_block_submission (fuel_block_hash, fuel_block_height, completed, submittal_height) VALUES ($1, $2, $3, $4)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea", + "Int8", + "Bool", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "a9bb121820d80a7aeadd87ce55ad241dd9a24d99d28ff14db70a93200ff869c6" +} diff --git a/.sqlx/query-e6619efca0fbd3ae785f2cd13aac5956c3187e2e72e4b873eac1621ff1b69e3e.json b/.sqlx/query-e6619efca0fbd3ae785f2cd13aac5956c3187e2e72e4b873eac1621ff1b69e3e.json deleted file mode 100644 index 6e2691bf..00000000 --- a/.sqlx/query-e6619efca0fbd3ae785f2cd13aac5956c3187e2e72e4b873eac1621ff1b69e3e.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO eth_fuel_block_submission (fuel_block_hash, fuel_block_height, completed, submittal_height) VALUES ($1, $2, $3, $4)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bytea", - "Int8", - "Bool", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "e6619efca0fbd3ae785f2cd13aac5956c3187e2e72e4b873eac1621ff1b69e3e" -} diff --git a/Cargo.lock b/Cargo.lock index 2b0e3702..b262574e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -375,9 +375,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "backtrace" @@ -1183,6 +1183,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "e2e" +version = "0.4.0" +dependencies = [ + "anyhow", + "eth", + "fuel", + "ports", + "tokio", +] + [[package]] name = "ecdsa" version = "0.16.9" @@ -1306,6 +1317,23 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "eth" +version = "0.4.0" +dependencies = [ + "async-trait", + "ethers", + "futures", + "metrics", + "mockall", + "ports", + "serde_json", + "thiserror", + "tokio", + "tracing", + "url", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -1666,6 +1694,18 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" +[[package]] +name = "fuel" +version = "0.4.0" +dependencies = [ + "async-trait", + "fuel-core-client", + "metrics", + "ports", + "tokio", + "url", +] + [[package]] name = "fuel-asm" version = "0.49.0" @@ -1684,21 +1724,16 @@ version = "0.4.0" dependencies = [ "actix-web", "anyhow", - "async-trait", "clap", "config", - "ethers", - "fuel-core-client", - "futures", - "hex", - "impl-tools", - "mockall", - "prometheus", - "rand", + "eth", + "fuel", + "metrics", + "ports", "serde", "serde_json", - "sqlx", - "testcontainers", + "services", + "storage", "thiserror", "tokio", "tokio-util", @@ -2732,6 +2767,13 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "metrics" +version = "0.4.0" +dependencies = [ + "prometheus", +] + [[package]] name = "mime" version = "0.3.17" @@ -2802,16 +2844,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num-bigint" version = "0.4.4" @@ -2857,9 +2889,9 @@ dependencies = [ [[package]] name = "num-iter" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" dependencies = [ "autocfg", "num-integer", @@ -2868,9 +2900,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", "libm", @@ -2959,12 +2991,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "p256" version = "0.13.2" @@ -3181,6 +3207,20 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" +[[package]] +name = "ports" +version = "0.4.0" +dependencies = [ + "async-trait", + "ethers-core", + "futures", + "impl-tools", + "mockall", + "rand", + "serde", + "thiserror", +] + [[package]] name = "postcard" version = "1.0.8" @@ -3326,9 +3366,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" dependencies = [ "cfg-if", "fnv", @@ -3877,11 +3917,11 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.5.0", "core-foundation", "core-foundation-sys", "libc", @@ -3890,9 +3930,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41f3cc463c0ef97e11c3461a9d3787412d30e8e7eb907c79180c4a57bf7c04ef" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" dependencies = [ "core-foundation-sys", "libc", @@ -4012,6 +4052,24 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "services" +version = "0.4.0" +dependencies = [ + "async-trait", + "futures", + "metrics", + "mockall", + "ports", + "rand", + "serde", + "storage", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4347,6 +4405,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "storage" +version = "0.4.0" +dependencies = [ + "async-trait", + "hex", + "ports", + "rand", + "serde", + "sqlx", + "storage", + "testcontainers", + "thiserror", + "tokio", +] + [[package]] name = "stringprep" version = "0.1.4" @@ -4720,16 +4794,15 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -4796,7 +4869,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.7", + "winnow 0.6.8", ] [[package]] @@ -4870,17 +4943,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-serde" version = "0.1.3" @@ -4897,14 +4959,11 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "nu-ansi-term", "serde", "serde_json", "sharded-slab", - "smallvec", "thread_local", "tracing-core", - "tracing-log", "tracing-serde", ] @@ -5394,9 +5453,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b9415ee827af173ebb3f15f9083df5a122eb93572ec28741fb153356ea2578" +checksum = "c3c52e9c97a68071b23e836c9380edae937f17b9c4667bd021973efc689f618d" dependencies = [ "memchr", ] @@ -5441,18 +5500,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.32" +version = "0.7.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +checksum = "087eca3c1eaf8c47b94d02790dd086cd594b912d2043d4de4bfdd466b3befb7c" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.32" +version = "0.7.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +checksum = "6f4b6c273f496d8fd4eaf18853e6b448760225dc030ff2c485a786859aea6393" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 0cc9a967..68477a11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,55 +1,55 @@ -[package] +[workspace] +resolver = "2" +members = [ + "committer", + "e2e", + "packages/eth", + "packages/fuel", + "packages/metrics", + "packages/ports", + "packages/services", + "packages/storage", +] + +[workspace.package] +version = "0.4.0" authors = ["Fuel Labs "] edition = "2021" homepage = "https://fuel.network/" license = "Apache-2.0" repository = "https://github.com/FuelLabs/fuel-block-committer" -rust-version = "1.77.0" -version = "0.4.0" -name = "fuel-block-committer" +rust-version = "1.77" +publish = false -[[test]] -harness = true -name = "e2e" -path = "tests/harness.rs" +[workspace.dependencies] +eth = { path = "./packages/eth", default-features = false } +fuel = { path = "./packages/fuel", default-features = false } +metrics = { path = "./packages/metrics", default-features = false } +ports = { path = "./packages/ports", default-features = false } +storage = { path = "./packages/storage", default-features = false } +services = { path = "./packages/services", default-features = false } -[dependencies] -actix-web = { version = "4", default-features = false, features = ["macros"] } +actix-web = { version = "4", default-features = false } +anyhow = { version = "1.0", default-features = false } async-trait = { version = "0.1", default-features = false } -clap = { version = "4.5", features = ["derive"] } -config = { version = "0.14", default-features = false, features = [ - "toml", - "async", -] } -ethers = { version = "2.0", features = ["ws"], default-features = false } +clap = { version = "4.5" } +config = { version = "0.14", default-features = false } +ethers = { version = "2.0", default-features = false } +ethers-core = { version = "2.0", default-features = false } fuel-core-client = { version = "0.26", default-features = false } futures = { version = "0.3", default-features = false } -hex = "0.4.3" -impl-tools = "0.10.0" +hex = { version = "0.4", default-features = false } +impl-tools = { version = "0.10.0", default-features = false } +mockall = { version = "0.12", default-features = false } prometheus = { version = "0.13", default-features = false } -rand = { version = "0.8", default-features = false, features = ["std"] } -serde = { version = "1.0", features = ["derive"], default-features = false } +rand = { version = "0.8", default-features = false } +serde = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false } -sqlx = { version = "0.7.4", features = [ - "postgres", - "runtime-tokio", - "migrate", - "macros", -], default-features = false } +sqlx = { version = "0.7.4", default-features = false } +testcontainers = { version = "0.16", default-features = false } thiserror = { version = "1.0", default-features = false } -tokio = { version = "1.28", features = [ - "rt-multi-thread", - "rt", - "macros", -], default-features = false } +tokio = { version = "1.37", default-features = false } tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } -tracing-subscriber = { version = "0.3", features = ["json"] } +tracing-subscriber = { version = "0.3", default-features = false } url = { version = "2.3", default-features = false } - -[dev-dependencies] -anyhow = { version = "1.0", default-features = false } -mockall = { version = "0.12", default-features = false } -testcontainers = { version = "0.16.6", default-features = false } -#TODO: Once fuels-rs catches up tests will be reenabled -#fuels-test-helpers = "0.57" diff --git a/committer/Cargo.toml b/committer/Cargo.toml new file mode 100644 index 00000000..a3a9befa --- /dev/null +++ b/committer/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "fuel-block-committer" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +actix-web = { workspace = true, features = ["macros"] } +clap = { workspace = true, features = ["derive"] } +config = { workspace = true, features = ["toml", "async"] } +eth = { workspace = true } +fuel = { workspace = true } +metrics = { workspace = true } +ports = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +services = { workspace = true } +storage = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tokio-util = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt", "json"] } +url = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +ports = { workspace = true, features = ["test-helpers"] } +storage = { workspace = true, features = ["test-helpers"] } diff --git a/src/api.rs b/committer/src/api.rs similarity index 89% rename from src/api.rs rename to committer/src/api.rs index ee2ad0e1..9deb3c47 100644 --- a/src/api.rs +++ b/committer/src/api.rs @@ -1,16 +1,19 @@ use std::sync::Arc; +use ::metrics::{ + prometheus::{self, Encoder, Registry, TextEncoder}, + HealthChecker, +}; use actix_web::{ error::InternalError, get, http::StatusCode, web, App, HttpResponse, HttpServer, Responder, }; -use prometheus::{Encoder, Registry, TextEncoder}; +use ports::storage::Storage; +use services::{HealthReporter, StatusReporter}; use crate::{ - adapters::storage::Storage, config::Config, errors::{Error, Result}, - services::{HealthReporter, StatusReporter}, - telemetry::HealthChecker, + Database, }; pub async fn launch_api_server( @@ -53,7 +56,7 @@ async fn health(data: web::Data>) -> impl Responder { } #[get("/status")] -async fn status(data: web::Data>) -> impl Responder { +async fn status(data: web::Data>>) -> impl Responder { let report = data.current_status().await?; Result::Ok(web::Json(report)) diff --git a/src/config.rs b/committer/src/config.rs similarity index 97% rename from src/config.rs rename to committer/src/config.rs index d37a11a6..3d582309 100644 --- a/src/config.rs +++ b/committer/src/config.rs @@ -1,12 +1,11 @@ use std::{net::Ipv4Addr, num::NonZeroU32, path::PathBuf, str::FromStr, time::Duration}; use clap::{command, Parser}; -use ethers::types::{Address, Chain}; +use eth::{Address, Chain}; use serde::Deserialize; +use storage::DbConfig; use url::Url; -use crate::adapters::storage::postgresql::DbConfig; - #[derive(Debug, Clone, Deserialize)] pub struct Config { pub eth: EthConfig, diff --git a/committer/src/errors.rs b/committer/src/errors.rs new file mode 100644 index 00000000..da6716b4 --- /dev/null +++ b/committer/src/errors.rs @@ -0,0 +1,73 @@ +use actix_web::ResponseError; +use tokio::task::JoinError; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("{0}")] + Other(String), + #[error("Network Error: {0}")] + Network(String), + #[error("Storage error: {0}")] + Storage(String), +} + +impl From for Error { + fn from(value: serde_json::Error) -> Self { + Self::Other(value.to_string()) + } +} + +impl From for Error { + fn from(error: std::io::Error) -> Self { + Self::Other(error.to_string()) + } +} + +impl From for Error { + fn from(error: JoinError) -> Self { + Self::Other(error.to_string()) + } +} + +impl From for Error { + fn from(error: ports::storage::Error) -> Self { + Self::Storage(error.to_string()) + } +} + +impl From for Error { + fn from(value: ports::l1::Error) -> Self { + match value { + ports::l1::Error::Network(e) => Self::Network(e), + _ => Self::Other(value.to_string()), + } + } +} + +impl From for Error { + fn from(value: ports::fuel::Error) -> Self { + match value { + ports::fuel::Error::Network(e) => Self::Network(e), + } + } +} + +impl From for Error { + fn from(error: services::Error) -> Self { + match error { + services::Error::Network(e) => Self::Network(e), + services::Error::Storage(e) => Self::Storage(e), + services::Error::Other(e) => Self::Other(e), + } + } +} + +impl From for Error { + fn from(error: config::ConfigError) -> Self { + Self::Other(error.to_string()) + } +} + +impl ResponseError for Error {} + +pub type Result = std::result::Result; diff --git a/src/main.rs b/committer/src/main.rs similarity index 76% rename from src/main.rs rename to committer/src/main.rs index a7fbe6ce..a2ae945d 100644 --- a/src/main.rs +++ b/committer/src/main.rs @@ -1,22 +1,25 @@ #![deny(unused_crate_dependencies)] -mod adapters; mod api; mod config; mod errors; -mod services; mod setup; -mod telemetry; use api::launch_api_server; use config::InternalConfig; use errors::Result; -use prometheus::Registry; -use setup::helpers::{ - create_eth_adapter, setup_logger, setup_storage, shut_down, spawn_block_watcher, - spawn_eth_committer_and_listener, spawn_wallet_balance_tracker, +use metrics::prometheus::Registry; +use setup::{ + create_l1_adapter, setup_logger, setup_storage, spawn_block_watcher, + spawn_l1_committer_and_listener, spawn_wallet_balance_tracker, }; use tokio_util::sync::CancellationToken; +use crate::setup::shut_down; + +pub type L1 = eth::WebsocketClient; +pub type Database = storage::Postgres; +pub type FuelApi = fuel::HttpClient; + #[tokio::main] async fn main() -> Result<()> { setup_logger(); @@ -24,7 +27,6 @@ async fn main() -> Result<()> { let config = config::parse()?; let storage = setup_storage(&config).await?; - storage.migrate().await?; let internal_config = InternalConfig::default(); let cancel_token = CancellationToken::new(); @@ -40,17 +42,16 @@ async fn main() -> Result<()> { ); let (ethereum_rpc, eth_health_check) = - create_eth_adapter(&config, &internal_config, &metrics_registry).await?; + create_l1_adapter(&config, &internal_config, &metrics_registry).await?; let wallet_balance_tracker_handle = spawn_wallet_balance_tracker( - &config, &internal_config, &metrics_registry, ethereum_rpc.clone(), cancel_token.clone(), ); - let (committer_handle, listener_handle) = spawn_eth_committer_and_listener( + let (committer_handle, listener_handle) = spawn_l1_committer_and_listener( &internal_config, rx_fuel_block, ethereum_rpc, diff --git a/src/setup/helpers.rs b/committer/src/setup.rs similarity index 72% rename from src/setup/helpers.rs rename to committer/src/setup.rs index 38aeae74..a7d40e46 100644 --- a/src/setup/helpers.rs +++ b/committer/src/setup.rs @@ -1,27 +1,22 @@ use std::time::Duration; -use prometheus::Registry; +use metrics::{prometheus::Registry, HealthChecker, RegistersMetrics}; +use ports::{storage::Storage, types::FuelBlock}; +use services::{BlockCommitter, BlockWatcher, CommitListener, Runner, WalletBalanceTracker}; use tokio::{sync::mpsc::Receiver, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; use crate::{ - adapters::{ - ethereum_adapter::{EthereumWs, MonitoredEthAdapter}, - fuel_adapter::{FuelBlock, FuelClient}, - runner::Runner, - storage::{postgresql::Postgres, Storage}, - }, config::{Config, InternalConfig}, errors::Result, - services::{BlockCommitter, BlockWatcher, CommitListener, WalletBalanceTracker}, - telemetry::{HealthChecker, RegistersMetrics}, + Database, FuelApi, L1, }; pub fn spawn_block_watcher( config: &Config, internal_config: &InternalConfig, - storage: Postgres, + storage: Database, registry: &Registry, cancel_token: CancellationToken, ) -> ( @@ -45,13 +40,12 @@ pub fn spawn_block_watcher( } pub fn spawn_wallet_balance_tracker( - config: &Config, internal_config: &InternalConfig, registry: &Registry, - ethereum_rpc: MonitoredEthAdapter, + l1: L1, cancel_token: CancellationToken, ) -> tokio::task::JoinHandle<()> { - let wallet_balance_tracker = WalletBalanceTracker::new(ethereum_rpc, &config.eth.wallet_key); + let wallet_balance_tracker = WalletBalanceTracker::new(l1); wallet_balance_tracker.register_metrics(registry); @@ -63,18 +57,17 @@ pub fn spawn_wallet_balance_tracker( ) } -pub fn spawn_eth_committer_and_listener( +pub fn spawn_l1_committer_and_listener( internal_config: &InternalConfig, rx_fuel_block: Receiver, - ethereum_rpc: MonitoredEthAdapter, - storage: Postgres, + l1: L1, + storage: Database, registry: &Registry, cancel_token: CancellationToken, ) -> (tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>) { - let committer_handler = - create_block_committer(rx_fuel_block, ethereum_rpc.clone(), storage.clone()); + let committer_handler = create_block_committer(rx_fuel_block, l1.clone(), storage.clone()); - let commit_listener = CommitListener::new(ethereum_rpc, storage, cancel_token.clone()); + let commit_listener = CommitListener::new(l1, storage, cancel_token.clone()); commit_listener.register_metrics(registry); let listener_handle = schedule_polling( @@ -89,10 +82,10 @@ pub fn spawn_eth_committer_and_listener( fn create_block_committer( rx_fuel_block: Receiver, - ethereum_rpc: MonitoredEthAdapter, + l1: L1, storage: impl Storage + 'static, ) -> tokio::task::JoinHandle<()> { - let mut block_committer = BlockCommitter::new(rx_fuel_block, ethereum_rpc, storage); + let mut block_committer = BlockCommitter::new(rx_fuel_block, l1, storage); tokio::spawn(async move { block_committer .run() @@ -101,27 +94,26 @@ fn create_block_committer( }) } -pub async fn create_eth_adapter( +pub async fn create_l1_adapter( config: &Config, internal_config: &InternalConfig, registry: &Registry, -) -> Result<(MonitoredEthAdapter, HealthChecker)> { - let ethereum_rpc = EthereumWs::connect( +) -> Result<(L1, HealthChecker)> { + let l1 = L1::connect( &config.eth.rpc, config.eth.chain_id, config.eth.state_contract_address, &config.eth.wallet_key, config.eth.commit_interval, + internal_config.eth_errors_before_unhealthy, ) .await?; - let monitored = - MonitoredEthAdapter::new(ethereum_rpc, internal_config.eth_errors_before_unhealthy); - monitored.register_metrics(registry); + l1.register_metrics(registry); - let health_check = monitored.connection_health_checker(); + let health_check = l1.connection_health_checker(); - Ok((monitored, health_check)) + Ok((l1, health_check)) } fn schedule_polling( @@ -151,8 +143,8 @@ fn create_fuel_adapter( config: &Config, internal_config: &InternalConfig, registry: &Registry, -) -> (FuelClient, HealthChecker) { - let fuel_adapter = FuelClient::new( +) -> (FuelApi, HealthChecker) { + let fuel_adapter = FuelApi::new( &config.fuel.graphql_endpoint, internal_config.fuel_errors_before_unhealthy, ); @@ -166,9 +158,9 @@ fn create_fuel_adapter( fn create_block_watcher( config: &Config, registry: &Registry, - fuel_adapter: FuelClient, - storage: Postgres, -) -> (BlockWatcher, Receiver) { + fuel_adapter: FuelApi, + storage: Database, +) -> (BlockWatcher, Receiver) { let (tx_fuel_block, rx_fuel_block) = tokio::sync::mpsc::channel(100); let block_watcher = BlockWatcher::new( config.eth.commit_interval, @@ -184,15 +176,17 @@ fn create_block_watcher( pub fn setup_logger() { tracing_subscriber::fmt() .with_writer(std::io::stderr) - .with_ansi(false) .with_level(true) .with_line_number(true) .json() .init(); } -pub async fn setup_storage(config: &Config) -> Result { - Ok(Postgres::connect(&config.app.db).await?) +pub async fn setup_storage(config: &Config) -> Result { + let postgres = Database::connect(&config.app.db).await?; + postgres.migrate().await?; + + Ok(postgres) } pub async fn shut_down( @@ -201,7 +195,7 @@ pub async fn shut_down( wallet_balance_tracker_handle: JoinHandle<()>, committer_handle: JoinHandle<()>, listener_handle: JoinHandle<()>, - storage: Postgres, + storage: Database, ) -> Result<()> { cancel_token.cancel(); diff --git a/deployment/Dockerfile b/deployment/Dockerfile index 76f5866b..00258dbe 100644 --- a/deployment/Dockerfile +++ b/deployment/Dockerfile @@ -6,16 +6,16 @@ WORKDIR /build/ FROM chef as planner ENV CARGO_NET_GIT_FETCH_WITH_CLI=true COPY . . -RUN cargo chef prepare --recipe-path recipe.json +RUN cargo chef prepare --recipe-path recipe.json --bin fuel-block-committer FROM chef as builder COPY --from=planner /build/recipe.json recipe.json # Build our project dependecies, not our application! -RUN cargo chef cook --release --recipe-path recipe.json +RUN cargo chef cook --release --recipe-path recipe.json --bin fuel-block-committer # Up to this point, if our dependency tree stays the same, # all layers should be cached. COPY . . -RUN cargo build --release +RUN cargo build --release --bin fuel-block-committer # Stage 2: Run FROM ubuntu:22.04 as run diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml new file mode 100644 index 00000000..fff6e70d --- /dev/null +++ b/e2e/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "e2e" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +eth = { workspace = true, features = ["test-helpers"] } +fuel = { workspace = true, features = ["test-helpers"] } +ports = { workspace = true, features = ["fuel", "l1"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs new file mode 100644 index 00000000..e6087d94 --- /dev/null +++ b/e2e/src/lib.rs @@ -0,0 +1,38 @@ +#[cfg(test)] +mod tests { + use std::time::Duration; + + use anyhow::Result; + use eth::{Chain, WebsocketClient}; + use fuel::HttpClient; + use ports::fuel::Api; + + const FUEL_NODE_PORT: u16 = 4000; + + #[tokio::test(flavor = "multi_thread")] + async fn submitted_correct_block_and_was_finalized() -> Result<()> { + let fuel_node_address = format!("http://localhost:{FUEL_NODE_PORT}"); + let provider = HttpClient::new(&fuel_node_address.parse()?, 10); + + let fuel_contract = WebsocketClient::connect( + &"ws://localhost:8089".parse()?, + Chain::AnvilHardhat, + "0xdAad669b06d79Cb48C8cfef789972436dBe6F24d".parse()?, + "0x9e56ccf010fa4073274b8177ccaad46fbaf286645310d03ac9bb6afa922a7c36", + 3.try_into()?, + 10, + ) + .await?; + + provider.produce_blocks(3).await?; + + // time enough to fwd the block to ethereum and for the TIME_TO_FINALIZE (1s) to elapse + tokio::time::sleep(Duration::from_secs(5)).await; + + let latest_block = provider.latest_block().await?; + + assert!(fuel_contract.finalized(latest_block).await?); + + Ok(()) + } +} diff --git a/migrations/0001_initial.down.sql b/migrations/0001_initial.down.sql deleted file mode 100644 index 6a564cc0..00000000 --- a/migrations/0001_initial.down.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE IF EXISTS eth_fuel_block_submission; diff --git a/packages/eth/Cargo.toml b/packages/eth/Cargo.toml new file mode 100644 index 00000000..e5360619 --- /dev/null +++ b/packages/eth/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "eth" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +ethers = { workspace = true, features = ["ws"] } +futures = { workspace = true } +metrics = { workspace = true } +ports = { workspace = true, features = ["l1"] } +serde_json = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +mockall = { workspace = true } +ports = { workspace = true, features = ["l1"] } +tokio = { workspace = true, features = ["macros"] } + +[features] +test-helpers = [] diff --git a/packages/eth/src/error.rs b/packages/eth/src/error.rs new file mode 100644 index 00000000..272e700e --- /dev/null +++ b/packages/eth/src/error.rs @@ -0,0 +1,46 @@ +use ethers::{ + prelude::{ContractError, SignerMiddleware}, + providers::{Provider, Ws}, + signers::LocalWallet, +}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("wallet error: {0}")] + Wallet(#[from] ethers::signers::WalletError), + #[error("network error: {0}")] + Network(String), + #[error("other error: {0}")] + Other(String), +} + +impl From for Error { + fn from(err: ethers::providers::ProviderError) -> Self { + Self::Network(err.to_string()) + } +} + +pub type ContractErrorType = + ethers::contract::ContractError, LocalWallet>>; + +impl From for Error { + fn from(value: ContractErrorType) -> Self { + match value { + ContractError::MiddlewareError { e } => Self::Other(e.to_string()), + ContractError::ProviderError { e } => Self::Network(e.to_string()), + _ => Self::Other(value.to_string()), + } + } +} + +pub type Result = std::result::Result; + +impl From for ports::l1::Error { + fn from(err: Error) -> Self { + match err { + Error::Network(err) => Self::Network(err), + Error::Other(err) => Self::Other(err), + Error::Wallet(err) => Self::Other(err.to_string()), + } + } +} diff --git a/packages/eth/src/lib.rs b/packages/eth/src/lib.rs new file mode 100644 index 00000000..528ab67c --- /dev/null +++ b/packages/eth/src/lib.rs @@ -0,0 +1,52 @@ +#![deny(unused_crate_dependencies)] +use std::pin::Pin; + +use async_trait::async_trait; +use ethers::types::U256; +use futures::{stream::TryStreamExt, Stream}; +use ports::types::{FuelBlockCommittedOnL1, L1Height}; +use websocket::EthEventStreamer; + +mod error; +mod metrics; +mod websocket; + +pub use ethers::types::{Address, Chain}; +pub use websocket::WebsocketClient; + +#[async_trait] +impl ports::l1::Contract for WebsocketClient { + async fn submit(&self, block: ports::types::FuelBlock) -> ports::l1::Result<()> { + self.submit(block).await + } + + fn event_streamer(&self, height: L1Height) -> Box { + let stream = self.event_streamer(height.into()); + Box::new(stream) + } +} + +#[async_trait] +impl ports::l1::Api for WebsocketClient { + async fn balance(&self) -> ports::l1::Result { + Ok(self.balance().await?) + } + + async fn get_block_number(&self) -> ports::l1::Result { + let block_num = self.get_block_number().await?; + let height = L1Height::try_from(block_num)?; + Ok(height) + } +} + +#[async_trait::async_trait] +impl ports::l1::EventStreamer for EthEventStreamer { + async fn establish_stream( + &self, + ) -> ports::l1::Result< + Pin> + '_ + Send>>, + > { + let stream = self.establish_stream().await?.map_err(Into::into); + Ok(Box::pin(stream)) + } +} diff --git a/packages/eth/src/metrics.rs b/packages/eth/src/metrics.rs new file mode 100644 index 00000000..bc38957a --- /dev/null +++ b/packages/eth/src/metrics.rs @@ -0,0 +1,27 @@ +use ::metrics::{ + prometheus::{core::Collector, IntCounter, Opts}, + RegistersMetrics, +}; + +#[derive(Clone)] +pub struct Metrics { + pub(crate) eth_network_errors: IntCounter, +} + +impl RegistersMetrics for Metrics { + fn metrics(&self) -> Vec> { + vec![Box::new(self.eth_network_errors.clone())] + } +} + +impl Default for Metrics { + fn default() -> Self { + let eth_network_errors = IntCounter::with_opts(Opts::new( + "eth_network_errors", + "Number of network errors encountered while running Ethereum RPCs.", + )) + .expect("eth_network_errors metric to be correctly configured"); + + Self { eth_network_errors } + } +} diff --git a/packages/eth/src/websocket.rs b/packages/eth/src/websocket.rs new file mode 100644 index 00000000..08b29249 --- /dev/null +++ b/packages/eth/src/websocket.rs @@ -0,0 +1,84 @@ +use std::num::NonZeroU32; + +use ::metrics::{prometheus::core::Collector, HealthChecker, RegistersMetrics}; +use ethers::types::{Address, Chain}; +use ports::{ + l1::Result, + types::{FuelBlock, U256}, +}; +use url::Url; + +pub use self::event_streamer::EthEventStreamer; +use self::{ + connection::WsConnection, + health_tracking_middleware::{EthApi, HealthTrackingMiddleware}, +}; + +mod connection; +mod event_streamer; +mod health_tracking_middleware; + +#[derive(Clone)] +pub struct WebsocketClient { + inner: HealthTrackingMiddleware, +} + +impl WebsocketClient { + pub async fn connect( + url: &Url, + chain_id: Chain, + contract_address: Address, + wallet_key: &str, + commit_interval: NonZeroU32, + unhealthy_after_n_errors: usize, + ) -> ports::l1::Result { + let provider = + WsConnection::connect(url, chain_id, contract_address, wallet_key, commit_interval) + .await?; + + Ok(Self { + inner: HealthTrackingMiddleware::new(provider, unhealthy_after_n_errors), + }) + } + + #[must_use] + pub fn connection_health_checker(&self) -> HealthChecker { + self.inner.connection_health_checker() + } + + pub(crate) fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer { + self.inner.event_streamer(eth_block_height) + } + + pub(crate) async fn submit(&self, block: FuelBlock) -> Result<()> { + Ok(self.inner.submit(block).await?) + } + + pub(crate) async fn get_block_number(&self) -> Result { + Ok(self.inner.get_block_number().await?) + } + + pub(crate) async fn balance(&self) -> Result { + Ok(self.inner.balance().await?) + } + + #[cfg(feature = "test-helpers")] + pub async fn finalized(&self, block: FuelBlock) -> Result { + Ok(self.inner.finalized(block).await?) + } + + #[cfg(feature = "test-helpers")] + pub async fn block_hash_at_commit_height(&self, commit_height: u32) -> Result<[u8; 32]> { + Ok(self + .inner + .block_hash_at_commit_height(commit_height) + .await?) + } +} + +// User responsible for registering any metrics T might have +impl RegistersMetrics for WebsocketClient { + fn metrics(&self) -> Vec> { + self.inner.metrics() + } +} diff --git a/packages/eth/src/websocket/connection.rs b/packages/eth/src/websocket/connection.rs new file mode 100644 index 00000000..611f8ca4 --- /dev/null +++ b/packages/eth/src/websocket/connection.rs @@ -0,0 +1,137 @@ +use std::{num::NonZeroU32, str::FromStr, sync::Arc}; + +use ethers::{ + prelude::{abigen, SignerMiddleware}, + providers::{Middleware, Provider, Ws}, + signers::{LocalWallet, Signer}, + types::{Address, Chain, H160, U256, U64}, +}; +use ports::types::FuelBlock; +use serde_json::Value; +use url::Url; + +use super::{event_streamer::EthEventStreamer, health_tracking_middleware::EthApi}; +use crate::error::Result; + +abigen!( + FUEL_STATE_CONTRACT, + r#"[ + function commit(bytes32 blockHash, uint256 commitHeight) external whenNotPaused + event CommitSubmitted(uint256 indexed commitHeight, bytes32 blockHash) + function finalized(bytes32 blockHash, uint256 blockHeight) external view whenNotPaused returns (bool) + function blockHashAtCommit(uint256 commitHeight) external view returns (bytes32) + ]"#, +); + +#[derive(Clone)] +pub struct WsConnection { + provider: Provider, + contract: FUEL_STATE_CONTRACT, LocalWallet>>, + commit_interval: NonZeroU32, + address: H160, +} + +#[async_trait::async_trait] +impl EthApi for WsConnection { + async fn submit(&self, block: FuelBlock) -> Result<()> { + let commit_height = Self::calculate_commit_height(block.height, self.commit_interval); + let contract_call = self.contract.commit(block.hash, commit_height); + let tx = contract_call.send().await?; + + tracing::info!("tx: {} submitted", tx.tx_hash()); + + Ok(()) + } + + async fn get_block_number(&self) -> Result { + // if provider.get_block_number is used the outgoing JSON RPC request would have the + // 'params' field set as `params: null`. This is accepted by Anvil but rejected by hardhat. + // By passing a preconstructed serde_json Value::Array it will cause params to be defined + // as `params: []` which is acceptable by both Anvil and Hardhat. + let response = self + .provider + .request::("eth_blockNumber", Value::Array(vec![])) + .await?; + Ok(response.as_u64()) + } + + async fn balance(&self) -> Result { + let address = self.address; + Ok(self.provider.get_balance(address, None).await?) + } + + fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer { + let events = self + .contract + .event::() + .from_block(eth_block_height); + + EthEventStreamer::new(events) + } + + #[cfg(feature = "test-helpers")] + async fn finalized(&self, block: FuelBlock) -> Result { + Ok(self + .contract + .finalized(block.hash, block.height.into()) + .call() + .await?) + } + + #[cfg(feature = "test-helpers")] + async fn block_hash_at_commit_height(&self, commit_height: u32) -> Result<[u8; 32]> { + Ok(self + .contract + .block_hash_at_commit(commit_height.into()) + .call() + .await?) + } +} + +impl WsConnection { + pub async fn connect( + url: &Url, + chain_id: Chain, + contract_address: Address, + wallet_key: &str, + commit_interval: NonZeroU32, + ) -> Result { + let provider = Provider::::connect(url.to_string()).await?; + + let wallet = LocalWallet::from_str(wallet_key)?.with_chain_id(chain_id); + let address = wallet.address(); + + let signer = SignerMiddleware::new(provider.clone(), wallet); + + let contract_address = Address::from_slice(contract_address.as_ref()); + let contract = FUEL_STATE_CONTRACT::new(contract_address, Arc::new(signer)); + + Ok(Self { + provider, + contract, + commit_interval, + address, + }) + } + + pub(crate) fn calculate_commit_height(block_height: u32, commit_interval: NonZeroU32) -> U256 { + (block_height / commit_interval).into() + } + + async fn _balance(&self, address: H160) -> Result { + Ok(self.provider.get_balance(address, None).await?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn calculates_correctly_the_commit_height() { + assert_eq!( + WsConnection::calculate_commit_height(10, 3.try_into().unwrap()), + 3.into() + ); + } +} diff --git a/packages/eth/src/websocket/event_streamer.rs b/packages/eth/src/websocket/event_streamer.rs new file mode 100644 index 00000000..a3629b4d --- /dev/null +++ b/packages/eth/src/websocket/event_streamer.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use ethers::{ + prelude::{k256::ecdsa::SigningKey, Event, SignerMiddleware}, + providers::{Provider, Ws}, + signers::Wallet, +}; +use futures::{Stream, TryStreamExt}; +use ports::types::FuelBlockCommittedOnL1; + +use super::connection::CommitSubmittedFilter; +use crate::error::Result; + +type EthStreamInitializer = Event< + Arc, Wallet>>, + SignerMiddleware, Wallet>, + CommitSubmittedFilter, +>; + +pub struct EthEventStreamer { + events: EthStreamInitializer, +} + +impl EthEventStreamer { + pub fn new(events: EthStreamInitializer) -> Self { + Self { events } + } + + pub(crate) async fn establish_stream( + &self, + ) -> Result> + Send + '_> { + let events = self.events.subscribe().await?; + let stream = events + .map_ok(|event| { + let fuel_block_hash = event.block_hash; + let commit_height = event.commit_height; + FuelBlockCommittedOnL1 { + fuel_block_hash, + commit_height, + } + }) + .map_err(Into::into); + Ok(stream) + } +} diff --git a/src/adapters/ethereum_adapter/monitored_adapter.rs b/packages/eth/src/websocket/health_tracking_middleware.rs similarity index 66% rename from src/adapters/ethereum_adapter/monitored_adapter.rs rename to packages/eth/src/websocket/health_tracking_middleware.rs index 7ece8825..b6ea61e2 100644 --- a/src/adapters/ethereum_adapter/monitored_adapter.rs +++ b/packages/eth/src/websocket/health_tracking_middleware.rs @@ -1,24 +1,35 @@ -use ethers::types::{H160, U256}; -use prometheus::{IntCounter, Opts}; +use ::metrics::{ + prometheus::core::Collector, ConnectionHealthTracker, HealthChecker, RegistersMetrics, +}; +use ports::types::{FuelBlock, U256}; -use super::EthHeight; use crate::{ - adapters::{ - ethereum_adapter::{EthereumAdapter, EventStreamer}, - fuel_adapter::FuelBlock, - }, - errors::{Error, Result}, - telemetry::{ConnectionHealthTracker, HealthChecker, RegistersMetrics}, + error::{Error, Result}, + metrics::Metrics, + websocket::event_streamer::EthEventStreamer, }; +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait EthApi { + async fn submit(&self, block: FuelBlock) -> Result<()>; + async fn get_block_number(&self) -> Result; + async fn balance(&self) -> Result; + fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer; + #[cfg(feature = "test-helpers")] + async fn finalized(&self, block: FuelBlock) -> Result; + #[cfg(feature = "test-helpers")] + async fn block_hash_at_commit_height(&self, commit_height: u32) -> Result<[u8; 32]>; +} + #[derive(Clone)] -pub struct MonitoredEthAdapter { +pub struct HealthTrackingMiddleware { adapter: T, metrics: Metrics, health_tracker: ConnectionHealthTracker, } -impl MonitoredEthAdapter { +impl HealthTrackingMiddleware { pub fn new(adapter: T, unhealthy_after_n_errors: usize) -> Self { Self { adapter, @@ -32,7 +43,7 @@ impl MonitoredEthAdapter { } fn note_network_status(&self, response: &Result) { - match &response { + match response { Ok(_val) => { self.health_tracker.note_success(); } @@ -46,71 +57,62 @@ impl MonitoredEthAdapter { } // User responsible for registering any metrics T might have -impl RegistersMetrics for MonitoredEthAdapter { - fn metrics(&self) -> Vec> { +impl RegistersMetrics for HealthTrackingMiddleware { + fn metrics(&self) -> Vec> { self.metrics.metrics() } } #[async_trait::async_trait] -impl EthereumAdapter for MonitoredEthAdapter { +impl EthApi for HealthTrackingMiddleware +where + T: EthApi + Send + Sync, +{ async fn submit(&self, block: FuelBlock) -> Result<()> { let response = self.adapter.submit(block).await; self.note_network_status(&response); response } - async fn get_block_number(&self) -> Result { + async fn get_block_number(&self) -> Result { let response = self.adapter.get_block_number().await; self.note_network_status(&response); response } - fn event_streamer(&self, eth_block_height: u64) -> Box { + fn event_streamer(&self, eth_block_height: u64) -> EthEventStreamer { self.adapter.event_streamer(eth_block_height) } - async fn balance(&self, address: H160) -> Result { - let response = self.adapter.balance(address).await; + async fn balance(&self) -> Result { + let response = self.adapter.balance().await; self.note_network_status(&response); response } -} -#[derive(Clone)] -struct Metrics { - eth_network_errors: IntCounter, -} - -impl RegistersMetrics for Metrics { - fn metrics(&self) -> Vec> { - vec![Box::new(self.eth_network_errors.clone())] + #[cfg(feature = "test-helpers")] + async fn finalized(&self, block: FuelBlock) -> Result { + self.adapter.finalized(block).await } -} - -impl Default for Metrics { - fn default() -> Self { - let eth_network_errors = IntCounter::with_opts(Opts::new( - "eth_network_errors", - "Number of network errors encountered while running Ethereum RPCs.", - )) - .expect("eth_network_errors metric to be correctly configured"); - Self { eth_network_errors } + #[cfg(feature = "test-helpers")] + async fn block_hash_at_commit_height(&self, commit_height: u32) -> Result<[u8; 32]> { + self.adapter + .block_hash_at_commit_height(commit_height) + .await } } #[cfg(test)] mod tests { - use prometheus::{proto::Metric, Registry}; + use ::metrics::prometheus::{proto::Metric, Registry}; use super::*; - use crate::adapters::ethereum_adapter::MockEthereumAdapter; #[tokio::test] async fn recovers_after_successful_network_request() { // given - let mut eth_adapter = MockEthereumAdapter::new(); + let mut eth_adapter = MockEthApi::new(); eth_adapter .expect_submit() .returning(|_| Err(Error::Network("An error".into()))); @@ -119,7 +121,7 @@ mod tests { .expect_get_block_number() .returning(|| Ok(10u32.into())); - let adapter = MonitoredEthAdapter::new(eth_adapter, 1); + let adapter = HealthTrackingMiddleware::new(eth_adapter, 1); let health_check = adapter.connection_health_checker(); let _ = adapter.submit(given_a_block(42)).await; @@ -134,7 +136,7 @@ mod tests { #[tokio::test] async fn other_errors_dont_impact_health_status() { // given - let mut eth_adapter = MockEthereumAdapter::new(); + let mut eth_adapter = MockEthApi::new(); eth_adapter .expect_submit() .returning(|_| Err(Error::Other("An error".into()))); @@ -143,7 +145,7 @@ mod tests { .expect_get_block_number() .returning(|| Err(Error::Other("An error".into()))); - let adapter = MonitoredEthAdapter::new(eth_adapter, 2); + let adapter = HealthTrackingMiddleware::new(eth_adapter, 2); let health_check = adapter.connection_health_checker(); let _ = adapter.submit(given_a_block(42)).await; @@ -157,7 +159,7 @@ mod tests { #[tokio::test] async fn network_errors_impact_health_status() { - let mut eth_adapter = MockEthereumAdapter::new(); + let mut eth_adapter = MockEthApi::new(); eth_adapter .expect_submit() .returning(|_| Err(Error::Network("An error".into()))); @@ -166,7 +168,7 @@ mod tests { .expect_get_block_number() .returning(|| Err(Error::Network("An error".into()))); - let adapter = MonitoredEthAdapter::new(eth_adapter, 3); + let adapter = HealthTrackingMiddleware::new(eth_adapter, 3); let health_check = adapter.connection_health_checker(); assert!(health_check.healthy()); @@ -182,7 +184,7 @@ mod tests { #[tokio::test] async fn network_errors_seen_in_metrics() { - let mut eth_adapter = MockEthereumAdapter::new(); + let mut eth_adapter = MockEthApi::new(); eth_adapter .expect_submit() .returning(|_| Err(Error::Network("An error".into()))); @@ -192,7 +194,7 @@ mod tests { .returning(|| Err(Error::Network("An error".into()))); let registry = Registry::new(); - let adapter = MonitoredEthAdapter::new(eth_adapter, 3); + let adapter = HealthTrackingMiddleware::new(eth_adapter, 3); adapter.register_metrics(®istry); let _ = adapter.submit(given_a_block(42)).await; diff --git a/packages/fuel/Cargo.toml b/packages/fuel/Cargo.toml new file mode 100644 index 00000000..13836028 --- /dev/null +++ b/packages/fuel/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "fuel" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +fuel-core-client = { workspace = true } +metrics = { workspace = true } +ports = { workspace = true, features = ["fuel"] } +url = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros"] } + +[features] +test-helpers = [] diff --git a/packages/fuel/src/client.rs b/packages/fuel/src/client.rs new file mode 100644 index 00000000..262f5481 --- /dev/null +++ b/packages/fuel/src/client.rs @@ -0,0 +1,78 @@ +use fuel_core_client::client::{types::Block, FuelClient as GqlClient}; +use metrics::{ + prometheus::core::Collector, ConnectionHealthTracker, HealthChecker, RegistersMetrics, +}; +use url::Url; + +use crate::{metrics::Metrics, Error, Result}; + +pub struct HttpClient { + client: GqlClient, + metrics: Metrics, + health_tracker: ConnectionHealthTracker, +} + +impl HttpClient { + #[must_use] + pub fn new(url: &Url, unhealthy_after_n_errors: usize) -> Self { + let client = GqlClient::new(url).expect("Url to be well formed"); + Self { + client, + metrics: Metrics::default(), + health_tracker: ConnectionHealthTracker::new(unhealthy_after_n_errors), + } + } + + #[cfg(feature = "test-helpers")] + pub async fn produce_blocks(&self, num: u32) -> Result<()> { + self.client + .produce_blocks(num, None) + .await + .map_err(|e| Error::Network(e.to_string()))?; + + Ok(()) + } + + pub(crate) async fn _block_at_height(&self, height: u32) -> Result> { + let maybe_block = self + .client + .block_by_height(height.into()) + .await + .map_err(|e| Error::Network(e.to_string()))?; + + Ok(maybe_block.map(Into::into)) + } + + pub(crate) async fn _latest_block(&self) -> Result { + match self.client.chain_info().await { + Ok(chain_info) => { + self.handle_network_success(); + Ok(chain_info.latest_block) + } + Err(err) => { + self.handle_network_error(); + Err(Error::Network(err.to_string())) + } + } + } + + #[must_use] + pub fn connection_health_checker(&self) -> HealthChecker { + self.health_tracker.tracker() + } + + fn handle_network_error(&self) { + self.health_tracker.note_failure(); + self.metrics.fuel_network_errors.inc(); + } + + fn handle_network_success(&self) { + self.health_tracker.note_success(); + } +} + +impl RegistersMetrics for HttpClient { + fn metrics(&self) -> Vec> { + self.metrics.metrics() + } +} diff --git a/src/adapters/fuel_adapter/fuel_client.rs b/packages/fuel/src/lib.rs similarity index 58% rename from src/adapters/fuel_adapter/fuel_client.rs rename to packages/fuel/src/lib.rs index 885022fd..267256bd 100644 --- a/src/adapters/fuel_adapter/fuel_client.rs +++ b/packages/fuel/src/lib.rs @@ -1,88 +1,44 @@ -use fuel_core_client::client::{types::Block as FuelGqlBlock, FuelClient as FuelGqlClient}; -use url::Url; - -use crate::{ - adapters::fuel_adapter::{fuel_metrics::FuelMetrics, FuelAdapter, FuelBlock}, - errors::{Error, Result}, - telemetry::{ConnectionHealthTracker, HealthChecker, RegistersMetrics}, -}; - -impl RegistersMetrics for FuelClient { - fn metrics(&self) -> Vec> { - self.metrics.metrics() - } -} - -pub struct FuelClient { - client: FuelGqlClient, - metrics: FuelMetrics, - health_tracker: ConnectionHealthTracker, -} +#![deny(unused_crate_dependencies)] +use fuel_core_client::client::types::Block; +use ports::types::FuelBlock; +mod client; +mod metrics; -impl FuelClient { - pub fn new(url: &Url, unhealthy_after_n_errors: usize) -> Self { - let client = FuelGqlClient::new(url).expect("Url to be well formed"); - Self { - client, - metrics: FuelMetrics::default(), - health_tracker: ConnectionHealthTracker::new(unhealthy_after_n_errors), - } - } +pub use client::*; - pub fn connection_health_checker(&self) -> HealthChecker { - self.health_tracker.tracker() - } - - fn handle_network_error(&self) { - self.health_tracker.note_failure(); - self.metrics.fuel_network_errors.inc(); - } - - fn handle_network_success(&self) { - self.health_tracker.note_success(); - } -} +type Error = ports::fuel::Error; +type Result = ports::fuel::Result; -impl From for FuelBlock { - fn from(value: FuelGqlBlock) -> Self { - Self { - hash: *value.id, - height: value.header.height, - } +fn convert_block(block: Block) -> FuelBlock { + FuelBlock { + hash: *block.id, + height: block.header.height, } } #[async_trait::async_trait] -impl FuelAdapter for FuelClient { - async fn block_at_height(&self, height: u32) -> Result> { - let maybe_block = self - .client - .block_by_height(height.into()) - .await - .map_err(|e| Error::Network(e.to_string()))?; - - Ok(maybe_block.map(Into::into)) +impl ports::fuel::Api for client::HttpClient { + async fn block_at_height(&self, height: u32) -> ports::fuel::Result> { + Ok(self._block_at_height(height).await?.map(convert_block)) } - async fn latest_block(&self) -> Result { - match self.client.chain_info().await { - Ok(chain_info) => { - self.handle_network_success(); - Ok(chain_info.latest_block.into()) - } - Err(err) => { - self.handle_network_error(); - Err(Error::Network(err.to_string())) - } - } + async fn latest_block(&self) -> ports::fuel::Result { + let block = self._latest_block().await?; + Ok(convert_block(block)) } } #[cfg(test)] mod tests { - use prometheus::{proto::Metric, Registry}; + use ::metrics::{ + prometheus::{proto::Metric, Registry}, + RegistersMetrics, + }; + use ports::fuel::Api; + use url::Url; use super::*; + use crate::client::HttpClient; // TODO: once a sdk release is made these can be adapted // #[tokio::test] @@ -142,7 +98,7 @@ mod tests { // killing the node once the SDK supports it. let url = Url::parse("localhost:12344").unwrap(); - let fuel_adapter = FuelClient::new(&url, 1); + let fuel_adapter = HttpClient::new(&url, 1); let registry = Registry::default(); fuel_adapter.register_metrics(®istry); @@ -169,7 +125,7 @@ mod tests { // killing the node once the SDK supports it. let url = Url::parse("http://localhost:12344").unwrap(); - let fuel_adapter = FuelClient::new(&url, 3); + let fuel_adapter = client::HttpClient::new(&url, 3); let health_check = fuel_adapter.connection_health_checker(); assert!(health_check.healthy()); diff --git a/src/adapters/fuel_adapter/metrics.rs b/packages/fuel/src/metrics.rs similarity index 79% rename from src/adapters/fuel_adapter/metrics.rs rename to packages/fuel/src/metrics.rs index b48b99fa..3984648a 100644 --- a/src/adapters/fuel_adapter/metrics.rs +++ b/packages/fuel/src/metrics.rs @@ -1,13 +1,14 @@ -use prometheus::{IntCounter, Opts}; - -use crate::telemetry::RegistersMetrics; +use metrics::{ + prometheus::{core::Collector, IntCounter, Opts}, + RegistersMetrics, +}; pub struct Metrics { pub fuel_network_errors: IntCounter, } impl RegistersMetrics for Metrics { - fn metrics(&self) -> Vec> { + fn metrics(&self) -> Vec> { vec![Box::new(self.fuel_network_errors.clone())] } } diff --git a/packages/metrics/Cargo.toml b/packages/metrics/Cargo.toml new file mode 100644 index 00000000..658ac3ed --- /dev/null +++ b/packages/metrics/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "metrics" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +prometheus = { workspace = true } diff --git a/packages/metrics/src/connection_health_tracker.rs b/packages/metrics/src/connection_health_tracker.rs new file mode 100644 index 00000000..07402ce9 --- /dev/null +++ b/packages/metrics/src/connection_health_tracker.rs @@ -0,0 +1,43 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use super::{HealthCheck, HealthChecker}; + +#[derive(Debug, Clone)] +pub struct ConnectionHealthTracker { + // how many failures are needed before the connection is deemed unhealhty + max_consecutive_failures: usize, + // how many consecutive failures there currently are + consecutive_failures: Arc, +} + +impl ConnectionHealthTracker { + #[must_use] + pub fn new(max_consecutive_failures: usize) -> Self { + Self { + max_consecutive_failures, + consecutive_failures: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn note_failure(&self) { + self.consecutive_failures.fetch_add(1, Ordering::SeqCst); + } + + pub fn note_success(&self) { + self.consecutive_failures.store(0, Ordering::SeqCst); + } + + #[must_use] + pub fn tracker(&self) -> HealthChecker { + Box::new(self.clone()) + } +} + +impl HealthCheck for ConnectionHealthTracker { + fn healthy(&self) -> bool { + self.consecutive_failures.load(Ordering::Relaxed) < self.max_consecutive_failures + } +} diff --git a/packages/metrics/src/lib.rs b/packages/metrics/src/lib.rs new file mode 100644 index 00000000..f5eb758b --- /dev/null +++ b/packages/metrics/src/lib.rs @@ -0,0 +1,22 @@ +#![deny(unused_crate_dependencies)] +mod connection_health_tracker; +pub use connection_health_tracker::*; + +pub type HealthChecker = Box; +pub trait HealthCheck: Send + Sync { + fn healthy(&self) -> bool; +} + +pub use prometheus; + +pub trait RegistersMetrics { + fn register_metrics(&self, registry: &crate::prometheus::Registry) { + self.metrics().into_iter().for_each(|metric| { + registry + .register(metric) + .expect("app to have correctly named metrics"); + }); + } + + fn metrics(&self) -> Vec>; +} diff --git a/packages/ports/Cargo.toml b/packages/ports/Cargo.toml new file mode 100644 index 00000000..dc738cf4 --- /dev/null +++ b/packages/ports/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "ports" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +async-trait = { workspace = true, optional = true } +ethers-core = { workspace = true, optional = true } +futures = { workspace = true, optional = true } +impl-tools = { workspace = true, optional = true } +mockall = { workspace = true, optional = true } +rand = { workspace = true, optional = true } +serde = { workspace = true, features = ["derive"] } +thiserror = { workspace = true, optional = true } + +[features] +test-helpers = ["dep:mockall", "dep:rand"] +l1 = ["dep:ethers-core", "dep:futures", "dep:thiserror", "dep:async-trait"] +fuel = ["dep:thiserror", "dep:async-trait"] +storage = ["dep:impl-tools", "dep:thiserror", "dep:async-trait"] +full = ["l1", "fuel", "storage"] diff --git a/packages/ports/src/lib.rs b/packages/ports/src/lib.rs new file mode 100644 index 00000000..205d7efa --- /dev/null +++ b/packages/ports/src/lib.rs @@ -0,0 +1,14 @@ +mod ports { + #[cfg(feature = "l1")] + pub mod l1; + + #[cfg(feature = "fuel")] + pub mod fuel; + + #[cfg(feature = "storage")] + pub mod storage; +} + +#[cfg(any(feature = "l1", feature = "fuel", feature = "storage"))] +pub use ports::*; +pub mod types; diff --git a/packages/ports/src/ports/fuel.rs b/packages/ports/src/ports/fuel.rs new file mode 100644 index 00000000..bbda896f --- /dev/null +++ b/packages/ports/src/ports/fuel.rs @@ -0,0 +1,16 @@ +use crate::types::FuelBlock; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("{0}")] + Network(String), +} + +pub type Result = std::result::Result; + +#[cfg_attr(feature = "test-helpers", mockall::automock)] +#[async_trait::async_trait] +pub trait Api: Send + Sync { + async fn block_at_height(&self, height: u32) -> Result>; + async fn latest_block(&self) -> Result; +} diff --git a/packages/ports/src/ports/l1.rs b/packages/ports/src/ports/l1.rs new file mode 100644 index 00000000..4f149b15 --- /dev/null +++ b/packages/ports/src/ports/l1.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; + +use crate::types::{FuelBlock, FuelBlockCommittedOnL1, InvalidL1Height, L1Height, Stream, U256}; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("network error: {0}")] + Network(String), + #[error("{0}")] + Other(String), +} + +pub type Result = std::result::Result; + +impl From for Error { + fn from(err: InvalidL1Height) -> Self { + Self::Other(err.to_string()) + } +} + +#[cfg_attr(feature = "test-helpers", mockall::automock)] +#[async_trait::async_trait] +pub trait Contract: Send + Sync { + async fn submit(&self, block: FuelBlock) -> Result<()>; + fn event_streamer(&self, height: L1Height) -> Box; +} + +#[cfg_attr(feature = "test-helpers", mockall::automock)] +#[async_trait::async_trait] +pub trait Api { + async fn get_block_number(&self) -> Result; + async fn balance(&self) -> Result; +} + +#[cfg_attr(feature = "test-helpers", mockall::automock)] +#[async_trait::async_trait] +pub trait EventStreamer { + async fn establish_stream<'a>( + &'a self, + ) -> Result> + 'a + Send>>>; +} diff --git a/packages/ports/src/ports/storage.rs b/packages/ports/src/ports/storage.rs new file mode 100644 index 00000000..d816b6aa --- /dev/null +++ b/packages/ports/src/ports/storage.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use crate::types::BlockSubmission; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("db response: {0}")] + Database(String), + #[error("data conversion app<->db failed: {0}")] + Conversion(String), +} + +pub type Result = std::result::Result; + +#[async_trait::async_trait] +#[impl_tools::autoimpl(for &T, &mut T, Arc, Box)] +#[cfg_attr(feature = "test-helpers", mockall::automock)] +pub trait Storage: Send + Sync { + async fn insert(&self, submission: BlockSubmission) -> Result<()>; + async fn submission_w_latest_block(&self) -> Result>; + async fn set_submission_completed(&self, fuel_block_hash: [u8; 32]) -> Result; +} diff --git a/packages/ports/src/types.rs b/packages/ports/src/types.rs new file mode 100644 index 00000000..298c0df6 --- /dev/null +++ b/packages/ports/src/types.rs @@ -0,0 +1,16 @@ +#[cfg(feature = "l1")] +pub use ethers_core::types::{H160, U256}; +#[cfg(feature = "l1")] +pub use futures::Stream; + +mod block_submission; +mod fuel_block; +#[cfg(feature = "l1")] +mod fuel_block_committed_on_l1; +mod l1_height; + +pub use block_submission::*; +pub use fuel_block::*; +#[cfg(feature = "l1")] +pub use fuel_block_committed_on_l1::*; +pub use l1_height::*; diff --git a/packages/ports/src/types/block_submission.rs b/packages/ports/src/types/block_submission.rs new file mode 100644 index 00000000..8eb7c736 --- /dev/null +++ b/packages/ports/src/types/block_submission.rs @@ -0,0 +1,21 @@ +use crate::types::{FuelBlock, L1Height}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BlockSubmission { + pub block: FuelBlock, + pub completed: bool, + // L1 block height moments before submitting the fuel block. Used to filter stale events in + // the commit listener. + pub submittal_height: L1Height, +} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> BlockSubmission { + BlockSubmission { + block: rng.gen(), + completed: rng.gen(), + submittal_height: rng.gen(), + } + } +} diff --git a/src/adapters/fuel_adapter.rs b/packages/ports/src/types/fuel_block.rs similarity index 61% rename from src/adapters/fuel_adapter.rs rename to packages/ports/src/types/fuel_block.rs index 2da49e1d..619d4646 100644 --- a/src/adapters/fuel_adapter.rs +++ b/packages/ports/src/types/fuel_block.rs @@ -1,19 +1,13 @@ -mod fuel_client; -mod fuel_metrics; - -pub use fuel_client::FuelClient; -use rand::distributions::{Distribution, Standard}; use serde::{Deserialize, Serialize}; -use crate::errors::Result; - #[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct FuelBlock { pub hash: [u8; 32], pub height: u32, } -impl Distribution for Standard { +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { fn sample(&self, rng: &mut R) -> FuelBlock { FuelBlock { hash: rng.gen(), @@ -21,6 +15,7 @@ impl Distribution for Standard { } } } + impl std::fmt::Debug for FuelBlock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let hash = self.hash.map(|byte| format!("{byte:02x?}")).join(""); @@ -30,10 +25,3 @@ impl std::fmt::Debug for FuelBlock { .finish() } } - -#[cfg_attr(test, mockall::automock)] -#[async_trait::async_trait] -pub trait FuelAdapter: Send + Sync { - async fn block_at_height(&self, height: u32) -> Result>; - async fn latest_block(&self) -> Result; -} diff --git a/packages/ports/src/types/fuel_block_committed_on_l1.rs b/packages/ports/src/types/fuel_block_committed_on_l1.rs new file mode 100644 index 00000000..fe04df52 --- /dev/null +++ b/packages/ports/src/types/fuel_block_committed_on_l1.rs @@ -0,0 +1,20 @@ +use crate::types::U256; + +#[derive(Clone, Copy)] +pub struct FuelBlockCommittedOnL1 { + pub fuel_block_hash: [u8; 32], + pub commit_height: U256, +} + +impl std::fmt::Debug for FuelBlockCommittedOnL1 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let hash = self + .fuel_block_hash + .map(|byte| format!("{byte:02x?}")) + .join(""); + f.debug_struct("FuelBlockCommittedOnL1") + .field("hash", &hash) + .field("commit_height", &self.commit_height) + .finish() + } +} diff --git a/packages/ports/src/types/l1_height.rs b/packages/ports/src/types/l1_height.rs new file mode 100644 index 00000000..4a54238f --- /dev/null +++ b/packages/ports/src/types/l1_height.rs @@ -0,0 +1,69 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] +pub struct L1Height { + height: i64, +} + +#[derive(Debug, Clone)] +pub struct InvalidL1Height(String); +impl std::fmt::Display for InvalidL1Height { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Invalid l1 height: {}", self.0) + } +} +impl std::error::Error for InvalidL1Height {} + +#[cfg(feature = "test-helpers")] +impl rand::distributions::Distribution for rand::distributions::Standard { + fn sample(&self, rng: &mut R) -> L1Height { + let height: i64 = rng.gen_range(0..=i64::MAX); + height.try_into().expect("Must be valid EthHeight") + } +} + +impl TryFrom for L1Height { + type Error = InvalidL1Height; + + fn try_from(height: i64) -> Result { + if height < 0 { + return Err(InvalidL1Height(format!( + "must be non-negative, got {height}", + ))); + } + Ok(Self { height }) + } +} + +impl TryFrom for L1Height { + type Error = InvalidL1Height; + fn try_from(height: u64) -> Result { + if height >= i64::MAX as u64 { + return Err(InvalidL1Height(format!( + "{height} too large. DB can handle at most {}", + i64::MAX + ))); + } + Ok(Self { + height: height as i64, + }) + } +} + +impl From for L1Height { + fn from(height: u32) -> Self { + Self { + height: i64::from(height), + } + } +} + +impl From for i64 { + fn from(height: L1Height) -> Self { + height.height + } +} + +impl From for u64 { + fn from(height: L1Height) -> Self { + height.height as Self + } +} diff --git a/packages/services/Cargo.toml b/packages/services/Cargo.toml new file mode 100644 index 00000000..db8c1d25 --- /dev/null +++ b/packages/services/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "services" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +futures = { workspace = true } +metrics = { workspace = true } +ports = { workspace = true, features = ["full"] } +serde = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +tokio-util = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +mockall = { workspace = true } +rand = { workspace = true } +storage = { workspace = true, features = ["test-helpers"] } diff --git a/src/services/block_committer.rs b/packages/services/src/block_committer.rs similarity index 57% rename from src/services/block_committer.rs rename to packages/services/src/block_committer.rs index 1bbcbf83..2d3e1015 100644 --- a/src/services/block_committer.rs +++ b/packages/services/src/block_committer.rs @@ -1,28 +1,25 @@ use async_trait::async_trait; +use ports::{ + storage::Storage, + types::{BlockSubmission, FuelBlock}, +}; use tokio::sync::mpsc::Receiver; use tracing::{error, info}; -use crate::{ - adapters::{ - ethereum_adapter::{EthereumAdapter, EthereumWs}, - fuel_adapter::FuelBlock, - runner::Runner, - storage::{postgresql::Postgres, BlockSubmission, Storage}, - }, - errors::Result, -}; +use super::Runner; +use crate::Result; -pub struct BlockCommitter { +pub struct BlockCommitter { rx_block: Receiver, - ethereum_rpc: A, + l1: C, storage: Db, } -impl BlockCommitter { - pub fn new(rx_block: Receiver, ethereum_rpc: A, storage: Db) -> Self { +impl BlockCommitter { + pub fn new(rx_block: Receiver, l1: L1, storage: Db) -> Self { Self { rx_block, - ethereum_rpc, + l1, storage, } } @@ -34,11 +31,11 @@ impl BlockCommitter { impl BlockCommitter where - A: EthereumAdapter, + A: ports::l1::Contract + ports::l1::Api, Db: Storage, { async fn submit_block(&self, fuel_block: FuelBlock) -> Result<()> { - let submittal_height = self.ethereum_rpc.get_block_number().await?; + let submittal_height = self.l1.get_block_number().await?; let submission = BlockSubmission { block: fuel_block, @@ -49,7 +46,7 @@ where self.storage.insert(submission).await?; // if we have a network failure the DB entry will be left at completed:false. - self.ethereum_rpc.submit(fuel_block).await?; + self.l1.submit(fuel_block).await?; Ok(()) } @@ -58,7 +55,7 @@ where #[async_trait] impl Runner for BlockCommitter where - A: EthereumAdapter, + A: ports::l1::Contract + ports::l1::Api, Db: Storage, { async fn run(&mut self) -> Result<()> { @@ -81,12 +78,40 @@ mod tests { use std::time::Duration; use mockall::predicate; + use ports::{ + l1::{Contract, EventStreamer, MockApi, MockContract}, + types::{L1Height, U256}, + }; use rand::Rng; + use storage::PostgresProcess; use super::*; - use crate::adapters::{ - ethereum_adapter::MockEthereumAdapter, storage::postgresql::PostgresProcess, - }; + + struct MockL1 { + api: MockApi, + contract: MockContract, + } + + #[async_trait::async_trait] + impl Contract for MockL1 { + async fn submit(&self, block: FuelBlock) -> ports::l1::Result<()> { + self.contract.submit(block).await + } + fn event_streamer(&self, height: L1Height) -> Box { + self.contract.event_streamer(height) + } + } + + #[cfg_attr(feature = "test-helpers", mockall::automock)] + #[async_trait::async_trait] + impl ports::l1::Api for MockL1 { + async fn get_block_number(&self) -> ports::l1::Result { + self.api.get_block_number().await + } + async fn balance(&self) -> ports::l1::Result { + self.api.balance().await + } + } #[tokio::test] async fn block_committer_will_submit_and_write_block() { @@ -97,38 +122,41 @@ mod tests { let process = PostgresProcess::shared().await.unwrap(); let db = process.create_random_db().await.unwrap(); - let eth_rpc_mock = given_eth_rpc_that_expects(block); + let mock_l1 = given_l1_that_expects_submission(block); tx.try_send(block).unwrap(); // when - spawn_committer_and_run_until_timeout(rx, eth_rpc_mock, db.clone()).await; + spawn_committer_and_run_until_timeout(rx, mock_l1, db.clone()).await; // then let last_submission = db.submission_w_latest_block().await.unwrap().unwrap(); assert_eq!(expeted_height, last_submission.block.height); } - fn given_eth_rpc_that_expects(block: FuelBlock) -> MockEthereumAdapter { - let mut eth_rpc_mock = MockEthereumAdapter::new(); - eth_rpc_mock + fn given_l1_that_expects_submission(block: FuelBlock) -> MockL1 { + let mut l1 = MockL1 { + api: MockApi::new(), + contract: MockContract::new(), + }; + l1.contract .expect_submit() .with(predicate::eq(block)) .return_once(move |_| Ok(())); - eth_rpc_mock + l1.api .expect_get_block_number() .return_once(move || Ok(0u32.into())); - eth_rpc_mock + l1 } async fn spawn_committer_and_run_until_timeout( rx: Receiver, - eth_rpc_mock: MockEthereumAdapter, + mock_l1: MockL1, storage: Db, ) { let _ = tokio::time::timeout(Duration::from_millis(250), async move { - let mut block_committer = BlockCommitter::new(rx, eth_rpc_mock, storage); + let mut block_committer = BlockCommitter::new(rx, mock_l1, storage); block_committer .run() .await diff --git a/src/services/block_watcher.rs b/packages/services/src/block_watcher.rs similarity index 92% rename from src/services/block_watcher.rs rename to packages/services/src/block_watcher.rs index 7a392841..a7d85bf0 100644 --- a/src/services/block_watcher.rs +++ b/packages/services/src/block_watcher.rs @@ -1,18 +1,15 @@ use std::{num::NonZeroU32, vec}; use async_trait::async_trait; -use prometheus::{core::Collector, IntGauge, Opts}; +use metrics::{ + prometheus::{core::Collector, IntGauge, Opts}, + RegistersMetrics, +}; +use ports::{storage::Storage, types::FuelBlock}; use tokio::sync::mpsc::Sender; -use crate::{ - adapters::{ - fuel_adapter::{FuelAdapter, FuelBlock, FuelClient}, - runner::Runner, - storage::{postgresql::Postgres, Storage}, - }, - errors::{Error, Result}, - telemetry::RegistersMetrics, -}; +use super::Runner; +use crate::{Error, Result}; struct Metrics { latest_fuel_block: IntGauge, @@ -36,7 +33,7 @@ impl Default for Metrics { } } -pub struct BlockWatcher { +pub struct BlockWatcher { fuel_adapter: A, tx_fuel_block: Sender, storage: Db, @@ -62,7 +59,7 @@ impl BlockWatcher { } impl BlockWatcher where - A: FuelAdapter, + A: ports::fuel::Api, Db: Storage, { async fn fetch_latest_block(&self) -> Result { @@ -110,7 +107,7 @@ where #[async_trait] impl Runner for BlockWatcher where - A: FuelAdapter, + A: ports::fuel::Api, Db: Storage, { async fn run(&mut self) -> Result<()> { @@ -140,15 +137,13 @@ where mod tests { use std::{sync::Arc, vec}; + use metrics::prometheus::{proto::Metric, Registry}; use mockall::predicate::eq; - use prometheus::{proto::Metric, Registry}; + use ports::{fuel::MockApi, types::BlockSubmission}; use rand::Rng; + use storage::{Postgres, PostgresProcess}; use super::*; - use crate::adapters::{ - fuel_adapter::MockFuelAdapter, - storage::{postgresql::PostgresProcess, BlockSubmission}, - }; #[tokio::test] async fn will_fetch_and_propagate_missed_block() { @@ -281,8 +276,8 @@ mod tests { db } - fn given_fetcher(available_blocks: Vec) -> MockFuelAdapter { - let mut fetcher = MockFuelAdapter::new(); + fn given_fetcher(available_blocks: Vec) -> MockApi { + let mut fetcher = MockApi::new(); for block in available_blocks.clone() { fetcher .expect_block_at_height() diff --git a/src/services/commit_listener.rs b/packages/services/src/commit_listener.rs similarity index 64% rename from src/services/commit_listener.rs rename to packages/services/src/commit_listener.rs index 7abfbd57..2e3d5314 100644 --- a/src/services/commit_listener.rs +++ b/packages/services/src/commit_listener.rs @@ -1,30 +1,29 @@ use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; -use prometheus::{IntGauge, Opts}; +use metrics::{ + prometheus::{core::Collector, IntGauge, Opts}, + RegistersMetrics, +}; +use ports::{ + storage::Storage, + types::{FuelBlockCommittedOnL1, L1Height}, +}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; -use crate::{ - adapters::{ - ethereum_adapter::{EthHeight, EthereumAdapter, EthereumWs, FuelBlockCommittedOnEth}, - runner::Runner, - storage::{postgresql::Postgres, Storage}, - }, - errors::Result, - telemetry::RegistersMetrics, -}; +use super::Runner; -pub struct CommitListener { - ethereum_rpc: E, +pub struct CommitListener { + contract: C, storage: Db, metrics: Metrics, cancel_token: CancellationToken, } -impl CommitListener { - pub fn new(ethereum_rpc: E, storage: Db, cancel_token: CancellationToken) -> Self { +impl CommitListener { + pub fn new(contract: C, storage: Db, cancel_token: CancellationToken) -> Self { Self { - ethereum_rpc, + contract, storage, metrics: Metrics::default(), cancel_token, @@ -32,12 +31,12 @@ impl CommitListener { } } -impl CommitListener +impl CommitListener where - E: EthereumAdapter, + C: ports::l1::Contract, Db: Storage, { - async fn determine_starting_eth_block(&mut self) -> Result { + async fn determine_starting_l1_height(&mut self) -> crate::Result { Ok(self .storage .submission_w_latest_block() @@ -47,13 +46,13 @@ where async fn handle_block_committed( &self, - committed_on_eth: FuelBlockCommittedOnEth, - ) -> Result<()> { - info!("block comitted on eth {committed_on_eth:?}"); + committed_on_l1: FuelBlockCommittedOnL1, + ) -> crate::Result<()> { + info!("block comitted on l1 {committed_on_l1:?}"); let submission = self .storage - .set_submission_completed(committed_on_eth.fuel_block_hash) + .set_submission_completed(committed_on_l1.fuel_block_hash) .await?; self.metrics @@ -63,7 +62,7 @@ where Ok(()) } - fn log_if_error(result: Result<()>) { + fn log_if_error(result: crate::Result<()>) { if let Err(error) = result { error!("Received an error from block commit event stream: {error}"); } @@ -71,18 +70,19 @@ where } #[async_trait] -impl Runner for CommitListener +impl Runner for CommitListener where - E: EthereumAdapter, + C: ports::l1::Contract, Db: Storage, { - async fn run(&mut self) -> Result<()> { - let eth_block = self.determine_starting_eth_block().await?; + async fn run(&mut self) -> crate::Result<()> { + let height = self.determine_starting_l1_height().await?; - self.ethereum_rpc - .event_streamer(eth_block.into()) + self.contract + .event_streamer(height) .establish_stream() .await? + .map_err(Into::into) .and_then(|event| self.handle_block_committed(event)) .take_until(self.cancel_token.cancelled()) .for_each(|response| async { Self::log_if_error(response) }) @@ -98,7 +98,7 @@ struct Metrics { } impl RegistersMetrics for CommitListener { - fn metrics(&self) -> Vec> { + fn metrics(&self) -> Vec> { vec![Box::new(self.metrics.latest_committed_block.clone())] } } @@ -119,32 +119,26 @@ impl Default for Metrics { #[cfg(test)] mod tests { - - use ethers::types::U256; use futures::stream; + use metrics::{ + prometheus::{proto::Metric, Registry}, + RegistersMetrics, + }; use mockall::predicate; - use prometheus::{proto::Metric, Registry}; + use ports::{ + l1::{MockContract, MockEventStreamer}, + storage::Storage, + types::{BlockSubmission, FuelBlockCommittedOnL1, L1Height, U256}, + }; use rand::Rng; + use storage::{Postgres, PostgresProcess}; use tokio_util::sync::CancellationToken; - use crate::{ - adapters::{ - ethereum_adapter::{ - EthHeight, FuelBlockCommittedOnEth, MockEthereumAdapter, MockEventStreamer, - }, - runner::Runner, - storage::{ - postgresql::{Postgres, PostgresProcess}, - BlockSubmission, Storage, - }, - }, - errors::Result, - services::CommitListener, - telemetry::RegistersMetrics, - }; + use crate::{CommitListener, Runner}; #[tokio::test] async fn listener_will_update_storage_if_event_is_emitted() { + use ports::storage::Storage; // given let mut rng = rand::thread_rng(); let submission = BlockSubmission { @@ -153,14 +147,13 @@ mod tests { }; let block_hash = submission.block.hash; - let eth_rpc_mock = - given_eth_rpc_that_will_stream(vec![Ok(block_hash)], submission.submittal_height); + let contract = given_contract_with_events(vec![block_hash], submission.submittal_height); let process = PostgresProcess::shared().await.unwrap(); let db = db_with_submission(&process, submission).await; let mut commit_listener = - CommitListener::new(eth_rpc_mock, db.clone(), CancellationToken::default()); + CommitListener::new(contract, db.clone(), CancellationToken::default()); // when commit_listener.run().await.unwrap(); @@ -182,14 +175,12 @@ mod tests { let block_hash = submission.block.hash; let fuel_block_height = submission.block.height; - let eth_rpc_mock = - given_eth_rpc_that_will_stream(vec![Ok(block_hash)], submission.submittal_height); + let contract = given_contract_with_events(vec![block_hash], submission.submittal_height); let process = PostgresProcess::shared().await.unwrap(); let db = db_with_submission(&process, submission).await; - let mut commit_listener = - CommitListener::new(eth_rpc_mock, db, CancellationToken::default()); + let mut commit_listener = CommitListener::new(contract, db, CancellationToken::default()); let registry = Registry::new(); commit_listener.register_metrics(®istry); @@ -222,8 +213,8 @@ mod tests { let missing_hash = block_missing_from_db.block.hash; let incoming_hash = incoming_block.block.hash; - let eth_rpc_mock = given_eth_rpc_that_will_stream( - vec![Ok(missing_hash), Ok(incoming_hash)], + let contract = given_contract_with_events( + vec![missing_hash, incoming_hash], incoming_block.submittal_height, ); @@ -231,7 +222,7 @@ mod tests { let db = db_with_submission(&process, incoming_block.clone()).await; let mut commit_listener = - CommitListener::new(eth_rpc_mock, db.clone(), CancellationToken::default()); + CommitListener::new(contract, db.clone(), CancellationToken::default()); // when commit_listener.run().await.unwrap(); @@ -258,31 +249,30 @@ mod tests { db } - fn given_eth_rpc_that_will_stream( - events: Vec>, - starting_from_height: EthHeight, - ) -> MockEthereumAdapter { - let mut eth_rpc = MockEthereumAdapter::new(); + fn given_contract_with_events( + events: Vec<[u8; 32]>, + starting_from_height: L1Height, + ) -> MockContract { + let mut contract = MockContract::new(); let event_streamer = Box::new(given_event_streamer_w_events(events)); - eth_rpc + contract .expect_event_streamer() - .with(predicate::eq(u64::from(starting_from_height))) + .with(predicate::eq(starting_from_height)) .return_once(move |_| event_streamer); - eth_rpc + contract } - fn given_event_streamer_w_events(events: Vec>) -> MockEventStreamer { + fn given_event_streamer_w_events(events: Vec<[u8; 32]>) -> MockEventStreamer { let mut streamer = MockEventStreamer::new(); let events = events .into_iter() - .map(|e| { - e.map(|fuel_block_hash| FuelBlockCommittedOnEth { - fuel_block_hash, - commit_height: U256::default(), - }) + .map(|block_hash| FuelBlockCommittedOnL1 { + fuel_block_hash: block_hash, + commit_height: U256::default(), }) + .map(Ok) .collect::>(); streamer diff --git a/src/services/health_reporter.rs b/packages/services/src/health_reporter.rs similarity index 93% rename from src/services/health_reporter.rs rename to packages/services/src/health_reporter.rs index aac9d213..6aa8b330 100644 --- a/src/services/health_reporter.rs +++ b/packages/services/src/health_reporter.rs @@ -1,7 +1,6 @@ +use metrics::HealthChecker; use serde::Serialize; -use crate::telemetry::HealthChecker; - #[derive(Debug, Serialize)] pub struct HealthReport { fuel_connection_up: bool, @@ -20,6 +19,7 @@ pub struct HealthReporter { } impl HealthReporter { + #[must_use] pub fn new(fuel_health_check: HealthChecker, eth_health_check: HealthChecker) -> Self { Self { fuel_connection: fuel_health_check, @@ -27,6 +27,7 @@ impl HealthReporter { } } + #[must_use] pub fn report(&self) -> HealthReport { HealthReport { fuel_connection_up: self.fuel_connection.healthy(), diff --git a/packages/services/src/lib.rs b/packages/services/src/lib.rs new file mode 100644 index 00000000..6fc3ae01 --- /dev/null +++ b/packages/services/src/lib.rs @@ -0,0 +1,54 @@ +#![deny(unused_crate_dependencies)] +mod block_committer; +mod block_watcher; +mod commit_listener; +mod health_reporter; +mod status_reporter; +mod wallet_balance_tracker; + +pub use block_committer::BlockCommitter; +pub use block_watcher::BlockWatcher; +pub use commit_listener::CommitListener; +pub use health_reporter::HealthReporter; +pub use status_reporter::StatusReporter; +pub use wallet_balance_tracker::WalletBalanceTracker; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("{0}")] + Other(String), + #[error("Network Error: {0}")] + Network(String), + #[error("Storage error: {0}")] + Storage(String), +} + +impl From for Error { + fn from(value: ports::l1::Error) -> Self { + match value { + ports::l1::Error::Network(e) => Self::Network(e), + _ => Self::Other(value.to_string()), + } + } +} + +impl From for Error { + fn from(value: ports::fuel::Error) -> Self { + match value { + ports::fuel::Error::Network(e) => Self::Network(e), + } + } +} + +impl From for Error { + fn from(error: ports::storage::Error) -> Self { + Self::Storage(error.to_string()) + } +} + +pub type Result = std::result::Result; + +#[async_trait::async_trait] +pub trait Runner: Send + Sync { + async fn run(&mut self) -> Result<()>; +} diff --git a/src/services/status_reporter.rs b/packages/services/src/status_reporter.rs similarity index 88% rename from src/services/status_reporter.rs rename to packages/services/src/status_reporter.rs index 75db1418..b6eaf97d 100644 --- a/src/services/status_reporter.rs +++ b/packages/services/src/status_reporter.rs @@ -1,9 +1,7 @@ +use ports::storage::Storage; use serde::Serialize; -use crate::{ - adapters::storage::{postgresql::Postgres, Storage}, - errors::Result, -}; +use crate::Result; #[derive(Debug, Serialize, Default, PartialEq, Eq)] pub struct StatusReport { @@ -17,7 +15,7 @@ pub enum Status { Committing, } -pub struct StatusReporter { +pub struct StatusReporter { storage: Db, } @@ -37,7 +35,7 @@ where .await? .map(|submission| submission.completed); - let status = if let Some(false) = last_submission_completed { + let status = if last_submission_completed == Some(false) { Status::Committing } else { Status::Idle @@ -51,10 +49,11 @@ where mod tests { use std::sync::Arc; + use ports::types::BlockSubmission; use rand::Rng; + use storage::PostgresProcess; use super::*; - use crate::adapters::storage::{postgresql::PostgresProcess, BlockSubmission}; #[tokio::test] async fn status_depends_on_last_submission() { diff --git a/src/services/wallet_balance_tracker.rs b/packages/services/src/wallet_balance_tracker.rs similarity index 51% rename from src/services/wallet_balance_tracker.rs rename to packages/services/src/wallet_balance_tracker.rs index ea65068d..ce75e5e3 100644 --- a/src/services/wallet_balance_tracker.rs +++ b/packages/services/src/wallet_balance_tracker.rs @@ -1,37 +1,30 @@ -use std::str::FromStr; - -use ethers::{ - signers::{LocalWallet, Signer}, - types::{H160, U256}, +use metrics::{ + prometheus::{core::Collector, IntGauge, Opts}, + RegistersMetrics, }; -use prometheus::{IntGauge, Opts}; +use ports::types::U256; -use crate::{ - adapters::{ethereum_adapter::EthereumAdapter, runner::Runner}, - errors::Result, - telemetry::RegistersMetrics, -}; +use super::Runner; +use crate::Result; -pub struct WalletBalanceTracker { - eth_adapter: Box, +pub struct WalletBalanceTracker { + api: Api, metrics: Metrics, - address: H160, } -impl WalletBalanceTracker { - pub fn new(adapter: impl EthereumAdapter + 'static, ethereum_wallet_key: &str) -> Self { - let address = LocalWallet::from_str(ethereum_wallet_key) - .expect("Valid eth key") - .address(); +impl WalletBalanceTracker +where + Api: ports::l1::Api, +{ + pub fn new(api: Api) -> Self { Self { - eth_adapter: Box::new(adapter), + api, metrics: Metrics::default(), - address, } } pub async fn update_balance(&self) -> Result<()> { - let balance = self.eth_adapter.balance(self.address).await?; + let balance = self.api.balance().await?; let balance_gwei = balance / U256::from(1_000_000_000); self.metrics @@ -42,8 +35,8 @@ impl WalletBalanceTracker { } } -impl RegistersMetrics for WalletBalanceTracker { - fn metrics(&self) -> Vec> { +impl RegistersMetrics for WalletBalanceTracker { + fn metrics(&self) -> Vec> { self.metrics.metrics() } } @@ -54,7 +47,7 @@ struct Metrics { } impl RegistersMetrics for Metrics { - fn metrics(&self) -> Vec> { + fn metrics(&self) -> Vec> { vec![Box::new(self.eth_wallet_balance.clone())] } } @@ -72,7 +65,10 @@ impl Default for Metrics { } #[async_trait::async_trait] -impl Runner for WalletBalanceTracker { +impl Runner for WalletBalanceTracker +where + Api: Send + Sync + ports::l1::Api, +{ async fn run(&mut self) -> Result<()> { self.update_balance().await } @@ -80,23 +76,19 @@ impl Runner for WalletBalanceTracker { #[cfg(test)] mod tests { - use mockall::predicate::eq; - use prometheus::{proto::Metric, Registry}; + + use metrics::prometheus::{proto::Metric, Registry}; + use ports::l1; use super::*; - use crate::adapters::ethereum_adapter::MockEthereumAdapter; #[tokio::test] async fn updates_metrics() { // given - let eth_private_key = "0000000000000000000000000000000000000000000000000000000000000001"; - let eth_adapter = given_eth_adapter( - "500000000000000000000", - "7E5F4552091A69125d5DfCb7b8C2659029395Bdf", - ); + let eth_adapter = given_l1_api("500000000000000000000"); let registry = Registry::new(); - let sut = WalletBalanceTracker::new(eth_adapter, eth_private_key); + let sut = WalletBalanceTracker::new(eth_adapter); sut.register_metrics(®istry); // when @@ -114,15 +106,13 @@ mod tests { assert_eq!(eth_balance_metric.get_value(), 500_000_000_000_f64); } - fn given_eth_adapter(wei_balance: &str, expected_addr: &str) -> MockEthereumAdapter { - let addr = H160::from_str(expected_addr).unwrap(); + fn given_l1_api(wei_balance: &str) -> l1::MockApi { let balance = U256::from_dec_str(wei_balance).unwrap(); - let mut eth_adapter = MockEthereumAdapter::new(); + let mut eth_adapter = l1::MockApi::new(); eth_adapter .expect_balance() - .with(eq(addr)) - .return_once(move |_| Ok(balance)); + .return_once(move || Ok(balance)); eth_adapter } diff --git a/packages/storage/Cargo.toml b/packages/storage/Cargo.toml new file mode 100644 index 00000000..91a61f99 --- /dev/null +++ b/packages/storage/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "storage" +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +version = { workspace = true } +publish = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +hex = { workspace = true } +ports = { workspace = true, features = ["storage"] } +rand = { workspace = true, optional = true } +serde = { workspace = true } +sqlx = { workspace = true, features = [ + "postgres", + "runtime-tokio", + "migrate", + "macros", +] } +testcontainers = { workspace = true, optional = true } +thiserror = { workspace = true } +tokio = { workspace = true, optional = true } + +[dev-dependencies] +ports = { workspace = true, features = ["storage"] } +rand = { workspace = true } +storage = { workspace = true, features = ["test-helpers"] } +tokio = { workspace = true } + +[features] +test-helpers = [ + "dep:testcontainers", + "tokio/sync", + "dep:rand", + "ports/test-helpers", +] diff --git a/packages/storage/migrations/0001_initial.down.sql b/packages/storage/migrations/0001_initial.down.sql new file mode 100644 index 00000000..ffec93bf --- /dev/null +++ b/packages/storage/migrations/0001_initial.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS l1_fuel_block_submission; diff --git a/migrations/0001_initial.up.sql b/packages/storage/migrations/0001_initial.up.sql similarity index 84% rename from migrations/0001_initial.up.sql rename to packages/storage/migrations/0001_initial.up.sql index 6f021689..8a6876c2 100644 --- a/migrations/0001_initial.up.sql +++ b/packages/storage/migrations/0001_initial.up.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS eth_fuel_block_submission ( +CREATE TABLE IF NOT EXISTS l1_fuel_block_submission ( fuel_block_hash BYTEA PRIMARY KEY NOT NULL, fuel_block_height BIGINT NOT NULL UNIQUE CHECK (fuel_block_height >= 0), completed BOOLEAN NOT NULL, diff --git a/packages/storage/src/error.rs b/packages/storage/src/error.rs new file mode 100644 index 00000000..b7e12b0a --- /dev/null +++ b/packages/storage/src/error.rs @@ -0,0 +1,30 @@ +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database Error {0}")] + Database(String), + #[error("Could not convert to/from domain/db type {0}")] + Conversion(String), +} + +impl From for ports::storage::Error { + fn from(value: Error) -> Self { + match value { + Error::Database(e) => Self::Database(e), + Error::Conversion(e) => Self::Conversion(e), + } + } +} + +impl From for Error { + fn from(e: sqlx::Error) -> Self { + Self::Database(e.to_string()) + } +} + +impl From for Error { + fn from(e: sqlx::migrate::MigrateError) -> Self { + Self::Database(e.to_string()) + } +} diff --git a/packages/storage/src/lib.rs b/packages/storage/src/lib.rs new file mode 100644 index 00000000..880e04fc --- /dev/null +++ b/packages/storage/src/lib.rs @@ -0,0 +1,113 @@ +#![deny(unused_crate_dependencies)] +mod tables; +#[cfg(feature = "test-helpers")] +mod test_instance; +#[cfg(feature = "test-helpers")] +pub use test_instance::*; + +mod error; +mod postgres; +use ports::types::BlockSubmission; +pub use postgres::*; + +#[async_trait::async_trait] +impl ports::storage::Storage for postgres::Postgres { + async fn insert(&self, submission: BlockSubmission) -> ports::storage::Result<()> { + Ok(self._insert(submission).await?) + } + + async fn submission_w_latest_block(&self) -> ports::storage::Result> { + Ok(self._submission_w_latest_block().await?) + } + + async fn set_submission_completed( + &self, + fuel_block_hash: [u8; 32], + ) -> ports::storage::Result { + Ok(self._set_submission_completed(fuel_block_hash).await?) + } +} + +#[cfg(test)] +mod tests { + use ports::{ + storage::{Error, Storage}, + types::BlockSubmission, + }; + use rand::{thread_rng, Rng}; + use storage as _; + + use crate::PostgresProcess; + + fn random_non_zero_height() -> u32 { + let mut rng = thread_rng(); + rng.gen_range(1..u32::MAX) + } + + #[tokio::test] + async fn can_insert_and_find_latest_block() { + // given + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await.unwrap(); + let latest_height = random_non_zero_height(); + + let latest_submission = given_incomplete_submission(latest_height); + db.insert(latest_submission.clone()).await.unwrap(); + + let older_submission = given_incomplete_submission(latest_height - 1); + db.insert(older_submission).await.unwrap(); + + // when + let actual = db.submission_w_latest_block().await.unwrap().unwrap(); + + // then + assert_eq!(actual, latest_submission); + } + + #[tokio::test] + async fn can_update_completion_status() { + // given + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await.unwrap(); + + let height = random_non_zero_height(); + let submission = given_incomplete_submission(height); + let block_hash = submission.block.hash; + db.insert(submission).await.unwrap(); + + // when + let submission = db.set_submission_completed(block_hash).await.unwrap(); + + // then + assert!(submission.completed); + } + + #[tokio::test] + async fn updating_a_missing_submission_causes_an_error() { + // given + let process = PostgresProcess::shared().await.unwrap(); + let db = process.create_random_db().await.unwrap(); + + let height = random_non_zero_height(); + let submission = given_incomplete_submission(height); + let block_hash = submission.block.hash; + + // when + let result = db.set_submission_completed(block_hash).await; + + // then + let Err(Error::Database(msg)) = result else { + panic!("should be storage error"); + }; + + let block_hash = hex::encode(block_hash); + assert_eq!(msg, format!("Cannot set submission to completed! Submission of block: `{block_hash}` not found in DB.")); + } + + fn given_incomplete_submission(fuel_block_height: u32) -> BlockSubmission { + let mut submission = rand::thread_rng().gen::(); + submission.block.height = fuel_block_height; + + submission + } +} diff --git a/packages/storage/src/postgres.rs b/packages/storage/src/postgres.rs new file mode 100644 index 00000000..3bd2505a --- /dev/null +++ b/packages/storage/src/postgres.rs @@ -0,0 +1,108 @@ +use ports::types::BlockSubmission; +use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; + +use super::error::{Error, Result}; +use crate::tables; + +#[derive(Clone)] +pub struct Postgres { + connection_pool: sqlx::Pool, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct DbConfig { + /// The hostname or IP address of the `PostgreSQL` server. + pub host: String, + /// The port number on which the `PostgreSQL` server is listening. + pub port: u16, + /// The username used to authenticate with the `PostgreSQL` server. + pub username: String, + /// The password used to authenticate with the `PostgreSQL` server. + pub password: String, + /// The name of the database to connect to on the `PostgreSQL` server. + pub database: String, + /// The maximum number of connections allowed in the connection pool. + pub max_connections: u32, +} + +impl Postgres { + pub async fn connect(opt: &DbConfig) -> ports::storage::Result { + let options = PgConnectOptions::new() + .username(&opt.username) + .password(&opt.password) + .database(&opt.database) + .host(&opt.host) + .port(opt.port); + + let connection_pool = PgPoolOptions::new() + .max_connections(opt.max_connections) + .connect_with(options) + .await + .map_err(crate::error::Error::from)?; + + Ok(Self { connection_pool }) + } + + /// Close only when shutting down the application. Will close the connection pool even if it is + /// shared. + pub async fn close(self) { + self.connection_pool.close().await; + } + + pub async fn migrate(&self) -> ports::storage::Result<()> { + sqlx::migrate!() + .run(&self.connection_pool) + .await + .map_err(crate::error::Error::from)?; + Ok(()) + } + + #[cfg(feature = "test-helpers")] + pub(crate) async fn execute(&self, query: &str) -> Result<()> { + sqlx::query(query).execute(&self.connection_pool).await?; + Ok(()) + } + + pub(crate) async fn _insert(&self, submission: BlockSubmission) -> crate::error::Result<()> { + let row = tables::L1FuelBlockSubmission::from(submission); + sqlx::query!( + "INSERT INTO l1_fuel_block_submission (fuel_block_hash, fuel_block_height, completed, submittal_height) VALUES ($1, $2, $3, $4)", + row.fuel_block_hash, + row.fuel_block_height, + row.completed, + row.submittal_height + ).execute(&self.connection_pool).await?; + Ok(()) + } + + pub(crate) async fn _submission_w_latest_block( + &self, + ) -> crate::error::Result> { + sqlx::query_as!( + tables::L1FuelBlockSubmission, + "SELECT * FROM l1_fuel_block_submission ORDER BY fuel_block_height DESC LIMIT 1" + ) + .fetch_optional(&self.connection_pool) + .await? + .map(BlockSubmission::try_from) + .transpose() + } + + pub(crate) async fn _set_submission_completed( + &self, + fuel_block_hash: [u8; 32], + ) -> Result { + let updated_row = sqlx::query_as!( + tables::L1FuelBlockSubmission, + "UPDATE l1_fuel_block_submission SET completed = true WHERE fuel_block_hash = $1 RETURNING *", + fuel_block_hash.as_slice(), + ).fetch_optional(&self.connection_pool).await?; + + if let Some(row) = updated_row { + Ok(row.try_into()?) + } else { + let hash = hex::encode(fuel_block_hash); + Err(Error::Database(format!("Cannot set submission to completed! Submission of block: `{hash}` not found in DB."))) + } + } +} diff --git a/src/adapters/storage/postgresql/tables.rs b/packages/storage/src/tables.rs similarity index 77% rename from src/adapters/storage/postgresql/tables.rs rename to packages/storage/src/tables.rs index ac8dc2a1..db405d23 100644 --- a/src/adapters/storage/postgresql/tables.rs +++ b/packages/storage/src/tables.rs @@ -1,24 +1,21 @@ -use crate::adapters::{ - fuel_adapter::FuelBlock, - storage::{BlockSubmission, Error}, -}; +use ports::types::{BlockSubmission, FuelBlock}; #[derive(sqlx::FromRow)] -pub struct EthFuelBlockSubmission { +pub struct L1FuelBlockSubmission { pub fuel_block_hash: Vec, pub fuel_block_height: i64, pub completed: bool, pub submittal_height: i64, } -impl TryFrom for BlockSubmission { - type Error = Error; +impl TryFrom for BlockSubmission { + type Error = crate::error::Error; - fn try_from(value: EthFuelBlockSubmission) -> Result { + fn try_from(value: L1FuelBlockSubmission) -> Result { let block_hash = value.fuel_block_hash.as_slice(); macro_rules! bail { ($msg: literal, $($args: expr),*) => { - return Err(Error::Conversion(format!($msg, $($args),*))); + return Err(Self::Error::Conversion(format!($msg, $($args),*))); }; } let Ok(hash) = block_hash.try_into() else { @@ -45,7 +42,7 @@ impl TryFrom for BlockSubmission { } } -impl From for EthFuelBlockSubmission { +impl From for L1FuelBlockSubmission { fn from(value: BlockSubmission) -> Self { Self { fuel_block_hash: value.block.hash.to_vec(), diff --git a/src/adapters/storage/postgresql/test_instance.rs b/packages/storage/src/test_instance.rs similarity index 91% rename from src/adapters/storage/postgresql/test_instance.rs rename to packages/storage/src/test_instance.rs index 26b44f45..3be1cd7f 100644 --- a/src/adapters/storage/postgresql/test_instance.rs +++ b/packages/storage/src/test_instance.rs @@ -2,8 +2,7 @@ use std::sync::{Arc, Weak}; use testcontainers::{core::WaitFor, runners::AsyncRunner, Image, RunnableImage}; -use super::{DbConfig, Postgres}; -use crate::adapters::storage::Result; +use super::postgres::{DbConfig, Postgres}; struct PostgresImage; @@ -37,7 +36,7 @@ pub struct PostgresProcess { } impl PostgresProcess { - pub async fn shared() -> Result> { + pub async fn shared() -> ports::storage::Result> { // If at some point no tests are running, the shared instance will be dropped. If // requested again, it will be recreated. // This is a workaround for the lack of a global setup/teardown for tests. @@ -57,7 +56,7 @@ impl PostgresProcess { Ok(process) } - pub async fn start() -> Result { + pub async fn start() -> ports::storage::Result { let username = "username".to_string(); let password = "password".to_string(); let initial_db = "test".to_string(); @@ -70,14 +69,14 @@ impl PostgresProcess { .await; Ok(Self { - container, username, password, initial_db, + container, }) } - pub async fn create_random_db(&self) -> Result { + pub async fn create_random_db(&self) -> ports::storage::Result { let mut config = DbConfig { host: "localhost".to_string(), port: self.container.get_host_port_ipv4(5432).await, diff --git a/run_tests.sh b/run_tests.sh index f2986445..8d652b36 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,25 +1,28 @@ #!/usr/bin/env bash while true; do - case "$1" in - --logs) - show_logs="true" - shift 1;; - --| "") - break;; - *) - printf "Unknown option %s\n" "$1" - exit 1;; - esac + case "$1" in + --logs) + show_logs="true" + shift 1 + ;; + -- | "") + break + ;; + *) + printf "Unknown option %s\n" "$1" + exit 1 + ;; + esac done -cargo test --bin fuel-block-committer +cargo test --workspace --exclude e2e docker compose up -d trap 'docker compose down > /dev/null 2>&1' EXIT -cargo test --test e2e -- --nocapture +cargo test --package e2e -- --nocapture if [[ $show_logs = "true" ]]; then - docker compose logs -f block_committer & + docker compose logs -f block_committer & fi diff --git a/src/adapters.rs b/src/adapters.rs deleted file mode 100644 index 90983a67..00000000 --- a/src/adapters.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod ethereum_adapter; -pub mod fuel_adapter; -pub mod runner; -pub mod storage; diff --git a/src/adapters/ethereum_adapter.rs b/src/adapters/ethereum_adapter.rs deleted file mode 100644 index e39f946e..00000000 --- a/src/adapters/ethereum_adapter.rs +++ /dev/null @@ -1,112 +0,0 @@ -mod monitored_adapter; -mod websocket; - -use std::pin::Pin; - -use async_trait::async_trait; -use ethers::types::{H160, U256}; -use futures::Stream; -pub use monitored_adapter::MonitoredEthAdapter; -use rand::distributions::{Distribution, Standard}; -pub use websocket::EthereumWs; - -use crate::{ - adapters::fuel_adapter::FuelBlock, - errors::{Error, Result}, -}; - -#[derive(Clone, Copy)] -pub struct FuelBlockCommittedOnEth { - pub fuel_block_hash: [u8; 32], - pub commit_height: U256, -} - -impl std::fmt::Debug for FuelBlockCommittedOnEth { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let hash = self - .fuel_block_hash - .map(|byte| format!("{byte:02x?}")) - .join(""); - f.debug_struct("FuelBlockCommittedOnEth") - .field("hash", &hash) - .field("commit_height", &self.commit_height) - .finish() - } -} - -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait EventStreamer { - async fn establish_stream<'a>( - &'a self, - ) -> Result> + 'a + Send>>>; -} - -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait EthereumAdapter: Send + Sync { - async fn submit(&self, block: FuelBlock) -> Result<()>; - async fn get_block_number(&self) -> Result; - async fn balance(&self, address: H160) -> Result; - fn event_streamer(&self, eth_block_height: u64) -> Box; -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] -pub struct EthHeight { - height: i64, -} - -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> EthHeight { - let height: i64 = rng.gen_range(0..=i64::MAX); - height.try_into().expect("Must be valid EthHeight") - } -} - -impl TryFrom for EthHeight { - type Error = Error; - - fn try_from(height: i64) -> Result { - if height < 0 { - return Err(Error::Other(format!( - "Height({height}) must be non-negative" - ))); - } - Ok(EthHeight { height }) - } -} - -impl TryFrom for EthHeight { - type Error = Error; - fn try_from(height: u64) -> Result { - if height >= i64::MAX as u64 { - return Err(Error::Other(format!( - "Height({height}) too large. DB can handle at most {}", - i64::MAX - ))); - } - Ok(Self { - height: height as i64, - }) - } -} - -impl From for EthHeight { - fn from(height: u32) -> Self { - Self { - height: i64::from(height), - } - } -} - -impl From for i64 { - fn from(height: EthHeight) -> Self { - height.height - } -} - -impl From for u64 { - fn from(height: EthHeight) -> Self { - height.height as u64 - } -} diff --git a/src/adapters/ethereum_adapter/websocket.rs b/src/adapters/ethereum_adapter/websocket.rs deleted file mode 100644 index 7e83b0d2..00000000 --- a/src/adapters/ethereum_adapter/websocket.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod adapter; -mod event_streamer; - -pub use adapter::EthereumWs; diff --git a/src/adapters/ethereum_adapter/websocket/adapter.rs b/src/adapters/ethereum_adapter/websocket/adapter.rs deleted file mode 100644 index c8e02681..00000000 --- a/src/adapters/ethereum_adapter/websocket/adapter.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::{num::NonZeroU32, str::FromStr, sync::Arc}; - -use async_trait::async_trait; -use ethers::{ - prelude::{abigen, ContractError, SignerMiddleware}, - providers::{Middleware, Provider, Ws}, - signers::{LocalWallet, Signer}, - types::{Address, Chain, H160, U256, U64}, -}; -use serde_json::Value; -use tracing::info; -use url::Url; - -use crate::{ - adapters::{ - ethereum_adapter::{ - websocket::event_streamer::EthEventStreamer, EthHeight, EthereumAdapter, EventStreamer, - }, - fuel_adapter::FuelBlock, - }, - errors::{Error, Result}, -}; - -abigen!( - FUEL_STATE_CONTRACT, - r#"[ - function commit(bytes32 blockHash, uint256 commitHeight) external whenNotPaused - event CommitSubmitted(uint256 indexed commitHeight, bytes32 blockHash) - ]"#, -); - -#[derive(Clone)] -pub struct EthereumWs { - provider: Provider, - contract: FUEL_STATE_CONTRACT, LocalWallet>>, - commit_interval: NonZeroU32, -} - -impl EthereumWs { - pub async fn connect( - ethereum_rpc: &Url, - chain_id: Chain, - contract_address: Address, - ethereum_wallet_key: &str, - commit_interval: NonZeroU32, - ) -> Result { - let provider = Provider::::connect(ethereum_rpc.to_string()) - .await - .map_err(|e| Error::Network(e.to_string()))?; - - let wallet = LocalWallet::from_str(ethereum_wallet_key)?.with_chain_id(chain_id); - - let signer = SignerMiddleware::new(provider.clone(), wallet); - - let contract_address = Address::from_slice(contract_address.as_ref()); - let contract = FUEL_STATE_CONTRACT::new(contract_address, Arc::new(signer)); - - Ok(Self { - provider, - contract, - commit_interval, - }) - } - - fn calculate_commit_height(block_height: u32, commit_interval: NonZeroU32) -> U256 { - (block_height / commit_interval).into() - } -} - -#[async_trait] -impl EthereumAdapter for EthereumWs { - async fn submit(&self, block: FuelBlock) -> Result<()> { - let commit_height = Self::calculate_commit_height(block.height, self.commit_interval); - let contract_call = self.contract.commit(block.hash, commit_height); - let tx = contract_call - .send() - .await - .map_err(|contract_err| match contract_err { - ContractError::ProviderError { e } => Error::Network(e.to_string()), - ContractError::MiddlewareError { e } => Error::Network(e.to_string()), - _ => Error::Other(contract_err.to_string()), - })?; - - info!("tx: {} submitted", tx.tx_hash()); - - Ok(()) - } - - async fn get_block_number(&self) -> Result { - // if provider.get_block_number is used the outgoing JSON RPC request would have the - // 'params' field set as `params: null`. This is accepted by Anvil but rejected by hardhat. - // By passing a preconstructed serde_json Value::Array it will cause params to be defined - // as `params: []` which is acceptable by both Anvil and Hardhat. - self.provider - .request("eth_blockNumber", Value::Array(vec![])) - .await - .map_err(|err| Error::Network(err.to_string())) - .and_then(|height: U64| height.as_u64().try_into()) - } - - fn event_streamer(&self, eth_block_height: u64) -> Box { - let events = self - .contract - .event::() - .from_block(eth_block_height); - - Box::new(EthEventStreamer::new(events)) - } - - async fn balance(&self, address: H160) -> Result { - self.provider - .get_balance(address, None) - .await - .map_err(|err| Error::Network(err.to_string())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn calculates_correctly_the_commit_height() { - assert_eq!( - EthereumWs::calculate_commit_height(10, 3.try_into().unwrap()), - 3.into() - ); - } -} diff --git a/src/adapters/ethereum_adapter/websocket/event_streamer.rs b/src/adapters/ethereum_adapter/websocket/event_streamer.rs deleted file mode 100644 index dd259155..00000000 --- a/src/adapters/ethereum_adapter/websocket/event_streamer.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::{pin::Pin, sync::Arc}; - -use ethers::{ - prelude::{k256::ecdsa::SigningKey, Event, SignerMiddleware}, - providers::{Provider, Ws}, - signers::Wallet, -}; -use futures::{stream::TryStreamExt, Stream}; - -use crate::{ - adapters::ethereum_adapter::{ - websocket::adapter::CommitSubmittedFilter, EventStreamer, FuelBlockCommittedOnEth, - }, - errors::{Error, Result}, -}; - -type EthStreamInitializer = Event< - Arc, Wallet>>, - SignerMiddleware, Wallet>, - CommitSubmittedFilter, ->; - -pub struct EthEventStreamer { - events: EthStreamInitializer, -} - -#[async_trait::async_trait] -impl EventStreamer for EthEventStreamer { - async fn establish_stream( - &self, - ) -> Result> + '_ + Send>>> { - Ok(Box::pin( - self.events - .subscribe() - .await - .map_err(|e| Error::Network(e.to_string()))? - .map_ok(|event| { - let fuel_block_hash = event.block_hash; - let commit_height = event.commit_height; - FuelBlockCommittedOnEth { - fuel_block_hash, - commit_height, - } - }) - .map_err(|e| Error::Other(e.to_string())), - )) - } -} - -impl EthEventStreamer { - pub fn new(events: EthStreamInitializer) -> Self { - Self { events } - } -} diff --git a/src/adapters/fuel_adapter/fuel_metrics.rs b/src/adapters/fuel_adapter/fuel_metrics.rs deleted file mode 100644 index adc556ab..00000000 --- a/src/adapters/fuel_adapter/fuel_metrics.rs +++ /dev/null @@ -1,26 +0,0 @@ -use prometheus::{IntCounter, Opts}; - -use crate::telemetry::RegistersMetrics; - -pub struct FuelMetrics { - pub fuel_network_errors: IntCounter, -} - -impl RegistersMetrics for FuelMetrics { - fn metrics(&self) -> Vec> { - vec![Box::new(self.fuel_network_errors.clone())] - } -} - -impl Default for FuelMetrics { - fn default() -> Self { - let fuel_network_errors = IntCounter::with_opts(Opts::new( - "fuel_network_errors", - "Number of network errors encountered while polling for a new Fuel block.", - )) - .expect("fuel_network_errors metric to be correctly configured"); - Self { - fuel_network_errors, - } - } -} diff --git a/src/adapters/fuel_adapter/health_tracker.rs b/src/adapters/fuel_adapter/health_tracker.rs deleted file mode 100644 index d3cf3d0b..00000000 --- a/src/adapters/fuel_adapter/health_tracker.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::{ - borrow::BorrowMut, - sync::{Arc, Mutex}, -}; - -use crate::telemetry::{HealthCheck, HealthChecker}; - -#[derive(Debug, Clone)] -pub struct FuelHealthTracker { - // how many failures are needed before the connection is deemed unhealhty - max_consecutive_failures: usize, - // how many consecutive failures there currently are - consecutive_failures: Arc>, -} - -impl FuelHealthTracker { - pub fn new(max_consecutive_failures: usize) -> Self { - Self { - max_consecutive_failures, - consecutive_failures: Arc::new(Mutex::new(0)), - } - } - - pub fn note_failure(&self) { - **self.acquire_consecutive_failures().borrow_mut() += 1; - } - - pub fn note_success(&self) { - **self.acquire_consecutive_failures().borrow_mut() = 0; - } - - fn acquire_consecutive_failures(&self) -> std::sync::MutexGuard { - self.consecutive_failures - .lock() - .expect("no need to handle poisoning since lock duration is short and no panics occurr") - } - - pub fn tracker(&self) -> HealthChecker { - Box::new(self.clone()) - } -} - -impl HealthCheck for FuelHealthTracker { - fn healthy(&self) -> bool { - *self.acquire_consecutive_failures() < self.max_consecutive_failures - } -} diff --git a/src/adapters/runner.rs b/src/adapters/runner.rs deleted file mode 100644 index 7876de14..00000000 --- a/src/adapters/runner.rs +++ /dev/null @@ -1,6 +0,0 @@ -use crate::errors::Result; - -#[async_trait::async_trait] -pub trait Runner: Send + Sync { - async fn run(&mut self) -> Result<()>; -} diff --git a/src/adapters/storage.rs b/src/adapters/storage.rs deleted file mode 100644 index 2c75ce75..00000000 --- a/src/adapters/storage.rs +++ /dev/null @@ -1,59 +0,0 @@ -pub mod postgresql; - -use std::sync::Arc; - -use async_trait::async_trait; -use rand::distributions::{Distribution, Standard}; - -use crate::adapters::{ethereum_adapter::EthHeight, fuel_adapter::FuelBlock}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BlockSubmission { - pub block: FuelBlock, - pub completed: bool, - // Eth block height moments before submitting the fuel block. Used to filter stale events in - // the commit listener. - pub submittal_height: EthHeight, -} - -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockSubmission { - BlockSubmission { - block: rng.gen(), - completed: rng.gen(), - submittal_height: rng.gen(), - } - } -} - -#[async_trait] -#[impl_tools::autoimpl(for &T, &mut T, Arc, Box)] -pub trait Storage: Send + Sync { - async fn insert(&self, submission: BlockSubmission) -> Result<()>; - async fn submission_w_latest_block(&self) -> Result>; - async fn set_submission_completed(&self, fuel_block_hash: [u8; 32]) -> Result; -} - -pub type Result = std::result::Result; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Database Error {0}")] - Database(String), - #[error("Could not convert to/from domain/db type {0}")] - Conversion(String), - #[error("{0}")] - Other(String), -} - -impl From for Error { - fn from(e: sqlx::Error) -> Self { - Self::Database(e.to_string()) - } -} - -impl From for Error { - fn from(e: sqlx::migrate::MigrateError) -> Self { - Self::Database(e.to_string()) - } -} diff --git a/src/adapters/storage/postgresql.rs b/src/adapters/storage/postgresql.rs deleted file mode 100644 index fe0e2ed5..00000000 --- a/src/adapters/storage/postgresql.rs +++ /dev/null @@ -1,189 +0,0 @@ -use super::{BlockSubmission, Error, Storage}; -use crate::adapters::storage::Result; - -mod tables; -#[cfg(test)] -mod test_instance; - -use serde::Deserialize; -#[cfg(test)] -pub use test_instance::*; - -#[derive(Clone)] -pub struct Postgres { - connection_pool: sqlx::Pool, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct DbConfig { - /// The hostname or IP address of the PostgreSQL server. - pub host: String, - /// The port number on which the PostgreSQL server is listening. - pub port: u16, - /// The username used to authenticate with the PostgreSQL server. - pub username: String, - /// The password used to authenticate with the PostgreSQL server. - pub password: String, - /// The name of the database to connect to on the PostgreSQL server. - pub database: String, - /// The maximum number of connections allowed in the connection pool. - pub max_connections: u32, -} - -impl Postgres { - pub async fn connect(opt: &DbConfig) -> Result { - let options = PgConnectOptions::new() - .username(&opt.username) - .password(&opt.password) - .database(&opt.database) - .host(&opt.host) - .port(opt.port); - - let connection_pool = PgPoolOptions::new() - .max_connections(opt.max_connections) - .connect_with(options) - .await?; - - Ok(Self { connection_pool }) - } - - /// Close only when shutting down the application. Will close the connection pool even if it is - /// shared. - pub async fn close(self) { - self.connection_pool.close().await; - } - - pub async fn migrate(&self) -> Result<()> { - sqlx::migrate!().run(&self.connection_pool).await?; - Ok(()) - } - - #[cfg(test)] - pub async fn execute(&self, query: &str) -> Result<()> { - sqlx::query(query).execute(&self.connection_pool).await?; - Ok(()) - } -} - -use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; - -#[async_trait::async_trait] -impl Storage for Postgres { - async fn insert(&self, submission: BlockSubmission) -> Result<()> { - let row = tables::EthFuelBlockSubmission::from(submission); - sqlx::query!( - "INSERT INTO eth_fuel_block_submission (fuel_block_hash, fuel_block_height, completed, submittal_height) VALUES ($1, $2, $3, $4)", - row.fuel_block_hash, - row.fuel_block_height, - row.completed, - row.submittal_height - ).execute(&self.connection_pool).await?; - Ok(()) - } - - async fn submission_w_latest_block(&self) -> Result> { - sqlx::query_as!( - tables::EthFuelBlockSubmission, - "SELECT * FROM eth_fuel_block_submission ORDER BY fuel_block_height DESC LIMIT 1" - ) - .fetch_optional(&self.connection_pool) - .await? - .map(BlockSubmission::try_from) - .transpose() - } - - async fn set_submission_completed(&self, fuel_block_hash: [u8; 32]) -> Result { - let updated_row = sqlx::query_as!( - tables::EthFuelBlockSubmission, - "UPDATE eth_fuel_block_submission SET completed = true WHERE fuel_block_hash = $1 RETURNING *", - fuel_block_hash.as_slice(), - ).fetch_optional(&self.connection_pool).await?; - - if let Some(row) = updated_row { - Ok(row.try_into()?) - } else { - let hash = hex::encode(fuel_block_hash); - Err(Error::Database(format!("Cannot set submission to completed! Submission of block: `{hash}` not found in DB."))) - } - } -} - -#[cfg(test)] -mod tests { - use rand::{thread_rng, Rng}; - - use super::*; - use crate::adapters::storage::{BlockSubmission, Error}; - - fn random_non_zero_height() -> u32 { - let mut rng = thread_rng(); - rng.gen_range(1..u32::MAX) - } - - #[tokio::test] - async fn can_insert_and_find_latest_block() { - // given - let process = PostgresProcess::shared().await.unwrap(); - let db = process.create_random_db().await.unwrap(); - let latest_height = random_non_zero_height(); - - let latest_submission = given_incomplete_submission(latest_height); - db.insert(latest_submission.clone()).await.unwrap(); - - let older_submission = given_incomplete_submission(latest_height - 1); - db.insert(older_submission).await.unwrap(); - - // when - let actual = db.submission_w_latest_block().await.unwrap().unwrap(); - - // then - assert_eq!(actual, latest_submission); - } - - #[tokio::test] - async fn can_update_completion_status() { - // given - let process = PostgresProcess::shared().await.unwrap(); - let db = process.create_random_db().await.unwrap(); - - let height = random_non_zero_height(); - let submission = given_incomplete_submission(height); - let block_hash = submission.block.hash; - db.insert(submission).await.unwrap(); - - // when - let submission = db.set_submission_completed(block_hash).await.unwrap(); - - // then - assert!(submission.completed); - } - - #[tokio::test] - async fn updating_a_missing_submission_causes_an_error() { - // given - let process = PostgresProcess::shared().await.unwrap(); - let db = process.create_random_db().await.unwrap(); - - let height = random_non_zero_height(); - let submission = given_incomplete_submission(height); - let block_hash = submission.block.hash; - - // when - let result = db.set_submission_completed(block_hash).await; - - // then - let Err(Error::Database(msg)) = result else { - panic!("should be storage error"); - }; - - let block_hash = hex::encode(block_hash); - assert_eq!(msg, format!("Cannot set submission to completed! Submission of block: `{block_hash}` not found in DB.")); - } - - fn given_incomplete_submission(fuel_block_height: u32) -> BlockSubmission { - let mut submission = rand::thread_rng().gen::(); - submission.block.height = fuel_block_height; - - submission - } -} diff --git a/src/errors.rs b/src/errors.rs deleted file mode 100644 index 78c226eb..00000000 --- a/src/errors.rs +++ /dev/null @@ -1,60 +0,0 @@ -use actix_web::ResponseError; -use ethers::signers::WalletError; -use tokio::task::JoinError; -use url::ParseError; - -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("{0}")] - Other(String), - #[error("Network Error: {0}")] - Network(String), - #[error("Storage Error: {0}")] - Storage(String), -} - -impl From for Error { - fn from(value: serde_json::Error) -> Self { - Self::Storage(value.to_string()) - } -} - -impl From for Error { - fn from(error: WalletError) -> Self { - Self::Other(error.to_string()) - } -} - -impl From for Error { - fn from(error: ParseError) -> Self { - Self::Other(error.to_string()) - } -} - -impl From for Error { - fn from(error: std::io::Error) -> Self { - Self::Other(error.to_string()) - } -} - -impl From for Error { - fn from(error: JoinError) -> Self { - Self::Other(error.to_string()) - } -} - -impl From for Error { - fn from(error: crate::adapters::storage::Error) -> Self { - Self::Storage(error.to_string()) - } -} - -impl From for Error { - fn from(error: config::ConfigError) -> Self { - Self::Other(error.to_string()) - } -} - -impl ResponseError for Error {} - -pub type Result = std::result::Result; diff --git a/src/services.rs b/src/services.rs deleted file mode 100644 index dcc3f967..00000000 --- a/src/services.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod block_committer; -mod block_watcher; -mod commit_listener; -mod health_reporter; -mod status_reporter; -mod wallet_balance_tracker; - -pub use block_committer::BlockCommitter; -pub use block_watcher::BlockWatcher; -pub use commit_listener::CommitListener; -pub use health_reporter::HealthReporter; -pub use status_reporter::StatusReporter; -pub use wallet_balance_tracker::WalletBalanceTracker; diff --git a/src/setup.rs b/src/setup.rs deleted file mode 100644 index 1630fabc..00000000 --- a/src/setup.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod helpers; diff --git a/src/telemetry.rs b/src/telemetry.rs deleted file mode 100644 index ab771a7a..00000000 --- a/src/telemetry.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod health_check; -mod health_tracker; -mod metrics; - -pub use health_check::*; -pub use health_tracker::*; -pub use metrics::*; diff --git a/src/telemetry/health_check.rs b/src/telemetry/health_check.rs deleted file mode 100644 index fb11c625..00000000 --- a/src/telemetry/health_check.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub type HealthChecker = Box; -pub trait HealthCheck: Send + Sync { - fn healthy(&self) -> bool; -} diff --git a/src/telemetry/health_tracker.rs b/src/telemetry/health_tracker.rs deleted file mode 100644 index 16a760a6..00000000 --- a/src/telemetry/health_tracker.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::{ - borrow::BorrowMut, - sync::{Arc, Mutex}, -}; - -use crate::telemetry::{HealthCheck, HealthChecker}; - -#[derive(Debug, Clone)] -pub struct ConnectionHealthTracker { - // how many failures are needed before the connection is deemed unhealhty - max_consecutive_failures: usize, - // how many consecutive failures there currently are - consecutive_failures: Arc>, -} - -impl ConnectionHealthTracker { - pub fn new(max_consecutive_failures: usize) -> Self { - Self { - max_consecutive_failures, - consecutive_failures: Arc::new(Mutex::new(0)), - } - } - - pub fn note_failure(&self) { - **self.acquire_consecutive_failures().borrow_mut() += 1; - } - - pub fn note_success(&self) { - **self.acquire_consecutive_failures().borrow_mut() = 0; - } - - fn acquire_consecutive_failures(&self) -> std::sync::MutexGuard { - self.consecutive_failures - .lock() - .expect("no need to handle poisoning since lock duration is short and no panics occur") - } - - pub fn tracker(&self) -> HealthChecker { - Box::new(self.clone()) - } -} - -impl HealthCheck for ConnectionHealthTracker { - fn healthy(&self) -> bool { - *self.acquire_consecutive_failures() < self.max_consecutive_failures - } -} diff --git a/src/telemetry/metrics.rs b/src/telemetry/metrics.rs deleted file mode 100644 index 4567cd85..00000000 --- a/src/telemetry/metrics.rs +++ /dev/null @@ -1,13 +0,0 @@ -use prometheus::{core::Collector, Registry}; - -pub trait RegistersMetrics { - fn register_metrics(&self, registry: &Registry) { - self.metrics().into_iter().for_each(|metric| { - registry - .register(metric) - .expect("app to have correctly named metrics"); - }); - } - - fn metrics(&self) -> Vec>; -} diff --git a/tests/eth_test_adapter.rs b/tests/eth_test_adapter.rs deleted file mode 100644 index ca0143f9..00000000 --- a/tests/eth_test_adapter.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::{str::FromStr, sync::Arc}; - -use anyhow::Result; -use ethers::{ - prelude::{abigen, SignerMiddleware}, - providers::{Provider, Ws}, - signers::{LocalWallet, Signer}, - types::{Chain, H160}, -}; - -pub struct FuelStateContract { - contract: FUEL_STATE_CONTRACT, LocalWallet>>, -} - -abigen!( - FUEL_STATE_CONTRACT, - r#"[ - function finalized(bytes32 blockHash, uint256 blockHeight) external view whenNotPaused returns (bool) - function blockHashAtCommit(uint256 commitHeight) external view returns (bytes32) - ]"#, -); - -impl FuelStateContract { - pub async fn connect(eth_node_port: u16) -> Result { - let contract_address = "0xdAad669b06d79Cb48C8cfef789972436dBe6F24d"; - let provider = Provider::::connect(format!("ws://127.0.0.1:{eth_node_port}")).await?; - - let wallet = LocalWallet::from_str( - "0x9e56ccf010fa4073274b8177ccaad46fbaf286645310d03ac9bb6afa922a7c36", - )? - .with_chain_id(Chain::AnvilHardhat); - - let signer = SignerMiddleware::new(provider, wallet); - - let contract_address: H160 = contract_address.parse()?; - let contract = FUEL_STATE_CONTRACT::new(contract_address, Arc::new(signer)); - - Ok(Self { contract }) - } - - pub async fn finalized(&self, block_hash: [u8; 32], block_height: u32) -> Result { - Ok(self - .contract - .finalized(block_hash, block_height.into()) - .call() - .await?) - } - - pub async fn _block_hash_at_commit_height(&self, commit_height: u32) -> Result<[u8; 32]> { - Ok(self - .contract - .block_hash_at_commit(commit_height.into()) - .call() - .await?) - } -} diff --git a/tests/harness.rs b/tests/harness.rs deleted file mode 100644 index 4f88aa2f..00000000 --- a/tests/harness.rs +++ /dev/null @@ -1,33 +0,0 @@ -mod eth_test_adapter; - -use std::time::Duration; - -use anyhow::Result; -use fuel_core_client::client::FuelClient; - -use crate::eth_test_adapter::FuelStateContract; - -const FUEL_NODE_PORT: u16 = 4000; -const ETH_NODE_PORT: u16 = 8089; - -#[tokio::test(flavor = "multi_thread")] -async fn submitted_correct_block_and_was_finalized() -> Result<()> { - let fuel_node_address = format!("http://localhost:{FUEL_NODE_PORT}"); - let provider = FuelClient::new(&fuel_node_address).unwrap(); - - let fuel_contract = FuelStateContract::connect(ETH_NODE_PORT).await?; - - provider.produce_blocks(3, None).await?; - - // time enough to fwd the block to ethereum and for the TIME_TO_FINALIZE (1s) to elapse - tokio::time::sleep(Duration::from_secs(5)).await; - - let latest_block = provider.chain_info().await?.latest_block; - let height = latest_block.header.height; - let hash = *latest_block.id; - - assert_eq!(height, 3); - assert!(fuel_contract.finalized(hash, height).await?); - - Ok(()) -}