Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Oct 17, 2023
1 parent 3a36557 commit 2fa23d8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
1 change: 1 addition & 0 deletions go/worker/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const ModuleName = "worker/storage"
type StorageWorkerStatus string

const (
StatusDBLoading StorageWorkerStatus = "loading database"
StatusInitializing StorageWorkerStatus = "initializing"
StatusStarting StorageWorkerStatus = "starting"
StatusStopping StorageWorkerStatus = "stopping"
Expand Down
38 changes: 32 additions & 6 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func NewNode(
roleProvider registration.RoleProvider,
rpcRoleProvider registration.RoleProvider,
workerCommonCfg workerCommon.Config,
localStorage storageApi.LocalBackend,
//localStorage storageApi.LocalBackend,
storageCtor func(context.Context) (storageApi.LocalBackend, error),
checkpointerCfg *checkpoint.CheckpointerConfig,
checkpointSyncCfg *CheckpointSyncConfig,
) (*Node, error) {
Expand All @@ -173,13 +174,14 @@ func NewNode(

workerCommonCfg: workerCommonCfg,

localStorage: localStorage,
//localStorage: localStorage,

fetchPool: fetchPool,

checkpointSyncCfg: checkpointSyncCfg,

status: api.StatusInitializing,
//status: api.StatusInitializing,
status: api.StatusDBLoading,

blockCh: channels.NewInfiniteChannel(),
diffCh: make(chan *fetchedDiff),
Expand All @@ -200,6 +202,31 @@ func NewNode(

n.ctx, n.ctxCancel = context.WithCancel(context.Background())

go func() {
localStorage, err := storageCtor(n.ctx)
if err != nil {
n.logger.Error("error creating storage worker backend", "err", err)
return
}
n.newBottomHalf(commonNode, rpcRoleProvider, localStorage, checkpointerCfg)
}()

return n, nil
}

func (n *Node) newBottomHalf(
commonNode *committee.Node,
rpcRoleProvider registration.RoleProvider,
localStorage storageApi.LocalBackend,
checkpointerCfg *checkpoint.CheckpointerConfig,
) {
func() {
n.statusLock.Lock()
defer n.statusLock.Unlock()

n.status = api.StatusInitializing
}()

// Create a new checkpointer if enabled.
if checkpointerCfg != nil {
checkpointerCfg = &checkpoint.CheckpointerConfig{
Expand Down Expand Up @@ -245,7 +272,8 @@ func NewNode(
*checkpointerCfg,
)
if err != nil {
return nil, fmt.Errorf("failed to create checkpointer: %w", err)
n.logger.Error("failed to create checkpointer", "err", err)
return
}
}

Expand All @@ -263,8 +291,6 @@ func NewNode(
if rpcRoleProvider != nil {
commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage))
}

return n, nil
}

// Service interface.
Expand Down
16 changes: 11 additions & 5 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"context"
"fmt"

"github.com/oasisprotocol/oasis-core/go/common"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/workerpool"
"github.com/oasisprotocol/oasis-core/go/config"
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common"
committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee"
Expand Down Expand Up @@ -95,9 +97,13 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
}
}

localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id)
if err != nil {
return fmt.Errorf("can't create local storage backend: %w", err)
storageCtor := func(ctx context.Context) (storageApi.LocalBackend, error) {
localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id)
if err != nil {
return nil, fmt.Errorf("can't create local storage backend: %w", err)
}
commonNode.Runtime.RegisterStorage(localStorage)
return localStorage, nil
}

node, err := committee.NewNode(
Expand All @@ -106,7 +112,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
rp,
rpRPC,
w.commonWorker.GetConfig(),
localStorage,
storageCtor,
checkpointerCfg,
&committee.CheckpointSyncConfig{
Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled,
Expand All @@ -116,7 +122,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC
if err != nil {
return err
}
commonNode.Runtime.RegisterStorage(localStorage)
//commonNode.Runtime.RegisterStorage(localStorage)
commonNode.AddHooks(node)
w.runtimes[id] = node

Expand Down

0 comments on commit 2fa23d8

Please sign in to comment.