diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 3f99fc973..6e28f4c5e 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -8,7 +8,7 @@ import six from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ - BaseSpiderFeedStream, BaseScoringLogStream + BaseSpiderFeedStream, BaseScoringLogStream, BaseStreamProducer from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig from six.moves import range @@ -61,7 +61,7 @@ def get_offset(self, partition_id): return self.counter -class Producer(object): +class Producer(BaseStreamProducer): def __init__(self, context, location, identity): self.identity = identity self.sender = context.zeromq.socket(zmq.PUB) @@ -172,26 +172,41 @@ def __init__(self, messagebus): self.in_location = messagebus.socket_config.db_out() self.out_location = messagebus.socket_config.spiders_in() self.partitions = messagebus.spider_feed_partitions - self.ready_partitions = set(self.partitions) + self.partitions_offset = {} + for partition_id in self.partitions: + self.partitions_offset[partition_id] = 0 self.consumer_hwm = messagebus.spider_feed_rcvhwm self.producer_hwm = messagebus.spider_feed_sndhwm self.hostname_partitioning = messagebus.hostname_partitioning + self.max_next_requests = messagebus.max_next_requests + self._producer = None def consumer(self, partition_id): return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm) def producer(self): - return SpiderFeedProducer(self.context, self.in_location, self.partitions, - self.producer_hwm, self.hostname_partitioning) + if not self._producer: + self._producer = SpiderFeedProducer( + self.context, self.in_location, self.partitions, + self.producer_hwm, self.hostname_partitioning) + return self._producer def available_partitions(self): - return self.ready_partitions + if not self._producer: + return [] - def mark_ready(self, partition_id): - self.ready_partitions.add(partition_id) + partitions = [] + for partition_id, last_offset in self.partitions_offset.items(): + producer_offset = self._producer.get_offset(partition_id) + if producer_offset is None: + producer_offset = 0 + lag = producer_offset - last_offset + if lag < self.max_next_requests or not producer_offset: + partitions.append(partition_id) + return partitions - def mark_busy(self, partition_id): - self.ready_partitions.discard(partition_id) + def set_spider_offset(self, partition_id, offset): + self.partitions_offset[partition_id] = offset class Context(object): @@ -210,6 +225,7 @@ def __init__(self, settings): self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2) self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0) self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') + self.max_next_requests = int(settings.get('MAX_NEXT_REQUESTS')) if self.socket_config.is_ipv6: self.context.zeromq.setsockopt(zmq.IPV6, True) diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index 3782f6c00..53d98c2a5 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -158,18 +158,13 @@ def available_partitions(self): """ raise NotImplementedError - def mark_ready(self, partition_id): + def set_spider_offset(self, partition_id, offset): """ - Marks partition as ready/available for receiving new batches. - :param partition_id: int - :return: nothing - """ - pass - - def mark_busy(self, partition_id): - """ - Marks partition as busy, so that spider assigned to this partition is busy processing previous batches. + Set the message processed offset for a given partition. Used to + calculate the lag between the message sent and message processed + to prevent overflowing the queue of an unresponsive partition. :param partition_id: int + :param offset: int :return: nothing """ pass diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 6f9abad85..960d35040 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -169,18 +169,7 @@ def consume_incoming(self, *args, **kwargs): continue if type == 'offset': _, partition_id, offset = msg - producer_offset = self.spider_feed_producer.get_offset(partition_id) - if producer_offset is None: - continue - else: - lag = producer_offset - offset - if lag < 0: - # non-sense in general, happens when SW is restarted and not synced yet with Spiders. - continue - if lag < self.max_next_requests or offset == 0: - self.spider_feed.mark_ready(partition_id) - else: - self.spider_feed.mark_busy(partition_id) + self.spider_feed.set_spider_offset(partition_id, offset) continue logger.debug('Unknown message type %s', type) except Exception as exc: diff --git a/tests/mocks/message_bus.py b/tests/mocks/message_bus.py index f8b6f582b..528c7ca72 100644 --- a/tests/mocks/message_bus.py +++ b/tests/mocks/message_bus.py @@ -1,5 +1,5 @@ from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ - BaseScoringLogStream, BaseSpiderFeedStream + BaseScoringLogStream, BaseSpiderFeedStream, BaseStreamProducer class Consumer(BaseStreamConsumer): @@ -27,7 +27,7 @@ def get_offset(self, partition_id): return self.offset -class Producer(object): +class Producer(BaseStreamProducer): def __init__(self): self.messages = [] @@ -70,23 +70,28 @@ def consumer(self, partition_id, type): class SpiderFeedStream(BaseSpiderFeedStream): def __init__(self, messagebus): - self.ready_partitions = set(messagebus.spider_feed_partitions) + self._producer = Producer() + self.max_next_requests = messagebus.max_next_requests + self.partitions_offset = {} + for partition_id in messagebus.spider_feed_partitions: + self.partitions_offset[partition_id] = 0 def producer(self): - return Producer() + return self._producer def consumer(self, partition_id): return Consumer() def available_partitions(self): - return self.ready_partitions - - def mark_ready(self, partition_id): - self.ready_partitions.add(partition_id) - - def mark_busy(self, partition_id): - self.ready_partitions.discard(partition_id) - + partitions = [] + for partition_id, last_offset in self.partitions_offset.items(): + lag = self._producer.get_offset(partition_id) - last_offset + if lag < self.max_next_requests or last_offset == 0: + partitions.append(partition_id) + return partitions + + def set_spider_offset(self, partition_id, offset): + self.partitions_offset[partition_id] = offset class FakeMessageBus(BaseMessageBus): diff --git a/tests/test_worker_db.py b/tests/test_worker_db.py index 05b91d0c2..a4f93d1b8 100644 --- a/tests/test_worker_db.py +++ b/tests/test_worker_db.py @@ -87,3 +87,28 @@ def test_offset(self): dbw._backend.queue.put_requests([r1, r2, r3]) assert dbw.new_batch() == 3 assert 3 in dbw._backend.partitions + + def test_partition_available(self): + dbw = self.dbw_setup(True) + msg1 = dbw._encoder.encode_offset(0, 128) + msg2 = dbw._encoder.encode_offset(1, 0) + dbw.spider_log_consumer.put_messages([msg1, msg2]) + dbw.spider_feed_producer.offset = 128 + dbw.consume_incoming() + + assert 0 in dbw.spider_feed.available_partitions() + assert 1 in dbw.spider_feed.available_partitions() + + msg3 = dbw._encoder.encode_offset(1, 1) + dbw.spider_log_consumer.put_messages([msg3]) + dbw.consume_incoming() + assert 1 not in dbw.spider_feed.available_partitions() + + msg3 = dbw._encoder.encode_offset(1, 1) + dbw.spider_log_consumer.put_messages([msg3]) + dbw.consume_incoming() + assert 1 not in dbw.spider_feed.available_partitions() + + dbw.spider_feed_producer.offset = 256 + assert 0 not in dbw.spider_feed.available_partitions() + assert 1 not in dbw.spider_feed.available_partitions()