Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark partition as busy when a new batch is sent to it #281

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

isra17
Copy link
Contributor

@isra17 isra17 commented May 29, 2017

I happened to stumble onto this bug where the DBWorker keep sending request to the spider until the queue is empty. Seems to go as follow:

  1. DBWorker set Spider's partition as ready
  2. DBWorker send a new batch, feed.counter = 256
  3. Spider receive new batch, send new offset spider.offset = 256
  4. DBWorker receive offset, since spider.offset <= feed.counter, keep the partition as ready
  5. Spider is busy scraping.
  6. DBWorker send a new batch to the spider's partition, feed.counter = 512
  7. DBWorker still sending new batches, feed.counter = 1024
  8. Finally Spider has some space for new request, download next requests and then send its new offset, spider.offset = 512
  9. DBWorker now set the partition as busy, however the lag between the spider offset and the feed counter can be quite huge by that time.

I guess crawling slowly make this worse since a single batch can take a few minutes to process, leaving the DBWorker some time to overload the feed.

My fix here is to mark a partition that received messages as busy. This way, the worker will wait for an update on the spider offset update to mark the partition as ready if needed. This should work well with a bigger MAX_NEXT_REQUESTS value on the worker to ensure the queue is never empty.

PS: Is there any IRC channel where the maintainers and other Frontera users are hanging out? I tried #scrapy but this didn't seem the right place to have Frontera discussion.

This prevent a spider from being overwhelmed by requests because it
takes too long to process a batch. The DB Worker will be able to send
a new batch once a crawler requests more requests and send the offset
message to update its state to the DB Worker.
@codecov-io
Copy link

codecov-io commented May 29, 2017

Codecov Report

Merging #281 into master will decrease coverage by 0.12%.
The diff coverage is 48%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #281      +/-   ##
==========================================
- Coverage   70.16%   70.04%   -0.13%     
==========================================
  Files          68       68              
  Lines        4720     4723       +3     
  Branches      632      635       +3     
==========================================
- Hits         3312     3308       -4     
- Misses       1272     1279       +7     
  Partials      136      136
Impacted Files Coverage Δ
frontera/worker/db.py 63.63% <100%> (+0.45%) ⬆️
frontera/core/messagebus.py 67.3% <100%> (+0.64%) ⬆️
frontera/contrib/messagebus/zeromq/__init__.py 80.11% <43.47%> (-4.23%) ⬇️
frontera/contrib/backends/hbase.py 70.55% <0%> (-0.76%) ⬇️
frontera/__init__.py
...apy_recording/scrapy_recording/spiders/__init__.py 100% <0%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1dec22f...a848c73. Read the comment docs.

@sibiryakov
Copy link
Member

@isra17 thanks for the contribution!
we don't have irc or other chat channel, because there is not that big demand.

I'm not sure I understand what the problem is:

  • loose of some requests generated by DBW between 7 and 8 steps?
    or
  • incorrect partition status set because of wrong sequence/timing of offset exchange?

A.

@isra17
Copy link
Contributor Author

isra17 commented May 30, 2017

The issue is that until the Spider finishes its current batch, the DBWorker will just keep sending new ones. In my case, the DBWorker have time to flush the entire backend queue into the message bus before the spider has the opportunity to mark itself as busy. This get annoying when the spider ends up with a few hours worth of work waiting in the message bus.

@sibiryakov
Copy link
Member

sibiryakov commented Jun 7, 2017

The issue is that until the Spider finishes its current batch, the DBWorker will just keep sending new ones. In my case, the DBWorker have time to flush the entire backend queue into the message bus before the spider has the opportunity to mark itself as busy.

The idea behind the code you're trying to modify is that DBW sends always some amount of request in advance. When there's a pauses between batches required for passing the states (ready->busy->ready) and waiting for a batch to finish (when batch is 95% finishing the spider is mostly idle waiting for a longest request) the crawling speed decreases. With some amount of requests always available in the queue spider has a chance to get requests always when there's a space in it's internal queue.

This get annoying when the spider ends up with a few hours worth of work waiting in the message bus.

I don't understand this. Spider is waiting because a) messages with batches were lost in ZMQ or b) busy status was incorrectly set and wasn't changing for long time, even when spider was already ready?

This is pretty tough topic to discuss async/remotely, so please contact me by Skype, alexander.sibiryakov so we could save some time.

A.

@isra17
Copy link
Contributor Author

isra17 commented Jul 7, 2017

@sibiryakov I did refactor the PR to keep track of the offset as discussed on Skype. Let me know if anything is missing.

Copy link
Member

@sibiryakov sibiryakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, but needs some work

.travis.yml Outdated
@@ -3,6 +3,7 @@ python: 2.7
branches:
only:
- master
- busy-partitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't needed probably

def mark_busy(self, partition_id):
self.ready_partitions.discard(partition_id)
def set_spider_offset(self, partition_id, offset):
self.partitions_offset[partition_id] = offset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable isn't defined

Copy link
Contributor Author

@isra17 isra17 Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha good catch, fixed

self.ready_partitions.discard(partition_id)

partitions = []
for partition_id, last_offset in self.partitions_offset.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, if partition doesn't exist (yet) in this dict - it will not be returned as available, which is wrong.
What if producer offsets will be less (because of DB worker restart) than consumer offsets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 86 does create the keys for each partition. In the worst case, a new partition will first send an offset message and create a key in the partitions_offset. As for the negative offset I don't think anything will break? From my understanding, when a DBWorker restart, a spider will have a big invalid offset, but it should be marked as ready and on its next message this will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants