Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Express lane timeboost auctioneer rpc forwarding and sequencer coordinator #2903

Merged
merged 7 commits into from
Jan 31, 2025
12 changes: 9 additions & 3 deletions execution/gethexec/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ func (f *TxForwarder) PublishTransaction(inctx context.Context, tx *types.Transa
} else {
err = arbitrum.SendConditionalTransactionRPC(ctx, rpcClient, tx, options)
}
if err != nil {
log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return errors.New("failed to publish transaction to any of the forwarding targets")
}
Expand All @@ -157,10 +159,12 @@ func (f *TxForwarder) PublishExpressLaneTransaction(inctx context.Context, msg *
defer cancelFunc()
for pos, rpcClient := range f.rpcClients {
err := sendExpressLaneTransactionRPC(ctx, rpcClient, msg)
if err != nil {
log.Warn("error forwarding express lane transaction to a backup target", "target", f.targets[pos], "err", err)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return errors.New("failed to publish transaction to any of the forwarding targets")
}
Expand All @@ -181,10 +185,12 @@ func (f *TxForwarder) PublishAuctionResolutionTransaction(inctx context.Context,
defer cancelFunc()
for pos, rpcClient := range f.rpcClients {
err := sendAuctionResolutionTransactionRPC(ctx, rpcClient, tx)
if err != nil {
log.Warn("error forwarding auction resolution transaction to a backup target", "target", f.targets[pos], "err", err)
}
if err == nil || !f.tryNewForwarderErrors.MatchString(err.Error()) {
return err
}
log.Warn("error forwarding transaction to a backup target", "target", f.targets[pos], "err", err)
}
return errors.New("failed to publish transaction to any of the forwarding targets")
}
Expand Down
2 changes: 1 addition & 1 deletion execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func CreateExecutionNode(
Version: "1.0",
Service: NewArbTimeboostAuctioneerAPI(txPublisher),
Public: false,
Authenticated: true, // Only exposed via JWT Auth to the auctioneer.
Authenticated: false,
})
apis = append(apis, rpc.API{
Namespace: "timeboost",
Expand Down
5 changes: 4 additions & 1 deletion execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ func (s *Sequencer) PublishAuctionResolutionTransaction(ctx context.Context, tx
return err
}
if forwarder != nil {
return fmt.Errorf("sequencer is currently not the chosen one, cannot accept auction resolution tx")
err := forwarder.PublishAuctionResolutionTransaction(ctx, tx)
if !errors.Is(err, ErrNoSequencer) {
return err
}
}

arrivalTime := time.Now()
Expand Down
31 changes: 8 additions & 23 deletions system_tests/timeboost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math/big"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -72,9 +71,8 @@ func testTxsHandlingDuringSequencerSwap(t *testing.T, dueToCrash bool) {
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")

auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, forwarder, cleanupForwarder := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, withForwardingSeq)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, forwarder, cleanupForwarder := setupExpressLaneAuction(t, tmpDir, ctx, withForwardingSeq)
seqB, seqClientB, seqInfo := builderSeq.L2.ConsensusNode, builderSeq.L2.Client, builderSeq.L2Info
seqA := forwarder.ConsensusNode
if !dueToCrash {
Expand Down Expand Up @@ -206,9 +204,8 @@ func TestForwardingExpressLaneTxs(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")

auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, forwarder, cleanupForwarder := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, withForwardingSeq)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, forwarder, cleanupForwarder := setupExpressLaneAuction(t, tmpDir, ctx, withForwardingSeq)
seqClient, seqInfo := builderSeq.L2.Client, builderSeq.L2Info
defer cleanupSeq()
defer cleanupForwarder()
Expand Down Expand Up @@ -252,9 +249,8 @@ func TestExpressLaneTransactionHandlingComplex(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")

auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, 0)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, 0)
seq, seqClient, seqInfo := builderSeq.L2.ConsensusNode, builderSeq.L2.Client, builderSeq.L2Info
defer cleanupSeq()

Expand Down Expand Up @@ -350,9 +346,8 @@ func TestExpressLaneTransactionHandling(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")

auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, 0)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, 0)
seq, seqClient, seqInfo := builderSeq.L2.ConsensusNode, builderSeq.L2.Client, builderSeq.L2Info
defer cleanupSeq()

Expand Down Expand Up @@ -955,9 +950,8 @@ func TestSequencerFeed_ExpressLaneAuction_ExpressLaneTxsHaveAdvantage(t *testing
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")

auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, 0)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, _, _ := setupExpressLaneAuction(t, tmpDir, ctx, 0)
seq, seqClient, seqInfo := builderSeq.L2.ConsensusNode, builderSeq.L2.Client, builderSeq.L2Info
defer cleanupSeq()

Expand Down Expand Up @@ -1002,8 +996,7 @@ func TestSequencerFeed_ExpressLaneAuction_InnerPayloadNoncesAreRespected_Timeboo
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpDir))
})
jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt")
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, feedListener, cleanupFeedListener := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath, withFeedListener)
auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, builderSeq, cleanupSeq, feedListener, cleanupFeedListener := setupExpressLaneAuction(t, tmpDir, ctx, withFeedListener)
seq, seqClient, seqInfo := builderSeq.L2.ConsensusNode, builderSeq.L2.Client, builderSeq.L2Info
defer cleanupSeq()
defer cleanupFeedListener()
Expand Down Expand Up @@ -1279,11 +1272,9 @@ func setupExpressLaneAuction(
t *testing.T,
dbDirPath string,
ctx context.Context,
jwtSecretPath string,
extraNodeTy extraNodeType,
) (common.Address, *timeboost.BidderClient, *timeboost.BidderClient, time.Duration, *NodeBuilder, func(), *TestClient, func()) {
seqPort := getRandomPort(t)
seqAuthPort := getRandomPort(t)
forwarderPort := getRandomPort(t)

nodeNames := []string{fmt.Sprintf("http://127.0.0.1:%d", seqPort), fmt.Sprintf("http://127.0.0.1:%d", forwarderPort)}
Expand All @@ -1293,10 +1284,7 @@ func setupExpressLaneAuction(
builderSeq := NewNodeBuilder(ctx).DefaultConfig(t, true)
builderSeq.l2StackConfig.HTTPHost = "localhost"
builderSeq.l2StackConfig.HTTPPort = seqPort
builderSeq.l2StackConfig.HTTPModules = []string{"eth", "arb", "debug", "timeboost"}
builderSeq.l2StackConfig.AuthPort = seqAuthPort
builderSeq.l2StackConfig.AuthModules = []string{"eth", "arb", "debug", "timeboost", "auctioneer"}
builderSeq.l2StackConfig.JWTSecret = jwtSecretPath
builderSeq.l2StackConfig.HTTPModules = []string{"eth", "arb", "debug", "timeboost", "auctioneer"}
builderSeq.nodeConfig.Feed.Output = *newBroadcasterConfigTest()
builderSeq.nodeConfig.Dangerous.NoSequencerCoordinator = false
builderSeq.nodeConfig.SeqCoordinator.Enable = true
Expand Down Expand Up @@ -1325,8 +1313,6 @@ func setupExpressLaneAuction(
forwarderNodeCfg.SeqCoordinator.MyUrl = nodeNames[1]
forwarderNodeCfg.SeqCoordinator.DeleteFinalizedMsgs = false
builderSeq.l2StackConfig.HTTPPort = forwarderPort
builderSeq.l2StackConfig.AuthPort = getRandomPort(t)
builderSeq.l2StackConfig.JWTSecret = jwtSecretPath
extraNode, cleanupExtraNode = builderSeq.Build2ndNode(t, &SecondNodeParams{nodeConfig: forwarderNodeCfg})
case withFeedListener:
tcpAddr, ok := seqNode.BroadcastServer.ListenerAddr().(*net.TCPAddr)
Expand Down Expand Up @@ -1545,11 +1531,10 @@ func setupExpressLaneAuction(
bidValidator.Start(ctx)

auctioneerCfg := &timeboost.AuctioneerServerConfig{
SequencerEndpoint: fmt.Sprintf("http://localhost:%d", seqAuthPort),
SequencerEndpoint: fmt.Sprintf("http://localhost:%d", seqPort),
AuctionContractAddress: proxyAddr.Hex(),
RedisURL: redisURL,
ConsumerConfig: pubsub.TestConsumerConfig,
SequencerJWTPath: jwtSecretPath,
DbDirectory: dbDirPath,
Wallet: genericconf.WalletConfig{
PrivateKey: fmt.Sprintf("00%x", seqInfo.Accounts["AuctionContract"].PrivateKey.D.Bytes()),
Expand Down
74 changes: 36 additions & 38 deletions timeboost/auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@ import (
"context"
"fmt"
"math/big"
"net/http"
"os"
"time"

"github.com/golang-jwt/jwt/v4"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"golang.org/x/crypto/sha3"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/cmd/util"
Expand Down Expand Up @@ -66,6 +61,8 @@ type AuctioneerServerConfig struct {
Wallet genericconf.WalletConfig `koanf:"wallet"`
SequencerEndpoint string `koanf:"sequencer-endpoint"`
SequencerJWTPath string `koanf:"sequencer-jwt-path"`
UseRedisCoordinator bool `koanf:"use-redis-coordinator"`
RedisCoordinatorURL string `koanf:"redis-coordinator-url"`
AuctionContractAddress string `koanf:"auction-contract-address"`
DbDirectory string `koanf:"db-directory"`
AuctionResolutionWaitTime time.Duration `koanf:"auction-resolution-wait-time"`
Expand All @@ -91,12 +88,14 @@ var TestAuctioneerServerConfig = AuctioneerServerConfig{

func AuctioneerServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultAuctioneerServerConfig.Enable, "enable auctioneer server")
f.String(prefix+".redis-url", DefaultAuctioneerServerConfig.RedisURL, "url of redis server")
f.String(prefix+".redis-url", DefaultAuctioneerServerConfig.RedisURL, "url of redis server to receive bids from bid validators")
pubsub.ConsumerConfigAddOptions(prefix+".consumer-config", f)
f.Duration(prefix+".stream-timeout", DefaultAuctioneerServerConfig.StreamTimeout, "Timeout on polling for existence of redis streams")
genericconf.WalletConfigAddOptions(prefix+".wallet", f, "wallet for auctioneer server")
f.String(prefix+".sequencer-endpoint", DefaultAuctioneerServerConfig.SequencerEndpoint, "sequencer RPC endpoint")
f.String(prefix+".sequencer-jwt-path", DefaultAuctioneerServerConfig.SequencerJWTPath, "sequencer jwt file path")
f.Bool(prefix+".use-redis-coordinator", DefaultAuctioneerServerConfig.UseRedisCoordinator, "use redis coordinator to find active sequencer")
f.String(prefix+".redis-coordinator-url", DefaultAuctioneerServerConfig.RedisCoordinatorURL, "redis coordinator url for finding active sequencer")
f.String(prefix+".auction-contract-address", DefaultAuctioneerServerConfig.AuctionContractAddress, "express lane auction contract address")
f.String(prefix+".db-directory", DefaultAuctioneerServerConfig.DbDirectory, "path to database directory for persisting validated bids in a sqlite file")
f.Duration(prefix+".auction-resolution-wait-time", DefaultAuctioneerServerConfig.AuctionResolutionWaitTime, "wait time after auction closing before resolving the auction")
Expand All @@ -110,8 +109,7 @@ type AuctioneerServer struct {
consumer *pubsub.Consumer[*JsonValidatedBid, error]
txOpts *bind.TransactOpts
chainId *big.Int
sequencerRpc *rpc.Client
client *ethclient.Client
endpointManager SequencerEndpointManager
auctionContract *express_lane_auctiongen.ExpressLaneAuction
auctionContractAddr common.Address
bidsReceiver chan *JsonValidatedBid
Expand All @@ -135,9 +133,6 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
if cfg.DbDirectory == "" {
return nil, errors.New("database directory is empty")
}
if cfg.SequencerJWTPath == "" {
return nil, errors.New("no sequencer jwt path specified")
}
database, err := NewDatabase(cfg.DbDirectory)
if err != nil {
return nil, err
Expand All @@ -158,33 +153,24 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
if err != nil {
return nil, fmt.Errorf("creating consumer for validation: %w", err)
}
sequencerJwtStr, err := os.ReadFile(cfg.SequencerJWTPath)
if err != nil {
return nil, err
}
sequencerJwt, err := hexutil.Decode(string(sequencerJwtStr))
if err != nil {
return nil, err
}
client, err := rpc.DialOptions(ctx, cfg.SequencerEndpoint, rpc.WithHTTPAuth(func(h http.Header) error {
claims := jwt.MapClaims{
// Required claim for Ethereum RPC API auth. "iat" stands for issued at
// and it must be a unix timestamp that is +/- 5 seconds from the current
// timestamp at the moment the server verifies this value.
"iat": time.Now().Unix(),
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString(sequencerJwt)

var endpointManager SequencerEndpointManager
if cfg.UseRedisCoordinator {
redisCoordinator, err := redisutil.NewRedisCoordinator(cfg.RedisCoordinatorURL)
if err != nil {
return errors.Wrap(err, "could not produce signed JWT token")
return nil, err
}
h.Set("Authorization", fmt.Sprintf("Bearer %s", tokenString))
return nil
}))
endpointManager = NewRedisEndpointManager(redisCoordinator, cfg.SequencerJWTPath)
} else {
endpointManager = NewStaticEndpointManager(cfg.SequencerEndpoint, cfg.SequencerJWTPath)
}

rpcClient, _, err := endpointManager.GetSequencerRPC(ctx)
if err != nil {
return nil, err
}
sequencerClient := ethclient.NewClient(client)
sequencerClient := ethclient.NewClient(rpcClient)

chainId, err := sequencerClient.ChainID(ctx)
if err != nil {
return nil, err
Expand All @@ -210,9 +196,8 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
}
return &AuctioneerServer{
txOpts: txOpts,
sequencerRpc: client,
endpointManager: endpointManager,
chainId: chainId,
client: sequencerClient,
database: database,
s3StorageService: s3StorageService,
consumer: c,
Expand Down Expand Up @@ -347,6 +332,19 @@ func (a *AuctioneerServer) resolveAuction(ctx context.Context) error {
var err error
opts := copyTxOpts(a.txOpts)
opts.NoSend = true

sequencerRpc, newRpc, err := a.endpointManager.GetSequencerRPC(ctx)
if err != nil {
return fmt.Errorf("failed to get sequencer RPC: %w", err)
}

if newRpc {
a.auctionContract, err = express_lane_auctiongen.NewExpressLaneAuction(a.auctionContractAddr, ethclient.NewClient(sequencerRpc))
if err != nil {
return fmt.Errorf("failed to recreate ExpressLaneAuction conctract bindings with new sequencer endpoint: %w", err)
}
}

switch {
case first != nil && second != nil: // Both bids are present
tx, err = a.auctionContract.ResolveMultiBidAuction(
Expand Down Expand Up @@ -391,13 +389,13 @@ func (a *AuctioneerServer) resolveAuction(ctx context.Context) error {
retryInterval := 1 * time.Second

if err := retryUntil(ctx, func() error {
if err := a.sequencerRpc.CallContext(ctx, nil, "auctioneer_submitAuctionResolutionTransaction", tx); err != nil {
log.Error("Error submitting auction resolution to privileged sequencer endpoint", "error", err)
if err := sequencerRpc.CallContext(ctx, nil, "auctioneer_submitAuctionResolutionTransaction", tx); err != nil {
log.Error("Error submitting auction resolution to sequencer endpoint", "error", err)
return err
}

// Wait for the transaction to be mined
receipt, err := bind.WaitMined(ctx, a.client, tx)
receipt, err := bind.WaitMined(ctx, ethclient.NewClient(sequencerRpc), tx)
if err != nil {
log.Error("Error waiting for transaction to be mined", "error", err)
return err
Expand Down
Loading
Loading