Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark partition as busy when a new batch is sent to it #281

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ python: 2.7
branches:
only:
- master
- busy-partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't needed probably

- /^\d\.\d+$/
- /^\d\.\d+\.\d+(rc\d+|dev\d+)?$/

Expand Down
3 changes: 3 additions & 0 deletions frontera/contrib/messagebus/kafkabus.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def flush(self):
def get_offset(self, partition_id):
pass

def partition(self, key):
return self._partitioner(key)


class SpiderLogStream(BaseSpiderLogStream):
def __init__(self, messagebus):
Expand Down
34 changes: 24 additions & 10 deletions frontera/contrib/messagebus/zeromq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import six

from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \
BaseSpiderFeedStream, BaseScoringLogStream
BaseSpiderFeedStream, BaseScoringLogStream, BaseStreamProducer
from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner
from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig
from six.moves import range
Expand Down Expand Up @@ -61,7 +61,7 @@ def get_offset(self, partition_id):
return self.counter


class Producer(object):
class Producer(BaseStreamProducer):
def __init__(self, context, location, identity):
self.identity = identity
self.sender = context.zeromq.socket(zmq.PUB)
Expand All @@ -80,7 +80,7 @@ def send(self, key, *messages):
# Raise TypeError if any message is not encoded as bytes
if any(not isinstance(m, six.binary_type) for m in messages):
raise TypeError("all produce message payloads must be type bytes")
partition = self.partitioner.partition(key)
partition = self.partition(key)
counter = self.counters.get(partition, 0)
for msg in messages:
self.sender.send_multipart([self.identity + pack(">B", partition), msg,
Expand All @@ -100,6 +100,9 @@ def flush(self):
def get_offset(self, partition_id):
return self.counters.get(partition_id, None)

def partition(self, key):
return self.partitioner.partition(key)


class SpiderLogProducer(Producer):
def __init__(self, context, location, partitions):
Expand Down Expand Up @@ -176,22 +179,32 @@ def __init__(self, messagebus):
self.consumer_hwm = messagebus.spider_feed_rcvhwm
self.producer_hwm = messagebus.spider_feed_sndhwm
self.hostname_partitioning = messagebus.hostname_partitioning
self.max_next_requests = messagebus.max_next_requests
self._producer = None

def consumer(self, partition_id):
return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm)

def producer(self):
return SpiderFeedProducer(self.context, self.in_location, self.partitions,
self.producer_hwm, self.hostname_partitioning)
if not self._producer:
self._producer = SpiderFeedProducer(
self.context, self.in_location, self.partitions,
self.producer_hwm, self.hostname_partitioning)
return self._producer

def available_partitions(self):
return self.ready_partitions
if not self._producer:
return []

def mark_ready(self, partition_id):
self.ready_partitions.add(partition_id)
partitions = []
for partition_id, last_offset in self.partitions_offset.items():
lag = self._producer.get_offset(partition_id) - last_offset
if lag < self.max_next_requests:
partitions.append(partition_id)
return partitions

def mark_busy(self, partition_id):
self.ready_partitions.discard(partition_id)
def set_spider_offset(self, partition_id, offset):
self.partitions_offset[partition_id] = offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable isn't defined

Copy link
Contributor Author

@isra17 isra17 Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha good catch, fixed



class Context(object):
Expand All @@ -210,6 +223,7 @@ def __init__(self, settings):
self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2)
self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0)
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.max_next_requests = settings.get('MAX_NEXT_REQUESTS')
if self.socket_config.is_ipv6:
self.context.zeromq.setsockopt(zmq.IPV6, True)

