Skip to content

Commit

Permalink
validation spawner only recieves one validation binary
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee committed Jul 18, 2024
1 parent 16434a2 commit 52df895
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 46 deletions.
12 changes: 6 additions & 6 deletions staker/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (v *BlockValidator) sendRecord(s *validationStatus) error {

//nolint:gosec
func (v *BlockValidator) writeToFile(validationEntry *validationEntry, moduleRoot common.Hash) error {
input, err := validationEntry.ToInput()
input, err := validationEntry.ToInput("wavm")
if err != nil {
return err
}
Expand Down Expand Up @@ -807,18 +807,18 @@ validationsLoop:
return nil, nil
}
if currentStatus == Prepared {
input, err := validationStatus.Entry.ToInput()
if err != nil && ctx.Err() == nil {
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
replaced := validationStatus.replaceStatus(Prepared, SendingValidation)
if !replaced {
v.possiblyFatal(errors.New("failed to set SendingValidation status"))
}
validatorPendingValidationsGauge.Inc(1)
var runs []validator.ValidationRun
for _, moduleRoot := range wasmRoots {
input, err := validationStatus.Entry.ToInput(v.chosenValidator[moduleRoot].StylusArch())
if err != nil && ctx.Err() == nil {
v.possiblyFatal(fmt.Errorf("%w: error preparing validation", err))
continue
}
run := v.chosenValidator[moduleRoot].Launch(input, moduleRoot)
log.Trace("advanceValidations: launched", "pos", validationStatus.Entry.Pos, "moduleRoot", moduleRoot)
runs = append(runs, run)
Expand Down
2 changes: 1 addition & 1 deletion staker/challenge_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (m *ChallengeManager) createExecutionBackend(ctx context.Context, step uint
if err != nil {
return fmt.Errorf("error creating validation entry for challenge %v msg %v for execution challenge: %w", m.challengeIndex, initialCount, err)
}
input, err := entry.ToInput()
input, err := entry.ToInput("wavm")
if err != nil {
return fmt.Errorf("error getting validation entry input of challenge %v msg %v: %w", m.challengeIndex, initialCount, err)
}
Expand Down
32 changes: 24 additions & 8 deletions staker/stateless_block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"runtime"
"testing"

"github.com/offchainlabs/nitro/arbstate/daprovider"
Expand Down Expand Up @@ -134,21 +135,32 @@ type validationEntry struct {
DelayedMsg []byte
}

func (e *validationEntry) ToInput() (*validator.ValidationInput, error) {
func (e *validationEntry) ToInput(stylusArch string) (*validator.ValidationInput, error) {
if e.Stage != Ready {
return nil, errors.New("cannot create input from non-ready entry")
}
return &validator.ValidationInput{
res := validator.ValidationInput{
Id: uint64(e.Pos),
HasDelayedMsg: e.HasDelayedMsg,
DelayedMsgNr: e.DelayedMsgNr,
Preimages: e.Preimages,
UserWasms: e.UserWasms,
StylusArch: stylusArch,
UserWasms: make(map[common.Hash][]byte, len(e.UserWasms)),
BatchInfo: e.BatchInfo,
DelayedMsg: e.DelayedMsg,
StartState: e.Start,
DebugChain: e.ChainConfig.DebugMode(),
}, nil
}
for hash, info := range e.UserWasms {
if stylusArch == "wavm" {
res.UserWasms[hash] = info.Module
} else if stylusArch == runtime.GOARCH {
res.UserWasms[hash] = info.Asm
} else {
return nil, fmt.Errorf("stylusArch not supported by block validator: %v", stylusArch)
}
}
return &res, nil
}

func newValidationEntry(
Expand Down Expand Up @@ -373,21 +385,25 @@ func (v *StatelessBlockValidator) ValidateResult(
if err != nil {
return false, nil, err
}
input, err := entry.ToInput()
if err != nil {
return false, nil, err
}
var run validator.ValidationRun
if !useExec {
if v.redisValidator != nil {
if validator.SpawnerSupportsModule(v.redisValidator, moduleRoot) {
input, err := entry.ToInput(v.redisValidator.StylusArch())
if err != nil {
return false, nil, err
}
run = v.redisValidator.Launch(input, moduleRoot)
}
}
}
if run == nil {
for _, spawner := range v.execSpawners {
if validator.SpawnerSupportsModule(spawner, moduleRoot) {
input, err := entry.ToInput(spawner.StylusArch())
if err != nil {
return false, nil, err
}
run = spawner.Launch(input, moduleRoot)
break
}
Expand Down
4 changes: 4 additions & 0 deletions system_tests/validation_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *mockSpawner) WasmModuleRoots() ([]common.Hash, error) {
return mockWasmModuleRoots, nil
}

func (s *mockSpawner) StylusArch() string {
return "mock"
}

func (s *mockSpawner) Launch(entry *validator.ValidationInput, moduleRoot common.Hash) validator.ValidationRun {
run := &mockValRun{
Promise: containers.NewPromise[validator.GoGlobalState](nil),
Expand Down
8 changes: 8 additions & 0 deletions validator/client/redis/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ValidationClientConfig struct {
StreamPrefix string `koanf:"stream-prefix"`
Room int32 `koanf:"room"`
RedisURL string `koanf:"redis-url"`
StylusArch string `koanf:"stylus-arch"`
ProducerConfig pubsub.ProducerConfig `koanf:"producer-config"`
CreateStreams bool `koanf:"create-streams"`
}
Expand All @@ -35,6 +36,7 @@ var DefaultValidationClientConfig = ValidationClientConfig{
Name: "redis validation client",
Room: 2,
RedisURL: "",
StylusArch: "wavm",
ProducerConfig: pubsub.DefaultProducerConfig,
CreateStreams: true,
}
Expand All @@ -44,6 +46,7 @@ var TestValidationClientConfig = ValidationClientConfig{
Room: 2,
RedisURL: "",
StreamPrefix: "test-",
StylusArch: "wavm",
ProducerConfig: pubsub.TestProducerConfig,
CreateStreams: false,
}
Expand All @@ -53,6 +56,7 @@ func ValidationClientConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int32(prefix+".room", DefaultValidationClientConfig.Room, "validation client room")
f.String(prefix+".redis-url", DefaultValidationClientConfig.RedisURL, "redis url")
f.String(prefix+".stream-prefix", DefaultValidationClientConfig.StreamPrefix, "prefix for stream name")
f.String(prefix+".stylus-arch", DefaultValidationClientConfig.StylusArch, "arch for stylus workers")
pubsub.ProducerAddConfigAddOptions(prefix+".producer-config", f)
f.Bool(prefix+".create-streams", DefaultValidationClientConfig.CreateStreams, "create redis streams if it does not exist")
}
Expand Down Expand Up @@ -148,6 +152,10 @@ func (c *ValidationClient) Name() string {
return c.config.Name
}

func (c *ValidationClient) StylusArch() string {
return c.config.StylusArch
}

func (c *ValidationClient) Room() int {
return int(c.room.Load())
}
25 changes: 22 additions & 3 deletions validator/client/validation_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"runtime"
"sync/atomic"
"time"

Expand All @@ -29,13 +30,16 @@ type ValidationClient struct {
stopwaiter.StopWaiter
client *rpcclient.RpcClient
name string
stylusArch string
room atomic.Int32
wasmModuleRoots []common.Hash
}

func NewValidationClient(config rpcclient.ClientConfigFetcher, stack *node.Node) *ValidationClient {
return &ValidationClient{
client: rpcclient.NewRpcClient(config, stack),
client: rpcclient.NewRpcClient(config, stack),
name: "not started",
stylusArch: "not started",
}
}

Expand Down Expand Up @@ -64,15 +68,22 @@ func (c *ValidationClient) Start(ctx_in context.Context) error {
if len(name) == 0 {
return errors.New("couldn't read name from server")
}
var stylusArch string
if err := c.client.CallContext(ctx, &stylusArch, server_api.Namespace+"_stylusArch"); err != nil {
return err
}
if stylusArch != "wavm" && stylusArch != runtime.GOARCH && stylusArch != "mock" {
return fmt.Errorf("unsupported stylus architecture: %v", stylusArch)
}
var moduleRoots []common.Hash
if err := c.client.CallContext(c.GetContext(), &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil {
if err := c.client.CallContext(ctx, &moduleRoots, server_api.Namespace+"_wasmModuleRoots"); err != nil {
return err
}
if len(moduleRoots) == 0 {
return fmt.Errorf("server reported no wasmModuleRoots")
}
var room int
if err := c.client.CallContext(c.GetContext(), &room, server_api.Namespace+"_room"); err != nil {
if err := c.client.CallContext(ctx, &room, server_api.Namespace+"_room"); err != nil {
return err
}
if room < 2 {
Expand All @@ -84,6 +95,7 @@ func (c *ValidationClient) Start(ctx_in context.Context) error {
c.room.Store(int32(room))
c.wasmModuleRoots = moduleRoots
c.name = name
c.stylusArch = stylusArch
return nil
}

Expand All @@ -94,6 +106,13 @@ func (c *ValidationClient) WasmModuleRoots() ([]common.Hash, error) {
return nil, errors.New("not started")
}

func (c *ValidationClient) StylusArch() string {
if c.Started() {
return c.stylusArch
}
return "not started"
}

func (c *ValidationClient) Stop() {
c.StopWaiter.StopOnly()
if c.client != nil {
Expand Down
1 change: 1 addition & 0 deletions validator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ValidationSpawner interface {
Start(context.Context) error
Stop()
Name() string
StylusArch() string
Room() int
}

Expand Down
38 changes: 16 additions & 22 deletions validator/server_api/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/offchainlabs/nitro/arbcompress"
"github.com/offchainlabs/nitro/arbutil"

"github.com/offchainlabs/nitro/util/jsonapi"
Expand Down Expand Up @@ -62,7 +62,8 @@ type InputJSON struct {
BatchInfo []BatchInfoJson
DelayedMsgB64 string
StartState validator.GoGlobalState
UserWasms map[common.Hash]UserWasmJson
StylusArch string
UserWasms map[common.Hash]string
DebugChain bool
}

Expand All @@ -77,11 +78,6 @@ func (i *InputJSON) WriteToFile() error {
return nil
}

type UserWasmJson struct {
Module string
Asm string
}

type BatchInfoJson struct {
Number uint64
DataB64 string
Expand All @@ -99,19 +95,20 @@ func ValidationInputToJson(entry *validator.ValidationInput) *InputJSON {
DelayedMsgB64: base64.StdEncoding.EncodeToString(entry.DelayedMsg),
StartState: entry.StartState,
PreimagesB64: jsonPreimagesMap,
UserWasms: make(map[common.Hash]UserWasmJson),
UserWasms: make(map[common.Hash]string),
StylusArch: entry.StylusArch,
DebugChain: entry.DebugChain,
}
for _, binfo := range entry.BatchInfo {
encData := base64.StdEncoding.EncodeToString(binfo.Data)
res.BatchInfo = append(res.BatchInfo, BatchInfoJson{Number: binfo.Number, DataB64: encData})
}
for moduleHash, info := range entry.UserWasms {
encWasm := UserWasmJson{
Asm: base64.StdEncoding.EncodeToString(info.Asm),
Module: base64.StdEncoding.EncodeToString(info.Module),
for moduleHash, data := range entry.UserWasms {
compressed, err := arbcompress.CompressWell(data)
if err != nil {
entry.StylusArch = "compressError:" + err.Error()
}
res.UserWasms[moduleHash] = encWasm
res.UserWasms[moduleHash] = base64.StdEncoding.EncodeToString(compressed)
}
return res
}
Expand All @@ -127,7 +124,8 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro
DelayedMsgNr: entry.DelayedMsgNr,
StartState: entry.StartState,
Preimages: preimages,
UserWasms: make(state.UserWasms),
StylusArch: entry.StylusArch,
UserWasms: make(map[common.Hash][]byte),
DebugChain: entry.DebugChain,
}
delayed, err := base64.StdEncoding.DecodeString(entry.DelayedMsgB64)
Expand All @@ -146,20 +144,16 @@ func ValidationInputFromJson(entry *InputJSON) (*validator.ValidationInput, erro
}
valInput.BatchInfo = append(valInput.BatchInfo, decInfo)
}
for moduleHash, info := range entry.UserWasms {
asm, err := base64.StdEncoding.DecodeString(info.Asm)
for moduleHash, encoded := range entry.UserWasms {
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return nil, err
}
module, err := base64.StdEncoding.DecodeString(info.Module)
uncompressed, err := arbcompress.Decompress(decoded, 30000000)
if err != nil {
return nil, err
}
decInfo := state.ActivatedWasm{
Asm: asm,
Module: module,
}
valInput.UserWasms[moduleHash] = decInfo
valInput.UserWasms[moduleHash] = uncompressed
}
return valInput, nil
}
11 changes: 9 additions & 2 deletions validator/server_arb/validator_spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (s *ArbitratorSpawner) WasmModuleRoots() ([]common.Hash, error) {
return s.locator.ModuleRoots(), nil
}

func (s *ArbitratorSpawner) StylusArch() string {
return "wavm"
}

func (s *ArbitratorSpawner) Name() string {
return "arbitrator"
}
Expand Down Expand Up @@ -118,8 +122,11 @@ func (v *ArbitratorSpawner) loadEntryToMachine(ctx context.Context, entry *valid
return fmt.Errorf("error while trying to add sequencer msg for proving: %w", err)
}
}
for moduleHash, info := range entry.UserWasms {
err = mach.AddUserWasm(moduleHash, info.Module)
if entry.StylusArch != "wavm" {
return fmt.Errorf("bad stylus arch loaded to machine. Expected wavm. Got: %s", entry.StylusArch)
}
for moduleHash, module := range entry.UserWasms {
err = mach.AddUserWasm(moduleHash, module)
if err != nil {
log.Error(
"error adding user wasm for proving",
Expand Down
8 changes: 6 additions & 2 deletions validator/server_jit/jit_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"os"
"os/exec"
"runtime"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -212,15 +213,18 @@ func (machine *JitMachine) prove(
}

// send user wasms
if entry.StylusArch != runtime.GOARCH {
return state, fmt.Errorf("bad stylus arch for validation input. got: %v, expected: %v", entry.StylusArch, runtime.GOARCH)
}
userWasms := entry.UserWasms
if err := writeUint32(uint32(len(userWasms))); err != nil {
return state, err
}
for moduleHash, info := range userWasms {
for moduleHash, program := range userWasms {
if err := writeExact(moduleHash[:]); err != nil {
return state, err
}
if err := writeBytes(info.Asm); err != nil {
if err := writeBytes(program); err != nil {
return state, err
}
}
Expand Down
Loading

0 comments on commit 52df895

Please sign in to comment.