diff --git a/.gitignore b/.gitignore index 2e33e3c..e41461d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,8 @@ __pycache__/ *.py[cod] *$py.class +.idea + # C extensions *.so diff --git a/README.md b/README.md index d365482..a18bae7 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,18 @@ ![GitHub last commit](https://img.shields.io/github/last-commit/crawlmap/scrapy-redis-sentinel) ![PyPI - Downloads](https://img.shields.io/pypi/dw/scrapy-redis-sentinel) +本项目基于原项目 [scrapy-redis-sentinel](https://github.com/crawlaio/scrapy-redis-sentinel) + +进行修改,修改内容如下: + +1. 添加了 Redis 哨兵,存在2个密码连接的支持 +2. 支持Python3.8+(collection.abc的引入方式) +3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析 +4. 自动添加 track_id: "make request from data" 和 "get request from next_request " +5. 增加任务防丢: 每次备份上一次任务,启动爬虫时,任务回队列首。`defaults.LATEST_QUEUE_KEY` +6. 增加使用shield进行任务调度: `MQ_USED` +----- + 本项目基于原项目 [scrapy-redis](https://github.com/rmax/scrapy-redis) 进行修改,修改内容如下: @@ -53,6 +65,8 @@ REDIS_SENTINELS = [ ('172.25.2.26', 26379), ('172.25.2.27', 26379) ] +# SENTINEL_KWARGS 非必须参数,可以设置sentinel密码,参考 https://github.com/redis/redis-py/issues/1219 +SENTINEL_KWARGS = {'password': 'sentinel_password'} # REDIS_SENTINEL_PARAMS 哨兵模式配置参数。 REDIS_SENTINEL_PARAMS = { @@ -80,22 +94,22 @@ REDIS_CLUSTER_PARAMS = { # 在 redis 中保持 scrapy-redis 用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理 redis queues SCHEDULER_PERSIST = True # 调度队列 -SCHEDULER = "scrapy_redis_sentinel.scheduler.Scheduler" +SCHEDULER = "mob_scrapy_redis_sentinel.scheduler.Scheduler" # 基础去重 -DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisDupeFilter" +DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter" # BloomFilter -# DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisBloomFilter" +# DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisBloomFilter" # 启用基于 Redis 统计信息 -STATS_CLASS = "scrapy_redis_sentinel.stats.RedisStatsCollector" +STATS_CLASS = "mob_scrapy_redis_sentinel.stats.RedisStatsCollector" # 指定排序爬取地址时使用的队列 # 默认的 按优先级排序( Scrapy 默认),由 sorted set 实现的一种非 FIFO、LIFO 方式。 -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderPriorityQueue' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderPriorityQueue' # 可选的 按先进先出排序(FIFO) -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack' # 可选的 按后进先出排序(LIFO) -# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack' +# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack' ``` > 注:当使用集群时单机不生效 diff --git a/mob_scrapy_redis_sentinel/__init__.py b/mob_scrapy_redis_sentinel/__init__.py new file mode 100644 index 0000000..a95c19f --- /dev/null +++ b/mob_scrapy_redis_sentinel/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +__original_author__ = "Rolando Espinoza" +__author__ = "luzihang" +__email__ = "luzihang@mob.com" +__version__ = "1.0" + +from mob_tools.mobLog import MobLoguru +from mob_tools.inner_ip import get_inner_ip + +inner_ip = get_inner_ip() + +PRODUCTION_ENV_TAG = '10.90' +# 不是以10.90开头的,认为是非生产环境 +if inner_ip.startswith(PRODUCTION_ENV_TAG): + mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es') +else: + mob_log = MobLoguru() + inner_ip = "127.0.0.1" diff --git a/scrapy_redis_sentinel/bloomfilter.py b/mob_scrapy_redis_sentinel/bloomfilter.py similarity index 100% rename from scrapy_redis_sentinel/bloomfilter.py rename to mob_scrapy_redis_sentinel/bloomfilter.py diff --git a/scrapy_redis_sentinel/connection.py b/mob_scrapy_redis_sentinel/connection.py similarity index 94% rename from scrapy_redis_sentinel/connection.py rename to mob_scrapy_redis_sentinel/connection.py index c25f008..78235b5 100644 --- a/scrapy_redis_sentinel/connection.py +++ b/mob_scrapy_redis_sentinel/connection.py @@ -152,6 +152,7 @@ def get_redis_sentinel_from_settings(settings): params.update(settings.getdict("REDIS_SENTINEL_PARAMS")) params.setdefault("sentinels", settings.get("REDIS_SENTINELS")) params.setdefault("socket_timeout", settings.get("REDIS_SENTINELS_SOCKET_TIMEOUT")) + params.setdefault("sentinel_kwargs", settings.get("SENTINEL_KWARGS")) return get_redis_sentinel(**params) @@ -165,7 +166,10 @@ def get_redis_sentinel(**kwargs): redis_sentinel_cls = kwargs.get("redis_cluster_cls", defaults.REDIS_SENTINEL_CLS) sentinels = kwargs.pop("sentinels", None) socket_timeout = kwargs.pop("socket_timeout", 0.5) - redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels, socket_timeout=socket_timeout) + sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) + redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels, + socket_timeout=socket_timeout, + sentinel_kwargs=sentinel_kwargs) redis_cls = redis_sentinel_cls.master_for(**kwargs) return redis_cls diff --git a/mob_scrapy_redis_sentinel/defaults.py b/mob_scrapy_redis_sentinel/defaults.py new file mode 100644 index 0000000..be2f988 --- /dev/null +++ b/mob_scrapy_redis_sentinel/defaults.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import redis +import os +import rediscluster +from redis.sentinel import Sentinel + +from mob_scrapy_redis_sentinel import inner_ip, mob_log + +# For standalone use. +DUPEFILTER_KEY = "dupefilter:%(timestamp)s" + +PIPELINE_KEY = "%(spider)s:items" + +STATS_KEY = '%(spider)s:stats' + +REDIS_CLS = redis.StrictRedis +REDIS_CLUSTER_CLS = rediscluster.RedisCluster +REDIS_SENTINEL_CLS = Sentinel + +REDIS_ENCODING = "utf-8" +# Sane connection defaults. +REDIS_PARAMS = { + "socket_timeout": 30, + "socket_connect_timeout": 30, + "retry_on_timeout": True, + "encoding": REDIS_ENCODING +} + +SCHEDULER_QUEUE_KEY = "%(spider)s:requests" +SCHEDULER_QUEUE_CLASS = "mob_scrapy_redis_sentinel.queue.PriorityQueue" +SCHEDULER_DUPEFILTER_KEY = "%(spider)s:dupefilter" +SCHEDULER_DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter" + +SCHEDULER_PERSIST = False + +START_URLS_KEY = "%(name)s:start_urls" +START_URLS_AS_SET = False +START_URLS_AS_ZSET = False + +# 最近一次队列备份(任务防丢) +""" +spider opened,读取 LATEST_QUEUE_KEY。获取上一次,stop 之前,最后一次的queue data; +每次make request from data,备份一份数据,到 LATEST_QUEUE_KEY。同时删除上一批的备份。(多个worker,删除同一个 LATEST_QUEUE_KEY,如何做到不互相干扰?) +""" +LATEST_QUEUE_KEY = "%(name)s:latest_queue" + +""" +从MQ获取任务 +""" +MQ_USED = False # 默认关闭 + +MQ_HOST = "http://10.89.104.148:10011" +# 创建队列 +CREATE_QUEUE = MQ_HOST + "/rest/ms/GemMQ/createQueue?queueName={queueName}" +# 从指定队列中取出消息 +POP_MESSAGE = MQ_HOST + "/rest/ms/GemMQ/popMessage?queueName={queueName}" +# 获取消息队列的大小 +GET_QUEUE_SIZE = MQ_HOST + "/rest/ms/GemMQ/getQueueSize?queueName={queueName}" + +# 与环境相关的配置 +PRODUCTION_ENV_TAG = '10.90' +# 不是以10.90开头的,认为是非生产环境 +if inner_ip.startswith(PRODUCTION_ENV_TAG): + QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}" +else: + QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}" diff --git a/scrapy_redis_sentinel/dupefilter.py b/mob_scrapy_redis_sentinel/dupefilter.py similarity index 74% rename from scrapy_redis_sentinel/dupefilter.py rename to mob_scrapy_redis_sentinel/dupefilter.py index 4ff51e5..f4e601c 100644 --- a/scrapy_redis_sentinel/dupefilter.py +++ b/mob_scrapy_redis_sentinel/dupefilter.py @@ -2,6 +2,9 @@ import logging import time +import pymongo +from pymongo.errors import DuplicateKeyError + from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint @@ -12,6 +15,74 @@ logger = logging.getLogger(__name__) +class MongoDupeFilter(BaseDupeFilter): + def __init__(self, mongo_uri, db, collection, debug=False, *args, **kwargs): + self.mongo = pymongo.MongoClient(mongo_uri) + self.mongo_db = db + self.collection = collection + self.debug = debug + self.logdupes = True + self.mongo[self.mongo_db][self.collection].create_index("fp", unique=True) + + @classmethod + def from_settings(cls, settings): + mongo_uri = settings.get("MongoFilter_URI") + mongo_db = settings.get("MongoFilter_DB") + collection = defaults.DUPEFILTER_KEY % {"timestamp": int(time.time())} + debug = settings.getbool("DUPEFILTER_DEBUG", False) + return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug) + + @classmethod + def from_crawler(cls, crawler): + return cls.from_settings(crawler.settings) + + def request_seen(self, request): + fp = self.request_fingerprint(request) + # This returns the number of values added, zero if already exists. + doc = self.mongo[self.mongo_db][self.collection].find_one({"fp": fp}) + if doc: + return True + try: + self.mongo[self.mongo_db][self.collection].insert_one({"fp": fp}) + except DuplicateKeyError: + pass + return False + + def request_fingerprint(self, request): + return request_fingerprint(request) + + @classmethod + def from_spider(cls, spider): + settings = spider.settings + mongo_uri = settings.get("MongoFilter_URI") + mongo_db = settings.get("MongoFilter_DB") + dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) + collection = dupefilter_key % {"spider": spider.name} + debug = settings.getbool("DUPEFILTER_DEBUG") + return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug) + + def close(self, reason=""): + self.clear() + + def clear(self): + self.mongo.drop_collection(self.collection) + + def log(self, request, spider): + if self.debug: + msg = "Filtered duplicate request: %(request)s" + self.logger.debug(msg, {"request": request}, extra={"spider": spider}) + elif self.logdupes: + msg = ( + "Filtered duplicate request %(request)s" + " - no more duplicates will be shown" + " (see DUPEFILTER_DEBUG to show all duplicates)" + ) + self.logger.debug(msg, {"request": request}, extra={"spider": spider}) + self.logdupes = False + + spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider) + + # TODO: Rename class to RedisDupeFilter. class RedisDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. @@ -159,6 +230,8 @@ def log(self, request, spider): self.logger.debug(msg, {"request": request}, extra={"spider": spider}) self.logdupes = False + spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider) + class RedisBloomFilter(BaseDupeFilter): """Redis-based request duplicates filter. @@ -195,7 +268,7 @@ def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:``. When using the - ``scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as + ``mob_scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters diff --git a/scrapy_redis_sentinel/picklecompat.py b/mob_scrapy_redis_sentinel/picklecompat.py similarity index 100% rename from scrapy_redis_sentinel/picklecompat.py rename to mob_scrapy_redis_sentinel/picklecompat.py diff --git a/scrapy_redis_sentinel/pipelines.py b/mob_scrapy_redis_sentinel/pipelines.py similarity index 100% rename from scrapy_redis_sentinel/pipelines.py rename to mob_scrapy_redis_sentinel/pipelines.py diff --git a/scrapy_redis_sentinel/queue.py b/mob_scrapy_redis_sentinel/queue.py similarity index 100% rename from scrapy_redis_sentinel/queue.py rename to mob_scrapy_redis_sentinel/queue.py diff --git a/scrapy_redis_sentinel/scheduler.py b/mob_scrapy_redis_sentinel/scheduler.py similarity index 89% rename from scrapy_redis_sentinel/scheduler.py rename to mob_scrapy_redis_sentinel/scheduler.py index c351306..587fb53 100644 --- a/scrapy_redis_sentinel/scheduler.py +++ b/mob_scrapy_redis_sentinel/scheduler.py @@ -6,6 +6,9 @@ from . import connection, defaults +from mob_scrapy_redis_sentinel import mob_log +from mob_scrapy_redis_sentinel.utils import get_track_id + # TODO: add SCRAPY_JOB support. class Scheduler(object): @@ -33,16 +36,16 @@ class Scheduler(object): """ def __init__( - self, - server, - persist=False, - flush_on_start=False, - queue_key=defaults.SCHEDULER_QUEUE_KEY, - queue_cls=defaults.SCHEDULER_QUEUE_CLASS, - dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, - dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, - idle_before_close=0, - serializer=None + self, + server, + persist=False, + flush_on_start=False, + queue_key=defaults.SCHEDULER_QUEUE_KEY, + queue_cls=defaults.SCHEDULER_QUEUE_CLASS, + dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, + dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, + idle_before_close=0, + serializer=None ): """Initialize scheduler. @@ -174,7 +177,9 @@ def enqueue_request(self, request): def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) + if request and self.stats: + mob_log.info(f"get request from next_request: spider name: {self.spider.name}, {request.__dict__}").track_id(get_track_id(request)).commit() self.stats.inc_value("scheduler/dequeued/redis", spider=self.spider) return request diff --git a/scrapy_redis_sentinel/spiders.py b/mob_scrapy_redis_sentinel/spiders.py similarity index 55% rename from scrapy_redis_sentinel/spiders.py rename to mob_scrapy_redis_sentinel/spiders.py index 9d3ee63..bf3c499 100644 --- a/scrapy_redis_sentinel/spiders.py +++ b/mob_scrapy_redis_sentinel/spiders.py @@ -1,6 +1,10 @@ # -*- coding: utf-8 -*- import time -from collections import Iterable + +try: + from collections import Iterable +except: + from collections.abc import Iterable from scrapy import signals from scrapy.exceptions import DontCloseSpider @@ -9,11 +13,21 @@ from . import connection, defaults from .utils import bytes_to_str +from mob_scrapy_redis_sentinel import mob_log +import json +from mob_scrapy_redis_sentinel.utils import make_md5 +from mob_scrapy_redis_sentinel import inner_ip + +import requests +import base64 +import traceback + class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" - + queue_name = None # mob rocket mq name redis_key = None + latest_queue = None redis_batch_size = None redis_encoding = None @@ -51,9 +65,18 @@ def setup_redis(self, crawler=None): self.redis_key = self.redis_key % {"name": self.name} + if settings.getbool("MQ_USED", defaults.MQ_USED): # 使用了mq, 区分和生产队列名称 + self.redis_key = self.name + self.queue_name = defaults.QUEUE_NAME_PREFIX.format(self.name) + self.logger.info(f"mq queue_name: {self.queue_name}, redis_key: {self.redis_key}") + if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") + if self.latest_queue is None: + self.latest_queue = settings.get("LATEST_QUEUE_KEY", defaults.LATEST_QUEUE_KEY) + self.latest_queue = self.latest_queue % {"name": self.name} + if self.redis_batch_size is None: # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE). self.redis_batch_size = settings.getint( @@ -82,14 +105,27 @@ def setup_redis(self, crawler=None): elif self.settings.getbool("REDIS_START_URLS_AS_ZSET", defaults.START_URLS_AS_ZSET): self.fetch_data = self.pop_priority_queue self.count_size = self.server.zcard + elif self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + self.fetch_data = self.pop_batch_mq + self.count_size = self.get_queue_size + # 爬虫启动时,检查队列是否存在,不存在则创建 + crawler.signals.connect(self.check_queue, signal=signals.spider_opened) else: self.fetch_data = self.pop_list_queue self.count_size = self.server.llen + # 爬虫启动时,会先从备份队列,取出任务 + crawler.signals.connect(self.spider_opened_latest_pop, signal=signals.spider_opened) + # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) + def check_queue(self): + if not self.get_queue_size(self.queue_name): + mob_log.warning(f"{self.queue_name} queue is not exist, now create it") + self.create_queue(self.queue_name) + def pop_list_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.lrange(redis_key, 0, batch_size - 1) @@ -97,6 +133,57 @@ def pop_list_queue(self, redis_key, batch_size): datas, _ = pipe.execute() return datas + def get_queue_size(self, redis_key): + try: + r = requests.get(defaults.GET_QUEUE_SIZE.format(queueName=self.queue_name), timeout=5) + return int(r.json()['data']['queueSize']) + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, get mq queue size error: {traceback.format_exc()}").track_id("").commit() + + def pop_batch_mq(self, redis_key, batch_size): + datas = [] + for i in range(batch_size): + queue_data = self.pop_mq(self.queue_name) + if queue_data: + datas.append(queue_data) + return datas + + def pop_mq(self, queue_name): + try: + r = requests.get(defaults.POP_MESSAGE.format(queueName=queue_name), timeout=5) + resp = r.json() + if resp.get("error_code") == 0 and resp.get("data"): + message = resp["data"]["message"] + queue_data = base64.b64decode(message) + return queue_data + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, pop mq error: {traceback.format_exc()}").track_id("").commit() + + def create_queue(self, queue_name): + try: + r = requests.get(defaults.CREATE_QUEUE.format(queueName=queue_name), timeout=5) + return r.json() + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, create mq error: {traceback.format_exc()}").track_id("").commit() + + def send_message2mq(self, queue_name, queue_data, priority=0, delay_seconds=""): + """ + 发送消息到指定队列 + """ + try: + message = (base64.b64encode(queue_data.encode())).decode() + form_data = { + "message": message, + "queueName": queue_name, + "priority": priority, + "delaySeconds": delay_seconds + } + r = requests.post(f"{defaults.MQ_HOST}/rest/ms/GemMQ/sendMessage", json=form_data) + resp = r.json() + mob_log.info(f"spider name: {self.name}, inner ip: {inner_ip}, send message to mq success, resp: {resp}").track_id("").commit() + except: + mob_log.error(f"spider name: {self.name}, inner ip: {inner_ip}, send message to mq error: {traceback.format_exc()}").track_id("").commit() + def pop_priority_queue(self, redis_key, batch_size): with self.server.pipeline() as pipe: pipe.zrevrange(redis_key, 0, batch_size - 1) @@ -104,12 +191,63 @@ def pop_priority_queue(self, redis_key, batch_size): datas, _ = pipe.execute() return datas + def latest_queue_mark(self, datas): + """备份队列 list or hash""" + # 1、删除上一次(多个worker,如何保证删除的一致性) + # self.server.delete(self.latest_queue) + self.server.hdel(self.latest_queue, inner_ip) + # 2、 存入 + # with self.server.pipeline() as pipe: + # for data in datas: + # pipe.rpush(self.latest_queue, data) + # pipe.execute() + latest_datas = [] + for data in datas: + latest_datas.append(bytes_to_str(data)) + mob_log.info(f"spider name: {self.name}, latest_queue_mark, inner_ip: {inner_ip}, latest_datas: {latest_datas}").track_id("").commit() + self.server.hset(self.latest_queue, inner_ip, latest_datas) + + def spider_opened_latest_pop(self): + """绑定spider open信号; 取出 stop spider前,最后1次datas""" + mob_log.info(f"spider name: {self.name}, spider_opened_latest_pop, inner_ip: {inner_ip}").track_id("").commit() + if self.server.hexists(self.latest_queue, inner_ip): + latest_datas = self.server.hget(self.latest_queue, inner_ip) + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, latest_datas: {bytes_to_str(latest_datas)}").track_id("").commit() + self.server.hdel(self.latest_queue, inner_ip) + for data in eval(bytes_to_str(latest_datas)): + mob_log.info(f"spider name: {self.name}, latest task back to queue, inner_ip: {inner_ip}, data: {data}").track_id("").commit() + if self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + self.send_message2mq(queue_name=self.queue_name, queue_data=str(data), priority=1) + else: # 使用reids + self.server.lpush(self.redis_key, json.dumps(data, ensure_ascii=False)) + + # if self.count_size(self.latest_queue) == 0: + # return + # datas = self.fetch_data(self.latest_queue, self.redis_batch_size) + def next_requests(self): """Returns a request to be scheduled or none.""" # XXX: Do we need to use a timeout here? found = 0 datas = self.fetch_data(self.redis_key, self.redis_batch_size) + self.latest_queue_mark(datas) for data in datas: + # 日志加入track_id + try: + queue_data = json.loads(data) + except: + queue_data = bytes_to_str(data) + track_id = make_md5(queue_data) + mob_log.info(f"spider name: {self.name}, make request from data, queue_data: {queue_data}").track_id(track_id).commit() + + # 处理mq并发重复 + if self.settings.getbool("MQ_USED", defaults.MQ_USED): # 使用MQ + if self.server.exists(track_id): # 存在则重复 + mob_log.info(f"spider name: {self.name}, mq repetition, track_id: {track_id}").track_id(track_id).commit() + continue + else: + self.server.set(track_id, "1", ex=60 * 3) + reqs = self.make_request_from_data(data) if isinstance(reqs, Iterable): for req in reqs: diff --git a/scrapy_redis_sentinel/stats.py b/mob_scrapy_redis_sentinel/stats.py similarity index 100% rename from scrapy_redis_sentinel/stats.py rename to mob_scrapy_redis_sentinel/stats.py diff --git a/mob_scrapy_redis_sentinel/utils.py b/mob_scrapy_redis_sentinel/utils.py new file mode 100644 index 0000000..1b1de3a --- /dev/null +++ b/mob_scrapy_redis_sentinel/utils.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +import six +from hashlib import md5 + + +def bytes_to_str(s, encoding="utf-8"): + """Returns a str if a bytes object is given.""" + if six.PY3 and isinstance(s, bytes): + return s.decode(encoding) + return s + + +def make_md5(text): + """ + make text to md5 + """ + return md5(str(text).encode('utf-8')).hexdigest() + + +def get_track_id(request): + track_id = '' + try: + track_id = request.meta.get("track_id") + except Exception: + pass + return track_id diff --git a/requirements.txt b/requirements.txt index ab6874f..5c34758 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ redis==3.5.3 redis-py-cluster==2.1.3 -Scrapy \ No newline at end of file +Scrapy +mob-tools==0.0.17 \ No newline at end of file diff --git a/scrapy_redis_sentinel/__init__.py b/scrapy_redis_sentinel/__init__.py deleted file mode 100644 index cf1d558..0000000 --- a/scrapy_redis_sentinel/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# -*- coding: utf-8 -*- - -__original_author__ = "Rolando Espinoza" -__author__ = "Shi Tao" -__email__ = "shitao0418@gmail.com" -__version__ = "0.7.2" diff --git a/scrapy_redis_sentinel/defaults.py b/scrapy_redis_sentinel/defaults.py deleted file mode 100644 index ea7863e..0000000 --- a/scrapy_redis_sentinel/defaults.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- coding: utf-8 -*- -import redis - -import rediscluster -from redis.sentinel import Sentinel - -DUPEFILTER_KEY = "dupefilter:%(timestamp)s" - -PIPELINE_KEY = "%(spider)s:items" - -STATS_KEY = '%(spider)s:stats' - -REDIS_CLS = redis.StrictRedis -REDIS_CLUSTER_CLS = rediscluster.RedisCluster -REDIS_SENTINEL_CLS = Sentinel - -REDIS_ENCODING = "utf-8" -# Sane connection defaults. -REDIS_PARAMS = { - "socket_timeout": 30, - "socket_connect_timeout": 30, - "retry_on_timeout": True, - "encoding": REDIS_ENCODING -} - -SCHEDULER_QUEUE_KEY = "%(spider)s:requests" -SCHEDULER_QUEUE_CLASS = "scrapy_redis_sentinel.queue.PriorityQueue" -SCHEDULER_DUPEFILTER_KEY = "%(spider)s:dupefilter" -SCHEDULER_DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisDupeFilter" - -SCHEDULER_PERSIST = False - -START_URLS_KEY = "%(name)s:start_urls" -START_URLS_AS_SET = False -START_URLS_AS_ZSET = False diff --git a/scrapy_redis_sentinel/utils.py b/scrapy_redis_sentinel/utils.py deleted file mode 100644 index e3b830f..0000000 --- a/scrapy_redis_sentinel/utils.py +++ /dev/null @@ -1,9 +0,0 @@ -# -*- coding: utf-8 -*- -import six - - -def bytes_to_str(s, encoding="utf-8"): - """Returns a str if a bytes object is given.""" - if six.PY3 and isinstance(s, bytes): - return s.decode(encoding) - return s diff --git a/setup.py b/setup.py index 4689138..9fddb81 100644 --- a/setup.py +++ b/setup.py @@ -7,13 +7,13 @@ from setuptools import Command, find_packages, setup -NAME = "scrapy-redis-sentinel" -FOLDER = "scrapy_redis_sentinel" +NAME = "mob-scrapy-redis-sentinel" +FOLDER = "mob_scrapy_redis_sentinel" DESCRIPTION = "Redis Cluster for Scrapy." -EMAIL = "shitao0418@gmail.com" -AUTHOR = "Shi Tao" +EMAIL = "luzhang@mob.com" +AUTHOR = "luzihang" REQUIRES_PYTHON = ">=3.6.0" -VERSION = None +VERSION = 1.0 def read_file(filename):