Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stage_baches): rpc resequence stop stage on unwind #1297

Merged
merged 81 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 74 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
cc0b65e
fix(stage_baches): rpc resequence stop stage on unwind
V-Staykov Oct 8, 2024
9ab827a
fix: tests
V-Staykov Oct 9, 2024
65a2c4e
fix: datastream channel write blocking
V-Staykov Oct 9, 2024
9410e73
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 9, 2024
b7a68ab
fix: datastream blocking test
V-Staykov Oct 9, 2024
6e9abcd
fix: add wait on the datastream connect loop
V-Staykov Oct 9, 2024
564377d
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 9, 2024
4a70b0e
fix: merge problems
V-Staykov Oct 9, 2024
e52fb2a
fix: blockhash comparison in stage batches processor
V-Staykov Oct 9, 2024
eb6be5f
fix: download entries till reaching the amount in header
V-Staykov Oct 10, 2024
df99d8a
fix: add go sum package
V-Staykov Oct 10, 2024
d779f99
feat: internal reconnect on each method in datastream client
V-Staykov Oct 11, 2024
bcf8a1a
fix: do not disconnect on stage batches end
V-Staykov Oct 11, 2024
c2f7f11
feat: add ctx close in datastream reconnections
V-Staykov Oct 11, 2024
1a0812e
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 11, 2024
561a4b7
fix: send stop command after normal stop of reading
V-Staykov Oct 11, 2024
eb97b3f
feat: retry a fixed number of times in stage batches
V-Staykov Oct 11, 2024
23dba85
fix: return error on ctx done
V-Staykov Oct 11, 2024
9f5ecaf
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 15, 2024
475ae6a
fix: reverse daastream server version
V-Staykov Oct 15, 2024
8f5a30f
feat: print ds found block
V-Staykov Oct 16, 2024
6d46ed1
feat: added more logs in stage batches
V-Staykov Oct 16, 2024
f80cf52
fix: check for sync limit in stage batches
V-Staykov Oct 16, 2024
850a4f0
fix: sync limit in stage batches
V-Staykov Oct 16, 2024
bc337a0
refactor: make unwind test erros a bit more readable
V-Staykov Oct 16, 2024
f32e465
refactor: make unwind tests erorrs more readable
V-Staykov Oct 16, 2024
8a3aae4
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 17, 2024
a461a81
refactor(ds_client): wrap connection read and write to set timeout
V-Staykov Oct 17, 2024
ffbe524
fix: add timeout to test clients
V-Staykov Oct 17, 2024
a613c62
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 17, 2024
adf3a35
fix: stage batches limit
V-Staykov Oct 17, 2024
2823ffc
feat: up datastream server version
V-Staykov Oct 17, 2024
8084779
fix: up datastream server version
V-Staykov Oct 17, 2024
9e52000
fix: go sum
V-Staykov Oct 17, 2024
e6be038
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 21, 2024
62a2a8d
Merge branch 'zkevm' into fix-stage-batches-resequence
hexoscott Oct 22, 2024
2c7694e
fix: add error handling for set timeouts in datastream client
V-Staykov Oct 28, 2024
cf71f18
fix: handle zero checkTImeout value
V-Staykov Oct 28, 2024
3542164
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 28, 2024
c663c32
fix: remove flag setting for datastream timeout
V-Staykov Oct 28, 2024
cb73cfb
fix: ci config
V-Staykov Oct 29, 2024
ddbbf7c
fix: resequence test timeout
V-Staykov Oct 29, 2024
cd181b3
fix: remove timeout from pre-london ci config
V-Staykov Oct 29, 2024
d7376b6
refactor: error handling
V-Staykov Oct 29, 2024
06061d1
fix: stop stage on unwind
V-Staykov Oct 29, 2024
fe6734b
fix: missing id in client
V-Staykov Oct 29, 2024
6f14463
fix: tests
V-Staykov Oct 29, 2024
bab79a0
fix: tests
V-Staykov Oct 29, 2024
4e17fbf
fix: finish processing blocks on last entry reached
V-Staykov Oct 29, 2024
f97342e
feat: send stop command at start of new cycle to not get timedout by …
V-Staykov Oct 29, 2024
d0fcdc5
fix: remove accidental commit folder
V-Staykov Oct 29, 2024
eee60e6
fix: remove unneeded commit
V-Staykov Oct 29, 2024
3eab9a7
fix: tests
V-Staykov Oct 29, 2024
4dcd3df
fix: remove unnneeded return
V-Staykov Oct 29, 2024
e0d338c
fix: get correct parent block hash
V-Staykov Oct 29, 2024
14d65a6
fix: read correct blockhash
V-Staykov Oct 30, 2024
48c769a
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 30, 2024
d5f3b6e
fix: unwind on ds block unwind
V-Staykov Oct 30, 2024
768a99d
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 30, 2024
53232ac
refactor: error handling in datastream and stage batches
V-Staykov Oct 31, 2024
8736296
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 31, 2024
abadf1d
fix: remove unneeded sleep
V-Staykov Oct 31, 2024
fb6eb38
fix: add a small sleep interval in the entry loop
V-Staykov Oct 31, 2024
d90a0fd
fix: stop streaming on querying new stuff from ds client
V-Staykov Oct 31, 2024
cfd68a3
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Oct 31, 2024
c0ab546
fix: buffer clear before new reads
V-Staykov Oct 31, 2024
e1836dc
fix: sleep more in resequence test
V-Staykov Nov 4, 2024
3e0c451
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 5, 2024
7ceb1e1
fix: cast call
V-Staykov Nov 5, 2024
2a6fda0
fix: remove wrong flag on cast
V-Staykov Nov 5, 2024
f36b560
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 5, 2024
c1df594
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 5, 2024
1af05ee
fix: cast json flags in test
V-Staykov Nov 5, 2024
e3559f2
feat: added wait time for block to be available on sync node
V-Staykov Nov 5, 2024
190a63f
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 5, 2024
5a9d2f8
Merge branch 'fix-stage-batches-resequence' of https://github.com/0xP…
V-Staykov Nov 5, 2024
6f711ec
fix: resequence block check test
V-Staykov Nov 5, 2024
ddbc956
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 6, 2024
136bb86
Merge branch 'zkevm' into fix-stage-batches-resequence
V-Staykov Nov 6, 2024
1ff9c05
Fix 'client already started' error on finding common ancestor
cffls Nov 6, 2024
c40987f
Add timeout
cffls Nov 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/scripts/test_resequence.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ get_latest_l2_batch() {
}