Expand Down
20 changes: 10 additions & 10 deletions frontera/core/messagebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def get_offset(self, partition_id):
"""
raise NotImplementedError

def partition(self, key):
"""
Returns partition id for key.
:param key: str key used for partitioning, None for non-keyed channels
"""
raise NotImplementedError

def close(self):
"""
Performs all necessary cleanup and closes the producer.
Expand Down Expand Up @@ -158,18 +165,11 @@ def available_partitions(self):
"""
raise NotImplementedError

def mark_ready(self, partition_id):
"""
Marks partition as ready/available for receiving new batches.
:param partition_id: int
:return: nothing
"""
pass

def mark_busy(self, partition_id):
def set_spider_offset(self, partition_id, offset):
"""
Marks partition as busy, so that spider assigned to this partition is busy processing previous batches.
Set a partition's message sent offset.
:param partition_id: int
:param offset: int
:return: nothing
"""
pass
Expand Down
13 changes: 1 addition & 12 deletions frontera/worker/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,7 @@ def consume_incoming(self, *args, **kwargs):
continue
if type == 'offset':
_, partition_id, offset = msg
producer_offset = self.spider_feed_producer.get_offset(partition_id)
if producer_offset is None:
continue
else:
lag = producer_offset - offset
if lag < 0:
# non-sense in general, happens when SW is restarted and not synced yet with Spiders.
continue
if lag < self.max_next_requests or offset == 0:
self.spider_feed.mark_ready(partition_id)
else:
self.spider_feed.mark_busy(partition_id)
self.spider_feed.set_spider_offset(partition_id, offset)
continue
logger.debug('Unknown message type %s', type)
except Exception as exc:
Expand Down
32 changes: 20 additions & 12 deletions tests/mocks/message_bus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \
BaseScoringLogStream, BaseSpiderFeedStream
BaseScoringLogStream, BaseSpiderFeedStream, BaseStreamProducer


class Consumer(BaseStreamConsumer):
Expand Down Expand Up @@ -27,7 +27,7 @@ def get_offset(self, partition_id):
return self.offset


class Producer(object):
class Producer(BaseStreamProducer):

def __init__(self):
self.messages = []
Expand All @@ -42,6 +42,9 @@ def flush(self):
def get_offset(self, partition_id):
return self.offset

def partition(self, key):
return 0


class ScoringLogStream(BaseScoringLogStream):

Expand Down Expand Up @@ -70,23 +73,28 @@ def consumer(self, partition_id, type):
class SpiderFeedStream(BaseSpiderFeedStream):

def __init__(self, messagebus):
self.ready_partitions = set(messagebus.spider_feed_partitions)
self._producer = Producer()
self.max_next_requests = messagebus.max_next_requests
self.partitions_offset = {}
for partition_id in messagebus.spider_feed_partitions:
self.partitions_offset[partition_id] = 0

def producer(self):
return Producer()
return self._producer

def consumer(self, partition_id):
return Consumer()

def available_partitions(self):
return self.ready_partitions

def mark_ready(self, partition_id):
self.ready_partitions.add(partition_id)

def mark_busy(self, partition_id):
self.ready_partitions.discard(partition_id)

partitions = []
for partition_id, last_offset in self.partitions_offset.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if partition doesn't exist (yet) in this dict - it will not be returned as available, which is wrong.
What if producer offsets will be less (because of DB worker restart) than consumer offsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 86 does create the keys for each partition. In the worst case, a new partition will first send an offset message and create a key in the partitions_offset. As for the negative offset I don't think anything will break? From my understanding, when a DBWorker restart, a spider will have a big invalid offset, but it should be marked as ready and on its next message this will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

lag = self._producer.get_offset(partition_id) - last_offset
if lag < self.max_next_requests or last_offset == 0:
partitions.append(partition_id)
return partitions

def set_spider_offset(self, partition_id, offset):
self.partitions_offset[partition_id] = offset

class FakeMessageBus(BaseMessageBus):

Expand Down
26 changes: 24 additions & 2 deletions tests/test_worker_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

class TestDBWorker(object):

def dbw_setup(self, distributed=False):
settings = Settings()
def dbw_setup(self, distributed=False, settings=None):
settings = settings or Settings()
settings.MAX_NEXT_REQUESTS = 64
settings.MESSAGE_BUS = 'tests.mocks.message_bus.FakeMessageBus'
if distributed:
Expand Down Expand Up @@ -78,12 +78,34 @@ def test_offset(self):
dbw.spider_feed_producer.offset = 100
dbw.consume_incoming()
assert 2 in dbw.spider_feed.available_partitions()

msg1 = dbw._encoder.encode_offset(2, 20)
msg2 = dbw._encoder.encode_offset(3, 0)
dbw.spider_log_consumer.put_messages([msg1, msg2])
dbw.consume_incoming()
assert 3 in dbw.spider_feed.available_partitions()
assert 2 not in dbw.spider_feed.available_partitions()

dbw._backend.queue.put_requests([r1, r2, r3])
assert dbw.new_batch() == 3
assert 3 in dbw._backend.partitions

def test_partition_available(self):
dbw = self.dbw_setup(True)
msg1 = dbw._encoder.encode_offset(0, 64)
msg2 = dbw._encoder.encode_offset(1, 0)
dbw.spider_log_consumer.put_messages([msg1, msg2])
dbw.spider_feed_producer.offset = 64
dbw.consume_incoming()

assert 0 in dbw.spider_feed.available_partitions()
assert 1 not in dbw.spider_feed.available_partitions()

msg3 = dbw._encoder.encode_offset(1, 1)
dbw.spider_log_consumer.put_messages([msg3])
dbw.consume_incoming()
assert 1 in dbw.spider_feed.available_partitions()

dbw.spider_feed_producer.offset = 128
assert 0 not in dbw.spider_feed.available_partitions()
assert 1 not in dbw.spider_feed.available_partitions()