Skip to content
This repository has been archived by the owner on Aug 8, 2018. It is now read-only.

Commit

Permalink
block sync after batch header fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
rairyx committed Aug 10, 2017
1 parent 6272453 commit 938a52d
Showing 1 changed file with 71 additions and 35 deletions.
106 changes: 71 additions & 35 deletions pyethapp/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato
self.chain_difficulty = chain_difficulty
self.requests = dict() # proto: Event
self.header_request = None
self.header_processed = 0
self.batch_requests = [] #batch header request
self.batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
self.start_block_number = self.chain.head.number
self.end_block_number = self.start_block_number + 1 # minimum synctask
self.max_block_revert = 3600*24 / self.chainservice.config['eth']['block']['DIFF_ADJUSTMENT_CUTOFF']
Expand Down Expand Up @@ -101,22 +102,24 @@ def fetch_hashchain(self):


if self.originating_proto.is_stopped:
if protos:
self.skeleton_peer= protos[0]
else:
log_st.warn('no protocols available')
# if protos:
# self.skeleton_peer= protos[0]
# else:
log_st.warn('originating_proto not available')
self.exit(success=False)
else:
self.skeleton_peer=self.originating_proto
self.requests[self.skeleton_peer] = deferred
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
try:
skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
# assert isinstance(skeleton,list)
# log_st.debug('skeleton received %u',len(skeleton))
# assert isinstance(skeleton,list)
log_st.debug('skeleton received %u',len(skeleton),skeleton=skeleton)
except gevent.Timeout:
log_st.warn('syncing skeleton timed out')
#todo drop originating proto

# del self.requests[self.skeleton_peer]
self.exit(success=False)
finally:
# # is also executed 'on the way out' when any other clause of the try statement
Expand All @@ -126,30 +129,35 @@ def fetch_hashchain(self):
del self.requests[self.skeleton_peer]


log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)
#log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)

if not skeleton:
# self.fetch_headers(from0)
# skeleton_fetch_done = True
log_st.warn('no more skeleton received')
# self.fetch_headers(from0)
skeleton_fetch_done = True
self.exit(success=False)
continue
else:
self.fetch_headerbatch(skeleton)
header_batch = self.fetch_headerbatch(from0,skeleton)
log_st.debug('header batch', headerbatch= header_batch)
# processed= process_headerbatch(batch_header)
# self.batch_header = filled[:processed]
# fetch_blocks(header_batch)
from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
if header_batch:
# self.fetch_blocks(header_batch)
from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
#insert batch_header to hashchain



#send requests in batches, receive one header batch at a time
def fetch_headerbatch(self, skeleton):

def fetch_headerbatch(self,origin,skeleton):
log_st.debug('origin from',origin=origin)

# while True
from0=skeleton[0]
# while True
#header_processed = 0
#from0=skeleton[0]
self.batch_requests=[]
batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
headers= []
proto = None
proto_received=None #proto which delivered the header
Expand Down Expand Up @@ -185,9 +193,9 @@ def fetch_headerbatch(self, skeleton):
log_st.debug('batch header fetching done')
return self.batch_result
try:
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)
log_st.debug('headers batch received from proto',proto=proto_received)
del self.header_request
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)['proto']
header=deferred.get(timeout=self.blockheaders_request_timeout)['headers']
log_st.debug('headers batch received from proto', header=header)
except gevent.Timeout:
log_st.warn('syncing batch hashchain timed out')
retry += 1
Expand All @@ -199,8 +207,34 @@ def fetch_headerbatch(self, skeleton):
log_st.info('headers sync failed with peers, retry', retry=retry)
gevent.sleep(self.retry_delay)
continue
finally:
del self.header_request


if header[0] not in self.batch_requests:
continue

#verified = self.verify_headers(self,proto_received, header)
batch_header= header[::-1] #in hight rising order
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin+len(batch_header)]= batch_header
# log_st.debug('batch result',batch_result= self.batch_result)
self.batch_requests.remove(header[0])
proto_received.set_idle()
del self.requests[proto_received]

header_ready = 0
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
header_ready += self.max_blockheaders_per_request

if header_ready > 0 :
# Headers are ready for delivery, gather them
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
log_st.debug('issue fetching blocks',header_processed=self.header_processed, blocks=processed, proto=proto_received,count=len(processed),start=processed[0].number)
self.fetch_blocks(processed)
self.header_processed += len(processed)

log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)




def idle_protocols(self):
Expand Down Expand Up @@ -235,7 +269,7 @@ def fetch_blocks(self, blockheaders_chain):
# fetch blocks (no parallelism here)
log_st.debug('fetching blocks', num=len(blockheaders_chain))
assert blockheaders_chain
blockheaders_chain.reverse() # height rising order
# blockheaders_chain.reverse() # height rising order
num_blocks = len(blockheaders_chain)
num_fetched = 0
retry = 0
Expand All @@ -244,7 +278,7 @@ def fetch_blocks(self, blockheaders_chain):
bodies = []

# try with protos
protocols = self.protocols
protocols = self.idle_protocols()
if not protocols:
log_st.warn('no protocols available')
return self.exit(success=False)
Expand Down Expand Up @@ -304,13 +338,13 @@ def fetch_blocks(self, blockheaders_chain):
# done
last_block = t_block
assert not len(blockheaders_chain)
assert last_block.header.hash == self.blockhash
log_st.debug('syncing finished')
# assert last_block.header.hash == self.blockhash
# log_st.debug('syncing finished')
# at this point blocks are not in the chain yet, but in the add_block queue
if self.chain_difficulty >= self.chain.head.chain_difficulty():
self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)

self.exit(success=True)
return
#self.exit(success=True)

def receive_newblockhashes(self, proto, newblockhashes):
"""
Expand Down Expand Up @@ -349,12 +383,14 @@ def receive_blockheaders(self, proto, blockheaders):
return
if self.batch_requests and blockheaders:
# check header validity

if blockheaders[0] in self.batch_requests:
self.batch_requests.remove(blockheaders[0])
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
proto.set_idle()
del self.requests[proto]
# if not valid(blockheaders):
# return self.exit(success=False)

# if blockheaders[0] in self.batch_requests:
# self.batch_requests.remove(blockheaders[0])
# log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
# proto.set_idle()
# del self.requests[proto]
#deliver to header processer

#pack batch headers
Expand All @@ -365,7 +401,7 @@ def receive_blockheaders(self, proto, blockheaders):

#evoke next header fetching task
# self.requests[proto].set(proto)
self.header_request.set(proto)
self.header_request.set({'proto':proto,'headers':blockheaders})
elif proto == self.skeleton_peer: #make sure it's from the originating proto
self.requests[proto].set(blockheaders)

Expand Down

0 comments on commit 938a52d

Please sign in to comment.