From 2fa23d89c4572e174645611941ca2dc935774dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ber=C4=8Di=C4=8D?= Date: Tue, 17 Oct 2023 17:31:10 +0200 Subject: [PATCH] tmp --- go/worker/storage/api/api.go | 1 + go/worker/storage/committee/node.go | 38 ++++++++++++++++++++++++----- go/worker/storage/worker.go | 16 ++++++++---- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/go/worker/storage/api/api.go b/go/worker/storage/api/api.go index a01791b9efc..f14519a94da 100644 --- a/go/worker/storage/api/api.go +++ b/go/worker/storage/api/api.go @@ -15,6 +15,7 @@ const ModuleName = "worker/storage" type StorageWorkerStatus string const ( + StatusDBLoading StorageWorkerStatus = "loading database" StatusInitializing StorageWorkerStatus = "initializing" StatusStarting StorageWorkerStatus = "starting" StatusStopping StorageWorkerStatus = "stopping" diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index f1de5131a5c..c7c18bfb3c8 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -157,7 +157,8 @@ func NewNode( roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, - localStorage storageApi.LocalBackend, + //localStorage storageApi.LocalBackend, + storageCtor func(context.Context) (storageApi.LocalBackend, error), checkpointerCfg *checkpoint.CheckpointerConfig, checkpointSyncCfg *CheckpointSyncConfig, ) (*Node, error) { @@ -173,13 +174,14 @@ func NewNode( workerCommonCfg: workerCommonCfg, - localStorage: localStorage, + //localStorage: localStorage, fetchPool: fetchPool, checkpointSyncCfg: checkpointSyncCfg, - status: api.StatusInitializing, + //status: api.StatusInitializing, + status: api.StatusDBLoading, blockCh: channels.NewInfiniteChannel(), diffCh: make(chan *fetchedDiff), @@ -200,6 +202,31 @@ func NewNode( n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + go func() { + localStorage, err := storageCtor(n.ctx) + if err != nil { + n.logger.Error("error creating storage worker backend", "err", err) + return + } + n.newBottomHalf(commonNode, rpcRoleProvider, localStorage, checkpointerCfg) + }() + + return n, nil +} + +func (n *Node) newBottomHalf( + commonNode *committee.Node, + rpcRoleProvider registration.RoleProvider, + localStorage storageApi.LocalBackend, + checkpointerCfg *checkpoint.CheckpointerConfig, +) { + func() { + n.statusLock.Lock() + defer n.statusLock.Unlock() + + n.status = api.StatusInitializing + }() + // Create a new checkpointer if enabled. if checkpointerCfg != nil { checkpointerCfg = &checkpoint.CheckpointerConfig{ @@ -245,7 +272,8 @@ func NewNode( *checkpointerCfg, ) if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) + n.logger.Error("failed to create checkpointer", "err", err) + return } } @@ -263,8 +291,6 @@ func NewNode( if rpcRoleProvider != nil { commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } - - return n, nil } // Service interface. diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index cdd13d0f9d8..3627aada0d0 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,6 +1,7 @@ package storage import ( + "context" "fmt" "github.com/oasisprotocol/oasis-core/go/common" @@ -9,6 +10,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/workerpool" "github.com/oasisprotocol/oasis-core/go/config" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" @@ -95,9 +97,13 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC } } - localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id) - if err != nil { - return fmt.Errorf("can't create local storage backend: %w", err) + storageCtor := func(ctx context.Context) (storageApi.LocalBackend, error) { + localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id) + if err != nil { + return nil, fmt.Errorf("can't create local storage backend: %w", err) + } + commonNode.Runtime.RegisterStorage(localStorage) + return localStorage, nil } node, err := committee.NewNode( @@ -106,7 +112,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC rp, rpRPC, w.commonWorker.GetConfig(), - localStorage, + storageCtor, checkpointerCfg, &committee.CheckpointSyncConfig{ Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled, @@ -116,7 +122,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC if err != nil { return err } - commonNode.Runtime.RegisterStorage(localStorage) + //commonNode.Runtime.RegisterStorage(localStorage) commonNode.AddHooks(node) w.runtimes[id] = node