get_latest_l1_verified_batch() {
current_batch=$(cast logs --rpc-url "$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" --address 0x1Fe038B54aeBf558638CA51C91bC8cCa06609e91 --from-block 0 -j | jq -r '.[] | select(.topics[0] == "0x9c72852172521097ba7e1482e6b44b351323df0155f97f4ea18fcec28e1f5966" or .topics[0] == "0xd1ec3a1216f08b6eff72e169ceb548b782db18a6614852618d86bb19f3f9b0d3") | .topics[1]' | tail -n 1 | sed 's/^0x//')
current_batch=$(cast logs --rpc-url "$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" --address 0x1Fe038B54aeBf558638CA51C91bC8cCa06609e91 --from-block 0 --json | jq -r '.[] | select(.topics[0] == "0x9c72852172521097ba7e1482e6b44b351323df0155f97f4ea18fcec28e1f5966" or .topics[0] == "0xd1ec3a1216f08b6eff72e169ceb548b782db18a6614852618d86bb19f3f9b0d3") | .topics[1]' | tail -n 1 | sed 's/^0x//')
current_batch_dec=$((16#$current_batch))
echo "$current_batch_dec"
}
Expand Down Expand Up @@ -46,7 +46,7 @@ wait_for_l1_batch() {
fi

if [ "$batch_type" = "virtual" ]; then
current_batch=$(cast logs --rpc-url "$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" --address 0x1Fe038B54aeBf558638CA51C91bC8cCa06609e91 --from-block 0 -j | jq -r '.[] | select(.topics[0] == "0x3e54d0825ed78523037d00a81759237eb436ce774bd546993ee67a1b67b6e766") | .topics[1]' | tail -n 1 | sed 's/^0x//')
current_batch=$(cast logs --rpc-url "$(kurtosis port print cdk-v1 el-1-geth-lighthouse rpc)" --address 0x1Fe038B54aeBf558638CA51C91bC8cCa06609e91 --from-block 0 -json | jq -r '.[] | select(.topics[0] == "0x3e54d0825ed78523037d00a81759237eb436ce774bd546993ee67a1b67b6e766") | .topics[1]' | tail -n 1 | sed 's/^0x//')
current_batch=$((16#$current_batch))
elif [ "$batch_type" = "verified" ]; then
current_batch=$(cast rpc zkevm_verifiedBatchNumber --rpc-url "$(kurtosis port print cdk-v1 cdk-erigon-node-001 rpc)" | sed 's/^"//;s/"$//')
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/ci_zkevm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ jobs:
sed -i '/zkevm.sequencer-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
/usr/local/bin/yq -i '.args.data_availability_mode = "${{ matrix.da-mode }}"' params.yml
/usr/local/bin/yq -i '.args.cdk_erigon_node_image = "cdk-erigon:local"' params.yml

- name: Deploy Kurtosis CDK package
working-directory: ./kurtosis-cdk
Expand Down Expand Up @@ -224,6 +230,8 @@ jobs:
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm\.pool-manager-url/d' ./templates/cdk-erigon/config.yml
sed -i '$a\zkevm.disable-virtual-counters: true' ./templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml


- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
sed -i '/zkevm.sequencer-non-empty-batch-seal-time:/d' templates/cdk-erigon/config.yml
sed -i '/sentry.drop-useless-peers:/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.pool-manager-url/d' templates/cdk-erigon/config.yml
sed -i '/zkevm.l2-datastreamer-timeout:/d' templates/cdk-erigon/config.yml
- name: Configure Kurtosis CDK
working-directory: ./kurtosis-cdk
run: |
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ var (
L2DataStreamerTimeout = cli.StringFlag{
Name: "zkevm.l2-datastreamer-timeout",
Usage: "The time to wait for data to arrive from the stream before reporting an error (0s doesn't check)",
Value: "0s",
Value: "3s",
}
L1SyncStartBlock = cli.Uint64Flag{
Name: "zkevm.l1-sync-start-block",
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace github.com/ledgerwatch/erigon-lib => ./erigon-lib

require (
gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7
github.com/99designs/gqlgen v0.17.40
github.com/Giulio2002/bls v0.0.0-20240315151443-652e18a3d188
github.com/Masterminds/sprig/v3 v3.2.3
Expand Down Expand Up @@ -62,7 +62,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/hashicorp/golang-lru/arc/v2 v2.0.6
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.2.4
github.com/holiman/uint256 v1.3.1
github.com/huandu/xstrings v1.4.0
github.com/huin/goupnp v1.2.0
github.com/iden3/go-iden3-crypto v0.0.15
Expand Down Expand Up @@ -110,11 +110,11 @@ require (
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
golang.org/x/sys v0.20.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.63.2
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/protobuf v1.33.0
google.golang.org/protobuf v1.34.2
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -174,6 +174,7 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c // indirect
github.com/go-delve/delve v1.21.2 // indirect
github.com/go-llsqlite/adapter v0.0.0-20230927005056-7f5ce7f0c916 // indirect
github.com/go-llsqlite/crawshaw v0.4.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ gfx.cafe/util/go/generic v0.0.0-20230721185457-c559e86c829c/go.mod h1:WvSX4JsCRB
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5 h1:p0epAhai44c34G+nzX0CZ67q3vkJtOXlO07lbhAEe9g=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.5/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6 h1:BSO1uu6dmLQ5kKb3uyDvsUxbnIoyumKvlwr0OtpTYMo=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.6/go.mod h1:RC6ouyNsUtJrv5aGPcM6Dm5xhXN209tRSzcsJsaOtZI=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7 h1:73sYxRQ9cOmtYBEyHePgEwrVULR+YruSQxVXCt/SmzU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.7/go.mod h1:7nM7Ihk+fTG1TQPwdZoGOYd3wprqqyIyjtS514uHzWE=
github.com/99designs/gqlgen v0.17.40 h1:/l8JcEVQ93wqIfmH9VS1jsAkwm6eAF1NwQn3N+SDqBY=
github.com/99designs/gqlgen v0.17.40/go.mod h1:b62q1USk82GYIVjC60h02YguAZLqYZtvWml8KkhJps4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -327,6 +331,8 @@ github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-delve/delve v1.21.2 h1:eaS+ziJo+660mi3D2q/VP8RxW5GcF4Y1zyKSi82alsU=
github.com/go-delve/delve v1.21.2/go.mod h1:FgTAiRUe43RS5EexL06RPyMtP8AMZVL/t9Qqgy3qUe4=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -490,6 +496,8 @@ github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSo
github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs=
github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
Expand Down Expand Up @@ -1333,6 +1341,8 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -1547,6 +1557,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
Expand Down
52 changes: 18 additions & 34 deletions zk/datastream/client/commands.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
package client

import "fmt"

const (
// Commands
CmdUnknown Command = 0
CmdStart Command = 1
CmdStop Command = 2
CmdHeader Command = 3
CmdStartBookmark Command = 4 // CmdStartBookmark for the start from bookmark TCP client command
CmdEntry Command = 5 // CmdEntry for the get entry TCP client command
CmdBookmark Command = 6 // CmdBookmark for the get bookmark TCP client command
CmdUnknown Command = iota
CmdStart
CmdStop
CmdHeader
CmdStartBookmark // CmdStartBookmark for the start from bookmark TCP client command
CmdEntry // CmdEntry for the get entry TCP client command
CmdBookmark // CmdBookmark for the get bookmark TCP client command
)

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendHeaderCmd() error {
err := c.sendCommand(CmdHeader)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.sendCommand(CmdHeader)
}

// sendBookmarkCmd sends either CmdStartBookmark or CmdBookmark for the provided bookmark value.
Expand All @@ -38,24 +31,23 @@ func (c *StreamClient) sendBookmarkCmd(bookmark []byte, streaming bool) error {
}

// Send bookmark length
if err := writeFullUint32ToConn(c.conn, uint32(len(bookmark))); err != nil {
if err := c.writeToConn(uint32(len(bookmark))); err != nil {
return err
}

// Send the bookmark to retrieve
return writeBytesToConn(c.conn, bookmark)
return c.writeToConn(bookmark)
}

// sendStartCmd sends a start command to the server, indicating
// that the client wishes to start streaming from the given entry number.
func (c *StreamClient) sendStartCmd(from uint64) error {
err := c.sendCommand(CmdStart)
if err != nil {
if err := c.sendCommand(CmdStart); err != nil {
return err
}

// Send starting/from entry number
return writeFullUint64ToConn(c.conn, from)
return c.writeToConn(from)
}

// sendEntryCmd sends the get data stream entry by number command to a TCP connection
Expand All @@ -66,29 +58,21 @@ func (c *StreamClient) sendEntryCmd(entryNum uint64) error {
}

// Send entry number
return writeFullUint64ToConn(c.conn, entryNum)
return c.writeToConn(entryNum)
}

// sendHeaderCmd sends the header command to the server.
func (c *StreamClient) sendStopCmd() error {
err := c.sendCommand(CmdStop)
if err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.sendCommand(CmdStop)
}

func (c *StreamClient) sendCommand(cmd Command) error {

// Send command
if err := writeFullUint64ToConn(c.conn, uint64(cmd)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
if err := c.writeToConn(uint64(cmd)); err != nil {
return err
}

// Send stream type
if err := writeFullUint64ToConn(c.conn, uint64(c.streamType)); err != nil {
return fmt.Errorf("%s %v", c.id, err)
}

return nil
return c.writeToConn(uint64(c.streamType))
}
Loading
Loading