Skip to content

Commit

Permalink
blockchain/indexers: Allow interrupts.
Browse files Browse the repository at this point in the history
This propagates the interrupt channel through to blockchain and the
indexers so that it is possible to interrupt long-running operations
such as catching up indexes.
  • Loading branch information
davecgh committed Sep 5, 2017
1 parent 30d4cae commit 8c883d1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 25 deletions.
17 changes: 14 additions & 3 deletions blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,8 +1421,11 @@ func (b *BlockChain) LocateHeaders(locator BlockLocator, hashStop *chainhash.Has
// purpose of supporting optional indexes.
type IndexManager interface {
// Init is invoked during chain initialize in order to allow the index
// manager to initialize itself and any indexes it is managing.
Init(*BlockChain) error
// manager to initialize itself and any indexes it is managing. The
// channel parameter specifies a channel the caller can close to signal
// that the process should be interrupted. It can be nil if that
// behavior is not desired.
Init(*BlockChain, <-chan struct{}) error

// ConnectBlock is invoked when a new block has been connected to the
// main chain.
Expand All @@ -1441,6 +1444,13 @@ type Config struct {
// This field is required.
DB database.DB

// Interrupt specifies a channel the caller can close to signal that
// long running operations, such as catching up indexes or performing
// database migrations, should be interrupted.
//
// This field can be nil if the caller does not desire the behavior.
Interrupt <-chan struct{}

// ChainParams identifies which chain parameters the chain is associated
// with.
//
Expand Down Expand Up @@ -1555,7 +1565,8 @@ func New(config *Config) (*BlockChain, error) {
// Initialize and catch up all of the currently active optional indexes
// as needed.
if config.IndexManager != nil {
if err := config.IndexManager.Init(&b); err != nil {
err := config.IndexManager.Init(&b, config.Interrupt)
if err != nil {
return nil, err
}
}
Expand Down
4 changes: 2 additions & 2 deletions blockchain/indexers/addrindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,6 @@ func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex {

// DropAddrIndex drops the address index from the provided database if it
// exists.
func DropAddrIndex(db database.DB) error {
return dropIndex(db, addrIndexKey, addrIndexName)
func DropAddrIndex(db database.DB, interrupt <-chan struct{}) error {
return dropIndex(db, addrIndexKey, addrIndexName, interrupt)
}
18 changes: 18 additions & 0 deletions blockchain/indexers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package indexers

import (
"encoding/binary"
"errors"

"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/database"
Expand All @@ -19,6 +20,10 @@ var (
// byteOrder is the preferred byte order used for serializing numeric
// fields for storage in the database.
byteOrder = binary.LittleEndian

// errInterruptRequested indicates that an operation was cancelled due
// to a user-requested interrupt.
errInterruptRequested = errors.New("interrupt requested")
)

// NeedsInputser provides a generic interface for an indexer to specify the it
Expand Down Expand Up @@ -88,3 +93,16 @@ type internalBucket interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
}

// interruptRequested returns true when the provided channel has been closed.
// This simplifies early shutdown slightly since the caller can just use an if
// statement instead of a select.
func interruptRequested(interrupted <-chan struct{}) bool {
select {
case <-interrupted:
return true
default:
}

return false
}
48 changes: 39 additions & 9 deletions blockchain/indexers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func indexDropKey(idxKey []byte) []byte {
// of being dropped and finishes dropping them when the are. This is necessary
// because dropping and index has to be done in several atomic steps rather than
// one big atomic step due to the massive number of entries.
func (m *Manager) maybeFinishDrops() error {
func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) error {
indexNeedsDrop := make([]bool, len(m.enabledIndexes))
err := m.db.View(func(dbTx database.Tx) error {
// None of the indexes needs to be dropped if the index tips
Expand All @@ -156,7 +156,7 @@ func (m *Manager) maybeFinishDrops() error {
return nil
}

// Make the indexer as requiring a drop if one is already in
// Mark the indexer as requiring a drop if one is already in
// progress.
for i, indexer := range m.enabledIndexes {
dropKey := indexDropKey(indexer.Key())
Expand All @@ -171,6 +171,10 @@ func (m *Manager) maybeFinishDrops() error {
return err
}

if interruptRequested(interrupt) {
return errInterruptRequested
}

// Finish dropping any of the enabled indexes that are already in the
// middle of being dropped.
for i, indexer := range m.enabledIndexes {
Expand All @@ -179,7 +183,7 @@ func (m *Manager) maybeFinishDrops() error {
}

log.Infof("Resuming %s drop", indexer.Name())
err := dropIndex(m.db, indexer.Key(), indexer.Name())
err := dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt)
if err != nil {
return err
}
Expand Down Expand Up @@ -225,14 +229,18 @@ func (m *Manager) maybeCreateIndexes(dbTx database.Tx) error {
// catch up due to the I/O contention.
//
// This is part of the blockchain.IndexManager interface.
func (m *Manager) Init(chain *blockchain.BlockChain) error {
func (m *Manager) Init(chain *blockchain.BlockChain, interrupt <-chan struct{}) error {
// Nothing to do when no indexes are enabled.
if len(m.enabledIndexes) == 0 {
return nil
}

if interruptRequested(interrupt) {
return errInterruptRequested
}

// Finish and drops that were previously interrupted.
if err := m.maybeFinishDrops(); err != nil {
if err := m.maybeFinishDrops(interrupt); err != nil {
return err
}

Expand Down Expand Up @@ -308,7 +316,8 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error {
var view *blockchain.UtxoViewpoint
if indexNeedsInputs(indexer) {
var err error
view, err = makeUtxoView(dbTx, block)
view, err = makeUtxoView(dbTx, block,
interrupt)
if err != nil {
return err
}
Expand All @@ -331,6 +340,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error {
if err != nil {
return err
}

if interruptRequested(interrupt) {
return errInterruptRequested
}
}

if initialHeight != height {
Expand Down Expand Up @@ -389,6 +402,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error {
return err
}

if interruptRequested(interrupt) {
return errInterruptRequested
}

// Connect the block for all indexes that need it.
var view *blockchain.UtxoViewpoint
for i, indexer := range m.enabledIndexes {
Expand All @@ -405,7 +422,8 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error {
// index.
if view == nil && indexNeedsInputs(indexer) {
var err error
view, err = makeUtxoView(dbTx, block)
view, err = makeUtxoView(dbTx, block,
interrupt)
if err != nil {
return err
}
Expand All @@ -421,6 +439,10 @@ func (m *Manager) Init(chain *blockchain.BlockChain) error {

// Log indexing progress.
progressLogger.LogBlockHeight(block)

if interruptRequested(interrupt) {
return errInterruptRequested
}
}

log.Infof("Indexes caught up to height %d", bestHeight)
Expand Down Expand Up @@ -470,7 +492,7 @@ func dbFetchTx(dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx, error) {
// transactions in the block. This is sometimes needed when catching indexes up
// because many of the txouts could actually already be spent however the
// associated scripts are still required to index them.
func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewpoint, error) {
func makeUtxoView(dbTx database.Tx, block *btcutil.Block, interrupt <-chan struct{}) (*blockchain.UtxoViewpoint, error) {
view := blockchain.NewUtxoViewpoint()
for txIdx, tx := range block.Transactions() {
// Coinbases do not reference any inputs. Since the block is
Expand All @@ -492,6 +514,10 @@ func makeUtxoView(dbTx database.Tx, block *btcutil.Block) (*blockchain.UtxoViewp

view.AddTxOuts(btcutil.NewTx(originTx), 0)
}

if interruptRequested(interrupt) {
return nil, errInterruptRequested
}
}

return view, nil
Expand Down Expand Up @@ -548,7 +574,7 @@ func NewManager(db database.DB, enabledIndexes []Indexer) *Manager {
// keep memory usage to reasonable levels. It also marks the drop in progress
// so the drop can be resumed if it is stopped before it is done before the
// index can be used again.
func dropIndex(db database.DB, idxKey []byte, idxName string) error {
func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
// Nothing to do if the index doesn't already exist.
var needsDelete bool
err := db.View(func(dbTx database.Tx) error {
Expand Down Expand Up @@ -610,6 +636,10 @@ func dropIndex(db database.DB, idxKey []byte, idxName string) error {
log.Infof("Deleted %d keys (%d total) from %s",
numDeleted, totalDeleted, idxName)
}

if interruptRequested(interrupt) {
return errInterruptRequested
}
}

// Call extra index specific deinitialization for the transaction index.
Expand Down
7 changes: 4 additions & 3 deletions blockchain/indexers/txindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,11 @@ func dropBlockIDIndex(db database.DB) error {
// DropTxIndex drops the transaction index from the provided database if it
// exists. Since the address index relies on it, the address index will also be
// dropped when it exists.
func DropTxIndex(db database.DB) error {
if err := dropIndex(db, addrIndexKey, addrIndexName); err != nil {
func DropTxIndex(db database.DB, interrupt <-chan struct{}) error {
err := dropIndex(db, addrIndexKey, addrIndexName, interrupt)
if err != nil {
return err
}

return dropIndex(db, txIndexKey, txIndexName)
return dropIndex(db, txIndexKey, txIndexName, interrupt)
}
15 changes: 8 additions & 7 deletions btcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func btcdMain(serverChan chan<- *server) error {
// Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
// another subsystem such as the RPC server.
interruptedChan := interruptListener()
interrupt := interruptListener()
defer btcdLog.Info("Shutdown complete")

// Show version at startup.
Expand Down Expand Up @@ -94,7 +94,7 @@ func btcdMain(serverChan chan<- *server) error {
}

// Return now if an interrupt signal was triggered.
if interruptRequested(interruptedChan) {
if interruptRequested(interrupt) {
return nil
}

Expand All @@ -111,7 +111,7 @@ func btcdMain(serverChan chan<- *server) error {
}()

// Return now if an interrupt signal was triggered.
if interruptRequested(interruptedChan) {
if interruptRequested(interrupt) {
return nil
}

Expand All @@ -120,15 +120,15 @@ func btcdMain(serverChan chan<- *server) error {
// NOTE: The order is important here because dropping the tx index also
// drops the address index since it relies on it.
if cfg.DropAddrIndex {
if err := indexers.DropAddrIndex(db); err != nil {
if err := indexers.DropAddrIndex(db, interrupt); err != nil {
btcdLog.Errorf("%v", err)
return err
}

return nil
}
if cfg.DropTxIndex {
if err := indexers.DropTxIndex(db); err != nil {
if err := indexers.DropTxIndex(db, interrupt); err != nil {
btcdLog.Errorf("%v", err)
return err
}
Expand All @@ -137,7 +137,8 @@ func btcdMain(serverChan chan<- *server) error {
}

// Create server and start it.
server, err := newServer(cfg.Listeners, db, activeNetParams.Params)
server, err := newServer(cfg.Listeners, db, activeNetParams.Params,
interrupt)
if err != nil {
// TODO: this logging could do with some beautifying.
btcdLog.Errorf("Unable to start server on %v: %v",
Expand All @@ -158,7 +159,7 @@ func btcdMain(serverChan chan<- *server) error {
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems such as the RPC
// server.
<-interruptedChan
<-interrupt
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2153,7 +2153,7 @@ func setupRPCListeners() ([]net.Listener, error) {
// newServer returns a new btcd server configured to listen on addr for the
// bitcoin network type specified by chainParams. Use start to begin accepting
// connections from peers.
func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Params) (*server, error) {
func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Params, interrupt <-chan struct{}) (*server, error) {
services := defaultServices
if cfg.NoPeerBloomFilters {
services &^= wire.SFNodeBloom
Expand Down Expand Up @@ -2237,6 +2237,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
var err error
s.chain, err = blockchain.New(&blockchain.Config{
DB: s.db,
Interrupt: interrupt,
ChainParams: s.chainParams,
Checkpoints: checkpoints,
TimeSource: s.timeSource,
Expand Down

0 comments on commit 8c883d1

Please sign in to comment.