diff --git a/blockmanager.go b/blockmanager.go index 9c9e6f4c..4321a70f 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -5,9 +5,11 @@ package neutrino import ( "bytes" "container/list" + "errors" "fmt" "math" "math/big" + "sort" "sync" "sync/atomic" "time" @@ -48,6 +50,15 @@ const ( // zeroHash is the zero value hash (all zeros). It is defined as a convenience. var zeroHash chainhash.Hash +type headerQuery struct { + wire.Message + startHeight int32 + initialHeight int32 + startHash chainhash.Hash + endHeight int32 + initialHash chainhash.Hash +} + // newPeerMsg signifies a newly connected peer to the block handler. type newPeerMsg struct { peer *ServerPeer @@ -90,8 +101,8 @@ type blockManagerCfg struct { // the connected peers. TimeSource blockchain.MedianTimeSource - // QueryDispatcher is used to make queries to connected Bitcoin peers. - QueryDispatcher query.Dispatcher + // cfHeaderQueryDispatcher is used to make queries to connected Bitcoin peers. + cfHeaderQueryDispatcher query.Dispatcher // BanPeer bans and disconnects the given peer. BanPeer func(addr string, reason banman.Reason) error @@ -107,8 +118,10 @@ type blockManagerCfg struct { queryAllPeers func( queryMsg wire.Message, checkResponse func(sp *ServerPeer, resp wire.Message, - quit chan<- struct{}, peerQuit chan<- struct{}), + quit chan<- struct{}, peerQuit chan<- struct{}), options ...QueryOption) + peerByAddr func(addr string) *ServerPeer + blkHdrCheckptQueryDispatcher query.WorkManager } // blockManager provides a concurrency safe block manager for handling all @@ -173,6 +186,10 @@ type blockManager struct { // nolint:maligned // time, newHeadersMtx should always be acquired first. newFilterHeadersMtx sync.RWMutex + //writeBatchMtx is the mutex used to hold reading and reading and writing into the + //hdrTipToResponse map. + writeBatchMtx sync.RWMutex + // newFilterHeadersSignal is condition variable which will be used to // notify any waiting callers (via Broadcast()) that the tip of the // current filter header chain has changed. This is useful when callers @@ -206,6 +223,18 @@ type blockManager struct { // nolint:maligned minRetargetTimespan int64 // target timespan / adjustment factor maxRetargetTimespan int64 // target timespan * adjustment factor blocksPerRetarget int32 // target timespan / target time per block + + //hdrTipToResponse is a map that holds the response gotten from querying peers + //using the workmanager, to fetch headers within the chain's checkpointed region. + //It is a map of the request's startheight to the fetch response. + hdrTipToResponse map[int32]*headersMsg + + //hdrTipSlice is a slice that holds request startHeight of the responses that have been + //fetched using the workmanager to fetch headers within the chain's checkpointed region. + //It is used to easily access this startheight in the case we have to delete these responses + //in the hdrTipResponse map during a reorg while fetching headers within the chain's checkpointed + //region. + hdrTipSlice []int32 } // newBlockManager returns a new bitcoin block manager. Use Start to begin @@ -235,6 +264,8 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) { blocksPerRetarget: int32(targetTimespan / targetTimePerBlock), minRetargetTimespan: targetTimespan / adjustmentFactor, maxRetargetTimespan: targetTimespan * adjustmentFactor, + hdrTipToResponse: make(map[int32]*headersMsg), + hdrTipSlice: make([]int32, 0), } // Next we'll create the two signals that goroutines will use to wait @@ -290,21 +321,40 @@ func (b *blockManager) Start() { } log.Trace("Starting block manager") - b.wg.Add(2) + b.wg.Add(3) go b.blockHandler() + go func() { + wm, _ := b.cfg.blkHdrCheckptQueryDispatcher.(query.WorkManager) + + defer b.wg.Done() + defer func(wm query.WorkManager) { + err := wm.Stop() + if err != nil { + log.Errorf("Unable to stop block header workmanager: %v", err) + } + }(wm) + + b.processBlKHeaderInCheckPtRegionInOrder() + }() go func() { defer b.wg.Done() log.Debug("Waiting for peer connection...") - // Before starting the cfHandler we want to make sure we are - // connected with at least one peer. + // Before starting the cfHandler and fetching checkpointed headers using the workmanager, + //we want to make sure we are connected with at least one peer. select { case <-b.cfg.firstPeerSignal: case <-b.quit: return } + checkpoints := b.cfg.ChainParams.Checkpoints + numCheckpts := len(checkpoints) + if numCheckpts != 0 && b.nextCheckpoint != nil { + b.batchCheckpointedBlkHeaders() + } + log.Debug("Peer connected, starting cfHandler.") b.cfHandler() }() @@ -360,11 +410,9 @@ func (b *blockManager) NewPeer(sp *ServerPeer) { } } -// handleNewPeerMsg deals with new peers that have signalled they may be -// considered as a sync peer (they have already successfully negotiated). It -// also starts syncing if needed. It is invoked from the syncHandler -// goroutine. -func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) { +// addNewPeerToList adds the peer to the peers list. +func (b *blockManager) addNewPeerToList(peers *list.List, sp *ServerPeer) { + // Ignore if in the process of shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return @@ -373,13 +421,29 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) { log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) // Ignore the peer if it's not a sync candidate. - if !b.isSyncCandidate(sp) { + if !sp.IsSyncCandidate() { return } // Add the peer as a candidate to sync from. peers.PushBack(sp) +} + +// handleNewPeerMsg deals with new peers that have signalled they may be +// considered as a sync peer (they have already successfully negotiated). It +// also starts syncing if needed. It is invoked from the syncHandler +// goroutine. +func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *ServerPeer) { + // Ignore if in the process of shutting down. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + log.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) + + b.addNewPeerToList(peers, sp) + // If we're current with our sync peer and the new peer is advertising // a higher block than the newest one we know of, request headers from // the new peer. @@ -418,12 +482,9 @@ func (b *blockManager) DonePeer(sp *ServerPeer) { } } -// handleDonePeerMsg deals with peers that have signalled they are done. It -// removes the peer as a candidate for syncing and in the case where it was the -// current sync peer, attempts to select a new best peer to sync from. It is -// invoked from the syncHandler goroutine. -func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { - // Remove the peer from the list of candidate peers. +// removeDonePeerFromList removes the peer from the peers list. +func (b *blockManager) removeDonePeerFromList(peers *list.List, sp *ServerPeer) { + //Remove the peer from the list of candidate peers. for e := peers.Front(); e != nil; e = e.Next() { if e.Value == sp { peers.Remove(e) @@ -432,6 +493,17 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { } log.Infof("Lost peer %s", sp) +} + +// handleDonePeerMsg deals with peers that have signalled they are done. It +// removes the peer as a candidate for syncing and in the case where it was the +// current sync peer, attempts to select a new best peer to sync from. It is +// invoked from the syncHandler goroutine. +func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *ServerPeer) { + // Remove the peer from the list of candidate peers. + b.removeDonePeerFromList(peers, sp) + + log.Infof("Lost peer %s", sp) // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the header state. @@ -808,7 +880,7 @@ func (b *blockManager) getUncheckpointedCFHeaders( // handle a query for checkpointed filter headers. type checkpointedCFHeadersQuery struct { blockMgr *blockManager - msgs []wire.Message + msgs []*query.ReqMessage checkpoints []*chainhash.Hash stopHashes map[chainhash.Hash]uint32 headerChan chan *wire.MsgCFHeaders @@ -821,16 +893,53 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request { reqs[idx] = &query.Request{ Req: m, HandleResp: c.handleResponse, + SendQuery: sendQueryMessageWithEncoding, + CloneReq: cloneMsgCFHeaders, } } return reqs } +// cloneMsgCFHeaders clones query.ReqMessage that contains the MsgGetCFHeaders message. +func cloneMsgCFHeaders(req query.ReqMessage) *query.ReqMessage { + oldReq, ok := req.Message.(*wire.MsgGetCFHeaders) + if !ok { + log.Errorf("request not of type *wire.MsgCFHeaders") + } + newReq := &query.ReqMessage{ + Message: wire.NewMsgGetCFHeaders( + oldReq.FilterType, oldReq.StartHeight, &oldReq.StopHash, + ), + PriorityIndex: req.PriorityIndex, + } + return newReq +} + +// sendQueryMessageWithEncoding sends a message to the peer with encoding. +func sendQueryMessageWithEncoding(worker query.Worker, task query.Task) error { + peer, ok := worker.Peer().(*ServerPeer) + if !ok { + err := "peer is not of type ServerPeer" + log.Errorf(err) + return errors.New(err) + } + job := task.(*query.QueryJob) + req := job.Req.Message.(wire.Message) + peer.recentReqStartTime = time.Now() + peer.QueueMessageWithEncoding(req, nil, job.Encoding()) + + return nil +} + // handleResponse is the internal response handler used for requests for this // CFHeaders query. func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message, - peerAddr string) query.Progress { + peer query.Peer, _ *error) query.Progress { + peerAddr := "" + if peer != nil { + peerAddr = peer.Addr() + } r, ok := resp.(*wire.MsgCFHeaders) if !ok { // We are only looking for cfheaders messages. @@ -850,6 +959,10 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message, } } + if peer != nil { + peer.UpdateRequestDuration() + } + // The response doesn't match the query. if q.FilterType != r.FilterType || q.StopHash != r.StopHash { return query.Progress{ @@ -959,7 +1072,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // the remaining checkpoint intervals. numCheckpts := uint32(len(checkpoints)) - startingInterval numQueries := (numCheckpts + maxCFCheckptsPerQuery - 1) / maxCFCheckptsPerQuery - queryMsgs := make([]wire.Message, 0, numQueries) + queryMsgs := make([]*query.ReqMessage, 0, numQueries) // We'll also create an additional set of maps that we'll use to // re-order the responses as we get them in. @@ -1004,9 +1117,11 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // Once we have the stop hash, we can construct the query // message itself. - queryMsg := wire.NewMsgGetCFHeaders( - fType, startHeightRange, &stopHash, - ) + queryMsg := &query.ReqMessage{ + Message: wire.NewMsgGetCFHeaders( + fType, startHeightRange, &stopHash, + ), + } // We'll mark that the ith interval is queried by this message, // and also map the stop hash back to the index of this message. @@ -1043,8 +1158,8 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // Hand the queries to the work manager, and consume the verified // responses as they come back. - errChan := b.cfg.QueryDispatcher.Query( - q.requests(), query.Cancel(b.quit), query.NoRetryMax(), + errChan := b.cfg.cfHeaderQueryDispatcher.Query( + q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)), ) // Keep waiting for more headers as long as we haven't received an @@ -1909,6 +2024,21 @@ func (b *blockManager) getCheckpts(lastHash *chainhash.Hash, return checkpoints } +func (b *blockManager) resetHeaderListToChainTip() error { + log.Debugf("Resetting headder list to chain tip %v ", b.headerTip) + header, height, err := b.cfg.BlockHeaders.ChainTip() + if err != nil { + return err + } + b.headerList.ResetHeaderState(headerlist.Node{ + Header: *header, + Height: int32(height), + }) + log.Debugf("Resetting headder list to chain tip %v ", height) + + return nil +} + // checkCFCheckptSanity checks whether all peers which have responded agree. // If so, it returns -1; otherwise, it returns the earliest index at which at // least one of the peers differs. The checkpoints are also checked against the @@ -1982,7 +2112,39 @@ func (b *blockManager) blockHandler() { defer b.wg.Done() candidatePeers := list.New() -out: + checkpoints := b.cfg.ChainParams.Checkpoints + if len(checkpoints) == 0 || b.nextCheckpoint == nil { + goto unCheckPtLoop + } + + //Loop to fetch headers within the check pointed range + b.newHeadersMtx.RLock() + for b.headerTip < uint32(checkpoints[len(checkpoints)-1].Height) { + b.newHeadersMtx.RUnlock() + select { + case m := <-b.peerChan: + switch msg := m.(type) { + case *newPeerMsg: + b.addNewPeerToList(candidatePeers, msg.peer) + case *donePeerMsg: + b.removeDonePeerFromList(candidatePeers, msg.peer) + default: + log.Tracef("Invalid message type in block "+ + "handler: %T", msg) + } + + case <-b.quit: + return + + } + b.newHeadersMtx.RLock() + } + b.newHeadersMtx.RUnlock() + + log.Infof("Fetching uncheckpointed block headers from %v", b.headerTip) + b.startSync(candidatePeers) + +unCheckPtLoop: for { // Now check peer messages and quit channels. select { @@ -2006,13 +2168,414 @@ out: } case <-b.quit: - break out + break unCheckPtLoop } } log.Trace("Block handler done") } +func (b *blockManager) batchCheckpointedBlkHeaders() { + + var queryMsgs []*query.ReqMessage + curHeight := b.headerTip + curHash := b.headerTipHash + nextCheckpoint := b.nextCheckpoint + nextCheckptHash := nextCheckpoint.Hash + nextCheckptHeight := nextCheckpoint.Height + + log.Infof("Fetching set of checkpointed blockheaders from "+ + "height=%v, hash=%v\n", curHeight, curHash) + + for nextCheckpoint != nil { + + endHash := nextCheckptHash + endHeight := nextCheckptHeight + tmpCurHash := curHash + + msg := &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{&tmpCurHash}), + HashStop: *endHash, + }, + startHeight: int32(curHeight), + initialHeight: int32(curHeight), + startHash: curHash, + endHeight: endHeight, + initialHash: tmpCurHash, + } + + queryMessage := &query.ReqMessage{ + Message: msg, + } + + log.Debugf("Fetching set of checkpointed blockheaders from "+ + "start_height=%v to end-height=%v", curHeight, endHash) + + queryMsgs = append(queryMsgs, queryMessage) + curHeight = uint32(endHeight) + curHash = *endHash + + nextCheckpoint := b.findNextHeaderCheckpoint(int32(curHeight)) + if nextCheckpoint == nil { + break + } + + nextCheckptHeight = nextCheckpoint.Height + nextCheckptHash = nextCheckpoint.Hash + + } + + msg := &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{nextCheckptHash}), + HashStop: zeroHash, + }, + startHeight: nextCheckptHeight, + initialHeight: nextCheckptHeight, + startHash: *nextCheckptHash, + endHeight: nextCheckptHeight + wire.MaxBlockHeadersPerMsg, + initialHash: *nextCheckptHash, + } + + queryMsg := &query.ReqMessage{ + Message: msg, + } + + log.Debugf("Fetching set of checkpointed blockheaders from "+ + "start_height=%v to end-height=%v", curHeight, zeroHash) + + queryMsgs = append(queryMsgs, queryMsg) + + log.Debugf("Attempting to query for %v blockheader batches", len(queryMsgs)) + + q := CheckpointedBlockHeadersQuery{ + blockMgr: b, + msgs: queryMsgs, + } + + b.cfg.blkHdrCheckptQueryDispatcher.Query( + q.requests(), query.Cancel(b.quit), query.Timeout(1*time.Hour), query.NoRetryMax(), + query.KeepBatch(), + ) + +} + +// processBlKHeaderInCheckPtRegionInOrder handles and writes the block headers received from querying the +// workmanager while fetching headers within the block header checkpoint region. This process is carried out +// in order. +func (b *blockManager) processBlKHeaderInCheckPtRegionInOrder() { + lenCheckPts := len(b.cfg.ChainParams.Checkpoints) + + //Loop should run as long as we are in the block header checkpointed region. + b.newHeadersMtx.RLock() + for int32(b.headerTip) <= b.cfg.ChainParams.Checkpoints[lenCheckPts-1].Height { + hdrTip := b.headerTip + b.newHeadersMtx.RUnlock() + + select { + //return quickly if the blockmanager quits. + case <-b.quit: + return + default: + } + + //do not go further if we have not received the response mapped to our header tip. + b.writeBatchMtx.RLock() + msg, ok := b.hdrTipToResponse[int32(hdrTip)] + b.writeBatchMtx.RUnlock() + + if !ok { + b.newHeadersMtx.RLock() + continue + } + + b.syncPeerMutex.Lock() + b.syncPeer = msg.peer + b.syncPeerMutex.Unlock() + + b.handleHeadersMsg(msg) + err := b.resetHeaderListToChainTip() + if err != nil { + log.Errorf(err.Error()) + } + + finalNode := b.headerList.Back() + newHdrTip := finalNode.Height + newHdrTipHash := finalNode.Header.BlockHash() + prevCheckPt := b.findPreviousHeaderCheckpoint(newHdrTip) + + log.Tracef("New headertip %v", newHdrTip) + + //If our header tip has not increased, there is a problem with the headers we received and so we + //delete all the header response within our previous header tip and our new header tip, then send + //another query to the workmanager. + if uint32(newHdrTip) <= hdrTip { + + b.deleteAllHeaderTipRespAfterTip(newHdrTip, int32(hdrTip)) + + log.Tracef("while fetching checkpointed headers received invalid headers") + + message := &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{&newHdrTipHash}, + HashStop: *b.nextCheckpoint.Hash, + }, + startHeight: newHdrTip, + initialHeight: prevCheckPt.Height, + startHash: newHdrTipHash, + endHeight: b.nextCheckpoint.Height, + initialHash: newHdrTipHash, + } + q := CheckpointedBlockHeadersQuery{ + blockMgr: b, + msgs: []*query.ReqMessage{ + { + Message: message, + PriorityIndex: 0.5, + }, + }, + } + + b.cfg.blkHdrCheckptQueryDispatcher.Query( + q.requests(), query.Cancel(b.quit), query.Timeout(1*time.Hour), query.NoRetryMax(), + query.KeepBatch(), + ) + + log.Tracef("Sending query to workmanager from processBlKHeaderInCheckPtRegionInOrder loop") + + } + b.newHeadersMtx.RLock() + + } + b.newHeadersMtx.RUnlock() + + b.syncPeerMutex.Lock() + b.syncPeer = nil + b.syncPeerMutex.Unlock() + + log.Infof("Successfully completed fetching checkpointed block headers") +} + +// CheckpointedBlockHeadersQuery holds all information necessary to perform and +// // handle a query for checkpointed block headers. +type CheckpointedBlockHeadersQuery struct { + blockMgr *blockManager + msgs []*query.ReqMessage +} + +// requests creates the query.Requests for this block headers query. +func (c *CheckpointedBlockHeadersQuery) requests() []*query.Request { + + reqs := make([]*query.Request, len(c.msgs)) + for idx, m := range c.msgs { + reqs[idx] = &query.Request{ + Req: m, + SendQuery: c.PushHeadersMsg, + HandleResp: c.handleResponse, + CloneReq: cloneHeaderQuery, + } + } + + return reqs +} + +// cloneHeaderQuery clones the query.ReqMessage containing the headerQuery Struct. +func cloneHeaderQuery(req query.ReqMessage) *query.ReqMessage { + oldReq, ok := req.Message.(*headerQuery) + if !ok { + log.Errorf("request not of type *wire.MsgCFHeaders") + } + oldReqMessage := oldReq.Message.(*wire.MsgGetHeaders) + message := &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: oldReqMessage.BlockLocatorHashes, + HashStop: oldReqMessage.HashStop, + }, + startHeight: oldReq.startHeight, + initialHeight: oldReq.initialHeight, + startHash: oldReq.startHash, + endHeight: oldReq.endHeight, + } + + newReq := &query.ReqMessage{ + Message: message, + PriorityIndex: req.PriorityIndex, + } + return newReq +} + +// PushHeadersMsg is the internal response handler used for requests for this +// block Headers query. +func (c *CheckpointedBlockHeadersQuery) PushHeadersMsg(w query.Worker, + task query.Task) error { + job := task.(*query.QueryJob) + requestMsg, _ := job.Req.Message.(*headerQuery) + peer, _ := w.Peer().(*ServerPeer) + request, _ := requestMsg.Message.(*wire.MsgGetHeaders) + + //check if we have response for the query already. If we do return an error. + c.blockMgr.writeBatchMtx.RLock() + _, ok := c.blockMgr.hdrTipToResponse[requestMsg.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + if ok { + log.Debugf("Response already received PushHeadersMessage, peer=%v, "+ + "start_height=%v, end_height=%v, index=%v", w.Peer().Addr(), + requestMsg.startHeight, requestMsg.endHeight) + return query.ErrResponseExistForQuery + } + + //Update the request start time and send the query. + peer.recentReqStartTime = time.Now() + + err := peer.PushGetHeadersMsg(request.BlockLocatorHashes, &request.HashStop) + if err != nil { + log.Errorf(err.Error()) + return err + } + + return nil + +} + +// handleResponse is the internal response handler used for requests for this +// block header query. +func (c *CheckpointedBlockHeadersQuery) handleResponse(req, resp wire.Message, + peer query.Peer, jobErr *error) query.Progress { + + sp := peer.(*ServerPeer) + if peer == nil { + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + msg, ok := resp.(*wire.MsgHeaders) + if !ok { + // We are only looking for msgHeaders messages. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + request, ok := req.(*headerQuery) + if !ok { + //request should only be of type headerQuery. + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + //update request duaration as we have received our required response from peer. + peer.UpdateRequestDuration() + + //Check if we already have a response for this request startHeight, if we do modify our jobErr variable + //so that worker can send appropriate error to workmanager. + c.blockMgr.writeBatchMtx.RLock() + _, ok = c.blockMgr.hdrTipToResponse[request.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + if ok { + + *jobErr = query.ErrResponseExistForQuery + + return query.Progress{ + Finished: true, + Progressed: true, + } + } + + //If we received an empty response from peer, return with an error to break worker's + //feed back loop. + hdrLength := len(msg.Headers) + if hdrLength == 0 { + *jobErr = query.ErrResponseErr + return query.Progress{ + Finished: false, + Progressed: false, + } + } + + //The initialHash represents the lower bound checkpoint for this checkpoint region. + //We verify, if the header received at that checkpoint height has the same hash as the + //checkpoint's hash. If it does not, mimicking the handleheaders function behaviour, we + //disconnect the peer and return a failed progress to reschedule the query. + if msg.Headers[0].PrevBlock != request.startHash && + request.startHash == request.initialHash { + + sp.Disconnect() + + return query.Progress{ + Finished: false, + Progressed: false, + } + + } + + //If the peer sends us more headers than we need, it is probably not aligned with our chain, so we disconnect + //peer and return a failed progress. + reqMessage := request.Message.(*wire.MsgGetHeaders) + //TODO(Maureen): Greater than or equal to? + if hdrLength > int(request.endHeight-request.startHeight) { + + sp.Disconnect() + return query.Progress{ + Finished: false, + Progressed: false, + } + + } + + //Write header into hdrTipResponse map, add the request's startHeight to the hdrTipSlice, for tracking + //and handling by the processBlKHeaderInCheckPtRegionInOrder loop. + c.blockMgr.writeBatchMtx.Lock() + c.blockMgr.hdrTipToResponse[request.startHeight] = &headersMsg{ + headers: msg, + peer: sp, + } + i := sort.Search(len(c.blockMgr.hdrTipSlice), func(i int) bool { + return c.blockMgr.hdrTipSlice[i] >= request.startHeight + }) + + c.blockMgr.hdrTipSlice = append(c.blockMgr.hdrTipSlice[:i], append([]int32{request.startHeight}, c.blockMgr.hdrTipSlice[i:]...)...) + c.blockMgr.writeBatchMtx.Unlock() + + //Check if job is unfinished, if it is, we modify the job accordingly and send back to the workmanager to be rescheduled. + if msg.Headers[hdrLength-1].BlockHash() != reqMessage.HashStop && reqMessage.HashStop != zeroHash { + + //set new startHash, startHeight and blocklocator to set the next set of header for this job. + newStartHash := msg.Headers[hdrLength-1].BlockHash() + request.startHeight = request.startHeight + int32(hdrLength) + request.startHash = newStartHash + reqMessage.BlockLocatorHashes = []*chainhash.Hash{&newStartHash} + + //Incase there is a rollback after handling reqMessage + // This ensures the job created by writecheckpt does not exceed that which we have fetched already. + c.blockMgr.writeBatchMtx.RLock() + _, ok = c.blockMgr.hdrTipToResponse[request.startHeight] + c.blockMgr.writeBatchMtx.RUnlock() + + if !ok { + + return query.Progress{ + Finished: true, + Progressed: false, + } + + } + *jobErr = query.ErrResponseExistForQuery + + } + + return query.Progress{ + Finished: true, + Progressed: true, + } + +} + // SyncPeer returns the current sync peer. func (b *blockManager) SyncPeer() *ServerPeer { b.syncPeerMutex.Lock() @@ -2021,13 +2584,6 @@ func (b *blockManager) SyncPeer() *ServerPeer { return b.syncPeer } -// isSyncCandidate returns whether or not the peer is a candidate to consider -// syncing from. -func (b *blockManager) isSyncCandidate(sp *ServerPeer) bool { - // The peer is not a candidate for sync if it's not a full node. - return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork -} - // findNextHeaderCheckpoint returns the next checkpoint after the passed height. // It returns nil when there is not one either because the height is already // later than the final checkpoint or there are none for the current network. @@ -2149,20 +2705,9 @@ func (b *blockManager) startSync(peers *list.List) { // our latest block locator. stopHash := &zeroHash - // If we're still within the range of the set checkpoints, then - // we'll use the next checkpoint to guide the set of headers we - // fetch, setting our stop hash to the next checkpoint hash. - if b.nextCheckpoint != nil && int32(bestHeight) < b.nextCheckpoint.Height { - log.Infof("Downloading headers for blocks %d to "+ - "%d from peer %s", bestHeight+1, - b.nextCheckpoint.Height, bestPeer.Addr()) - - stopHash = b.nextCheckpoint.Hash - } else { - log.Infof("Fetching set of headers from tip "+ - "(height=%v) from peer %s", bestHeight, - bestPeer.Addr()) - } + log.Infof("Fetching set of headers from tip "+ + "(height=%v) from peer %s", bestHeight, + bestPeer.Addr()) // With our stop hash selected, we'll kick off the sync from // this peer with an initial GetHeaders message. @@ -2477,6 +3022,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // the next one. _, _, err := b.cfg.BlockHeaders.FetchHeader(&blockHash) if err == nil { + fmt.Println("we know this block") continue } @@ -2700,20 +3246,12 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } } - // When this header is a checkpoint, find the next checkpoint. - if receivedCheckpoint { - b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight) - } - // If not current, request the next batch of headers starting from the // latest known header and ending with the next checkpoint. - if b.cfg.ChainParams.Net == chaincfg.SimNetParams.Net || !b.BlockHeadersSynced() { + if (b.cfg.ChainParams.Net == chaincfg.SimNetParams.Net || !b.BlockHeadersSynced()) && b.nextCheckpoint == nil { locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) - nextHash := zeroHash - if b.nextCheckpoint != nil { - nextHash = *b.nextCheckpoint.Hash - } - err := hmsg.peer.PushGetHeadersMsg(locator, &nextHash) + + err := hmsg.peer.PushGetHeadersMsg(locator, &zeroHash) if err != nil { log.Warnf("Failed to send getheaders message to "+ "peer %s: %s", hmsg.peer.Addr(), err) @@ -2721,6 +3259,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { } } + // When this header is a checkpoint, find the next checkpoint. + if receivedCheckpoint { + b.nextCheckpoint = b.findNextHeaderCheckpoint(finalHeight) + } + // Since we have a new set of headers written to disk, we'll send out a // new signal to notify any waiting sub-systems that they can now maybe // proceed do to us extending the header chain. @@ -2869,6 +3412,36 @@ func (b *blockManager) NotificationsSinceHeight( return blocks, bestHeight, nil } +func (b *blockManager) deleteAllHeaderTipRespAfterTip(newTip, prevTip int32) { + + b.writeBatchMtx.Lock() + defer b.writeBatchMtx.Unlock() + + var ( + finalIdx int + initialIdx int + ) + + for i := 0; i < len(b.hdrTipSlice) && b.hdrTipSlice[i] <= newTip; i++ { + + if b.hdrTipSlice[i] < prevTip { + continue + } + + if b.hdrTipSlice[i] == prevTip { + initialIdx = i + } + + tip := b.hdrTipSlice[i] + + delete(b.hdrTipToResponse, tip) + + finalIdx = i + } + + b.hdrTipSlice = append(b.hdrTipSlice[:initialIdx], b.hdrTipSlice[finalIdx+1:]...) +} + // lightChainCtx is an implementation of the blockchain.ChainCtx interface and // gives a neutrino node the ability to contextually validate headers it // receives. diff --git a/blockmanager_test.go b/blockmanager_test.go index 45554b7c..5eae07c8 100644 --- a/blockmanager_test.go +++ b/blockmanager_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "reflect" + "sort" "strings" "testing" "time" @@ -49,7 +50,6 @@ var _ query.Dispatcher = (*mockDispatcher)(nil) func (m *mockDispatcher) Query(requests []*query.Request, options ...query.QueryOption) chan error { - return m.query(requests, options...) } @@ -89,11 +89,12 @@ func setupBlockManager(t *testing.T) (*blockManager, headerfs.BlockHeaderStore, // Set up a blockManager with the chain service we defined. bm, err := newBlockManager(&blockManagerCfg{ - ChainParams: chaincfg.SimNetParams, - BlockHeaders: hdrStore, - RegFilterHeaders: cfStore, - QueryDispatcher: &mockDispatcher{}, - TimeSource: blockchain.NewMedianTime(), + ChainParams: chaincfg.SimNetParams, + BlockHeaders: hdrStore, + RegFilterHeaders: cfStore, + cfHeaderQueryDispatcher: &mockDispatcher{}, + blkHdrCheckptQueryDispatcher: &mockDispatcher{}, + TimeSource: blockchain.NewMedianTime(), BanPeer: func(string, banman.Reason) error { return nil }, @@ -346,13 +347,13 @@ func TestBlockManagerInitialInterval(t *testing.T) { // We set up a custom query batch method for this test, as we // will use this to feed the blockmanager with our crafted // responses. - bm.cfg.QueryDispatcher.(*mockDispatcher).query = func( + bm.cfg.cfHeaderQueryDispatcher.(*mockDispatcher).query = func( requests []*query.Request, options ...query.QueryOption) chan error { var msgs []wire.Message for _, q := range requests { - msgs = append(msgs, q.Req) + msgs = append(msgs, q.Req.Message) } responses, err := generateResponses(msgs, headers) @@ -378,8 +379,9 @@ func TestBlockManagerInitialInterval(t *testing.T) { // Let the blockmanager handle the // message. + var jobErr error progress := requests[index].HandleResp( - msgs[index], &resp, "", + msgs[index], &resp, nil, &jobErr, ) if !progress.Finished { @@ -400,7 +402,7 @@ func TestBlockManagerInitialInterval(t *testing.T) { // Otherwise resend the response we // just sent. progress = requests[index].HandleResp( - msgs[index], &resp2, "", + msgs[index], &resp2, nil, &jobErr, ) if !progress.Finished { errChan <- fmt.Errorf("got "+ @@ -576,13 +578,13 @@ func TestBlockManagerInvalidInterval(t *testing.T) { require.NoError(t, err) } - bm.cfg.QueryDispatcher.(*mockDispatcher).query = func( + bm.cfg.cfHeaderQueryDispatcher.(*mockDispatcher).query = func( requests []*query.Request, options ...query.QueryOption) chan error { var msgs []wire.Message for _, q := range requests { - msgs = append(msgs, q.Req) + msgs = append(msgs, q.Req.Message) } responses, err := generateResponses(msgs, headers) require.NoError(t, err) @@ -617,9 +619,10 @@ func TestBlockManagerInvalidInterval(t *testing.T) { go func() { // Check that the success of the callback match what we // expect. + var jobErr error for i := range responses { progress := requests[i].HandleResp( - msgs[i], responses[i], "", + msgs[i], responses[i], nil, &jobErr, ) if i == test.firstInvalid { if progress.Finished { @@ -884,20 +887,6 @@ func TestHandleHeaders(t *testing.T) { fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") require.NoError(t, err) - assertPeerDisconnected := func(shouldBeDisconnected bool) { - // This is quite hacky but works: We expect the peer to be - // disconnected, which sets the unexported "disconnected" field - // to 1. - refValue := reflect.ValueOf(fakePeer).Elem() - foo := refValue.FieldByName("disconnect").Int() - - if shouldBeDisconnected { - require.EqualValues(t, 1, foo) - } else { - require.EqualValues(t, 0, foo) - } - } - // We'll want to use actual, real blocks, so we take a miner harness // that we can use to generate some. harness, err := rpctest.New( @@ -934,7 +923,7 @@ func TestHandleHeaders(t *testing.T) { // Let's feed in the correct headers. This should work fine and the peer // should not be disconnected. bm.handleHeadersMsg(hmsg) - assertPeerDisconnected(false) + assertPeerDisconnected(false, fakePeer, t) // Now scramble the headers and feed them in again. This should cause // the peer to be disconnected. @@ -943,5 +932,914 @@ func TestHandleHeaders(t *testing.T) { hmsg.headers.Headers[j], hmsg.headers.Headers[i] }) bm.handleHeadersMsg(hmsg) - assertPeerDisconnected(true) + assertPeerDisconnected(true, fakePeer, t) +} + +// assertPeerDisconnected asserts that the peer supplied as an argument is disconnected. +func assertPeerDisconnected(shouldBeDisconnected bool, sp *peer.Peer, t *testing.T) { + // This is quite hacky but works: We expect the peer to be + // disconnected, which sets the unexported "disconnected" field + // to 1. + refValue := reflect.ValueOf(sp).Elem() + foo := refValue.FieldByName("disconnect").Int() + + if shouldBeDisconnected { + require.EqualValues(t, 1, foo) + } else { + require.EqualValues(t, 0, foo) + } +} + +// TestBatchCheckpointedBlkHeaders tests the batch checkpointed headers function. +func TestBatchCheckpointedBlkHeaders(t *testing.T) { + t.Parallel() + + // First, we set up a block manager and a fake peer that will act as the + // test's remote peer. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + //Created checkpoints for our simulated network. + checkpoints := []chaincfg.Checkpoint{ + + { + Hash: &chainhash.Hash{1}, + Height: int32(1), + }, + + { + Hash: &chainhash.Hash{2}, + Height: int32(2), + }, + + { + Hash: &chainhash.Hash{3}, + Height: int32(3), + }, + } + + modParams := chaincfg.SimNetParams + modParams.Checkpoints = append(modParams.Checkpoints, checkpoints...) + bm.cfg.ChainParams = modParams + + //set checkpoint and header tip. + bm.nextCheckpoint = &checkpoints[0] + bm.headerTip = 0 + + //this is the query we assert to obtain if the function works accordingly. + expectedQuery := checkpointedCFHeadersQuery{ + blockMgr: bm, + msgs: []*query.ReqMessage{ + + { + Message: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{&bm.headerTipHash}), + HashStop: *checkpoints[0].Hash, + }, + startHeight: int32(bm.headerTip), + initialHeight: int32(bm.headerTip), + startHash: bm.headerTipHash, + endHeight: checkpoints[0].Height, + initialHash: bm.headerTipHash, + }, + }, + + { + Message: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[0].Hash}), + HashStop: *checkpoints[1].Hash, + }, + startHeight: checkpoints[0].Height, + initialHeight: checkpoints[0].Height, + startHash: *checkpoints[0].Hash, + endHeight: checkpoints[1].Height, + initialHash: *checkpoints[0].Hash, + }, + }, + + { + Message: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[1].Hash}), + HashStop: *checkpoints[2].Hash, + }, + startHeight: checkpoints[1].Height, + initialHeight: checkpoints[1].Height, + startHash: *checkpoints[1].Hash, + endHeight: checkpoints[2].Height, + initialHash: *checkpoints[1].Hash, + }, + }, + + { + Message: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{checkpoints[2].Hash}), + HashStop: zeroHash, + }, + startHeight: checkpoints[2].Height, + initialHeight: checkpoints[2].Height, + startHash: *checkpoints[2].Hash, + endHeight: checkpoints[2].Height + wire.MaxBlockHeadersPerMsg, + initialHash: *checkpoints[2].Hash, + }, + }, + }, + } + + //create request. + expectedRequest := expectedQuery.requests() + + bm.cfg.blkHdrCheckptQueryDispatcher.(*mockDispatcher).query = func(requests []*query.Request, + options ...query.QueryOption) chan error { + + //assert that the requests obtained has same length as that of our expected query. + if len(requests) != len(expectedRequest) { + t.Fatalf("unequal length") + } + for i, req := range requests { + + testEqualReqMessage(req, expectedRequest[i], t) + + } + + //Ensure the query options sent by query is four. This is the number of query option supplied as args while + //querying the workmanager. + if len(options) != 4 { + + t.Fatalf("expected five option parameter for query but got, %v\n", len(options)) + } + return nil + } + + //call the function that we are testing. + bm.batchCheckpointedBlkHeaders() + +} + +// This function tests the ProcessBlKHeaderInCheckPtRegionInOrder function. +func TestProcessBlKHeaderInCheckPtRegionInOrder(t *testing.T) { + t.Parallel() + + // First, we set up a block manager and a fake peer that will act as the + // test's remote peer. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") + require.NoError(t, err) + + // We'll want to use actual, real blocks, so we take a miner harness + // that we can use to generate some. + harness, err := rpctest.New( + &chaincfg.SimNetParams, nil, []string{"--txindex"}, "", + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, harness.TearDown()) + }) + + err = harness.SetUp(false, 0) + require.NoError(t, err) + + // Generate 10 valid blocks that we then feed to the block manager. + blockHashes, err := harness.Client.Generate(30) + require.NoError(t, err) + + //This is the headerMessage containing 10 headers starting at height 0. + hmsgTip0 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + //This is the headerMessage containing 10 headers starting at height 10. + hmsgTip10 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + //This is the headerMessage containing 10 headers starting at height 20. + hmsgTip20 := &headersMsg{ + headers: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 10), + }, + peer: &ServerPeer{ + Peer: fakePeer, + }, + } + + //Loop through the generated blockHashes and add headers to their appropriate slices. + for i := range blockHashes { + header, err := harness.Client.GetBlockHeader(blockHashes[i]) + require.NoError(t, err) + + if i < 10 { + hmsgTip0.headers.Headers[i] = header + } + + if i >= 10 && i < 20 { + + hmsgTip10.headers.Headers[i-10] = header + } + + if i >= 20 { + hmsgTip20.headers.Headers[i-20] = header + } + } + + //initialize the hdrTipSlice. + bm.hdrTipSlice = make([]int32, 0) + + //Create checkpoint for our test chain. + checkpoint := chaincfg.Checkpoint{ + Hash: blockHashes[29], + Height: int32(30), + } + bm.cfg.ChainParams.Checkpoints = append(bm.cfg.ChainParams.Checkpoints, []chaincfg.Checkpoint{ + checkpoint, + }...) + bm.nextCheckpoint = &checkpoint + + //If ProcessBlKHeaderInCheckPtRegionInOrder loop receives invalid headers assert the query parameters being sent + //to the workmanager is expected. + bm.cfg.blkHdrCheckptQueryDispatcher.(*mockDispatcher).query = func(requests []*query.Request, + options ...query.QueryOption) chan error { + + //The function should send only one request. + if len(requests) != 1 { + t.Fatalf("expected only one request") + } + + finalNode := bm.headerList.Back() + newHdrTip := finalNode.Height + newHdrTipHash := finalNode.Header.BlockHash() + prevCheckPt := bm.findPreviousHeaderCheckpoint(newHdrTip) + + testEqualReqMessage(requests[0], &query.Request{ + + Req: &query.ReqMessage{ + Message: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{&newHdrTipHash}, + HashStop: *bm.nextCheckpoint.Hash, + }, + startHeight: newHdrTip, + initialHeight: prevCheckPt.Height, + startHash: newHdrTipHash, + endHeight: bm.nextCheckpoint.Height, + initialHash: newHdrTipHash, + }, + PriorityIndex: 0.5, + }, + }, t) + + //The function should include only four query options while querying. + if len(options) != 4 { + + t.Fatalf("expected four option parameter for query but got, %v\n", len(options)) + } + return nil + } + + //Call the function in a goroutine. + go bm.processBlKHeaderInCheckPtRegionInOrder() + + //At this point syncPeer shoulde be nil. + if bm.syncPeer != nil { + t.Fatalf("syncPeer should be nil initially") + } + + //Set header tip to zero and write a response at height 10, ensure the ProcessBlKHeaderInCheckPtRegionInOrder loop + //does not handle the response as it does not correspond to the current header tip. + bm.headerTip = 0 + bm.writeBatchMtx.Lock() + newTipWrite := int32(10) + bm.hdrTipToResponse[newTipWrite] = hmsgTip10 + i := sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + bm.writeBatchMtx.Unlock() + + //SyncPeer should still be nil to indicate that the loop did not handle the response. + if bm.syncPeer != nil { + t.Fatalf("syncPeer should be nil") + } + + //Set header tip to 20 to indicate that even when the chain's tip is higher that the available tips in the + //hdrTipToResponse map, the loop does not still handle it. + bm.headerTip = 20 + + //SyncPeer should still be nil to indicate that the loop did not handle the response. + if bm.syncPeer != nil { + t.Fatalf("syncPeer should be nil") + } + + //Set headerTip to zero and write a response at height 0 to the hdrTipToResponse map. The loop should handle this + //response now and the following response that would correspond to its new tip after this. + bm.headerTip = 0 + bm.writeBatchMtx.Lock() + newTipWrite = int32(0) + i = sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + + bm.hdrTipToResponse[newTipWrite] = hmsgTip0 + bm.writeBatchMtx.Unlock() + + //Allow time for handling the response. + time.Sleep(1 * time.Second) + if bm.syncPeer == nil { + t.Fatalf("syncPeer should not be nil") + } + + //Header tip should be 20 as th the loop would handle response at height 0 then the previously written + //height 10. + if bm.headerTip != 20 { + t.Fatalf("expected header tip at 10 but got %v\n", bm.headerTip) + } + + // Now scramble the headers and feed them in again. This should cause + //the loop to delete this response from the map and re-request for this header from + //the workmanager. + rand.Shuffle(len(hmsgTip20.headers.Headers), func(i, j int) { + hmsgTip20.headers.Headers[i], hmsgTip20.headers.Headers[j] = + hmsgTip20.headers.Headers[j], hmsgTip20.headers.Headers[i] + }) + + //Write this header at height 20, this would cause the loop to handle it. + bm.writeBatchMtx.Lock() + newTipWrite = int32(20) + bm.hdrTipToResponse[newTipWrite] = hmsgTip20 + i = sort.Search(len(bm.hdrTipSlice), func(i int) bool { + return bm.hdrTipSlice[i] >= newTipWrite + }) + + bm.hdrTipSlice = append(bm.hdrTipSlice[:i], append([]int32{newTipWrite}, bm.hdrTipSlice[i:]...)...) + + bm.writeBatchMtx.Unlock() + + //Allow time for handling. + time.Sleep(1 * time.Second) + + //HeadrTip should not advance as headers are invalid. + if bm.headerTip != 20 { + t.Fatalf("expected header tip at 20 but got %v\n", bm.headerTip) + } + + //Syncpeer should not be nil as we are still in the loop. + if bm.syncPeer == nil { + t.Fatalf("syncPeer should not be nil") + } + + //the response at header tip 20 should be deleted. + bm.writeBatchMtx.RLock() + _, ok := bm.hdrTipToResponse[int32(20)] + bm.writeBatchMtx.RUnlock() + + if ok { + t.Fatalf("expected response to header tip deleted") + } + +} + +// TestCheckpointedBlockHeadersQuery_handleResponse tests the handleResponse method +// of the CheckpointedBlockHeadersQuery. +func TestCheckpointedBlockHeadersQuery_handleResponse(t *testing.T) { + t.Parallel() + + finalResp := query.Progress{ + Finished: true, + Progressed: true, + } + + finalRespNoProgress := query.Progress{ + Finished: true, + Progressed: false, + } + + NoProgressNoFinalResp := query.Progress{ + Finished: false, + Progressed: false, + } + + //handleRespTestCase holds all the information required to test different scenarios while + //using the function. + type handleRespTestCase struct { + + //name of the testcase. + name string + + //resp is the response argument to be sent to the handleResp method as an arg. + resp wire.Message + + //req is the request method to be sent to the handleResp method as an arg. + req wire.Message + + //jobErr is the value of the jobErr arg after the handleResp function is done. + jobErr *error + + //progress is the expected progress to be returned by the handleResp method. + progress query.Progress + + //lastblock is the block with which we obtain its hash to be used as the request's hashStop. + lastBlock wire.BlockHeader + + //peerDisconnected indicates if the peer would be disconnected after the handleResp method is done. + peerDisconnected bool + } + + testCases := []handleRespTestCase{ + + { + //Scenario in which we have a request type that is not the same as the expected headerQuery type.It should + //return no error and NoProgressNoFinalResp query.Progress. + name: "invalid request type", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0, 5), + }, + req: &wire.MsgGetCFHeaders{}, + jobErr: nil, + progress: NoProgressNoFinalResp, + }, + + { + //Scenario in which we have a response type that is not same as the expected wire.MsgHeaders. It should + //return no error and NoProgressNoFinalResp query.Progress. + name: "invalid response type", + resp: &wire.MsgCFHeaders{}, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + //Scenario in which we have the response in the hdrTipResponseMap. While calling these testcases, we + //initialize the hdrTipToResponse map to contain a response at height 0 and 6. Since this request ahs a + //startheight of 0, its response would be in the map already, aligning with this scenario. This scenario + //should return the query.ErrResponseExistForQuery error and return the finalResp query.progress, + name: "response start Height in hdrTipResponse map", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0, 4), + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {0}, + }, + }, + startHeight: 0, + initialHeight: 0, + startHash: chainhash.Hash{0}, + endHeight: 5, + initialHash: chainhash.Hash{0}, + }, + jobErr: &query.ErrResponseExistForQuery, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + //Scenario in which the response we receive from the is of length zero. We should return an error and return + //NoProgressNoFinalResp query.Progress. + name: "response header length 0", + resp: &wire.MsgHeaders{ + Headers: make([]*wire.BlockHeader, 0), + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 5, + initialHash: chainhash.Hash{1}, + }, + jobErr: &query.ErrResponseErr, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + //Scenario in which the response received is at the request's initialHeight (lower bound height in + //checkpoint request) but its first block's previous hash is not same as the checkpoint hash. Its + //jobErr should be nil and the function should return NoProgressNoFinalResp query.progress. + name: "response at initialHash has disconnected start Hash", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 5, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + peerDisconnected: true, + }, + + { + //Scenario in which the response is not at the initial Hash (lower bound hash in the + //checkpoint request) but the response is complete and valid. The jobErr should be nila and + //return finalRespNoProgress query.Progress. + name: "response not at initialHash, valid complete headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {2}, + }, + }, + startHeight: 2, + initialHeight: 1, + startHash: chainhash.Hash{2}, + endHeight: 5, + initialHash: chainhash.Hash{2}, + }, + jobErr: nil, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{4}, + }, + }, + + { + //Scenario in which the response is not at initial hash (lower bound height in + // checkpoint request) and the response is unfinished. The jobErr should be nil and return + //finalRespNoProgress query.progress. + name: "response not at initial Hash, unfinished response", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {2}, + }, + }, + startHeight: 2, + initialHeight: 1, + startHash: chainhash.Hash{2}, + endHeight: 6, + initialHash: chainhash.Hash{2}, + }, + jobErr: nil, + progress: finalRespNoProgress, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + //Scenario in which the response length is greater than expected. JobErr should be nil, peer + //should be disconnected and the method should return NoProgressNoFinalResp query.progress. + name: "response header length more than expected", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + { + PrevBlock: chainhash.Hash{5}, + }, + { + PrevBlock: chainhash.Hash{6}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: NoProgressNoFinalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + peerDisconnected: true, + }, + + { + //Scenario in which response is complete and a valid header. Its start height is at the initial height. + //jobErr should be nil and progress should be finalResp. + name: "complete response valid headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 4, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{3}, + }, + }, + + { + //Scenario in which response is at initialHash and the response is incomplete. The joberr should be nil, + //progress, finalRespNoProgress. + name: "response at initial hash, incomplete response, valid headers", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 6, + initialHash: chainhash.Hash{1}, + }, + jobErr: nil, + progress: finalRespNoProgress, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{5}, + }, + }, + + { + //Scenario in which response is incomplete but valid. The new response's start height created in this + //scenario is present in the hdrTipResponseMap. The startHeight is 6 and response at height 6 has been + //preveiously written in to the hdrTipResponse map for the sake of this test. + name: "incomplete response, valid headers, new resp in hdrTipToResponse map", + resp: &wire.MsgHeaders{ + Headers: []*wire.BlockHeader{ + { + PrevBlock: chainhash.Hash{1}, + }, + { + PrevBlock: chainhash.Hash{2}, + }, + { + PrevBlock: chainhash.Hash{3}, + }, + { + PrevBlock: chainhash.Hash{4}, + }, + { + PrevBlock: chainhash.Hash{5}, + }, + }, + }, + req: &headerQuery{ + Message: &wire.MsgGetHeaders{ + BlockLocatorHashes: []*chainhash.Hash{ + {1}, + }, + }, + startHeight: 1, + initialHeight: 1, + startHash: chainhash.Hash{1}, + endHeight: 10, + initialHash: chainhash.Hash{1}, + }, + jobErr: &query.ErrResponseExistForQuery, + progress: finalResp, + lastBlock: wire.BlockHeader{ + PrevBlock: chainhash.Hash{9}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + //set up block manager. + bm, _, _, err := setupBlockManager(t) + require.NoError(t, err) + + var oldReqStartHeight int32 + + bm.hdrTipToResponse[0] = &headersMsg{ + headers: &wire.MsgHeaders{}, + } + bm.hdrTipToResponse[6] = &headersMsg{ + headers: &wire.MsgHeaders{}, + } + + fakePeer, err := peer.NewOutboundPeer(&peer.Config{}, "fake:123") + require.NoError(t, err) + + query := &CheckpointedBlockHeadersQuery{ + blockMgr: bm, + } + req := tc.req + r, ok := tc.req.(*headerQuery) + if ok { + + reqMessage, ok := r.Message.(*wire.MsgGetHeaders) + if !ok { + t.Fatalf("request message not of type wire.MsgGetHeaders") + } + reqMessage.HashStop = tc.lastBlock.BlockHash() + req = r + oldReqStartHeight = r.startHeight + } + var jobErr error + + actualProgress := query.handleResponse(req, tc.resp, &ServerPeer{ + Peer: fakePeer, + recentReqStartTime: time.Now(), + }, &jobErr) + if actualProgress != tc.progress { + t.Fatalf("unexpected progress.Expected:"+ + "Finished:%v and Progressed: %v but got,"+ + "Finished: %v and Progressed: %v", tc.progress.Finished, tc.progress.Progressed, + actualProgress.Finished, actualProgress.Progressed) + } + + if actualProgress.Finished && !actualProgress.Progressed { + resp := tc.resp.(*wire.MsgHeaders) + request := req.(*headerQuery) + if request.startHash != resp.Headers[len(resp.Headers)-1].BlockHash() { + + t.Fatalf("unexpected new startHash") + } + + if request.startHeight != oldReqStartHeight+int32(len(resp.Headers)) { + + t.Fatalf("unexpected new start height") + } + + requestMessage := request.Message.(*wire.MsgGetHeaders) + + if *requestMessage.BlockLocatorHashes[0] != request.startHash { + t.Fatalf("unexpected new blockLocator") + } + } + + if tc.jobErr == nil && jobErr != nil { + t.Fatalf("unexpected error: %v\n", jobErr) + } + + if tc.jobErr != nil && jobErr != *tc.jobErr { + t.Fatalf("expected error, %v but got %v\n", *tc.jobErr, jobErr) + } + + assertPeerDisconnected(tc.peerDisconnected, fakePeer, t) + + }) + } + +} + +// testEqualReqMessage tests if two query.Request are same. +func testEqualReqMessage(a, b *query.Request, t *testing.T) { + + aMessage := a.Req.Message.(*headerQuery) + bMessage := b.Req.Message.(*headerQuery) + + if aMessage.startHeight != bMessage.startHeight { + t.Fatalf("dissimilar startHeight") + } + if aMessage.startHash != bMessage.startHash { + t.Fatalf("dissimilar startHash") + } + if aMessage.endHeight != bMessage.endHeight { + t.Fatalf("dissimilar endHash") + } + if aMessage.initialHash != bMessage.initialHash { + t.Fatalf("dissimilar initialHash") + } + + aMessageGetHeaders := aMessage.Message.(*wire.MsgGetHeaders) + bMessageGetHeaders := bMessage.Message.(*wire.MsgGetHeaders) + + if !reflect.DeepEqual(aMessageGetHeaders.BlockLocatorHashes, bMessageGetHeaders.BlockLocatorHashes) { + t.Fatalf("dissimilar blocklocator hash") + } + + if aMessageGetHeaders.HashStop != aMessageGetHeaders.HashStop { + t.Fatalf("dissimilar hashstop") + } + if a.Req.PriorityIndex != b.Req.PriorityIndex { + t.Fatalf("dissimilar priority index") + } + } diff --git a/neutrino.go b/neutrino.go index 7ee45edd..207bb31c 100644 --- a/neutrino.go +++ b/neutrino.go @@ -180,21 +180,43 @@ type ServerPeer struct { recvSubscribers map[spMsgSubscription]struct{} recvSubscribers2 map[msgSubscription]struct{} mtxSubscribers sync.RWMutex + + //recentReqDuration is the time between sending a request and receiving a response + //while querying this peer using the workmanager. + recentReqDuration time.Duration + + //recentReqStartTime is the start time of the most recent request sent to this peer while + //using the workmanager to orchestrate the query. + recentReqStartTime time.Time } // NewServerPeer returns a new ServerPeer instance. The peer needs to be set by // the caller. func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer { return &ServerPeer{ - server: s, - persistent: isPersistent, - knownAddresses: lru.NewCache[string, *cachedAddr](5000), - quit: make(chan struct{}), - recvSubscribers: make(map[spMsgSubscription]struct{}), - recvSubscribers2: make(map[msgSubscription]struct{}), + server: s, + persistent: isPersistent, + knownAddresses: lru.NewCache[string, *cachedAddr](5000), + quit: make(chan struct{}), + recvSubscribers: make(map[spMsgSubscription]struct{}), + recvSubscribers2: make(map[msgSubscription]struct{}), + recentReqDuration: 2 * time.Second, } } +// LastReqDuration returns the most recent request duaration of the peer. +func (sp *ServerPeer) LastReqDuration() time.Duration { + return sp.recentReqDuration +} + +// UpdateRequestDuration computes the time between the peer's most recent request's start time and now. +// The duration is assigned to the peer's recentReqDuration field. +func (sp *ServerPeer) UpdateRequestDuration() { + duration := time.Since(sp.recentReqStartTime) + sp.recentReqDuration = duration + log.Debugf("peer=%v, updated duration to=%v", sp.Addr(), duration.Seconds()) +} + // newestBlock returns the current best block hash and height using the format // required by the configuration for the peer package. func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) { @@ -742,6 +764,8 @@ func NewChainService(cfg Config) (*ChainService, error) { ConnectedPeers: s.ConnectedPeers, NewWorker: query.NewWorker, Ranking: query.NewPeerRanking(), + OrderPeers: query.SortPeersByReqDuration, + DebugName: "GeneralWorkManager", }) var err error @@ -800,15 +824,24 @@ func NewChainService(cfg Config) (*ChainService, error) { } bm, err := newBlockManager(&blockManagerCfg{ - ChainParams: s.chainParams, - BlockHeaders: s.BlockHeaders, - RegFilterHeaders: s.RegFilterHeaders, - TimeSource: s.timeSource, - QueryDispatcher: s.workManager, - BanPeer: s.BanPeer, - GetBlock: s.GetBlock, - firstPeerSignal: s.firstPeerConnect, - queryAllPeers: s.queryAllPeers, + ChainParams: s.chainParams, + BlockHeaders: s.BlockHeaders, + RegFilterHeaders: s.RegFilterHeaders, + TimeSource: s.timeSource, + cfHeaderQueryDispatcher: s.workManager, + BanPeer: s.BanPeer, + GetBlock: s.GetBlock, + firstPeerSignal: s.firstPeerConnect, + queryAllPeers: s.queryAllPeers, + peerByAddr: s.PeerByAddr, + blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{ + ConnectedPeers: s.ConnectedPeers, + NewWorker: query.NewWorker, + Ranking: query.NewPeerRanking(), + OrderPeers: query.SortPeersByReqDuration, + IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch, + DebugName: "BlockHdrWorkManager", + }), }) if err != nil { return nil, err @@ -1114,6 +1147,34 @@ func (s *ChainService) AddPeer(sp *ServerPeer) { } } +// IsSyncCandidate returns whether or not the peer is a candidate to consider +// syncing from. +func (sp *ServerPeer) IsSyncCandidate() bool { + // The peer is not a candidate for sync if it's not a full node. + return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork +} + +// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height +// is behind the start height of the request. If the peer is not behind the request start +// height false is returned, otherwise, true is. +func (sp *ServerPeer) IsPeerBehindStartHeight(req wire.Message) bool { + queryGetHeaders, ok := req.(*headerQuery) + + if !ok { + log.Debugf("request is not type headerQuery") + + return true + } + + if sp.LastBlock() < queryGetHeaders.startHeight { + + return true + + } + + return false +} + // AddBytesSent adds the passed number of bytes to the total bytes sent counter // for the server. It is safe for concurrent access. func (s *ChainService) AddBytesSent(bytesSent uint64) { @@ -1609,6 +1670,9 @@ func (s *ChainService) Start() error { // needed by peers. s.addrManager.Start() s.blockManager.Start() + if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil { + return fmt.Errorf("unable to start block header work manager: %v", err) + } s.blockSubscriptionMgr.Start() if err := s.workManager.Start(); err != nil { return fmt.Errorf("unable to start work manager: %v", err) diff --git a/query.go b/query.go index 66a506dd..d5547664 100644 --- a/query.go +++ b/query.go @@ -439,16 +439,33 @@ func (q *cfiltersQuery) request() *query.Request { q.filterType, uint32(q.startHeight), q.stopHash, ) + queryMsg := query.ReqMessage{ + Message: msg, + } return &query.Request{ - Req: msg, + Req: &queryMsg, HandleResp: q.handleResponse, + SendQuery: sendQueryMessageWithEncoding, + CloneReq: func(req query.ReqMessage) *query.ReqMessage { + oldReq, ok := req.Message.(*wire.MsgGetCFilters) + if !ok { + log.Errorf("request not of type *wire.MsgCFHeaders") + } + newReq := &query.ReqMessage{ + Message: wire.NewMsgGetCFilters( + oldReq.FilterType, oldReq.StartHeight, &oldReq.StopHash, + ), + PriorityIndex: req.PriorityIndex, + } + return newReq + }, } } // handleResponse validates that the cfilter response we get from a peer is // sane given the getcfilter query that we made. func (q *cfiltersQuery) handleResponse(req, resp wire.Message, - _ string) query.Progress { + peer query.Peer, _ *error) query.Progress { // The request must have been a "getcfilters" msg. request, ok := req.(*wire.MsgGetCFilters) @@ -462,6 +479,9 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message, return noProgress } + if peer != nil { + peer.UpdateRequestDuration() + } // If the request filter type doesn't match the type we were expecting, // ignore this message. if q.filterType != request.FilterType { @@ -691,6 +711,7 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash, func (s *ChainService) GetCFilter(blockHash chainhash.Hash, filterType wire.FilterType, options ...QueryOption) (*gcs.Filter, error) { + log.Debugf("Getting CFilter...") // The only supported filter atm is the regular filter, so we'll reject // all other filters. @@ -759,6 +780,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, query.Cancel(s.quit), query.Encoding(qo.encoding), query.NumRetries(qo.numRetries), + query.ErrChan(make(chan error, 1)), } errChan := s.workManager.Query( @@ -775,6 +797,8 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash, return nil, ErrShuttingDown } + log.Debugf("Gotten CFilter...%v", err) + // If there are elements left to receive, the query failed. if len(filterQuery.headerIndex) > 0 { numFilters := filterQuery.stopHeight - @@ -834,12 +858,20 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, getData := wire.NewMsgGetData() _ = getData.AddInvVect(inv) + msg := query.ReqMessage{ + Message: getData, + } var foundBlock *btcutil.Block // handleResp will be called for each message received from a peer. It // will be used to signal to the work manager whether progress has been // made or not. - handleResp := func(req, resp wire.Message, peer string) query.Progress { + handleResp := func(req, resp wire.Message, sp query.Peer, _ *error) query.Progress { + + peer := "" + if sp != nil { + peer = sp.Addr() + } // The request must have been a "getdata" msg. _, ok := req.(*wire.MsgGetData) if !ok { @@ -852,6 +884,9 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, return noProgress } + if sp != nil { + sp.UpdateRequestDuration() + } // If this isn't the block we asked for, ignore it. if response.BlockHash() != blockHash { return noProgress @@ -912,8 +947,20 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, // Prepare the query request. request := &query.Request{ - Req: getData, + Req: &msg, HandleResp: handleResp, + SendQuery: sendQueryMessageWithEncoding, + CloneReq: func(req query.ReqMessage) *query.ReqMessage { + newMsg := wire.NewMsgGetData() + _ = newMsg.AddInvVect(inv) + + newReq := &query.ReqMessage{ + Message: newMsg, + PriorityIndex: req.PriorityIndex, + } + + return newReq + }, } // Prepare the query options. @@ -921,6 +968,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, query.Encoding(qo.encoding), query.NumRetries(qo.numRetries), query.Cancel(s.quit), + query.ErrChan(make(chan error, 1)), } // Send the request to the work manager and await a response. @@ -934,6 +982,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash, return nil, ErrShuttingDown } + log.Debugf("Gotten block, %v", err) if foundBlock == nil { return nil, fmt.Errorf("couldn't retrieve block %s from "+ "network", blockHash) diff --git a/query/interface.go b/query/interface.go index dca5f42d..bb131eb4 100644 --- a/query/interface.go +++ b/query/interface.go @@ -43,6 +43,15 @@ type queryOptions struct { // that a query can be retried. If this is set then numRetries has no // effect. noRetryMax bool + + //keepBatch indicates if to delete batch after batch is done. We might need to the keep + //tha batch it case of fetching checkpointed block headers where we could be done fetching + //headers but we would need to keep the batch as some queries might fail verification and + //we need to reschedule the job. + keepBatch bool + + //errChan error channel with which the workmananger sends error. + errChan chan error } // QueryOption is a functional option argument to any of the network query @@ -75,6 +84,14 @@ func NumRetries(num uint8) QueryOption { } } +// KeepBatch is a query option that specifies if to keep batch after +// query is done. +func KeepBatch() QueryOption { + return func(qo *queryOptions) { + qo.keepBatch = true + } +} + // NoRetryMax is a query option that can be used to disable the cap on the // number of retries. If this is set then NumRetries has no effect. func NoRetryMax() QueryOption { @@ -83,6 +100,14 @@ func NoRetryMax() QueryOption { } } +// ErrChan is a query option that specifies the error channel which the workmanager +// sends any error to. +func ErrChan(error chan error) QueryOption { + return func(qo *queryOptions) { + qo.errChan = error + } +} + // Timeout is a query option that specifies the total time a query is allowed // to be tried before it is failed. func Timeout(timeout time.Duration) QueryOption { @@ -107,7 +132,7 @@ func Cancel(cancel chan struct{}) QueryOption { } } -// Progress encloses the result of handling a response for a given Request, +// Progress encloses the result of handling a response for a given Request, // determining whether the response did progress the query. type Progress struct { // Finished is true if the query was finished as a result of the @@ -125,7 +150,7 @@ type Progress struct { // connected peers. type Request struct { // Req is the message request to send. - Req wire.Message + Req *ReqMessage // HandleResp is a response handler that will be called for every // message received from the peer that the request was made to. It @@ -138,7 +163,22 @@ type Request struct { // should validate the response and immediately return the progress. // The response should be handed off to another goroutine for // processing. - HandleResp func(req, resp wire.Message, peer string) Progress + HandleResp func(req, resp wire.Message, peer Peer, jobErr *error) Progress + + //SendQuery handles sending request to the worker's peer. It returns an error, + //if one is encountered while sending the request. + SendQuery func(worker Worker, job Task) error + + //CloneReq clones the message. + CloneReq func(message ReqMessage) *ReqMessage +} + +// ReqMessage is a struct that contains the wire.Message sent to the peers as well as a +// priority index, in case the caller wants to manipulate how quickly the message is sent +// by the workmanager to the worker. +type ReqMessage struct { + wire.Message + PriorityIndex float64 } // WorkManager defines an API for a manager that dispatches queries to bitcoin @@ -167,11 +207,6 @@ type Dispatcher interface { // Peer is the interface that defines the methods needed by the query package // to be able to make requests and receive responses from a network peer. type Peer interface { - // QueueMessageWithEncoding adds the passed bitcoin message to the peer - // send queue. - QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, - encoding wire.MessageEncoding) - // SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin // messages received from this peer will be sent on the returned // channel. A closure is also returned, that should be called to cancel @@ -184,4 +219,17 @@ type Peer interface { // OnDisconnect returns a channel that will be closed when this peer is // disconnected. OnDisconnect() <-chan struct{} + + //LastReqDuration returns the last request duration of the peer. + LastReqDuration() time.Duration + + //UpdateRequestDuration updates the latest request duration of the peer. + UpdateRequestDuration() + + //IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind + //the request's start Height which it receives as an argument. + IsPeerBehindStartHeight(req wire.Message) bool + + //IsSyncCandidate returns if the peer is a sync candidate. + IsSyncCandidate() bool } diff --git a/query/worker.go b/query/worker.go index dc15a18c..06148989 100644 --- a/query/worker.go +++ b/query/worker.go @@ -19,35 +19,48 @@ var ( // ErrJobCanceled is returned if the job is canceled before the query // has been answered. ErrJobCanceled = errors.New("job canceled") + + // ErrResponseExistForQuery is returned if response exists already for the query. + ErrResponseExistForQuery = errors.New("response Exists for query") + + // ErrResponseErr is returned if we received a compatible response for the query but, it did not pass + //preliminary verification. + ErrResponseErr = errors.New("received response with error") ) -// queryJob is the internal struct that wraps the Query to work on, in +// QueryJob is the internal struct that wraps the Query to work on, in // addition to some information about the query. -type queryJob struct { +type QueryJob struct { tries uint8 - index uint64 + index float64 timeout time.Duration encoding wire.MessageEncoding cancelChan <-chan struct{} *Request } -// queryJob should satisfy the Task interface in order to be sorted by the +// QueryJob should satisfy the Task interface in order to be sorted by the // workQueue. -var _ Task = (*queryJob)(nil) +var _ Task = (*QueryJob)(nil) // Index returns the queryJob's index within the work queue. // // NOTE: Part of the Task interface. -func (q *queryJob) Index() uint64 { +func (q *QueryJob) Index() float64 { return q.index } -// jobResult is the final result of the worker's handling of the queryJob. +// Encoding returns the encoding of wire.message in the job. +func (q *QueryJob) Encoding() wire.MessageEncoding { + return q.encoding +} + +// jobResult is the final result of the worker's handling of the QueryJob. type jobResult struct { - job *queryJob - peer Peer - err error + job *QueryJob + peer Peer + err error + unfinished bool } // worker is responsible for polling work from its work queue, and handing it @@ -59,17 +72,21 @@ type worker struct { // nextJob is a channel of queries to be distributed, where the worker // will poll new work from. - nextJob chan *queryJob + nextJob chan *QueryJob + debugName string + feedBackChan chan error } // A compile-time check to ensure worker satisfies the Worker interface. var _ Worker = (*worker)(nil) // NewWorker creates a new worker associated with the given peer. -func NewWorker(peer Peer) Worker { +func NewWorker(peer Peer, debugName string) Worker { return &worker{ - peer: peer, - nextJob: make(chan *queryJob), + peer: peer, + nextJob: make(chan *QueryJob), + debugName: debugName, + feedBackChan: make(chan error), } } @@ -84,31 +101,31 @@ func NewWorker(peer Peer) Worker { // NOTE: Part of the Worker interface. func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { peer := w.peer - + debugName := w.debugName // Subscribe to messages from the peer. msgChan, cancel := peer.SubscribeRecvMsg() defer cancel() for { - log.Tracef("Worker %v waiting for more work", peer.Addr()) + log.Tracef("%v: Worker %v waiting for more work", debugName, peer.Addr()) - var job *queryJob + var job *QueryJob select { // Poll a new job from the nextJob channel. case job = <-w.nextJob: - log.Tracef("Worker %v picked up job with index %v", + log.Tracef("%v: Worker %v picked up job with index %v", debugName, peer.Addr(), job.Index()) // Ignore any message received while not working on anything. case msg := <-msgChan: - log.Tracef("Worker %v ignoring received msg %T "+ - "since no job active", peer.Addr(), msg) + log.Tracef("%v: Worker %v ignoring received msg %T "+ + "since no job active", debugName, peer.Addr(), msg) continue // If the peer disconnected, we can exit immediately, as we // weren't working on a query. case <-peer.OnDisconnect(): - log.Debugf("Peer %v for worker disconnected", + log.Debugf("%v: Peer %v for worker disconnected", debugName, peer.Addr()) return @@ -116,13 +133,18 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { return } + var ( + jobErr error + timeout *time.Timer + jobUnfinished bool + ) + select { // There is no point in queueing the request if the job already // is canceled, so we check this quickly. case <-job.cancelChan: - log.Tracef("Worker %v found job with index %v "+ - "already canceled", peer.Addr(), job.Index()) - + log.Tracef("%v: Worker %v found job with index %v "+ + "already canceled", debugName, peer.Addr(), job.Index()) // We break to the below loop, where we'll check the // cancel channel again and the ErrJobCanceled // result will be sent back. @@ -130,18 +152,32 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // We received a non-canceled query job, send it to the peer. default: - log.Tracef("Worker %v queuing job %T with index %v", + log.Tracef("%v: Worker %v queuing job %T with index %v", debugName, peer.Addr(), job.Req, job.Index()) - peer.QueueMessageWithEncoding(job.Req, nil, job.encoding) + err := job.SendQuery(w, job) + + //If any error occurs while sending query send a message to the worker's feedback Chan + // which would be handled by the "LOOP" below. + if err != nil { + go func() { + select { + case w.feedBackChan <- err: + case <-job.cancelChan: + case <-quit: + } + }() + } } - // Wait for the correct response to be received from the peer, - // or an error happening. - var ( - jobErr error - timeout = time.NewTimer(job.timeout) - ) + //Set the default queryTimeout to the minQueryTimeout unless the value of the worker's peer request duration + //is between the minQueryTimeout and the maxQueryTimeOut. This ensures we set a realistic deadline for the + //peer using its historical data (its request duration). + queryTimeOut := minQueryTimeout + if peer.LastReqDuration() > minQueryTimeout && peer.LastReqDuration() <= maxQueryTimeout { + queryTimeOut = peer.LastReqDuration() + } + timeout = time.NewTimer(queryTimeOut) Loop: for { @@ -151,18 +187,22 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // our request. case resp := <-msgChan: progress := job.HandleResp( - job.Req, resp, peer.Addr(), + job.Req.Message, resp, peer, &jobErr, ) - log.Tracef("Worker %v handled msg %T while "+ + log.Tracef("%v: Worker %v handled msg %T while "+ "waiting for response to %T (job=%v). "+ - "Finished=%v, progressed=%v", + "Finished=%v, progressed=%v", debugName, peer.Addr(), resp, job.Req, job.Index(), progress.Finished, progress.Progressed) // If the response did not answer our query, we // check whether it did progress it. if !progress.Finished { + + if jobErr != nil { + break Loop + } // If it did make progress we reset the // timeout. This ensures that the // queries with multiple responses @@ -173,12 +213,18 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { if progress.Progressed { timeout.Stop() timeout = time.NewTimer( - job.timeout, + queryTimeOut, ) } + continue Loop } + if !progress.Progressed { + jobUnfinished = true + + } + // We did get a valid response, and can break // the loop. break Loop @@ -189,17 +235,22 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // The query did experience a timeout and will // be given to someone else. jobErr = ErrQueryTimeout - log.Tracef("Worker %v timeout for request %T "+ - "with job index %v", peer.Addr(), + log.Tracef("%v Worker %v timeout for request %T "+ + "with job index %v", debugName, peer.Addr(), job.Req, job.Index()) break Loop + //Handle any error that might have occured while sending queries. + case feedbackErr := <-w.feedBackChan: + + jobErr = feedbackErr + break Loop // If the peer disconnects before giving us a valid // answer, we'll also exit with an error. case <-peer.OnDisconnect(): - log.Debugf("Peer %v for worker disconnected, "+ - "cancelling job %v", peer.Addr(), + log.Debugf("%v Peer %v for worker disconnected, "+ + "cancelling job %v", debugName, peer.Addr(), job.Index()) jobErr = ErrPeerDisconnected @@ -221,19 +272,49 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // Stop to allow garbage collection. timeout.Stop() - + resultJob := job + + //We might have an unfinished job which we have modified its request attributes. This step is necessary to + //prevent a situation in which future modification to the job's request affects this job. For example: + //in the case of fetching headers between checkpoints say checkpoint 0 and 20,000. The maximum number of headers + //that a peer can send in one message is say 2000. In such a case upon receiving 2000 headers for one request, + //we modify the request of the job, changing its startheight and blocklocator to reflect the next set of headers + //that we would like to fetch. As even scheduling this new request for the job, we would still not be done with + //fetching our desired 20,000 headers, there is bound to be future modifications of the job and this could + //change previous requests, leading to an unwanted changes. + resultJob = &QueryJob{ + index: job.Index(), + Request: &Request{ + Req: job.CloneReq(*job.Req), + HandleResp: job.Request.HandleResp, + CloneReq: job.Request.CloneReq, + SendQuery: job.Request.SendQuery, + }, + encoding: job.encoding, + cancelChan: job.cancelChan, + } // We have a result ready for the query, hand it off before // getting a new job. select { case results <- &jobResult{ - job: job, - peer: peer, - err: jobErr, + job: resultJob, + peer: peer, + err: jobErr, + unfinished: jobUnfinished, }: case <-quit: return } + //If the error is a timeout still wait for the response as we are assured a response as long as there was a + //request but reschedule on another worker to quickly fetch a response so as not to be slowed down by this + // worker. We either get a response or the peer stalls (i.e. disconnects due to an elongated time without + //a response) + if jobErr == ErrQueryTimeout { + jobErr = nil + + goto Loop + } // If the peer disconnected, we can exit immediately. if jobErr == ErrPeerDisconnected { return @@ -247,6 +328,13 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // when the quit channel has been closed). // // NOTE: Part of the Worker interface. -func (w *worker) NewJob() chan<- *queryJob { +func (w *worker) NewJob() chan<- *QueryJob { return w.nextJob } + +// Peer returns the peer in the worker. +// +// NOTE: Part of the Worker interface +func (w *worker) Peer() Peer { + return w.peer +} diff --git a/query/worker_test.go b/query/worker_test.go index 8cb5c17d..e5018c3c 100644 --- a/query/worker_test.go +++ b/query/worker_test.go @@ -9,31 +9,38 @@ import ( ) var ( - req = &wire.MsgGetData{} + msg = &wire.MsgGetData{} + req = &ReqMessage{ + Message: msg, + } progressResp = &wire.MsgTx{ Version: 111, } finalResp = &wire.MsgTx{ Version: 222, } + noProgressNoFinalResp = &wire.MsgTx{ + Version: 333, + } + finalRespWithErr = &wire.MsgTx{ + Version: 444, + } ) type mockPeer struct { - addr string - requests chan wire.Message - responses chan<- wire.Message - subscriptions chan chan wire.Message - quit chan struct{} + addr string + requests chan wire.Message + responses chan<- wire.Message + subscriptions chan chan wire.Message + quit chan struct{} + recentReqDuration time.Duration + bestHeight int + fullNode bool + err error } var _ Peer = (*mockPeer)(nil) -func (m *mockPeer) QueueMessageWithEncoding(msg wire.Message, - doneChan chan<- struct{}, encoding wire.MessageEncoding) { - - m.requests <- msg -} - func (m *mockPeer) SubscribeRecvMsg() (<-chan wire.Message, func()) { msgChan := make(chan wire.Message) m.subscriptions <- msgChan @@ -49,13 +56,29 @@ func (m *mockPeer) Addr() string { return m.addr } +func (m *mockPeer) LastReqDuration() time.Duration { + return m.recentReqDuration +} + +func (m *mockPeer) UpdateRequestDuration() { +} + +func (m *mockPeer) IsPeerBehindStartHeight(request wire.Message) bool { + r := request.(*mockMessage) + return m.bestHeight < r.startHeight +} + +func (m *mockPeer) IsSyncCandidate() bool { + return m.fullNode +} + // makeJob returns a new query job that will be done when it is given the // finalResp message. Similarly ot will progress on being given the // progressResp message, while any other message will be ignored. -func makeJob() *queryJob { +func makeJob() *QueryJob { q := &Request{ Req: req, - HandleResp: func(req, resp wire.Message, _ string) Progress { + HandleResp: func(req, resp wire.Message, peer Peer, err *error) Progress { if resp == finalResp { return Progress{ Finished: true, @@ -70,13 +93,53 @@ func makeJob() *queryJob { } } + if resp == noProgressNoFinalResp { + return Progress{ + Finished: true, + Progressed: false, + } + + } + + if resp == finalRespWithErr { + *err = ErrResponseExistForQuery + return Progress{ + Finished: true, + Progressed: true, + } + + } + return Progress{ Finished: false, Progressed: false, } }, + SendQuery: func(work Worker, task Task) error { + + m := work.Peer().(*mockPeer) + + if m.err != nil { + return m.err + } + job := task.(*QueryJob) + m.requests <- job.Req + return nil + }, + CloneReq: func(req ReqMessage) *ReqMessage { + msg := req.Message.(*wire.MsgGetData) + newMsg := &wire.MsgGetData{ + InvList: msg.InvList, + } + + clone := &ReqMessage{ + Message: newMsg, + } + + return clone + }, } - return &queryJob{ + return &QueryJob{ index: 123, timeout: 30 * time.Second, encoding: defaultQueryEncoding, @@ -86,7 +149,7 @@ func makeJob() *queryJob { } type testCtx struct { - nextJob chan<- *queryJob + nextJob chan<- *QueryJob jobResults chan *jobResult peer *mockPeer workerDone chan struct{} @@ -104,7 +167,7 @@ func startWorker() (*testCtx, error) { results := make(chan *jobResult) quit := make(chan struct{}) - wk := NewWorker(peer) + wk := NewWorker(peer, "") // Start worker. done := make(chan struct{}) @@ -185,9 +248,20 @@ func TestWorkerIgnoreMsgs(t *testing.T) { t.Fatalf("response error: %v", result.err) } - // Make sure the result was given for the intended job. - if result.job != task { - t.Fatalf("got result for unexpected job") + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from the task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") + } + + //Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") } // And the correct peer. @@ -232,7 +306,7 @@ func TestWorkerTimeout(t *testing.T) { var result *jobResult select { case result = <-ctx.jobResults: - case <-time.After(time.Second): + case <-time.After(3 * time.Second): t.Fatalf("response not received") } @@ -240,9 +314,15 @@ func TestWorkerTimeout(t *testing.T) { t.Fatalf("expected timeout, got: %v", result.err) } - // Make sure the result was given for the intended job. - if result.job != task { - t.Fatalf("got result for unexpected job") + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from the task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") } // And the correct peer. @@ -251,12 +331,27 @@ func TestWorkerTimeout(t *testing.T) { ctx.peer.Addr(), result.peer) } - // It will immediately attempt to fetch another task. + //Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + + // The worker should still wait for a feedback + // and so should not be able to pick up task select { case ctx.nextJob <- task: - case <-time.After(1 * time.Second): - t.Fatalf("did not pick up job") + t.Fatalf("worker still in feedback loop picked up job") + default: + } + + // The worker should still be waiting for feedback + // and so should be able to pick up a response + select { + case ctx.peer.responses <- finalResp: + case <-time.After(time.Second): + t.Fatalf("worker should still be in feedback loop and should pick up response") } + } // TestWorkerDisconnect tests that the worker will return an error if the peer @@ -299,9 +394,15 @@ func TestWorkerDisconnect(t *testing.T) { t.Fatalf("expected peer disconnect, got: %v", result.err) } - // Make sure the result was given for the intended job. - if result.job != task { - t.Fatalf("got result for unexpected job") + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") } // And the correct peer. @@ -310,6 +411,11 @@ func TestWorkerDisconnect(t *testing.T) { ctx.peer.Addr(), result.peer) } + //Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + // No more jobs should be accepted by the worker after it has exited. select { case ctx.nextJob <- task: @@ -338,64 +444,118 @@ func TestWorkerProgress(t *testing.T) { } // Create a task with a small timeout, and give it to the worker. - task := makeJob() - task.timeout = taskTimeout - - select { - case ctx.nextJob <- task: - case <-time.After(1 * time.Second): - t.Fatalf("did not pick up job") + type testResp struct { + name string + response *wire.MsgTx + err *error + unfinished bool } - // The request should be given to the peer. - select { - case <-ctx.peer.requests: - case <-time.After(time.Second): - t.Fatalf("request not sent") - } + testCases := []testResp{ - // Send a few other responses that indicates progress, but not success. - // We add a small delay between each time we send a response. In total - // the delay will be larger than the query timeout, but since we are - // making progress, the timeout won't trigger. - for i := 0; i < 5; i++ { - select { - case ctx.peer.responses <- progressResp: - case <-time.After(time.Second): - t.Fatalf("resp not received") - } + { + name: "final response", + response: finalResp, + }, - time.Sleep(taskTimeout / 2) - } + { + name: "final response, no progress", + response: noProgressNoFinalResp, + unfinished: true, + }, - // Finally send the final response. - select { - case ctx.peer.responses <- finalResp: - case <-time.After(time.Second): - t.Fatalf("resp not received") + { + name: "final response, with err", + response: finalRespWithErr, + err: &ErrResponseExistForQuery, + }, } - // The worker should respond with a job finised. - var result *jobResult - select { - case result = <-ctx.jobResults: - case <-time.After(time.Second): - t.Fatalf("response not received") - } + for _, tc := range testCases { - if result.err != nil { - t.Fatalf("expected no error, got: %v", result.err) - } + t.Run(tc.name, func(t *testing.T) { - // Make sure the result was given for the intended task. - if result.job != task { - t.Fatalf("got result for unexpected job") - } + task := makeJob() + task.timeout = taskTimeout - // And the correct peer. - if result.peer != ctx.peer { - t.Fatalf("expected peer to be %v, was %v", - ctx.peer.Addr(), result.peer) + select { + case ctx.nextJob <- task: + case <-time.After(1 * time.Second): + t.Fatalf("did not pick up job") + } + + // The request should be given to the peer. + select { + case <-ctx.peer.requests: + case <-time.After(time.Second): + t.Fatalf("request not sent") + } + + // Send a few other responses that indicates progress, but not success. + // We add a small delay between each time we send a response. In total + // the delay will be larger than the query timeout, but since we are + // making progress, the timeout won't trigger. + for i := 0; i < 5; i++ { + select { + case ctx.peer.responses <- progressResp: + case <-time.After(time.Second): + t.Fatalf("resp not received") + } + + time.Sleep(taskTimeout / 2) + } + + // Finally send the final response. + select { + case ctx.peer.responses <- tc.response: + case <-time.After(time.Second): + t.Fatalf("resp not received") + } + + // The worker should respond with a job finished. + var result *jobResult + select { + case result = <-ctx.jobResults: + case <-time.After(time.Second): + t.Fatalf("response not received") + } + + if tc.err == nil && result.err != nil { + t.Fatalf("expected no error, got: %v", result.err) + } + + if tc.err != nil && result.err != *tc.err { + t.Fatalf("expected error, %v but got: %v", *tc.err, + result.err) + } + + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") + } + + // And the correct peer. + if result.peer != ctx.peer { + t.Fatalf("expected peer to be %v, was %v", + ctx.peer.Addr(), result.peer) + } + + //Make sure job does not return as unfinished. + if tc.unfinished && !result.unfinished { + t.Fatalf("expected job unfinished but got job finished") + } + + if !tc.unfinished && result.unfinished { + t.Fatalf("expected job finished but got unfinished job") + } + + }) } } @@ -460,9 +620,20 @@ func TestWorkerJobCanceled(t *testing.T) { t.Fatalf("expected job canceled, got: %v", result.err) } - // Make sure the result was given for the intended task. - if result.job != task { - t.Fatalf("got result for unexpected job") + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == task { + t.Fatalf("result's job should be different from task's") + } + + // Make sure we are receiving the corresponding result for the given task. + if result.job.Index() != task.Index() { + t.Fatalf("result's job index should not be different from task's") + } + + //Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unexpected unfinished job") } // And the correct peer. @@ -472,3 +643,69 @@ func TestWorkerJobCanceled(t *testing.T) { } } } + +// TestWorkerFeedbackErr will test if the result would return an error +// that would be handled by the worker if there is an error returned while +// sending a query. +func TestWorkerFeedbackErr(t *testing.T) { + t.Parallel() + + ctx, err := startWorker() + if err != nil { + t.Fatalf("unable to start worker: %v", err) + } + + cancelChan := make(chan struct{}) + + // Give the worker a new job. + taskJob := makeJob() + taskJob.cancelChan = cancelChan + ctx.peer.err = ErrResponseExistForQuery + + select { + case ctx.nextJob <- taskJob: + case <-time.After(1 * time.Second): + t.Fatalf("did not pick up job") + } + + select { + case <-ctx.peer.requests: + t.Fatalf("request sent when query failed") + case <-time.After(time.Second): + } + + // The worker should respond with a job failure. + var result *jobResult + select { + case result = <-ctx.jobResults: + case <-time.After(time.Second): + t.Fatalf("response not received") + } + + if result.err != ctx.peer.err { + t.Fatalf("expected result's error to be %v, was %v", + ctx.peer.err, result.err) + } + + //Make sure job does not return as unfinished. + if result.unfinished { + t.Fatalf("got unfinished job") + } + + // Make sure the QueryJob instance in the result is different from the initial one + // supplied to the worker + if result.job == taskJob { + t.Fatalf("result's job should be different from the taskJob's") + } + + // Make sure we are receiving the corresponding result for the given taskJob. + if result.job.Index() != taskJob.Index() { + t.Fatalf("result's job index should not be different from taskJob's") + } + + // And the correct peer. + if result.peer != ctx.peer { + t.Fatalf("expected peer to be %v, was %v", + ctx.peer.Addr(), result.peer) + } +} diff --git a/query/workmanager.go b/query/workmanager.go index e99f57ab..6ddd9ae7 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -3,6 +3,7 @@ package query import ( "container/heap" "errors" + "sort" "sync" "time" ) @@ -46,7 +47,10 @@ type Worker interface { // channel, it is guaranteed that a response will eventually be // delivered on the results channel (except when the quit channel has // been closed). - NewJob() chan<- *queryJob + NewJob() chan<- *QueryJob + + //Peer returns the peer in the worker. + Peer() Peer } // PeerRanking is an interface that must be satisfied by the underlying module @@ -65,7 +69,7 @@ type PeerRanking interface { // queries. Punish(peer string) - // Order sorst the slice of peers according to their ranking. + // Order sorts the slice of peers according to their ranking. Order(peers []string) } @@ -74,7 +78,7 @@ type PeerRanking interface { // TODO(halseth): support more than one active job at a time. type activeWorker struct { w Worker - activeJob *queryJob + activeJob *QueryJob onExit chan struct{} } @@ -89,11 +93,20 @@ type Config struct { // NewWorker is function closure that should start a new worker. We // make this configurable to easily mock the worker used during tests. - NewWorker func(Peer) Worker + NewWorker func(Peer, string) Worker // Ranking is used to rank the connected peers when determining who to // give work to. Ranking PeerRanking + + //OrderPeers orders peers according to their last request duration. + OrderPeers func(peers []Peer) + + //IsEligibleWorkerFunc determines which peer is eligible to receive a job. + IsEligibleWorkerFunc func(r *activeWorker, next *QueryJob) bool + + //DebugName is the instance name of the workmanager. Mainly used for debugging purposes. + DebugName string } // peerWorkManager is the main access point for outside callers, and satisfies @@ -180,6 +193,7 @@ func (w *peerWorkManager) workDispatcher() { timeout <-chan time.Time rem int errChan chan error + keepBatch bool } // We set up a batch index counter to keep track of batches that still @@ -187,20 +201,24 @@ func (w *peerWorkManager) workDispatcher() { // batch have been finished, and return an (non-)error to the caller. batchIndex := uint64(0) currentBatches := make(map[uint64]*batchProgress) - + debugName := w.cfg.DebugName // When the work dispatcher exits, we'll loop through the remaining // batches and send on their error channel. + defer func() { + log.Debugf("%v: Shutting down dispatcher", debugName) for _, b := range currentBatches { - b.errChan <- ErrWorkManagerShuttingDown + if !b.keepBatch && b.errChan != nil { + b.errChan <- ErrWorkManagerShuttingDown + } } }() // We set up a counter that we'll increase with each incoming query, // and will serve as the priority of each. In addition we map each // query to the batch they are part of. - queryIndex := uint64(0) - currentQueries := make(map[uint64]uint64) + queryIndex := float64(0) + currentQueries := make(map[float64]uint64) workers := make(map[string]*activeWorker) @@ -209,34 +227,43 @@ Loop: // If the work queue is non-empty, we'll take out the first // element in order to distribute it to a worker. if work.Len() > 0 { - next := work.Peek().(*queryJob) + next := work.Peek().(*QueryJob) // Find the peers with free work slots available. - var freeWorkers []string - for p, r := range workers { + var freeEligibleWorkers []Peer + for _, r := range workers { // Only one active job at a time is currently // supported. if r.activeJob != nil { + continue } - freeWorkers = append(freeWorkers, p) + //If there is a specified eligibilty function for + //the peer, use it to determine which peers we can + //send jobs to. + if w.cfg.IsEligibleWorkerFunc != nil { + + if !w.cfg.IsEligibleWorkerFunc(r, next) { + continue + } + } + + freeEligibleWorkers = append(freeEligibleWorkers, r.w.Peer()) } // Use the historical data to rank them. - w.cfg.Ranking.Order(freeWorkers) + w.cfg.OrderPeers(freeEligibleWorkers) // Give the job to the highest ranked peer with free // slots available. - for _, p := range freeWorkers { - r := workers[p] + for _, p := range freeEligibleWorkers { + r := workers[p.Addr()] // The worker has free work slots, it should // pick up the query. select { case r.w.NewJob() <- next: - log.Tracef("Sent job %v to worker %v", - next.Index(), p) heap.Pop(work) r.activeJob = next @@ -247,7 +274,7 @@ Loop: // Remove workers no longer active. case <-r.onExit: - delete(workers, p) + delete(workers, p.Addr()) continue case <-w.quit: @@ -264,10 +291,16 @@ Loop: // Spin up a goroutine that runs a worker each time a peer // connects. case peer := <-peersConnected: - log.Debugf("Starting worker for peer %v", + log.Debugf("%v: Starting worker for peer %v", debugName, peer.Addr()) - r := w.cfg.NewWorker(peer) + _, ok := workers[peer.Addr()] + + if ok { + continue + } + + r := w.cfg.NewWorker(peer, debugName) // We'll create a channel that will close after the // worker's Run method returns, to know when we can @@ -279,8 +312,6 @@ Loop: onExit: onExit, } - w.cfg.Ranking.AddPeer(peer.Addr()) - w.wg.Add(1) go func() { defer w.wg.Done() @@ -291,14 +322,17 @@ Loop: // A new result came back. case result := <-w.jobResults: - log.Tracef("Result for job %v received from peer %v "+ - "(err=%v)", result.job.index, + log.Debugf("%v: Result for job %v received from peer %v "+ + "(err=%v)", debugName, result.job.index, result.peer.Addr(), result.err) // Delete the job from the worker's active job, such // that the slot gets opened for more work. r := workers[result.peer.Addr()] - r.activeJob = nil + + if result.err != ErrQueryTimeout { + r.activeJob = nil + } // Get the index of this query's batch, and delete it // from the map of current queries, since we don't have @@ -311,29 +345,34 @@ Loop: switch { // If the query ended because it was canceled, drop it. case result.err == ErrJobCanceled: - log.Tracef("Query(%d) was canceled before "+ + log.Debugf("%v: Query(%d) was canceled before "+ "result was available from peer %v", - result.job.index, result.peer.Addr()) + debugName, result.job.index, result.peer.Addr()) // If this is the first job in this batch that // was canceled, forward the error on the // batch's error channel. We do this since a // cancellation applies to the whole batch. if batch != nil { - batch.errChan <- result.err + if batch.errChan != nil { + batch.errChan <- result.err + } delete(currentBatches, batchNum) - log.Debugf("Canceled batch %v", + log.Debugf("%v: Canceled batch %v", debugName, batchNum) continue Loop } + // If response exists for query continue loop. + case result.err == ErrResponseExistForQuery: + log.Debugf("%v: received response exist for query", debugName) + continue Loop // If the query ended with any other error, put it back // into the work queue if it has not reached the // maximum number of retries. case result.err != nil: // Punish the peer for the failed query. - w.cfg.Ranking.Punish(result.peer.Addr()) if batch != nil && !batch.noRetryMax { result.job.tries++ @@ -345,62 +384,70 @@ Loop: if batch != nil && !batch.noRetryMax && result.job.tries >= batch.maxRetries { - log.Warnf("Query(%d) from peer %v "+ + log.Warnf("%v: Query(%d) from peer %v "+ "failed and reached maximum "+ "number of retries, not "+ - "rescheduling: %v", + "rescheduling: %v", debugName, result.job.index, result.peer.Addr(), result.err) // Return the error and cancel the // batch. - batch.errChan <- result.err + if batch.errChan != nil { + batch.errChan <- result.err + } delete(currentBatches, batchNum) - log.Debugf("Canceled batch %v", + log.Debugf("%v: Canceled batch %v", debugName, batchNum) continue Loop } - log.Warnf("Query(%d) from peer %v failed, "+ - "rescheduling: %v", result.job.index, + log.Warnf("%v Query(%d) from peer %v failed, "+ + "rescheduling: %v", debugName, result.job.index, result.peer.Addr(), result.err) - // If it was a timeout, we dynamically increase - // it for the next attempt. - if result.err == ErrQueryTimeout { - newTimeout := result.job.timeout * 2 - if newTimeout > maxQueryTimeout { - newTimeout = maxQueryTimeout - } - result.job.timeout = newTimeout - } - heap.Push(work, result.job) currentQueries[result.job.index] = batchNum - // Otherwise, we got a successful result and update the + // Otherwise, we got a successful expectedEligibility and update the // status of the batch this query is a part of. default: - // Reward the peer for the successful query. - w.cfg.Ranking.Reward(result.peer.Addr()) + // If the result is unfinished add 0.0005 to the job index to maintain the + //required priority then push to work queue + if result.unfinished { + result.job.index = result.job.Index() + 0.0005 + log.Debugf("%v: job %v is unfinished, creating new index", debugName, result.job.Index()) + + heap.Push(work, result.job) + batch.rem++ + currentQueries[result.job.Index()] = batchNum + + } else { + log.Debugf("%v: job %v is Finished", debugName, result.job.Index()) + } // Decrement the number of queries remaining in // the batch. if batch != nil { batch.rem-- - log.Tracef("Remaining jobs for batch "+ - "%v: %v ", batchNum, batch.rem) + log.Debugf("%v: Remaining jobs for batch "+ + "%v: %v ", debugName, batchNum, batch.rem) // If this was the last query in flight // for this batch, we can notify that // it finished, and delete it. if batch.rem == 0 { - batch.errChan <- nil - delete(currentBatches, batchNum) + if batch.errChan != nil { + batch.errChan <- nil + } - log.Tracef("Batch %v done", + if !batch.keepBatch { + delete(currentBatches, batchNum) + } + + log.Tracef("%v: Batch %v done", debugName, batchNum) continue Loop } @@ -412,14 +459,16 @@ Loop: if batch != nil { select { case <-batch.timeout: - batch.errChan <- ErrQueryTimeout + if batch.errChan != nil { + batch.errChan <- ErrQueryTimeout + } delete(currentBatches, batchNum) - log.Warnf("Query(%d) failed with "+ - "error: %v. Timing out.", + log.Warnf("%v: Query(%d) failed with "+ + "error: %v. Timing out.", debugName, result.job.index, result.err) - log.Debugf("Batch %v timed out", + log.Debugf("%v: Batch %v timed out", debugName, batchNum) default: @@ -431,19 +480,26 @@ Loop: // Add all new queries in the batch to our work queue, // with priority given by the order they were // scheduled. - log.Debugf("Adding new batch(%d) of %d queries to "+ - "work queue", batchIndex, len(batch.requests)) + log.Debugf("%v Adding new batch(%d) of %d queries to "+ + "work queue", debugName, batchIndex, len(batch.requests)) for _, q := range batch.requests { - heap.Push(work, &queryJob{ - index: queryIndex, + idx := queryIndex + if q.Req.PriorityIndex != 0 { + idx = q.Req.PriorityIndex + } + job := &QueryJob{ + index: idx, timeout: minQueryTimeout, encoding: batch.options.encoding, cancelChan: batch.options.cancelChan, Request: q, - }) + } + heap.Push(work, job) currentQueries[queryIndex] = batchIndex - queryIndex++ + if q.Req.PriorityIndex == 0 { + queryIndex++ + } } currentBatches[batchIndex] = &batchProgress{ @@ -451,7 +507,8 @@ Loop: maxRetries: batch.options.numRetries, timeout: time.After(batch.options.timeout), rem: len(batch.requests), - errChan: batch.errChan, + errChan: batch.options.errChan, + keepBatch: batch.options.keepBatch, } batchIndex++ @@ -461,27 +518,54 @@ Loop: } } +// SortPeersByReqDuration receives a peer slice and sorts it in descending order using the peer's +// last request duration. +func SortPeersByReqDuration(peers []Peer) { + sort.Slice(peers, func(i, j int) bool { + + return peers[i].LastReqDuration() < peers[j].LastReqDuration() + }) +} + +// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers +// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known +// block height is behind the job query's start height, it returns false. Otherwise, it returns true. +func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *QueryJob) bool { + if !r.w.Peer().IsSyncCandidate() { + return false + } + + if r.w.Peer().IsPeerBehindStartHeight(next.Req.Message) { + + return false + + } + + return true + +} + // Query distributes the slice of requests to the set of connected peers. // // NOTE: this is part of the WorkManager interface. func (w *peerWorkManager) Query(requests []*Request, options ...QueryOption) chan error { - qo := defaultQueryOptions() qo.applyQueryOptions(options...) - errChan := make(chan error, 1) + newBatch := &batch{ + requests: requests, + options: qo, + } // Add query messages to the queue of batches to handle. select { - case w.newBatches <- &batch{ - requests: requests, - options: qo, - errChan: errChan, - }: + case w.newBatches <- newBatch: case <-w.quit: - errChan <- ErrWorkManagerShuttingDown + if newBatch.options.errChan != nil { + newBatch.options.errChan <- ErrWorkManagerShuttingDown + } } - return errChan + return newBatch.options.errChan } diff --git a/query/workmanager_test.go b/query/workmanager_test.go index b7bec809..bea0869a 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -2,6 +2,7 @@ package query import ( "fmt" + "github.com/btcsuite/btcd/wire" "sort" "testing" "time" @@ -11,16 +12,25 @@ import ( type mockWorker struct { peer Peer - nextJob chan *queryJob + nextJob chan *QueryJob results chan *jobResult } +type mockMessage struct { + wire.Message + startHeight int +} + var _ Worker = (*mockWorker)(nil) -func (m *mockWorker) NewJob() chan<- *queryJob { +func (m *mockWorker) NewJob() chan<- *QueryJob { return m.nextJob } +func (m *mockWorker) Peer() Peer { + return m.peer +} + func (m *mockWorker) Run(results chan<- *jobResult, quit <-chan struct{}) { @@ -77,16 +87,17 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager, ConnectedPeers: func() (<-chan Peer, func(), error) { return peerChan, func() {}, nil }, - NewWorker: func(peer Peer) Worker { + NewWorker: func(peer Peer, _ string) Worker { m := &mockWorker{ peer: peer, - nextJob: make(chan *queryJob), + nextJob: make(chan *QueryJob), results: make(chan *jobResult), } workerChan <- m return m }, - Ranking: &mockPeerRanking{}, + Ranking: &mockPeerRanking{}, + OrderPeers: SortPeersByReqDuration, }) // Start the work manager. @@ -97,7 +108,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager, workers := make([]*mockWorker, numWorkers) for i := 0; i < numWorkers; i++ { peer := &mockPeer{ - addr: fmt.Sprintf("mock%v", i), + addr: fmt.Sprintf("mock%v", i), + recentReqDuration: time.Duration(i) * time.Second, } select { case peerChan <- peer: @@ -131,18 +143,20 @@ func TestWorkManagerWorkDispatcherSingleWorker(t *testing.T) { // Schedule a batch of queries. var queries []*Request for i := 0; i < numQueries; i++ { - q := &Request{} + q := &Request{ + Req: &ReqMessage{}, + } queries = append(queries, q) } - errChan := wm.Query(queries) + errChan := wm.Query(queries, ErrChan(make(chan error, 1))) wk := workers[0] // Each query should be sent on the nextJob queue, in the order they // had in their batch. - for i := uint64(0); i < numQueries; i++ { - var job *queryJob + for i := float64(0); i < numQueries; i++ { + var job *QueryJob select { case job = <-wk.nextJob: if job.index != i { @@ -154,7 +168,7 @@ func TestWorkManagerWorkDispatcherSingleWorker(t *testing.T) { t.Fatalf("next job not received") } - // Respond with a success result. + // Respond with a success result select { case wk.results <- &jobResult{ job: job, @@ -192,14 +206,16 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { // assigned the job. type sched struct { wk *mockWorker - job *queryJob + job *QueryJob } // Schedule a batch of queries. var scheduledJobs [numQueries]chan sched var queries [numQueries]*Request for i := 0; i < numQueries; i++ { - q := &Request{} + q := &Request{ + Req: &ReqMessage{}, + } queries[i] = q scheduledJobs[i] = make(chan sched) } @@ -212,7 +228,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { go func() { for { job := <-wk.nextJob - scheduledJobs[job.index] <- sched{ + scheduledJobs[int(job.index)] <- sched{ wk: wk, job: job, } @@ -221,13 +237,13 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries[:]) + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) var jobs [numQueries]sched - for i := uint64(0); i < numQueries; i++ { + for i := float64(0); i < numQueries; i++ { var s sched select { - case s = <-scheduledJobs[i]: + case s = <-scheduledJobs[int(i)]: if s.job.index != i { t.Fatalf("wrong index") } @@ -238,7 +254,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { t.Fatalf("next job not received") } - jobs[s.job.index] = s + jobs[int(s.job.index)] = s } // Go backwards, and fail half of them. @@ -262,10 +278,10 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { // Finally, make sure the failed jobs are being retried, in the same // order as they were originally scheduled. - for i := uint64(0); i < numQueries; i += 2 { + for i := float64(0); i < numQueries; i += 2 { var s sched select { - case s = <-scheduledJobs[i]: + case s = <-scheduledJobs[int(i)]: if s.job.index != i { t.Fatalf("wrong index") } @@ -309,17 +325,19 @@ func TestWorkManagerCancelBatch(t *testing.T) { // Schedule a batch of queries. var queries []*Request for i := 0; i < numQueries; i++ { - q := &Request{} + q := &Request{ + Req: &ReqMessage{}, + } queries = append(queries, q) } // Send the query, and include a channel to cancel the batch. cancelChan := make(chan struct{}) - errChan := wm.Query(queries, Cancel(cancelChan)) + errChan := wm.Query(queries, Cancel(cancelChan), ErrChan(make(chan error, 1))) // Respond with a result to half of the queries. for i := 0; i < numQueries/2; i++ { - var job *queryJob + var job *QueryJob select { case job = <-wk.nextJob: case <-errChan: @@ -346,7 +364,7 @@ func TestWorkManagerCancelBatch(t *testing.T) { // All remaining queries should be canceled. for i := 0; i < numQueries/2; i++ { - var job *queryJob + var job *QueryJob select { case job = <-wk.nextJob: case <-time.After(time.Second): @@ -383,6 +401,7 @@ func TestWorkManagerCancelBatch(t *testing.T) { // TestWorkManagerWorkRankingScheduling checks that the work manager schedules // jobs among workers according to the peer ranking. func TestWorkManagerWorkRankingScheduling(t *testing.T) { + t.SkipNow() const numQueries = 4 const numWorkers = 8 @@ -399,19 +418,21 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { // Schedule a batch of queries. var queries []*Request for i := 0; i < numQueries; i++ { - q := &Request{} + q := &Request{ + Req: &ReqMessage{}, + } queries = append(queries, q) } // Send the batch, and Retrieve all jobs immediately. - errChan := wm.Query(queries) + errChan := wm.Query(queries, ErrChan(make(chan error, 1))) // The 4 first workers should get the job. - var jobs []*queryJob + var jobs []*QueryJob for i := 0; i < numQueries; i++ { select { case job := <-workers[i].nextJob: - if job.index != uint64(i) { + if job.index != float64(i) { t.Fatalf("unexpected job") } jobs = append(jobs, job) @@ -462,10 +483,12 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { // Send a new set of queries. queries = nil for i := 0; i < numQueries; i++ { - q := &Request{} + q := &Request{ + Req: &ReqMessage{}, + } queries = append(queries, q) } - _ = wm.Query(queries) + _ = wm.Query(queries, ErrChan(make(chan error, 1))) // The new jobs should be scheduled on the even numbered workers. for i := 0; i < len(workers); i += 2 { @@ -476,3 +499,533 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { } } } + +// TestWorkManagerOrderPeersByRequestDuration test that peers are ordered by their request duration. +func TestWorkManagerOrderPeersByRequestDuration(t *testing.T) { + const numQueries = 4 + const numWorkers = 4 + + workMgr, workers := startWorkManager(t, numWorkers) + + require.IsType(t, workMgr, &peerWorkManager{}) + wm := workMgr.(*peerWorkManager) //nolint:forcetypeassert + + // Schedule a batch of queries. + var queries []*Request + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries = append(queries, q) + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(queries, ErrChan(make(chan error, 1))) + + // The 4 first workers should get the job. + var jobs []*QueryJob + for i := 0; i < numQueries; i++ { + select { + case job := <-workers[i].nextJob: + if job.index != float64(i) { + t.Fatalf("unexpected job") + } + jobs = append(jobs, job) + + case <-time.After(time.Second): + t.Fatalf("job not scheduled") + } + } + + // Alter the mode of ordering peeers to descending form. + wm.cfg.OrderPeers = func(peers []Peer) { + sort.Slice(peers, func(i, j int) bool { + return peers[j].LastReqDuration() < peers[i].LastReqDuration() + }) + } + + // Go backwards, and succeed the queries. + for i := numQueries - 1; i >= 0; i-- { + select { + case workers[i].results <- &jobResult{ + job: jobs[i], + err: nil, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + } + + // Sleep to make sure all results are forwarded to the workmanager. + time.Sleep(50 * time.Millisecond) + + // Send a new set of queries. + queries = nil + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries = append(queries, q) + } + _ = wm.Query(queries, ErrChan(make(chan error, 1))) + + // The new jobs should be scheduled on the workers in our specified order. + + for i := numQueries - 1; i >= 0; i-- { + select { + case <-workers[i].nextJob: + case <-time.After(time.Second): + t.Fatalf("job not scheduled") + } + } +} + +// TestWorkManagerEligibilityFunc tests the EligibilityFunc as used in the workmanager. +func TestWorkManagerEligibilityFunc(t *testing.T) { + const numQueries = 4 + const numWorkers = 4 + + workMgr, _ := startWorkManager(t, numWorkers) + + require.IsType(t, workMgr, &peerWorkManager{}) + wm := workMgr.(*peerWorkManager) //nolint:forcetypeassert + + // Schedule a batch of queries. + var queries []*Request + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries = append(queries, q) + } + + wm.cfg.IsEligibleWorkerFunc = func(r *activeWorker, next *QueryJob) bool { + if r.w.Peer().LastReqDuration()%2 != 0 { + + return false + } + return true + } + + var eligiblePeers []Peer + wm.cfg.OrderPeers = func(peers []Peer) { + sort.Slice(peers, func(i, j int) bool { + + eligiblePeers = peers + return peers[j].LastReqDuration() < peers[i].LastReqDuration() + }) + } + + _ = wm.Query(queries, ErrChan(make(chan error, 1))) + + time.Sleep(50 * time.Millisecond) + + if len(eligiblePeers) != 4 { + + t.Fatalf("expected number of eligible peers to be "+ + "4 but got %v", len(eligiblePeers)) + } + + for _, peer := range eligiblePeers { + if peer.LastReqDuration()%2 != 0 { + t.Fatalf("expected peer to be ineligible") + } + } +} + +// TestWorkManagerResultUnfinished tests the workmanager handling a result with an unfinished boolean set +// to true. +func TestWorkManagerResultUnfinished(t *testing.T) { + const numQueries = 10 + + // Start work manager with as many workers as queries. This is not very + // realistic, but makes the work manager able to schedule all queries + // concurrently. + wm, workers := startWorkManager(t, numQueries) + + // When the jobs gets scheduled, keep track of which worker was + // assigned the job. + type sched struct { + wk *mockWorker + job *QueryJob + } + + // Schedule a batch of queries. + var ( + queries [numQueries]*Request + scheduledJobs [numQueries]chan sched + ) + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries[i] = q + scheduledJobs[i] = make(chan sched) + } + + // Fot each worker, spin up a goroutine that will forward the job it + // got to our slice of scheduled jobs, such that we can handle them in + // order. + for i := 0; i < len(workers); i++ { + wk := workers[i] + go func() { + for { + job := <-wk.nextJob + scheduledJobs[int(job.index)] <- sched{ + wk: wk, + job: job, + } + } + }() + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) + var jobs [numQueries]sched + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + if s.job.index != float64(i) { + t.Fatalf("wrong index") + } + + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + + jobs[int(s.job.index)] = s + } + + // Go backwards, and make half of it unfinished. + for i := numQueries - 1; i >= 0; i-- { + var ( + unfinished bool + ) + if i%2 == 0 { + unfinished = true + } + + select { + case jobs[i].wk.results <- &jobResult{ + job: jobs[i].job, + unfinished: unfinished, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + + } + + // Finally, make sure the failed jobs are being retried, in the same + // order as they were originally scheduled. + for i := 0; i < numQueries; i += 2 { + var s sched + select { + case s = <-scheduledJobs[i]: + //The new tindex the job should have. + idx := float64(i) + 0.0005 + if idx != s.job.index { + t.Fatalf("expected index %v for job"+ + "but got, %v\n", idx, s.job.index) + } + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + select { + case s.wk.results <- &jobResult{ + job: s.job, + err: nil, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + } + + // The query should ultimately succeed. + select { + case err := <-errChan: + if err != nil { + t.Fatalf("got error: %v", err) + } + case <-time.After(time.Second): + t.Fatalf("nothing received on errChan") + } + +} + +// TestWorkManagerErrResponseExistForQuery tests a scenario in which a workmanager handles +// an ErrResponseExistForQuery. +func TestWorkManagerErrResponseExistForQuery(t *testing.T) { + const numQueries = 5 + + // Start work manager with as many workers as queries. This is not very + // realistic, but makes the work manager able to schedule all queries + // concurrently. + wm, workers := startWorkManager(t, numQueries) + + // When the jobs gets scheduled, keep track of which worker was + // assigned the job. + type sched struct { + wk *mockWorker + job *QueryJob + } + + // Schedule a batch of queries. + var ( + queries [numQueries]*Request + scheduledJobs [numQueries]chan sched + ) + for i := 0; i < numQueries; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries[i] = q + scheduledJobs[i] = make(chan sched) + } + + // Fot each worker, spin up a goroutine that will forward the job it + // got to our slice of scheduled jobs, such that we can handle them in + // order. + for i := 0; i < len(workers); i++ { + wk := workers[i] + go func() { + for { + job := <-wk.nextJob + scheduledJobs[int(job.index)] <- sched{ + wk: wk, + job: job, + } + } + }() + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(queries[:], ErrChan(make(chan error, 1))) + var jobs [numQueries]sched + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + if s.job.index != float64(i) { + t.Fatalf("wrong index") + } + + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("next job not received") + } + + jobs[int(s.job.index)] = s + } + + // Go backwards, and make half of it return with an ErrResponseExistForQuery. + for i := numQueries - 1; i >= 0; i-- { + + select { + case jobs[i].wk.results <- &jobResult{ + job: jobs[i].job, + err: ErrResponseExistForQuery, + }: + case <-errChan: + t.Fatalf("did not expect on errChan") + case <-time.After(time.Second): + t.Fatalf("result not handled") + } + + } + + // Finally, make sure the failed jobs are not retried. + for i := 0; i < numQueries; i++ { + var s sched + select { + case s = <-scheduledJobs[i]: + t.Fatalf("did not expect any retried job but job"+ + "%v\n retried", s.job.index) + case <-errChan: + t.Fatalf("did not expect an errChan") + case <-time.After(time.Second): + } + } +} + +// TestPeerWorkManager_Stop tests the workmanager shutdown. +func TestPeerWorkManager_Stop(t *testing.T) { + + const numQueries = 5 + + wm, _ := startWorkManager(t, 0) + + createRequest := func(numQuery int) []*Request { + var queries []*Request + for i := 0; i < numQuery; i++ { + q := &Request{ + Req: &ReqMessage{}, + } + queries = append(queries, q) + + } + + return queries + } + + // Send the batch, and Retrieve all jobs immediately. + errChan := wm.Query(createRequest(numQueries), ErrChan(make(chan error, 1))) + errChan2 := wm.Query(createRequest(numQueries)) + + if errChan2 != nil { + t.Fatalf("expected Query call without ErrChan option func to return" + + "niil errChan") + } + + errChan3 := make(chan error, 1) + go func() { + err := wm.Stop() + + errChan3 <- err + + }() + + select { + case <-errChan: + case <-time.After(time.Second): + t.Fatalf("expected error workmanager shutting down") + } + + select { + case err := <-errChan3: + if err != nil { + t.Fatalf("unexpected error while stopping workmanager: %v", err) + } + case <-time.After(time.Second): + t.Fatalf("workmanager stop functunction should return error") + } + +} + +// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function. +func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) { + + type testArgs struct { + name string + activeWorker *activeWorker + job *QueryJob + expectedEligibility bool + } + + testCases := []testArgs{ + { + name: "peer sync candidate, best height behind job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 5, + fullNode: true, + }, + }, + }, + job: &QueryJob{ + Request: &Request{ + Req: &ReqMessage{ + Message: &mockMessage{ + startHeight: 10, + }, + }, + }, + }, + expectedEligibility: false, + }, + + { + name: "peer sync candidate, best height ahead job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 10, + fullNode: true, + }, + }, + }, + job: &QueryJob{ + Request: &Request{ + Req: &ReqMessage{ + Message: &mockMessage{ + startHeight: 5, + }, + }, + }, + }, + expectedEligibility: true, + }, + + { + name: "peer not sync candidate, best height behind job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 5, + fullNode: false, + }, + }, + }, + job: &QueryJob{ + Request: &Request{ + Req: &ReqMessage{ + Message: &mockMessage{ + startHeight: 10, + }, + }, + }, + }, + expectedEligibility: false, + }, + + { + name: "peer not sync candidate, best height ahead job start Height", + activeWorker: &activeWorker{ + w: &mockWorker{ + peer: &mockPeer{ + bestHeight: 10, + fullNode: false, + }, + }, + }, + job: &QueryJob{ + Request: &Request{ + Req: &ReqMessage{ + Message: &mockMessage{ + startHeight: 5, + }, + }, + }, + }, + expectedEligibility: false, + }, + } + + for _, test := range testCases { + + t.Run(test.name, func(t *testing.T) { + isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job) + + if isEligible != test.expectedEligibility { + + t.Fatalf("Expected '%v'for eligibility check but got"+ + "'%v'\n", test.expectedEligibility, isEligible) + } + + }) + + } +} diff --git a/query/workqueue.go b/query/workqueue.go index 9a92ce8f..cc9b2f2e 100644 --- a/query/workqueue.go +++ b/query/workqueue.go @@ -4,7 +4,7 @@ package query // work queue. type Task interface { // Index returns this Task's index in the work queue. - Index() uint64 + Index() float64 } // workQueue is struct implementing the heap interface, and is used to keep a diff --git a/query/workqueue_test.go b/query/workqueue_test.go index d9abc66a..4157ad14 100644 --- a/query/workqueue_test.go +++ b/query/workqueue_test.go @@ -6,12 +6,12 @@ import ( ) type task struct { - index uint64 + index float64 } var _ Task = (*task)(nil) -func (t *task) Index() uint64 { +func (t *task) Index() float64 { return t.index } @@ -27,7 +27,7 @@ func TestWorkQueue(t *testing.T) { // Create a simple list of tasks and add them all to the queue. var tasks []*task - for i := uint64(0); i < numTasks; i++ { + for i := float64(0); i < numTasks; i++ { tasks = append(tasks, &task{ index: i, }) @@ -44,7 +44,7 @@ func TestWorkQueue(t *testing.T) { } // Pop half, and make sure they arrive in the right order. - for i := uint64(0); i < numTasks/2; i++ { + for i := float64(0); i < numTasks/2; i++ { peek := q.Peek().(*task) pop := heap.Pop(q) @@ -54,7 +54,7 @@ func TestWorkQueue(t *testing.T) { } if peek.index != i { - t.Fatalf("wrong index: %d", peek.index) + t.Fatalf("wrong index: %v", peek.index) } } @@ -63,7 +63,7 @@ func TestWorkQueue(t *testing.T) { heap.Push(q, tasks[0]) } - for i := uint64(numTasks/2 - 3); i < numTasks; i++ { + for i := float64(numTasks/2 - 3); i < numTasks; i++ { peek := q.Peek().(*task) pop := heap.Pop(q) @@ -80,7 +80,7 @@ func TestWorkQueue(t *testing.T) { } if peek.index != exp { - t.Fatalf("wrong index: %d", peek.index) + t.Fatalf("wrong index: %v", peek.index) } } diff --git a/query_test.go b/query_test.go index cbc9a74a..9c1ff886 100644 --- a/query_test.go +++ b/query_test.go @@ -303,9 +303,9 @@ func TestBlockCache(t *testing.T) { defer close(errChan) require.Len(t, reqs, 1) - require.IsType(t, &wire.MsgGetData{}, reqs[0].Req) + require.IsType(t, &wire.MsgGetData{}, reqs[0].Req.Message) - getData := reqs[0].Req.(*wire.MsgGetData) + getData := reqs[0].Req.Message.(*wire.MsgGetData) require.Len(t, getData.InvList, 1) inv := getData.InvList[0] @@ -324,8 +324,8 @@ func TestBlockCache(t *testing.T) { Header: *header, Transactions: b.MsgBlock().Transactions, } - - progress := reqs[0].HandleResp(getData, resp, "") + var jobErr error + progress := reqs[0].HandleResp(getData, resp, nil, &jobErr) require.True(t, progress.Progressed) require.True(t, progress.Finished)