From 065f94d9883af31d788797ed6ad68a46b2073465 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 27 Jan 2022 17:35:22 +0800 Subject: [PATCH 01/43] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9=E5=93=A8?= =?UTF-8?q?=E5=85=B5=E5=AF=86=E7=A0=81=E7=9A=84=E6=94=AF=E6=8C=81=20Suppor?= =?UTF-8?q?t=20connecting=20to=20sentinel=20with=20password?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 ++ scrapy_redis_sentinel/connection.py | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d365482..7dd5d8c 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,8 @@ REDIS_SENTINELS = [ ('172.25.2.26', 26379), ('172.25.2.27', 26379) ] +# SENTINEL_KWARGS 非必须参数,可以设置sentinel密码,参考 https://github.com/redis/redis-py/issues/1219 +SENTINEL_KWARGS = {'password': 'sentinel_password'} # REDIS_SENTINEL_PARAMS 哨兵模式配置参数。 REDIS_SENTINEL_PARAMS = { diff --git a/scrapy_redis_sentinel/connection.py b/scrapy_redis_sentinel/connection.py index c25f008..78235b5 100644 --- a/scrapy_redis_sentinel/connection.py +++ b/scrapy_redis_sentinel/connection.py @@ -152,6 +152,7 @@ def get_redis_sentinel_from_settings(settings): params.update(settings.getdict("REDIS_SENTINEL_PARAMS")) params.setdefault("sentinels", settings.get("REDIS_SENTINELS")) params.setdefault("socket_timeout", settings.get("REDIS_SENTINELS_SOCKET_TIMEOUT")) + params.setdefault("sentinel_kwargs", settings.get("SENTINEL_KWARGS")) return get_redis_sentinel(**params) @@ -165,7 +166,10 @@ def get_redis_sentinel(**kwargs): redis_sentinel_cls = kwargs.get("redis_cluster_cls", defaults.REDIS_SENTINEL_CLS) sentinels = kwargs.pop("sentinels", None) socket_timeout = kwargs.pop("socket_timeout", 0.5) - redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels, socket_timeout=socket_timeout) + sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) + redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels, + socket_timeout=socket_timeout, + sentinel_kwargs=sentinel_kwargs) redis_cls = redis_sentinel_cls.master_for(**kwargs) return redis_cls From 777900a2db671130b0123ae38a62aaf48711a8c9 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 27 Jan 2022 17:50:54 +0800 Subject: [PATCH 02/43] =?UTF-8?q?=E5=AF=B9collections=E4=B8=ADIterable?= =?UTF-8?q?=E7=9A=84=E5=BC=95=E5=85=A5=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=AF=B9?= =?UTF-8?q?=E9=AB=98=E7=89=88=E6=9C=ACPython=E7=9A=84=E5=85=BC=E5=AE=B9?= =?UTF-8?q?=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scrapy_redis_sentinel/spiders.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scrapy_redis_sentinel/spiders.py b/scrapy_redis_sentinel/spiders.py index 9d3ee63..02a4368 100644 --- a/scrapy_redis_sentinel/spiders.py +++ b/scrapy_redis_sentinel/spiders.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- import time -from collections import Iterable + +try: + from collections import Iterable +except: + from collections.abc import Iterable from scrapy import signals from scrapy.exceptions import DontCloseSpider From 4e1c5c3fe9c64eddf3705eafdee5c8a60983e751 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 11 Apr 2022 15:09:56 +0800 Subject: [PATCH 03/43] =?UTF-8?q?add=EF=BC=9Adupefilter/filtered?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scrapy_redis_sentinel/dupefilter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scrapy_redis_sentinel/dupefilter.py b/scrapy_redis_sentinel/dupefilter.py index 4ff51e5..e583a46 100644 --- a/scrapy_redis_sentinel/dupefilter.py +++ b/scrapy_redis_sentinel/dupefilter.py @@ -159,6 +159,8 @@ def log(self, request, spider): self.logger.debug(msg, {"request": request}, extra={"spider": spider}) self.logdupes = False + spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider) + class RedisBloomFilter(BaseDupeFilter): """Redis-based request duplicates filter. From b60c1e0431c997858ebbab887ad42aad3184211b Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 11 Apr 2022 15:16:38 +0800 Subject: [PATCH 04/43] update README.md --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 7dd5d8c..3d6fc39 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,16 @@ ![GitHub last commit](https://img.shields.io/github/last-commit/crawlmap/scrapy-redis-sentinel) ![PyPI - Downloads](https://img.shields.io/pypi/dw/scrapy-redis-sentinel) +本项目基于原项目 [scrapy-redis-sentinel](https://github.com/crawlaio/scrapy-redis-sentinel) + +进行修改,修改内容如下: + +1. 添加了 Redis 哨兵,存在2个密码连接的支持 +2. 支持Python3.8+(collection.abc的引入方式) +3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析 + +----- + 本项目基于原项目 [scrapy-redis](https://github.com/rmax/scrapy-redis) 进行修改,修改内容如下: From ffda2f77a63ec44588fa5b4cbd1d43d7fe196a29 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 11 Apr 2022 16:53:14 +0800 Subject: [PATCH 05/43] update: scrapy_redis_sentinel --> mob_scrapy_redis_sentinel --- README.md | 14 +++++++------- .../__init__.py | 0 .../bloomfilter.py | 0 .../connection.py | 0 .../defaults.py | 4 ++-- .../dupefilter.py | 2 +- .../picklecompat.py | 0 .../pipelines.py | 0 .../queue.py | 0 .../scheduler.py | 0 .../spiders.py | 0 .../stats.py | 0 .../utils.py | 0 setup.py | 2 +- 14 files changed, 11 insertions(+), 11 deletions(-) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/__init__.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/bloomfilter.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/connection.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/defaults.py (82%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/dupefilter.py (98%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/picklecompat.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/pipelines.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/queue.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/scheduler.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/spiders.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/stats.py (100%) rename {scrapy_redis_sentinel => mob_scrapy_redis_sentinel}/utils.py (100%) diff --git a/README.md b/README.md index 3d6fc39..663552f 100644 --- a/README.md +++ b/README.md @@ -92,22 +92,22 @@ REDIS_CLUSTER_PARAMS = { # 在 redis 中保持 scrapy-redis 用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理 redis queues SCHEDULER_PERSIST = True # 调度队列 -SCHEDULER = "scrapy_redis_sentinel.scheduler.Scheduler" +SCHEDULER = "mob_scrapy_redis_sentinel.scheduler.Scheduler" # 基础去重 -DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisDupeFilter" +DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter" # BloomFilter -# DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisBloomFilter" +# DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisBloomFilter" # 启用基于 Redis 统计信息 -STATS_CLASS = "scrapy_redis_sentinel.stats.RedisStatsCollector" +STATS_CLASS = "mob_scrapy_redis_sentinel.stats.RedisStatsCollector" # 指定排序爬取地址时使用的队列 # 默认的 按优先级排序( Scrapy 默认),由 sorted set 实现的一种非 FIFO、LIFO 方式。 -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderPriorityQueue' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderPriorityQueue' # 可选的 按先进先出排序(FIFO) -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack' # 可选的 按后进先出排序(LIFO) -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack' ``` > 注:当使用集群时单机不生效 diff --git a/scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py similarity index 100% rename from scrapy_redis_sentinel/__init__.py rename to mob_scrapy_redis_sentinel/__init__.py diff --git a/scrapy_redis_sentinel/bloomfilter.py b/mob_scrapy_redis_sentinel/bloomfilter.py similarity index 100% rename from scrapy_redis_sentinel/bloomfilter.py rename to mob_scrapy_redis_sentinel/bloomfilter.py diff --git a/scrapy_redis_sentinel/connection.py b/mob_scrapy_redis_sentinel/connection.py similarity index 100% rename from scrapy_redis_sentinel/connection.py rename to mob_scrapy_redis_sentinel/connection.py diff --git a/scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py similarity index 82% rename from scrapy_redis_sentinel/defaults.py rename to mob_scrapy_redis_sentinel/defaults.py index ea7863e..59ae7c9 100644 --- a/scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -24,9 +24,9 @@ } SCHEDULER_QUEUE_KEY = "%(spider)s:requests" -SCHEDULER_QUEUE_CLASS = "scrapy_redis_sentinel.queue.PriorityQueue" +SCHEDULER_QUEUE_CLASS = "mob_scrapy_redis_sentinel.queue.PriorityQueue" SCHEDULER_DUPEFILTER_KEY = "%(spider)s:dupefilter" -SCHEDULER_DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisDupeFilter" +SCHEDULER_DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter" SCHEDULER_PERSIST = False diff --git a/scrapy_redis_sentinel/dupefilter.py b/mob_scrapy_redis_sentinel/dupefilter.py similarity index 98% rename from scrapy_redis_sentinel/dupefilter.py rename to mob_scrapy_redis_sentinel/dupefilter.py index e583a46..4c869b9 100644 --- a/scrapy_redis_sentinel/dupefilter.py +++ b/mob_scrapy_redis_sentinel/dupefilter.py @@ -197,7 +197,7 @@ def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:``. When using the - ``scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as + ``mob_scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters diff --git a/scrapy_redis_sentinel/picklecompat.py b/mob_scrapy_redis_sentinel/picklecompat.py similarity index 100% rename from scrapy_redis_sentinel/picklecompat.py rename to mob_scrapy_redis_sentinel/picklecompat.py diff --git a/scrapy_redis_sentinel/pipelines.py b/mob_scrapy_redis_sentinel/pipelines.py similarity index 100% rename from scrapy_redis_sentinel/pipelines.py rename to mob_scrapy_redis_sentinel/pipelines.py diff --git a/scrapy_redis_sentinel/queue.py b/mob_scrapy_redis_sentinel/queue.py similarity index 100% rename from scrapy_redis_sentinel/queue.py rename to mob_scrapy_redis_sentinel/queue.py diff --git a/scrapy_redis_sentinel/scheduler.py b/mob_scrapy_redis_sentinel/scheduler.py similarity index 100% rename from scrapy_redis_sentinel/scheduler.py rename to mob_scrapy_redis_sentinel/scheduler.py diff --git a/scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py similarity index 100% rename from scrapy_redis_sentinel/spiders.py rename to mob_scrapy_redis_sentinel/spiders.py diff --git a/scrapy_redis_sentinel/stats.py b/mob_scrapy_redis_sentinel/stats.py similarity index 100% rename from scrapy_redis_sentinel/stats.py rename to mob_scrapy_redis_sentinel/stats.py diff --git a/scrapy_redis_sentinel/utils.py b/mob_scrapy_redis_sentinel/utils.py similarity index 100% rename from scrapy_redis_sentinel/utils.py rename to mob_scrapy_redis_sentinel/utils.py diff --git a/setup.py b/setup.py index 4689138..78e876a 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ from setuptools import Command, find_packages, setup NAME = "scrapy-redis-sentinel" -FOLDER = "scrapy_redis_sentinel" +FOLDER = "mob_scrapy_redis_sentinel" DESCRIPTION = "Redis Cluster for Scrapy." EMAIL = "shitao0418@gmail.com" AUTHOR = "Shi Tao" From 024a8ddb5eaafbe331e4394151c0d8a9248a2321 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 13 Apr 2022 22:42:11 +0800 Subject: [PATCH 06/43] =?UTF-8?q?add:=20=E6=B7=BB=E5=8A=A0=20make=20reques?= =?UTF-8?q?t=20=E5=92=8C=20get=20request=20from=20next=5Frequest=EF=BC=8C2?= =?UTF-8?q?=E4=B8=AA=E8=87=AA=E5=8A=A8=E7=9A=84=EF=BC=8C=E5=8C=85=E5=90=AB?= =?UTF-8?q?trace=5Fid=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/__init__.py | 14 ++++++++++++++ mob_scrapy_redis_sentinel/scheduler.py | 25 +++++++++++++++---------- mob_scrapy_redis_sentinel/spiders.py | 14 ++++++++++++++ mob_scrapy_redis_sentinel/utils.py | 17 +++++++++++++++++ 4 files changed, 60 insertions(+), 10 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index cf1d558..5d88f3c 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -4,3 +4,17 @@ __author__ = "Shi Tao" __email__ = "shitao0418@gmail.com" __version__ = "0.7.2" + +from mob_tools.mobLog import MobLoguru +import os + +# 环境常量 +DEV = "dev" +PROD = "prod" + +ENV = os.getenv("ENV", DEV) + +if ENV == "prod": + mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es') +else: + mob_log = MobLoguru() diff --git a/mob_scrapy_redis_sentinel/scheduler.py b/mob_scrapy_redis_sentinel/scheduler.py index c351306..587fb53 100644 --- a/mob_scrapy_redis_sentinel/scheduler.py +++ b/mob_scrapy_redis_sentinel/scheduler.py @@ -6,6 +6,9 @@ from . import connection, defaults +from mob_scrapy_redis_sentinel import mob_log +from mob_scrapy_redis_sentinel.utils import get_track_id + # TODO: add SCRAPY_JOB support. class Scheduler(object): @@ -33,16 +36,16 @@ class Scheduler(object): """ def __init__( - self, - server, - persist=False, - flush_on_start=False, - queue_key=defaults.SCHEDULER_QUEUE_KEY, - queue_cls=defaults.SCHEDULER_QUEUE_CLASS, - dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, - dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, - idle_before_close=0, - serializer=None + self, + server, + persist=False, + flush_on_start=False, + queue_key=defaults.SCHEDULER_QUEUE_KEY, + queue_cls=defaults.SCHEDULER_QUEUE_CLASS, + dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, + dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, + idle_before_close=0, + serializer=None ): """Initialize scheduler. @@ -174,7 +177,9 @@ def enqueue_request(self, request): def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) + if request and self.stats: + mob_log.info(f"get request from next_request: spider name: {self.spider.name}, {request.__dict__}").track_id(get_track_id(request)).commit() self.stats.inc_value("scheduler/dequeued/redis", spider=self.spider) return request diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 02a4368..5e6864f 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -13,6 +13,10 @@ from . import connection, defaults from .utils import bytes_to_str +from mob_scrapy_redis_sentinel import mob_log +import json +from mob_scrapy_redis_sentinel.utils import make_md5 + class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" @@ -114,6 +118,16 @@ def next_requests(self): found = 0 datas = self.fetch_data(self.redis_key, self.redis_batch_size) for data in datas: + + # 日志加入track_id + try: + queue_data = json.loads(data) + except: + queue_data = {} + keyword = queue_data.get("keyword") + track_id = make_md5(keyword) + mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() + reqs = self.make_request_from_data(data) if isinstance(reqs, Iterable): for req in reqs: diff --git a/mob_scrapy_redis_sentinel/utils.py b/mob_scrapy_redis_sentinel/utils.py index e3b830f..1b1de3a 100644 --- a/mob_scrapy_redis_sentinel/utils.py +++ b/mob_scrapy_redis_sentinel/utils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import six +from hashlib import md5 def bytes_to_str(s, encoding="utf-8"): @@ -7,3 +8,19 @@ def bytes_to_str(s, encoding="utf-8"): if six.PY3 and isinstance(s, bytes): return s.decode(encoding) return s + + +def make_md5(text): + """ + make text to md5 + """ + return md5(str(text).encode('utf-8')).hexdigest() + + +def get_track_id(request): + track_id = '' + try: + track_id = request.meta.get("track_id") + except Exception: + pass + return track_id From a76d01efd7d9d87ae90c7ee71325619b812abba9 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 13 Apr 2022 22:45:15 +0800 Subject: [PATCH 07/43] fix --- mob_scrapy_redis_sentinel/spiders.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 5e6864f..1efe536 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -124,8 +124,7 @@ def next_requests(self): queue_data = json.loads(data) except: queue_data = {} - keyword = queue_data.get("keyword") - track_id = make_md5(keyword) + track_id = make_md5(queue_data) mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() reqs = self.make_request_from_data(data) From 6f784a4f52c6085baf2907ca78cecaf47039059d Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 14 Apr 2022 17:38:06 +0800 Subject: [PATCH 08/43] update README.md --- .gitignore | 2 ++ README.md | 1 + 2 files changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 2e33e3c..e41461d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class +.idea + # C extensions *.so diff --git a/README.md b/README.md index 663552f..560404f 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ 1. 添加了 Redis 哨兵,存在2个密码连接的支持 2. 支持Python3.8+(collection.abc的引入方式) 3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析 +4. 自动添加 track_id: "make request from data" 和 "get request from next_request " ----- From 76263da2bf8ca5d6fe70cd96d8dd6f0a449bd3bf Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 16:29:12 +0800 Subject: [PATCH 09/43] =?UTF-8?q?feat:=20add=20LATEST=5FQUEUE=5FKEY,=20?= =?UTF-8?q?=E6=9C=80=E8=BF=91=E4=B8=80=E6=AC=A1=E9=98=9F=E5=88=97=E5=A4=87?= =?UTF-8?q?=E4=BB=BD=EF=BC=88=E4=BB=BB=E5=8A=A1=E9=98=B2=E4=B8=A2=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/__init__.py | 3 ++ mob_scrapy_redis_sentinel/defaults.py | 7 ++++ mob_scrapy_redis_sentinel/spiders.py | 46 ++++++++++++++++++++++++--- 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index 5d88f3c..b9faafe 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -6,6 +6,7 @@ __version__ = "0.7.2" from mob_tools.mobLog import MobLoguru +from mob_tools.inner_ip import get_inner_ip import os # 环境常量 @@ -16,5 +17,7 @@ if ENV == "prod": mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es') + inner_ip = os.getenv("LOCAL_IP", get_inner_ip()) else: mob_log = MobLoguru() + inner_ip = "127.0.0.1" diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index 59ae7c9..ff8486e 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -33,3 +33,10 @@ START_URLS_KEY = "%(name)s:start_urls" START_URLS_AS_SET = False START_URLS_AS_ZSET = False + +# 最近一次队列备份(任务防丢) +""" +spider opened,读取 LATEST_QUEUE_KEY。获取上一次,stop 之前,最后一次的queue data; +每次make request from data,备份一份数据,到 LATEST_QUEUE_KEY。同时删除上一批的备份。(多个worker,删除同一个 LATEST_QUEUE_KEY,如何做到不互相干扰?) +""" +LATEST_QUEUE_KEY = "%(name)s:latest_queue" diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 1efe536..bc75c4c 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -16,12 +16,14 @@ from mob_scrapy_redis_sentinel import mob_log import json from mob_scrapy_redis_sentinel.utils import make_md5 +from mob_scrapy_redis_sentinel import inner_ip class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None + latest_queue = None redis_batch_size = None redis_encoding = None @@ -62,6 +64,10 @@ def setup_redis(self, crawler=None): if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") + if self.latest_queue is None: + self.latest_queue = settings.get("LATEST_QUEUE_KEY", defaults.LATEST_QUEUE_KEY) + self.latest_queue = self.latest_queue % {"name": self.name} + if self.redis_batch_size is None: # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE). self.redis_batch_size = settings.getint( @@ -98,6 +104,9 @@ def setup_redis(self, crawler=None): # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) + # 爬虫启动时,会先从备份队列,取出任务 + crawler.signals.connect(self.spider_opened_latest_pop, signal=signals.spider_opened) + def pop_list_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.lrange(redis_key, 0, batch_size - 1) @@ -112,13 +121,34 @@ def pop_priority_queue(self, redis_key, batch_size): datas, _ = pipe.execute() return datas - def next_requests(self): - """Returns a request to be scheduled or none.""" - # XXX: Do we need to use a timeout here? + def latest_queue_mark(self, datas): + """备份队列 list or hash""" + # 1、删除上一次(多个worker,如何保证删除的一致性) + # self.server.delete(self.latest_queue) + mob_log.info(f"spider name: {self.name}, latest_queue_mark, inner_ip: {inner_ip}").track_id("").commit() + self.server.hdel(self.latest_queue, inner_ip) + # 2、 存入 + # with self.server.pipeline() as pipe: + # for data in datas: + # pipe.rpush(self.latest_queue, data) + # pipe.execute() + self.server.hset(self.latest_queue, inner_ip, datas) + + def spider_opened_latest_pop(self): + """绑定spider open信号; 取出 stop spider前,最后1次datas""" + # hash + mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() + if self.server.hexists(self.latest_queue, inner_ip): + datas = self.server.hget(self.latest_queue, inner_ip) + self.server.hdel(self.latest_queue, inner_ip) + self.req_yield(datas) + # if self.count_size(self.latest_queue) == 0: + # return + # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) + + def req_yield(self, datas): found = 0 - datas = self.fetch_data(self.redis_key, self.redis_batch_size) for data in datas: - # 日志加入track_id try: queue_data = json.loads(data) @@ -143,6 +173,12 @@ def next_requests(self): if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) + def next_requests(self): + """Returns a request to be scheduled or none.""" + # XXX: Do we need to use a timeout here? + datas = self.fetch_data(self.redis_key, self.redis_batch_size) + self.req_yield(datas) + def make_request_from_data(self, data): """Returns a Request instance from data coming from Redis. From 7d6d3bbd0480b7028e947873396c42ee76821897 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 16:41:04 +0800 Subject: [PATCH 10/43] update: setup.py --- mob_scrapy_redis_sentinel/__init__.py | 6 +++--- requirements.txt | 3 ++- setup.py | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index b9faafe..843d740 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- __original_author__ = "Rolando Espinoza" -__author__ = "Shi Tao" -__email__ = "shitao0418@gmail.com" -__version__ = "0.7.2" +__author__ = "luzihang" +__email__ = "luzihang@mob.com" +__version__ = "0.5" from mob_tools.mobLog import MobLoguru from mob_tools.inner_ip import get_inner_ip diff --git a/requirements.txt b/requirements.txt index ab6874f..dec3fd2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ redis==3.5.3 redis-py-cluster==2.1.3 -Scrapy \ No newline at end of file +Scrapy +mob-tools==0.0.16 \ No newline at end of file diff --git a/setup.py b/setup.py index 78e876a..3e3797e 100644 --- a/setup.py +++ b/setup.py @@ -7,11 +7,11 @@ from setuptools import Command, find_packages, setup -NAME = "scrapy-redis-sentinel" +NAME = "mob-scrapy-redis-sentinel" FOLDER = "mob_scrapy_redis_sentinel" DESCRIPTION = "Redis Cluster for Scrapy." -EMAIL = "shitao0418@gmail.com" -AUTHOR = "Shi Tao" +EMAIL = "luzhang@mob.com" +AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" VERSION = None From 42dd20ec0e318a9d58543590cd613875e73cfe05 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 16:54:32 +0800 Subject: [PATCH 11/43] fix: generator func --- mob_scrapy_redis_sentinel/spiders.py | 43 +++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index bc75c4c..ba02c5d 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -100,13 +100,13 @@ def setup_redis(self, crawler=None): self.fetch_data = self.pop_list_queue self.count_size = self.server.llen + # 爬虫启动时,会先从备份队列,取出任务 + crawler.signals.connect(self.spider_opened_latest_pop, signal=signals.spider_opened) + # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) - # 爬虫启动时,会先从备份队列,取出任务 - crawler.signals.connect(self.spider_opened_latest_pop, signal=signals.spider_opened) - def pop_list_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.lrange(redis_key, 0, batch_size - 1) @@ -141,13 +141,40 @@ def spider_opened_latest_pop(self): if self.server.hexists(self.latest_queue, inner_ip): datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) - self.req_yield(datas) + found = 0 + for data in datas: + # 日志加入track_id + try: + queue_data = json.loads(data) + except: + queue_data = {} + track_id = make_md5(queue_data) + mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() + + reqs = self.make_request_from_data(data) + if isinstance(reqs, Iterable): + for req in reqs: + yield req + # XXX: should be here? + found += 1 + self.logger.info(f"start req url:{req.url}") + elif reqs: + yield reqs + found += 1 + else: + self.logger.debug("Request not made from data: %r", data) + + if found: + self.logger.debug("Read %s requests from '%s'", found, self.redis_key) # if self.count_size(self.latest_queue) == 0: # return # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) - def req_yield(self, datas): + def next_requests(self): + """Returns a request to be scheduled or none.""" + # XXX: Do we need to use a timeout here? found = 0 + datas = self.fetch_data(self.redis_key, self.redis_batch_size) for data in datas: # 日志加入track_id try: @@ -173,11 +200,7 @@ def req_yield(self, datas): if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) - def next_requests(self): - """Returns a request to be scheduled or none.""" - # XXX: Do we need to use a timeout here? - datas = self.fetch_data(self.redis_key, self.redis_batch_size) - self.req_yield(datas) + self.latest_queue_mark(datas) def make_request_from_data(self, data): """Returns a Request instance from data coming from Redis. From 68d006df11e9c5e6069ee6919d2ab8d0f7499ee5 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 16:59:18 +0800 Subject: [PATCH 12/43] update --- mob_scrapy_redis_sentinel/spiders.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index ba02c5d..c7ab591 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -175,6 +175,7 @@ def next_requests(self): # XXX: Do we need to use a timeout here? found = 0 datas = self.fetch_data(self.redis_key, self.redis_batch_size) + self.latest_queue_mark(datas) for data in datas: # 日志加入track_id try: @@ -200,8 +201,6 @@ def next_requests(self): if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) - self.latest_queue_mark(datas) - def make_request_from_data(self, data): """Returns a Request instance from data coming from Redis. From d688adc66c8dca4b7392bd186d0ad77a70dff489 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 17:17:31 +0800 Subject: [PATCH 13/43] update --- mob_scrapy_redis_sentinel/spiders.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index c7ab591..274d0a7 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -136,8 +136,12 @@ def latest_queue_mark(self, datas): def spider_opened_latest_pop(self): """绑定spider open信号; 取出 stop spider前,最后1次datas""" - # hash mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() + for req in self._spider_opened_latest_pop(): + self.crawler.engine.crawl(req, spider=self) + + def _spider_opened_latest_pop(self): + # hash if self.server.hexists(self.latest_queue, inner_ip): datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) From e694c6dcf009d017412e1da5af33cbc29d5e12b2 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 17:23:12 +0800 Subject: [PATCH 14/43] update --- mob_scrapy_redis_sentinel/spiders.py | 56 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 274d0a7..8f4adc8 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -137,39 +137,39 @@ def latest_queue_mark(self, datas): def spider_opened_latest_pop(self): """绑定spider open信号; 取出 stop spider前,最后1次datas""" mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() - for req in self._spider_opened_latest_pop(): - self.crawler.engine.crawl(req, spider=self) + if self.server.hexists(self.latest_queue, inner_ip): + for req in self._spider_opened_latest_pop(): + self.crawler.engine.crawl(req, spider=self) def _spider_opened_latest_pop(self): # hash - if self.server.hexists(self.latest_queue, inner_ip): - datas = self.server.hget(self.latest_queue, inner_ip) - self.server.hdel(self.latest_queue, inner_ip) - found = 0 - for data in datas: - # 日志加入track_id - try: - queue_data = json.loads(data) - except: - queue_data = {} - track_id = make_md5(queue_data) - mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() - - reqs = self.make_request_from_data(data) - if isinstance(reqs, Iterable): - for req in reqs: - yield req - # XXX: should be here? - found += 1 - self.logger.info(f"start req url:{req.url}") - elif reqs: - yield reqs + datas = self.server.hget(self.latest_queue, inner_ip) + self.server.hdel(self.latest_queue, inner_ip) + found = 0 + for data in datas: + # 日志加入track_id + try: + queue_data = json.loads(data) + except: + queue_data = {} + track_id = make_md5(queue_data) + mob_log.info(f"spider name: {self.name}, make request from data from latest queue, queue_data: {queue_data}").track_id(track_id).commit() + + reqs = self.make_request_from_data(data) + if isinstance(reqs, Iterable): + for req in reqs: + yield req + # XXX: should be here? found += 1 - else: - self.logger.debug("Request not made from data: %r", data) + self.logger.info(f"start req url:{req.url}") + elif reqs: + yield reqs + found += 1 + else: + self.logger.debug("Request not made from data: %r", data) - if found: - self.logger.debug("Read %s requests from '%s'", found, self.redis_key) + if found: + self.logger.debug("Read %s requests from '%s'", found, self.redis_key) # if self.count_size(self.latest_queue) == 0: # return # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) From 352a10a9733e4a8369c55c1688a528ec20363040 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 17:34:24 +0800 Subject: [PATCH 15/43] update --- mob_scrapy_redis_sentinel/spiders.py | 31 ++++------------------------ 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 8f4adc8..1242992 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -132,44 +132,21 @@ def latest_queue_mark(self, datas): # for data in datas: # pipe.rpush(self.latest_queue, data) # pipe.execute() - self.server.hset(self.latest_queue, inner_ip, datas) + self.server.hset(self.latest_queue, inner_ip, json.dumps(json.loads(datas), ensure_ascii=False)) def spider_opened_latest_pop(self): """绑定spider open信号; 取出 stop spider前,最后1次datas""" mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() if self.server.hexists(self.latest_queue, inner_ip): - for req in self._spider_opened_latest_pop(): - self.crawler.engine.crawl(req, spider=self) + self._spider_opened_latest_pop() def _spider_opened_latest_pop(self): # hash datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) - found = 0 for data in datas: - # 日志加入track_id - try: - queue_data = json.loads(data) - except: - queue_data = {} - track_id = make_md5(queue_data) - mob_log.info(f"spider name: {self.name}, make request from data from latest queue, queue_data: {queue_data}").track_id(track_id).commit() - - reqs = self.make_request_from_data(data) - if isinstance(reqs, Iterable): - for req in reqs: - yield req - # XXX: should be here? - found += 1 - self.logger.info(f"start req url:{req.url}") - elif reqs: - yield reqs - found += 1 - else: - self.logger.debug("Request not made from data: %r", data) - - if found: - self.logger.debug("Read %s requests from '%s'", found, self.redis_key) + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}").track_id("").commit() + self.server.lpush(self.redis_key, json.dumps(json.loads(data), ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: # return # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) From 97a5b917d5844d6a8c1ec74f83933583df879025 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 17:42:34 +0800 Subject: [PATCH 16/43] update --- mob_scrapy_redis_sentinel/spiders.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 1242992..866ef8c 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -132,7 +132,10 @@ def latest_queue_mark(self, datas): # for data in datas: # pipe.rpush(self.latest_queue, data) # pipe.execute() - self.server.hset(self.latest_queue, inner_ip, json.dumps(json.loads(datas), ensure_ascii=False)) + latest_datas = [] + for data in datas: + latest_datas.append(json.loads(data)) + self.server.hset(self.latest_queue, inner_ip, latest_datas) def spider_opened_latest_pop(self): """绑定spider open信号; 取出 stop spider前,最后1次datas""" @@ -142,11 +145,11 @@ def spider_opened_latest_pop(self): def _spider_opened_latest_pop(self): # hash - datas = self.server.hget(self.latest_queue, inner_ip) + latest_datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) - for data in datas: + for data in latest_datas: mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}").track_id("").commit() - self.server.lpush(self.redis_key, json.dumps(json.loads(data), ensure_ascii=False)) + self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: # return # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) From 35878fa6b444f16000e44220e9b5dbbcb49a3f91 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 17:57:55 +0800 Subject: [PATCH 17/43] update --- mob_scrapy_redis_sentinel/spiders.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 866ef8c..daa4b21 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -141,15 +141,12 @@ def spider_opened_latest_pop(self): """绑定spider open信号; 取出 stop spider前,最后1次datas""" mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() if self.server.hexists(self.latest_queue, inner_ip): - self._spider_opened_latest_pop() + latest_datas = self.server.hget(self.latest_queue, inner_ip) + self.server.hdel(self.latest_queue, inner_ip) + for data in latest_datas: + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}, latest_datas: {latest_datas}").track_id("").commit() + self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) - def _spider_opened_latest_pop(self): - # hash - latest_datas = self.server.hget(self.latest_queue, inner_ip) - self.server.hdel(self.latest_queue, inner_ip) - for data in latest_datas: - mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}").track_id("").commit() - self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: # return # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) From 6d3345c74b3ed525446996e0faa2e2d7d872a05f Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 18:02:37 +0800 Subject: [PATCH 18/43] update --- mob_scrapy_redis_sentinel/spiders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index daa4b21..07b5b9c 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -143,7 +143,7 @@ def spider_opened_latest_pop(self): if self.server.hexists(self.latest_queue, inner_ip): latest_datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) - for data in latest_datas: + for data in eval(bytes_to_str(latest_datas)): mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}, latest_datas: {latest_datas}").track_id("").commit() self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) From 825b2aa8bfa137a77b0cdd5637ff55db89a22f7f Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 18:14:29 +0800 Subject: [PATCH 19/43] update --- mob_scrapy_redis_sentinel/spiders.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 07b5b9c..7206b1a 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -125,7 +125,6 @@ def latest_queue_mark(self, datas): """备份队列 list or hash""" # 1、删除上一次(多个worker,如何保证删除的一致性) # self.server.delete(self.latest_queue) - mob_log.info(f"spider name: {self.name}, latest_queue_mark, inner_ip: {inner_ip}").track_id("").commit() self.server.hdel(self.latest_queue, inner_ip) # 2、 存入 # with self.server.pipeline() as pipe: @@ -135,6 +134,7 @@ def latest_queue_mark(self, datas): latest_datas = [] for data in datas: latest_datas.append(json.loads(data)) + mob_log.info(f"spider name: {self.name}, latest_queue_mark, inner_ip: {inner_ip}, latest_datas: {latest_datas}").track_id("").commit() self.server.hset(self.latest_queue, inner_ip, latest_datas) def spider_opened_latest_pop(self): @@ -144,7 +144,7 @@ def spider_opened_latest_pop(self): latest_datas = self.server.hget(self.latest_queue, inner_ip) self.server.hdel(self.latest_queue, inner_ip) for data in eval(bytes_to_str(latest_datas)): - mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}, latest_datas: {latest_datas}").track_id("").commit() + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}, latest_datas: {bytes_to_str(latest_datas)}").track_id("").commit() self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: From c045059dd40dcebd57735836e73a04f3be02c652 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 2 Jun 2022 18:36:00 +0800 Subject: [PATCH 20/43] update --- mob_scrapy_redis_sentinel/spiders.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 7206b1a..656c9f9 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -142,9 +142,10 @@ def spider_opened_latest_pop(self): mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() if self.server.hexists(self.latest_queue, inner_ip): latest_datas = self.server.hget(self.latest_queue, inner_ip) + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, latest_datas: {bytes_to_str(latest_datas)}").track_id("").commit() self.server.hdel(self.latest_queue, inner_ip) for data in eval(bytes_to_str(latest_datas)): - mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}, latest_datas: {bytes_to_str(latest_datas)}").track_id("").commit() + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}").track_id("").commit() self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: From fa7f889f5fa44e105314e7ead50010f1acf9dc13 Mon Sep 17 00:00:00 2001 From: zhangyongqiang Date: Mon, 6 Jun 2022 10:22:03 +0800 Subject: [PATCH 21/43] update version from 0.4 to 0.5 --- setup.py | 135 ++++++++++++++++++++++--------------------------------- 1 file changed, 53 insertions(+), 82 deletions(-) diff --git a/setup.py b/setup.py index 3e3797e..c5159bb 100644 --- a/setup.py +++ b/setup.py @@ -1,104 +1,75 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import io -import os -import sys -from shutil import rmtree - -from setuptools import Command, find_packages, setup - -NAME = "mob-scrapy-redis-sentinel" -FOLDER = "mob_scrapy_redis_sentinel" -DESCRIPTION = "Redis Cluster for Scrapy." -EMAIL = "luzhang@mob.com" -AUTHOR = "luzihang" -REQUIRES_PYTHON = ">=3.6.0" -VERSION = None - - -def read_file(filename): - with open(filename) as fp: - return fp.read().strip() - +# !/usr/bin/env python +# _*_coding: utf-8 _*_ -def read_requirements(filename): - return [line.strip() for line in read_file(filename).splitlines() if not line.startswith("#")] +import codecs +import os +try: + from setuptools import setup +except: + from distutils.core import setup +""" +打包的用的setup必须引入, +""" -REQUIRED = read_requirements("requirements.txt") -here = os.path.abspath(os.path.dirname(__file__)) +def read(fname): + """ + 定义一个read方法,用来读取目录下的长描述 + 我们一般是将README文件中的内容读取出来作为长描述,这个会在PyPI中你这个包的页面上展现出来, + 你也可以不用这个方法,自己手动写内容即可, + PyPI上支持.rst格式的文件。暂不支持.md格式的文件,
.rst文件PyPI会自动把它转为HTML形式显示在你包的信息页面上。 + """ + return codecs.open(os.path.join(os.path.dirname(__file__), fname)).read() -try: - with io.open(os.path.join(here, "README.md"), encoding="utf-8") as f: - long_description = "\n" + f.read() -except FileNotFoundError: - long_description = DESCRIPTION -print(long_description) -about = {} -if not VERSION: - with open(os.path.join(here, FOLDER, "__init__.py")) as f: - exec(f.read(), about) -else: - about["__version__"] = VERSION +# 名字,一般放你包的名字即可 +NAME = "mob_scrapy_redis_sentinel" -class UploadCommand(Command): - description = "Build and publish the package." - user_options = [] +# 包含的包,可以多个,这是一个列表 +PACKAGES = ["mob_scrapy_redis_sentinel"] - @staticmethod - def status(s): - """Prints things in bold.""" - print("\033[1m{0}\033[0m".format(s)) +# 关于这个包的描述 +DESCRIPTION = "this is a tool package for mobTech." - def initialize_options(self): - pass +# 参见 read 方法说明 +LONG_DESCRIPTION = read("README.md") - def finalize_options(self): - pass +# 当前包的一些关键字,方便PyPI进行分类 +KEYWORDS = "mob utils" - def run(self): - try: - self.status("Removing previous builds…") - rmtree(os.path.join(here, "dist")) - except OSError: - pass +# 作者 +AUTHOR = "LuZiHang" - self.status("Building Source and Wheel (universal) distribution…") - os.system("{0} setup.py sdist bdist_wheel --universal".format(sys.executable)) +# 作者邮箱 +AUTHOR_EMAIL = "luzihang@mob.com" - self.status("Uploading the package to PyPI via Twine…") - os.system("twine upload dist/*") +# 你这个包的项目地址 +URL = "https://github.com/pypa/sampleproject" - sys.exit() +# 自己控制的版本号 +VERSION = "0.5" +# 授权方式 +LICENSE = "MIT" setup( name=NAME, - version=about["__version__"], + version=VERSION, description=DESCRIPTION, - long_description=long_description, - long_description_content_type="text/markdown", - author=AUTHOR, - author_email=EMAIL, - url='https://github.com/crawlmap/scrapy-redis-sentinel.git', - project_urls={"Documentation": "https://crawlaio.com/scrapy-redis-sentinel/"}, - packages=find_packages(), - install_requires=REQUIRED, - license="MIT", - zip_safe=False, - keywords=[ - 'scrapy-redis', - 'scrapy-redis-sentinel', - 'scrapy-redis-cluster' - ], + long_description=LONG_DESCRIPTION, classifiers=[ - "License :: OSI Approved :: MIT License", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python', + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', ], - cmdclass={"upload": UploadCommand}, + keywords=KEYWORDS, + author=AUTHOR, + author_email=AUTHOR_EMAIL, + url=URL, + license=LICENSE, + packages=PACKAGES, + include_package_data=True, + zip_safe=True, ) From 1fd8cba91e31e7388242f426a81f619e70a81af4 Mon Sep 17 00:00:00 2001 From: zhangyongqiang Date: Mon, 6 Jun 2022 10:27:54 +0800 Subject: [PATCH 22/43] update version from 0.4 to 0.5 --- setup.py | 135 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 53 deletions(-) diff --git a/setup.py b/setup.py index c5159bb..3985baf 100644 --- a/setup.py +++ b/setup.py @@ -1,75 +1,104 @@ -# !/usr/bin/env python -# _*_coding: utf-8 _*_ - -import codecs +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import io import os +import sys +from shutil import rmtree + +from setuptools import Command, find_packages, setup + +NAME = "mob-scrapy-redis-sentinel" +FOLDER = "mob_scrapy_redis_sentinel" +DESCRIPTION = "Redis Cluster for Scrapy." +EMAIL = "luzhang@mob.com" +AUTHOR = "luzihang" +REQUIRES_PYTHON = ">=3.6.0" +VERSION = 0.5 + + +def read_file(filename): + with open(filename) as fp: + return fp.read().strip() -try: - from setuptools import setup -except: - from distutils.core import setup -""" -打包的用的setup必须引入, -""" +def read_requirements(filename): + return [line.strip() for line in read_file(filename).splitlines() if not line.startswith("#")] -def read(fname): - """ - 定义一个read方法,用来读取目录下的长描述 - 我们一般是将README文件中的内容读取出来作为长描述,这个会在PyPI中你这个包的页面上展现出来, - 你也可以不用这个方法,自己手动写内容即可, - PyPI上支持.rst格式的文件。暂不支持.md格式的文件,
.rst文件PyPI会自动把它转为HTML形式显示在你包的信息页面上。 - """ - return codecs.open(os.path.join(os.path.dirname(__file__), fname)).read() +REQUIRED = read_requirements("requirements.txt") -# 名字,一般放你包的名字即可 -NAME = "mob_scrapy_redis_sentinel" +here = os.path.abspath(os.path.dirname(__file__)) -# 包含的包,可以多个,这是一个列表 -PACKAGES = ["mob_scrapy_redis_sentinel"] +try: + with io.open(os.path.join(here, "README.md"), encoding="utf-8") as f: + long_description = "\n" + f.read() +except FileNotFoundError: + long_description = DESCRIPTION +print(long_description) +about = {} +if not VERSION: + with open(os.path.join(here, FOLDER, "__init__.py")) as f: + exec(f.read(), about) +else: + about["__version__"] = VERSION + + +class UploadCommand(Command): + description = "Build and publish the package." + user_options = [] -# 关于这个包的描述 -DESCRIPTION = "this is a tool package for mobTech." + @staticmethod + def status(s): + """Prints things in bold.""" + print("\033[1m{0}\033[0m".format(s)) -# 参见 read 方法说明 -LONG_DESCRIPTION = read("README.md") + def initialize_options(self): + pass -# 当前包的一些关键字,方便PyPI进行分类 -KEYWORDS = "mob utils" + def finalize_options(self): + pass -# 作者 -AUTHOR = "LuZiHang" + def run(self): + try: + self.status("Removing previous builds…") + rmtree(os.path.join(here, "dist")) + except OSError: + pass -# 作者邮箱 -AUTHOR_EMAIL = "luzihang@mob.com" + self.status("Building Source and Wheel (universal) distribution…") + os.system("{0} setup.py sdist bdist_wheel --universal".format(sys.executable)) -# 你这个包的项目地址 -URL = "https://github.com/pypa/sampleproject" + self.status("Uploading the package to PyPI via Twine…") + os.system("twine upload dist/*") -# 自己控制的版本号 -VERSION = "0.5" + sys.exit() -# 授权方式 -LICENSE = "MIT" setup( name=NAME, - version=VERSION, + version=about["__version__"], description=DESCRIPTION, - long_description=LONG_DESCRIPTION, + long_description=long_description, + long_description_content_type="text/markdown", + author=AUTHOR, + author_email=EMAIL, + url='https://github.com/crawlmap/scrapy-redis-sentinel.git', + project_urls={"Documentation": "https://crawlaio.com/scrapy-redis-sentinel/"}, + packages=find_packages(), + install_requires=REQUIRED, + license="MIT", + zip_safe=False, + keywords=[ + 'scrapy-redis', + 'scrapy-redis-sentinel', + 'scrapy-redis-cluster' + ], classifiers=[ - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python', - 'Intended Audience :: Developers', - 'Operating System :: OS Independent', + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", ], - keywords=KEYWORDS, - author=AUTHOR, - author_email=AUTHOR_EMAIL, - url=URL, - license=LICENSE, - packages=PACKAGES, - include_package_data=True, - zip_safe=True, + cmdclass={"upload": UploadCommand}, ) From cd7416500d9528aa069cb8ca2e68ea43d0660717 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 6 Jun 2022 11:40:37 +0800 Subject: [PATCH 23/43] update --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 560404f..4670206 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ 2. 支持Python3.8+(collection.abc的引入方式) 3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析 4. 自动添加 track_id: "make request from data" 和 "get request from next_request " - +5. 增加任务防丢: 每次备份上一次任务,启动爬虫时,任务回队列首。`defaults.LATEST_QUEUE_KEY` ----- 本项目基于原项目 [scrapy-redis](https://github.com/rmax/scrapy-redis) From 9559b8db73b36e012f63379c80fd1638b3c43c66 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 13 Jun 2022 20:19:23 +0800 Subject: [PATCH 24/43] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20rocket=20mq=20?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/defaults.py | 11 ++++++++ mob_scrapy_redis_sentinel/spiders.py | 36 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index ff8486e..438644c 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -40,3 +40,14 @@ 每次make request from data,备份一份数据,到 LATEST_QUEUE_KEY。同时删除上一批的备份。(多个worker,删除同一个 LATEST_QUEUE_KEY,如何做到不互相干扰?) """ LATEST_QUEUE_KEY = "%(name)s:latest_queue" + +""" +从MQ获取任务 +""" +MQ_USED = False # 默认关闭 + +MQ_HOST = "http://10.89.104.148:10011" +# 从指定队列中取出消息 +POP_MESSAGE = MQ_HOST + "/rest/ms/GemMQ/popMessage?queueName={queueName}" +# 获取消息队列的大小 +GET_QUEUE_SIZE = MQ_HOST + "/rest/ms/GemMQ/getQueueSize?queueName={queueName}" diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 656c9f9..3d6da53 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -18,6 +18,10 @@ from mob_scrapy_redis_sentinel.utils import make_md5 from mob_scrapy_redis_sentinel import inner_ip +import requests +import base64 +import traceback + class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" @@ -96,6 +100,9 @@ def setup_redis(self, crawler=None): elif self.settings.getbool("REDIS_START_URLS_AS_ZSET", defaults.START_URLS_AS_ZSET): self.fetch_data = self.pop_priority_queue self.count_size = self.server.zcard + elif self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + self.fetch_data = self.pop_batch_mq + self.count_size = self.get_queue_size else: self.fetch_data = self.pop_list_queue self.count_size = self.server.llen @@ -114,6 +121,35 @@ def pop_list_queue(self, redis_key, batch_size): datas, _ = pipe.execute() return datas + def get_queue_size(self, queue_name): + try: + r = requests.get(defaults.GET_QUEUE_SIZE.format(queueName=queue_name), timeout=5) + return r.json()['data']['queueSize'] + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, get mq queue size error: {traceback.format_exc()}").track_id("").commit() + + def pop_batch_mq(self, redis_key, batch_size): + """ + redis_key: mq队列名称默认和redis_key同名 + batch_size: 批量取用 + """ + datas = [] + for i in range(batch_size): + queue_data = self.pop_mq(queue_name=redis_key) + datas.append(queue_data) + return datas + + def pop_mq(self, queue_name): + try: + r = requests.get(defaults.POP_MESSAGE.format(queueName=queue_name), timeout=5) + resp = r.json() + if resp.get("error_code") == 0 and resp.get("data"): + message = resp["data"]["message"] + queue_data = base64.b64decode(message) + return queue_data + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, pop mq error: {traceback.format_exc()}").track_id("").commit() + def pop_priority_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.zrevrange(redis_key, 0, batch_size - 1) From 6fe8a376a03b060edc3612d446228810b5c2de23 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Mon, 13 Jun 2022 20:20:50 +0800 Subject: [PATCH 25/43] update get_queue_size queueSize type --- mob_scrapy_redis_sentinel/spiders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 3d6da53..05d4f93 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -124,7 +124,7 @@ def pop_list_queue(self, redis_key, batch_size): def get_queue_size(self, queue_name): try: r = requests.get(defaults.GET_QUEUE_SIZE.format(queueName=queue_name), timeout=5) - return r.json()['data']['queueSize'] + return int(r.json()['data']['queueSize']) except: mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, get mq queue size error: {traceback.format_exc()}").track_id("").commit() From f99876267590003228cb06ff16cd6285f89cf0ff Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Tue, 14 Jun 2022 11:21:22 +0800 Subject: [PATCH 26/43] fix None data error --- mob_scrapy_redis_sentinel/spiders.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 05d4f93..06ed174 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -136,7 +136,8 @@ def pop_batch_mq(self, redis_key, batch_size): datas = [] for i in range(batch_size): queue_data = self.pop_mq(queue_name=redis_key) - datas.append(queue_data) + if queue_data: + datas.append(queue_data) return datas def pop_mq(self, queue_name): From 628873c14fd89798d21b1076fcc995251108750b Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Tue, 14 Jun 2022 12:28:40 +0800 Subject: [PATCH 27/43] modify queue_name, can run according env dev/prod --- mob_scrapy_redis_sentinel/defaults.py | 8 ++++- mob_scrapy_redis_sentinel/spiders.py | 42 ++++++++++++++++++++------- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index 438644c..40502cf 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- import redis - +import os import rediscluster from redis.sentinel import Sentinel @@ -51,3 +51,9 @@ POP_MESSAGE = MQ_HOST + "/rest/ms/GemMQ/popMessage?queueName={queueName}" # 获取消息队列的大小 GET_QUEUE_SIZE = MQ_HOST + "/rest/ms/GemMQ/getQueueSize?queueName={queueName}" + +# 与环境相关的配置 +if os.getenv('env') == 'prod': + QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" +else: + QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 06ed174..7a4d9f0 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -25,7 +25,7 @@ class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" - + queue_name = None # mob rocket mq name redis_key = None latest_queue = None redis_batch_size = None @@ -65,6 +65,9 @@ def setup_redis(self, crawler=None): self.redis_key = self.redis_key % {"name": self.name} + if settings.getbool("MQ_USED", defaults.MQ_USED): # 使用了mq, 区分和生产队列名称 + self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.redis_key) + if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") @@ -121,21 +124,17 @@ def pop_list_queue(self, redis_key, batch_size): datas, _ = pipe.execute() return datas - def get_queue_size(self, queue_name): + def get_queue_size(self, redis_key): try: - r = requests.get(defaults.GET_QUEUE_SIZE.format(queueName=queue_name), timeout=5) + r = requests.get(defaults.GET_QUEUE_SIZE.format(queueName=self.queue_name), timeout=5) return int(r.json()['data']['queueSize']) except: mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, get mq queue size error: {traceback.format_exc()}").track_id("").commit() def pop_batch_mq(self, redis_key, batch_size): - """ - redis_key: mq队列名称默认和redis_key同名 - batch_size: 批量取用 - """ datas = [] for i in range(batch_size): - queue_data = self.pop_mq(queue_name=redis_key) + queue_data = self.pop_mq(self.queue_name) if queue_data: datas.append(queue_data) return datas @@ -151,6 +150,24 @@ def pop_mq(self, queue_name): except: mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, pop mq error: {traceback.format_exc()}").track_id("").commit() + def send_message2mq(self, queue_name, queue_data, priority=0, delay_seconds=""): + """ + 发送消息到指定队列 + """ + try: + message = (base64.b64encode(queue_data.encode())).decode() + form_data = { + "message": message, + "queueName": queue_name, + "priority": priority, + "delaySeconds": delay_seconds + } + r = requests.post(f"{defaults.MQ_HOST}/rest/ms/GemMQ/sendMessage", json=form_data) + resp = r.json() + mob_log.info(f"spider name: {self.name}, inner ip: {inner_ip}, send message to mq success, resp: {resp}").track_id("").commit() + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, send message to mq error: {traceback.format_exc()}").track_id("").commit() + def pop_priority_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.zrevrange(redis_key, 0, batch_size - 1) @@ -170,7 +187,7 @@ def latest_queue_mark(self, datas): # pipe.execute() latest_datas = [] for data in datas: - latest_datas.append(json.loads(data)) + latest_datas.append(bytes_to_str(data)) mob_log.info(f"spider name: {self.name}, latest_queue_mark, inner_ip: {inner_ip}, latest_datas: {latest_datas}").track_id("").commit() self.server.hset(self.latest_queue, inner_ip, latest_datas) @@ -183,7 +200,10 @@ def spider_opened_latest_pop(self): self.server.hdel(self.latest_queue, inner_ip) for data in eval(bytes_to_str(latest_datas)): mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}").track_id("").commit() - self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) + if self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + self.send_message2mq(queue_name=self.queue_name, queue_data=str(data), priority=1) + else: # 使用reids + self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) # if self.count_size(self.latest_queue) == 0: # return @@ -200,7 +220,7 @@ def next_requests(self): try: queue_data = json.loads(data) except: - queue_data = {} + queue_data = bytes_to_str(data) track_id = make_md5(queue_data) mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() From 6ba35b0cfff2e0970eb94eecfaeece2436483fdb Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Tue, 14 Jun 2022 14:33:15 +0800 Subject: [PATCH 28/43] =?UTF-8?q?redis=5Fkey=E6=B7=BB=E5=8A=A0=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E5=80=BC=EF=BC=8C=E9=BB=98=E8=AE=A4spider=20name?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/spiders.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 7a4d9f0..2af941f 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -66,7 +66,8 @@ def setup_redis(self, crawler=None): self.redis_key = self.redis_key % {"name": self.name} if settings.getbool("MQ_USED", defaults.MQ_USED): # 使用了mq, 区分和生产队列名称 - self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.redis_key) + self.redis_key = self.name + self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.name) if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") From 7798f1c586ddd62ed706645d9bc38bbb6cbb58cd Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Tue, 14 Jun 2022 18:49:18 +0800 Subject: [PATCH 29/43] =?UTF-8?q?redis=5Fkey=E6=B7=BB=E5=8A=A0=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E5=80=BC=EF=BC=8C=E9=BB=98=E8=AE=A4spider=20name?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3985baf..a507ca9 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ EMAIL = "luzhang@mob.com" AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" -VERSION = 0.5 +VERSION = 0.7 def read_file(filename): From 50669db5a9732aa0128ddb39443ab885ada50c42 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 15 Jun 2022 17:00:21 +0800 Subject: [PATCH 30/43] =?UTF-8?q?=E6=94=B9=E5=8F=98=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E7=9A=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=B8=BA?= =?UTF-8?q?ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/defaults.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index 40502cf..4452627 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -3,6 +3,7 @@ import os import rediscluster from redis.sentinel import Sentinel +from mob_tools.inner_ip import get_inner_ip DUPEFILTER_KEY = "dupefilter:%(timestamp)s" @@ -53,7 +54,10 @@ GET_QUEUE_SIZE = MQ_HOST + "/rest/ms/GemMQ/getQueueSize?queueName={queueName}" # 与环境相关的配置 -if os.getenv('env') == 'prod': +PRODUCTION_ENV_TAG = '10.90' +local_ip = os.getenv("LOCAL_IP", get_inner_ip()) +# 不是以10.90开头的,认为是非生产环境 +if local_ip.startswith(PRODUCTION_ENV_TAG): QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" else: QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" From c4557f4b409f229e5e55e930e5f7063c6c91fe3a Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 15 Jun 2022 17:16:13 +0800 Subject: [PATCH 31/43] =?UTF-8?q?=E6=94=B9=E5=8F=98=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E7=9A=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=B8=BA?= =?UTF-8?q?ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/__init__.py | 8 ++++++-- mob_scrapy_redis_sentinel/defaults.py | 7 +++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index 843d740..b816079 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -15,9 +15,13 @@ ENV = os.getenv("ENV", DEV) -if ENV == "prod": +inner_ip = os.getenv("LOCAL_IP", get_inner_ip()) + +PRODUCTION_ENV_TAG = '10.90' +local_ip = os.getenv("LOCAL_IP", inner_ip) +# 不是以10.90开头的,认为是非生产环境 +if local_ip.startswith(PRODUCTION_ENV_TAG): mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es') - inner_ip = os.getenv("LOCAL_IP", get_inner_ip()) else: mob_log = MobLoguru() inner_ip = "127.0.0.1" diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index 4452627..309da39 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -3,7 +3,8 @@ import os import rediscluster from redis.sentinel import Sentinel -from mob_tools.inner_ip import get_inner_ip + +from mob_scrapy_redis_sentinel import inner_ip, mob_log DUPEFILTER_KEY = "dupefilter:%(timestamp)s" @@ -55,9 +56,11 @@ # 与环境相关的配置 PRODUCTION_ENV_TAG = '10.90' -local_ip = os.getenv("LOCAL_IP", get_inner_ip()) +local_ip = os.getenv("LOCAL_IP", inner_ip) # 不是以10.90开头的,认为是非生产环境 if local_ip.startswith(PRODUCTION_ENV_TAG): QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" else: QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" + +mob_log.debug(f"QUEUE_NAME_PREFIX: {QUEUE_NAME_PREFIX}, local_ip: {local_ip}").track_id("").commit() From 07e3857d77c734873d674de8e2b4fd082e8798da Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 15 Jun 2022 17:26:29 +0800 Subject: [PATCH 32/43] =?UTF-8?q?=E6=94=B9=E5=8F=98=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E7=9A=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=B8=BA?= =?UTF-8?q?ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/defaults.py | 2 -- mob_scrapy_redis_sentinel/spiders.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index 309da39..bd1cac0 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -62,5 +62,3 @@ QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" else: QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" - -mob_log.debug(f"QUEUE_NAME_PREFIX: {QUEUE_NAME_PREFIX}, local_ip: {local_ip}").track_id("").commit() diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 2af941f..fe41377 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -68,6 +68,7 @@ def setup_redis(self, crawler=None): if settings.getbool("MQ_USED", defaults.MQ_USED): # 使用了mq, 区分和生产队列名称 self.redis_key = self.name self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.name) + mob_log.debug(f"queue_name: {self.queue_name}, redis_key: {self.redis_key}").track_id("").commit() if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") From a1dd1806433a6a804b3397f26c7a057cfb56a103 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 15 Jun 2022 17:30:29 +0800 Subject: [PATCH 33/43] =?UTF-8?q?=E6=94=B9=E5=8F=98=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E7=9A=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=B8=BA?= =?UTF-8?q?ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/spiders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index fe41377..0de431f 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -68,7 +68,7 @@ def setup_redis(self, crawler=None): if settings.getbool("MQ_USED", defaults.MQ_USED): # 使用了mq, 区分和生产队列名称 self.redis_key = self.name self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.name) - mob_log.debug(f"queue_name: {self.queue_name}, redis_key: {self.redis_key}").track_id("").commit() + self.logger.info(f"mq queue_name: {self.queue_name}, redis_key: {self.redis_key}") if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") From 927931a6a951027b212fb94a3fb835d8d3ca156d Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 15 Jun 2022 18:02:05 +0800 Subject: [PATCH 34/43] =?UTF-8?q?=E6=94=B9=E5=8F=98=E8=AF=86=E5=88=AB?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E7=9A=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=B8=BA?= =?UTF-8?q?ip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/__init__.py | 14 +++----------- mob_scrapy_redis_sentinel/defaults.py | 3 +-- setup.py | 2 +- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index b816079..57b9c2a 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -3,24 +3,16 @@ __original_author__ = "Rolando Espinoza" __author__ = "luzihang" __email__ = "luzihang@mob.com" -__version__ = "0.5" +__version__ = "0.8" from mob_tools.mobLog import MobLoguru from mob_tools.inner_ip import get_inner_ip -import os -# 环境常量 -DEV = "dev" -PROD = "prod" - -ENV = os.getenv("ENV", DEV) - -inner_ip = os.getenv("LOCAL_IP", get_inner_ip()) +inner_ip = get_inner_ip() PRODUCTION_ENV_TAG = '10.90' -local_ip = os.getenv("LOCAL_IP", inner_ip) # 不是以10.90开头的,认为是非生产环境 -if local_ip.startswith(PRODUCTION_ENV_TAG): +if inner_ip.startswith(PRODUCTION_ENV_TAG): mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es') else: mob_log = MobLoguru() diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index bd1cac0..ec4168f 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -56,9 +56,8 @@ # 与环境相关的配置 PRODUCTION_ENV_TAG = '10.90' -local_ip = os.getenv("LOCAL_IP", inner_ip) # 不是以10.90开头的,认为是非生产环境 -if local_ip.startswith(PRODUCTION_ENV_TAG): +if inner_ip.startswith(PRODUCTION_ENV_TAG): QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" else: QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" diff --git a/setup.py b/setup.py index a507ca9..01b834f 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ EMAIL = "luzhang@mob.com" AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" -VERSION = 0.7 +VERSION = 0.8 def read_file(filename): From d9ba5b9146021bb36c5bf0bbc8122e788b291423 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Thu, 16 Jun 2022 20:59:26 +0800 Subject: [PATCH 35/43] =?UTF-8?q?=E7=88=AC=E8=99=AB=E5=88=9D=E5=A7=8B?= =?UTF-8?q?=E5=8C=96=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=98=AF=E5=90=A6=E5=AD=98=E5=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/defaults.py | 2 ++ mob_scrapy_redis_sentinel/spiders.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index ec4168f..cc72611 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -49,6 +49,8 @@ MQ_USED = False # 默认关闭 MQ_HOST = "http://10.89.104.148:10011" +# 创建队列 +CREATE_QUEUE = MQ_HOST + "/rest/ms/GemMQ/createQueue?queueName={queueName}" # 从指定队列中取出消息 POP_MESSAGE = MQ_HOST + "/rest/ms/GemMQ/popMessage?queueName={queueName}" # 获取消息队列的大小 diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 0de431f..afa7e49 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -108,6 +108,8 @@ def setup_redis(self, crawler=None): elif self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ self.fetch_data = self.pop_batch_mq self.count_size = self.get_queue_size + # 爬虫启动时,检查队列是否存在,不存在则创建 + crawler.signals.connect(self.check_queue, signal=signals.spider_opened) else: self.fetch_data = self.pop_list_queue self.count_size = self.server.llen @@ -119,6 +121,11 @@ def setup_redis(self, crawler=None): # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) + def check_queue(self): + if not self.get_queue_size(self.queue_name): + mob_log.warning(f"{self.queue_name} queue is not exist, now create it") + self.create_queue(self.queue_name) + def pop_list_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.lrange(redis_key, 0, batch_size - 1) @@ -152,6 +159,13 @@ def pop_mq(self, queue_name): except: mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, pop mq error: {traceback.format_exc()}").track_id("").commit() + def create_queue(self, queue_name): + try: + r = requests.get(defaults.CREATE_QUEUE.format(queueName=queue_name), timeout=5) + return r.json() + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, create mq error: {traceback.format_exc()}").track_id("").commit() + def send_message2mq(self, queue_name, queue_data, priority=0, delay_seconds=""): """ 发送消息到指定队列 From 8c49b0fb86518f8b14c2551b944507b3112bda21 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 17 Jun 2022 10:38:16 +0800 Subject: [PATCH 36/43] update version --- mob_scrapy_redis_sentinel/__init__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index 57b9c2a..340d14f 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -3,7 +3,7 @@ __original_author__ = "Rolando Espinoza" __author__ = "luzihang" __email__ = "luzihang@mob.com" -__version__ = "0.8" +__version__ = "0.9" from mob_tools.mobLog import MobLoguru from mob_tools.inner_ip import get_inner_ip diff --git a/setup.py b/setup.py index 01b834f..db8ceb9 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ EMAIL = "luzhang@mob.com" AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" -VERSION = 0.8 +VERSION = 0.9 def read_file(filename): From a1ffcd833c29fd5ec4531da37bfb7d1cc0013e69 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 22 Jun 2022 11:10:21 +0800 Subject: [PATCH 37/43] update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 4670206..a18bae7 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ 3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析 4. 自动添加 track_id: "make request from data" 和 "get request from next_request " 5. 增加任务防丢: 每次备份上一次任务,启动爬虫时,任务回队列首。`defaults.LATEST_QUEUE_KEY` +6. 增加使用shield进行任务调度: `MQ_USED` ----- 本项目基于原项目 [scrapy-redis](https://github.com/rmax/scrapy-redis) From 540d5c0aa3c091a9e19ba1328a8fee58cdee2257 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Wed, 22 Jun 2022 11:10:51 +0800 Subject: [PATCH 38/43] update MongoDupeFilter --- mob_scrapy_redis_sentinel/defaults.py | 1 + mob_scrapy_redis_sentinel/dupefilter.py | 71 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py index cc72611..be2f988 100644 --- a/mob_scrapy_redis_sentinel/defaults.py +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -6,6 +6,7 @@ from mob_scrapy_redis_sentinel import inner_ip, mob_log +# For standalone use. DUPEFILTER_KEY = "dupefilter:%(timestamp)s" PIPELINE_KEY = "%(spider)s:items" diff --git a/mob_scrapy_redis_sentinel/dupefilter.py b/mob_scrapy_redis_sentinel/dupefilter.py index 4c869b9..f4e601c 100644 --- a/mob_scrapy_redis_sentinel/dupefilter.py +++ b/mob_scrapy_redis_sentinel/dupefilter.py @@ -2,6 +2,9 @@ import logging import time +import pymongo +from pymongo.errors import DuplicateKeyError + from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint @@ -12,6 +15,74 @@ logger = logging.getLogger(__name__) +class MongoDupeFilter(BaseDupeFilter): + def __init__(self, mongo_uri, db, collection, debug=False, *args, **kwargs): + self.mongo = pymongo.MongoClient(mongo_uri) + self.mongo_db = db + self.collection = collection + self.debug = debug + self.logdupes = True + self.mongo[self.mongo_db][self.collection].create_index("fp", unique=True) + + @classmethod + def from_settings(cls, settings): + mongo_uri = settings.get("MongoFilter_URI") + mongo_db = settings.get("MongoFilter_DB") + collection = defaults.DUPEFILTER_KEY % {"timestamp": int(time.time())} + debug = settings.getbool("DUPEFILTER_DEBUG", False) + return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug) + + @classmethod + def from_crawler(cls, crawler): + return cls.from_settings(crawler.settings) + + def request_seen(self, request): + fp = self.request_fingerprint(request) + # This returns the number of values added, zero if already exists. + doc = self.mongo[self.mongo_db][self.collection].find_one({"fp": fp}) + if doc: + return True + try: + self.mongo[self.mongo_db][self.collection].insert_one({"fp": fp}) + except DuplicateKeyError: + pass + return False + + def request_fingerprint(self, request): + return request_fingerprint(request) + + @classmethod + def from_spider(cls, spider): + settings = spider.settings + mongo_uri = settings.get("MongoFilter_URI") + mongo_db = settings.get("MongoFilter_DB") + dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) + collection = dupefilter_key % {"spider": spider.name} + debug = settings.getbool("DUPEFILTER_DEBUG") + return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug) + + def close(self, reason=""): + self.clear() + + def clear(self): + self.mongo.drop_collection(self.collection) + + def log(self, request, spider): + if self.debug: + msg = "Filtered duplicate request: %(request)s" + self.logger.debug(msg, {"request": request}, extra={"spider": spider}) + elif self.logdupes: + msg = ( + "Filtered duplicate request %(request)s" + " - no more duplicates will be shown" + " (see DUPEFILTER_DEBUG to show all duplicates)" + ) + self.logger.debug(msg, {"request": request}, extra={"spider": spider}) + self.logdupes = False + + spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider) + + # TODO: Rename class to RedisDupeFilter. class RedisDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. From f1074f765f488930f39cb82621ed4c2b8a98be83 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 24 Jun 2022 12:21:34 +0800 Subject: [PATCH 39/43] =?UTF-8?q?update:=20=E5=A4=84=E7=90=86mq=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E9=87=8D=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/spiders.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index afa7e49..ab17546 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -240,6 +240,13 @@ def next_requests(self): track_id = make_md5(queue_data) mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() + # 处理mq并发重复 + if self.server.exists(track_id): # 存在则重复 + mob_log.info(f"spider name: {self.name}, mq repetition, track_id: {track_id}").track_id(track_id).commit() + continue + else: + self.server.set(track_id, "1", ex=60 * 5) + reqs = self.make_request_from_data(data) if isinstance(reqs, Iterable): for req in reqs: From c03cb8ff99fb009e14d7805e0d5064049626b2b9 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 24 Jun 2022 12:25:31 +0800 Subject: [PATCH 40/43] update: version --- mob_scrapy_redis_sentinel/__init__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py index 340d14f..a95c19f 100644 --- a/mob_scrapy_redis_sentinel/__init__.py +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -3,7 +3,7 @@ __original_author__ = "Rolando Espinoza" __author__ = "luzihang" __email__ = "luzihang@mob.com" -__version__ = "0.9" +__version__ = "1.0" from mob_tools.mobLog import MobLoguru from mob_tools.inner_ip import get_inner_ip diff --git a/setup.py b/setup.py index db8ceb9..9fddb81 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ EMAIL = "luzhang@mob.com" AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" -VERSION = 0.9 +VERSION = 1.0 def read_file(filename): From 63a69f434f41dfba7f9a3c4bcd2800d2abb3bc8d Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 24 Jun 2022 12:26:12 +0800 Subject: [PATCH 41/43] update: mob-tools==0.0.17 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index dec3fd2..5c34758 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ redis==3.5.3 redis-py-cluster==2.1.3 Scrapy -mob-tools==0.0.16 \ No newline at end of file +mob-tools==0.0.17 \ No newline at end of file From cfd6029872595459feae51b518ed8e69bd9641a2 Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 24 Jun 2022 14:04:24 +0800 Subject: [PATCH 42/43] =?UTF-8?q?update:=20=E5=88=A4=E6=96=AD=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E4=BD=BF=E7=94=A8=E4=BA=86MQ?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/spiders.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index ab17546..334d731 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -241,11 +241,12 @@ def next_requests(self): mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() # 处理mq并发重复 - if self.server.exists(track_id): # 存在则重复 - mob_log.info(f"spider name: {self.name}, mq repetition, track_id: {track_id}").track_id(track_id).commit() - continue - else: - self.server.set(track_id, "1", ex=60 * 5) + if self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + if self.server.exists(track_id): # 存在则重复 + mob_log.info(f"spider name: {self.name}, mq repetition, track_id: {track_id}").track_id(track_id).commit() + continue + else: + self.server.set(track_id, "1", ex=60 * 5) reqs = self.make_request_from_data(data) if isinstance(reqs, Iterable): From 6cebedbe287b5a3a7351d50dd4e8d17ece93607c Mon Sep 17 00:00:00 2001 From: luzihang123 <526154064@qq.com> Date: Fri, 24 Jun 2022 14:06:13 +0800 Subject: [PATCH 43/43] =?UTF-8?q?update:=20=E7=BC=A9=E7=9F=AD=E8=BF=87?= =?UTF-8?q?=E6=9C=9F=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mob_scrapy_redis_sentinel/spiders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mob_scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py index 334d731..bf3c499 100644 --- a/mob_scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -246,7 +246,7 @@ def next_requests(self): mob_log.info(f"spider name: {self.name}, mq repetition, track_id: {track_id}").track_id(track_id).commit() continue else: - self.server.set(track_id, "1", ex=60 * 5) + self.server.set(track_id, "1", ex=60 * 3) reqs = self.make_request_from_data(data) if isinstance(reqs, Iterable):