diff --git a/pyethapp/synchronizer.py b/pyethapp/synchronizer.py index 06b79179..beab5a7c 100644 --- a/pyethapp/synchronizer.py +++ b/pyethapp/synchronizer.py @@ -50,8 +50,9 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato self.chain_difficulty = chain_difficulty self.requests = dict() # proto: Event self.header_request = None + self.header_processed = 0 self.batch_requests = [] #batch header request - self.batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request + self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request self.start_block_number = self.chain.head.number self.end_block_number = self.start_block_number + 1 # minimum synctask self.max_block_revert = 3600*24 / self.chainservice.config['eth']['block']['DIFF_ADJUSTMENT_CUTOFF'] @@ -101,10 +102,10 @@ def fetch_hashchain(self): if self.originating_proto.is_stopped: - if protos: - self.skeleton_peer= protos[0] - else: - log_st.warn('no protocols available') + # if protos: + # self.skeleton_peer= protos[0] + # else: + log_st.warn('originating_proto not available') self.exit(success=False) else: self.skeleton_peer=self.originating_proto @@ -112,11 +113,13 @@ def fetch_hashchain(self): self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0) try: skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout) - # assert isinstance(skeleton,list) - # log_st.debug('skeleton received %u',len(skeleton)) + # assert isinstance(skeleton,list) + log_st.debug('skeleton received %u',len(skeleton),skeleton=skeleton) except gevent.Timeout: log_st.warn('syncing skeleton timed out') #todo drop originating proto + + # del self.requests[self.skeleton_peer] self.exit(success=False) finally: # # is also executed 'on the way out' when any other clause of the try statement @@ -126,30 +129,35 @@ def fetch_hashchain(self): del self.requests[self.skeleton_peer] - log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton) + #log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton) if not skeleton: - # self.fetch_headers(from0) - # skeleton_fetch_done = True + log_st.warn('no more skeleton received') + # self.fetch_headers(from0) + skeleton_fetch_done = True + self.exit(success=False) continue else: - self.fetch_headerbatch(skeleton) + header_batch = self.fetch_headerbatch(from0,skeleton) + log_st.debug('header batch', headerbatch= header_batch) # processed= process_headerbatch(batch_header) # self.batch_header = filled[:processed] - # fetch_blocks(header_batch) - from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request + if header_batch: + # self.fetch_blocks(header_batch) + from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request #insert batch_header to hashchain #send requests in batches, receive one header batch at a time - def fetch_headerbatch(self, skeleton): - + def fetch_headerbatch(self,origin,skeleton): + log_st.debug('origin from',origin=origin) - # while True - from0=skeleton[0] + # while True + #header_processed = 0 + #from0=skeleton[0] self.batch_requests=[] - batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request + self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request headers= [] proto = None proto_received=None #proto which delivered the header @@ -185,9 +193,9 @@ def fetch_headerbatch(self, skeleton): log_st.debug('batch header fetching done') return self.batch_result try: - proto_received = deferred.get(timeout=self.blockheaders_request_timeout) - log_st.debug('headers batch received from proto',proto=proto_received) - del self.header_request + proto_received = deferred.get(timeout=self.blockheaders_request_timeout)['proto'] + header=deferred.get(timeout=self.blockheaders_request_timeout)['headers'] + log_st.debug('headers batch received from proto', header=header) except gevent.Timeout: log_st.warn('syncing batch hashchain timed out') retry += 1 @@ -199,8 +207,34 @@ def fetch_headerbatch(self, skeleton): log_st.info('headers sync failed with peers, retry', retry=retry) gevent.sleep(self.retry_delay) continue + finally: + del self.header_request - + if header[0] not in self.batch_requests: + continue + + #verified = self.verify_headers(self,proto_received, header) + batch_header= header[::-1] #in hight rising order + self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin+len(batch_header)]= batch_header + # log_st.debug('batch result',batch_result= self.batch_result) + self.batch_requests.remove(header[0]) + proto_received.set_idle() + del self.requests[proto_received] + + header_ready = 0 + while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]: + header_ready += self.max_blockheaders_per_request + + if header_ready > 0 : + # Headers are ready for delivery, gather them + processed = self.batch_result[self.header_processed:self.header_processed+header_ready] + log_st.debug('issue fetching blocks',header_processed=self.header_processed, blocks=processed, proto=proto_received,count=len(processed),start=processed[0].number) + self.fetch_blocks(processed) + self.header_processed += len(processed) + + log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests) + + def idle_protocols(self): @@ -235,7 +269,7 @@ def fetch_blocks(self, blockheaders_chain): # fetch blocks (no parallelism here) log_st.debug('fetching blocks', num=len(blockheaders_chain)) assert blockheaders_chain - blockheaders_chain.reverse() # height rising order + # blockheaders_chain.reverse() # height rising order num_blocks = len(blockheaders_chain) num_fetched = 0 retry = 0 @@ -244,7 +278,7 @@ def fetch_blocks(self, blockheaders_chain): bodies = [] # try with protos - protocols = self.protocols + protocols = self.idle_protocols() if not protocols: log_st.warn('no protocols available') return self.exit(success=False) @@ -304,13 +338,13 @@ def fetch_blocks(self, blockheaders_chain): # done last_block = t_block assert not len(blockheaders_chain) - assert last_block.header.hash == self.blockhash - log_st.debug('syncing finished') + # assert last_block.header.hash == self.blockhash + # log_st.debug('syncing finished') # at this point blocks are not in the chain yet, but in the add_block queue if self.chain_difficulty >= self.chain.head.chain_difficulty(): self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto) - - self.exit(success=True) + return + #self.exit(success=True) def receive_newblockhashes(self, proto, newblockhashes): """ @@ -349,12 +383,14 @@ def receive_blockheaders(self, proto, blockheaders): return if self.batch_requests and blockheaders: # check header validity - - if blockheaders[0] in self.batch_requests: - self.batch_requests.remove(blockheaders[0]) - log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests) - proto.set_idle() - del self.requests[proto] + # if not valid(blockheaders): + # return self.exit(success=False) + + # if blockheaders[0] in self.batch_requests: + # self.batch_requests.remove(blockheaders[0]) + # log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests) + # proto.set_idle() + # del self.requests[proto] #deliver to header processer #pack batch headers @@ -365,7 +401,7 @@ def receive_blockheaders(self, proto, blockheaders): #evoke next header fetching task # self.requests[proto].set(proto) - self.header_request.set(proto) + self.header_request.set({'proto':proto,'headers':blockheaders}) elif proto == self.skeleton_peer: #make sure it's from the originating proto self.requests[proto].set(blockheaders)