-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mark partition as busy when a new batch is sent to it #281
base: master
Are you sure you want to change the base?
Changes from 4 commits
174b162
2f7e3cd
d4aa8db
0b646e4
dacd2b8
f7d2cd0
64b61de
58ff5f4
d2ef60e
a848c73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
import six | ||
|
||
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ | ||
BaseSpiderFeedStream, BaseScoringLogStream | ||
BaseSpiderFeedStream, BaseScoringLogStream, BaseStreamProducer | ||
from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner | ||
from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig | ||
from six.moves import range | ||
|
@@ -61,7 +61,7 @@ def get_offset(self, partition_id): | |
return self.counter | ||
|
||
|
||
class Producer(object): | ||
class Producer(BaseStreamProducer): | ||
def __init__(self, context, location, identity): | ||
self.identity = identity | ||
self.sender = context.zeromq.socket(zmq.PUB) | ||
|
@@ -80,7 +80,7 @@ def send(self, key, *messages): | |
# Raise TypeError if any message is not encoded as bytes | ||
if any(not isinstance(m, six.binary_type) for m in messages): | ||
raise TypeError("all produce message payloads must be type bytes") | ||
partition = self.partitioner.partition(key) | ||
partition = self.partition(key) | ||
counter = self.counters.get(partition, 0) | ||
for msg in messages: | ||
self.sender.send_multipart([self.identity + pack(">B", partition), msg, | ||
|
@@ -100,6 +100,9 @@ def flush(self): | |
def get_offset(self, partition_id): | ||
return self.counters.get(partition_id, None) | ||
|
||
def partition(self, key): | ||
return self.partitioner.partition(key) | ||
|
||
|
||
class SpiderLogProducer(Producer): | ||
def __init__(self, context, location, partitions): | ||
|
@@ -176,22 +179,32 @@ def __init__(self, messagebus): | |
self.consumer_hwm = messagebus.spider_feed_rcvhwm | ||
self.producer_hwm = messagebus.spider_feed_sndhwm | ||
self.hostname_partitioning = messagebus.hostname_partitioning | ||
self.max_next_requests = messagebus.max_next_requests | ||
self._producer = None | ||
|
||
def consumer(self, partition_id): | ||
return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm) | ||
|
||
def producer(self): | ||
return SpiderFeedProducer(self.context, self.in_location, self.partitions, | ||
self.producer_hwm, self.hostname_partitioning) | ||
if not self._producer: | ||
self._producer = SpiderFeedProducer( | ||
self.context, self.in_location, self.partitions, | ||
self.producer_hwm, self.hostname_partitioning) | ||
return self._producer | ||
|
||
def available_partitions(self): | ||
return self.ready_partitions | ||
if not self._producer: | ||
return [] | ||
|
||
def mark_ready(self, partition_id): | ||
self.ready_partitions.add(partition_id) | ||
partitions = [] | ||
for partition_id, last_offset in self.partitions_offset.items(): | ||
lag = self._producer.get_offset(partition_id) - last_offset | ||
if lag < self.max_next_requests: | ||
partitions.append(partition_id) | ||
return partitions | ||
|
||
def mark_busy(self, partition_id): | ||
self.ready_partitions.discard(partition_id) | ||
def set_spider_offset(self, partition_id, offset): | ||
self.partitions_offset[partition_id] = offset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this variable isn't defined There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ha good catch, fixed |
||
|
||
|
||
class Context(object): | ||
|
@@ -210,6 +223,7 @@ def __init__(self, settings): | |
self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2) | ||
self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0) | ||
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') | ||
self.max_next_requests = settings.get('MAX_NEXT_REQUESTS') | ||
if self.socket_config.is_ipv6: | ||
self.context.zeromq.setsockopt(zmq.IPV6, True) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ | ||
BaseScoringLogStream, BaseSpiderFeedStream | ||
BaseScoringLogStream, BaseSpiderFeedStream, BaseStreamProducer | ||
|
||
|
||
class Consumer(BaseStreamConsumer): | ||
|
@@ -27,7 +27,7 @@ def get_offset(self, partition_id): | |
return self.offset | ||
|
||
|
||
class Producer(object): | ||
class Producer(BaseStreamProducer): | ||
|
||
def __init__(self): | ||
self.messages = [] | ||
|
@@ -42,6 +42,9 @@ def flush(self): | |
def get_offset(self, partition_id): | ||
return self.offset | ||
|
||
def partition(self, key): | ||
return 0 | ||
|
||
|
||
class ScoringLogStream(BaseScoringLogStream): | ||
|
||
|
@@ -70,23 +73,28 @@ def consumer(self, partition_id, type): | |
class SpiderFeedStream(BaseSpiderFeedStream): | ||
|
||
def __init__(self, messagebus): | ||
self.ready_partitions = set(messagebus.spider_feed_partitions) | ||
self._producer = Producer() | ||
self.max_next_requests = messagebus.max_next_requests | ||
self.partitions_offset = {} | ||
for partition_id in messagebus.spider_feed_partitions: | ||
self.partitions_offset[partition_id] = 0 | ||
|
||
def producer(self): | ||
return Producer() | ||
return self._producer | ||
|
||
def consumer(self, partition_id): | ||
return Consumer() | ||
|
||
def available_partitions(self): | ||
return self.ready_partitions | ||
|
||
def mark_ready(self, partition_id): | ||
self.ready_partitions.add(partition_id) | ||
|
||
def mark_busy(self, partition_id): | ||
self.ready_partitions.discard(partition_id) | ||
|
||
partitions = [] | ||
for partition_id, last_offset in self.partitions_offset.items(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, if partition doesn't exist (yet) in this dict - it will not be returned as available, which is wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 86 does create the keys for each partition. In the worst case, a new partition will first send an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok! |
||
lag = self._producer.get_offset(partition_id) - last_offset | ||
if lag < self.max_next_requests or last_offset == 0: | ||
partitions.append(partition_id) | ||
return partitions | ||
|
||
def set_spider_offset(self, partition_id, offset): | ||
self.partitions_offset[partition_id] = offset | ||
|
||
class FakeMessageBus(BaseMessageBus): | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't needed probably