From 25421fb15f811c7a64dcc4c2a9c186875158b684 Mon Sep 17 00:00:00 2001 From: Mateo Gonzales Navarrete <38146507+mgonnav@users.noreply.github.com> Date: Tue, 11 Jul 2023 09:18:30 -0500 Subject: [PATCH] Cache stats in Redis while jobs are running (#28) * Add RedisStatsCollector * Add redis to requirements * Require redis in setup.py * Move job update to RUNNING to RedisStatsCollector --- estela_scrapy/extensions.py | 111 +++++++++++++++++++++++------------- estela_scrapy/log.py | 1 - estela_scrapy/settings.py | 3 +- estela_scrapy/utils.py | 25 +++++++- requirements.in | 1 + requirements.txt | 8 ++- setup.py | 1 + 7 files changed, 104 insertions(+), 46 deletions(-) diff --git a/estela_scrapy/extensions.py b/estela_scrapy/extensions.py index ae47bbb..d59ce2c 100644 --- a/estela_scrapy/extensions.py +++ b/estela_scrapy/extensions.py @@ -1,62 +1,46 @@ import json import os -from datetime import timedelta +from datetime import datetime -import requests +import redis from scrapy import signals +from scrapy.exceptions import NotConfigured from scrapy.exporters import PythonItemExporter +from twisted.internet import task from estela_scrapy.utils import json_serializer, producer +from .utils import json_serializer, update_job + RUNNING_STATUS = "RUNNING" COMPLETED_STATUS = "COMPLETED" -class ItemStorageExtension: - def __init__(self, stats): +class BaseExtension: + def __init__(self, stats, *args, **kwargs): self.stats = stats - exporter_kwargs = {"binary": False} - self.exporter = PythonItemExporter(**exporter_kwargs) + self.auth_token = os.getenv("ESTELA_AUTH_TOKEN") job = os.getenv("ESTELA_SPIDER_JOB") host = os.getenv("ESTELA_API_HOST") - self.auth_token = os.getenv("ESTELA_AUTH_TOKEN") self.job_jid, spider_sid, project_pid = job.split(".") self.job_url = "{}/api/projects/{}/spiders/{}/jobs/{}".format( host, project_pid, spider_sid, self.job_jid ) - def spider_opened(self, spider): - self.update_job(status=RUNNING_STATUS) - - def update_job( - self, - status, - lifespan=timedelta(seconds=0), - total_bytes=0, - item_count=0, - request_count=0, - ): - requests.patch( - self.job_url, - data={ - "status": status, - "lifespan": lifespan, - "total_response_bytes": total_bytes, - "item_count": item_count, - "request_count": request_count, - }, - headers={"Authorization": "Token {}".format(self.auth_token)}, - ) + +class ItemStorageExtension(BaseExtension): + def __init__(self, stats): + super().__init__(stats) + exporter_kwargs = {"binary": False} + self.exporter = PythonItemExporter(**exporter_kwargs) @classmethod def from_crawler(cls, crawler): ext = cls(crawler.stats) crawler.signals.connect(ext.item_scraped, signals.item_scraped) - crawler.signals.connect(ext.spider_opened, signals.spider_opened) - crawler.signals.connect(ext.spider_closed, signals.spider_closed) return ext - def item_scraped(self, item): + def item_scraped(self, item, spider): item = self.exporter.export_item(item) data = { "jid": os.getenv("ESTELA_COLLECTION"), @@ -65,19 +49,64 @@ def item_scraped(self, item): } producer.send("job_items", data) + +class RedisStatsCollector(BaseExtension): + def __init__(self, stats): + super().__init__(stats) + + redis_url = os.getenv("REDIS_URL") + if not redis_url: + raise NotConfigured("REDIS_URL not found in the settings") + self.redis_conn = redis.from_url(redis_url) + + self.stats_key = os.getenv("REDIS_STATS_KEY") + self.interval = float(os.getenv("REDIS_STATS_INTERVAL")) + + @classmethod + def from_crawler(cls, crawler): + ext = cls(crawler.stats) + + crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened) + crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed) + + return ext + + def spider_opened(self, spider): + update_job(self.job_url, self.auth_token, status=RUNNING_STATUS) + self.task = task.LoopingCall(self.store_stats, spider) + self.task.start(self.interval) + def spider_closed(self, spider, reason): - spider_stats = self.stats.get_stats() - self.update_job( + if self.task.running: + self.task.stop() + + try: + self.redis_conn.delete(self.stats_key) + except Exception: + pass + + stats = self.stats.get_stats() + update_job( + self.job_url, + self.auth_token, status=COMPLETED_STATUS, - lifespan=spider_stats.get("elapsed_time_seconds", 0), - total_bytes=spider_stats.get("downloader/response_bytes", 0), - item_count=spider_stats.get("item_scraped_count", 0), - request_count=spider_stats.get("downloader/request_count", 0), + lifespan=int(stats.get("elapsed_time_seconds", 0)), + total_bytes=stats.get("downloader/response_bytes", 0), + item_count=stats.get("item_scraped_count", 0), + request_count=stats.get("downloader/request_count", 0), ) - parser_stats = json.dumps(spider_stats, default=json_serializer) + parsed_stats = json.dumps(stats, default=json_serializer) data = { "jid": os.getenv("ESTELA_SPIDER_JOB"), - "payload": json.loads(parser_stats), + "payload": json.loads(parsed_stats), } - producer.send("job_stats", data) + producer.send("job_stats", value=data) + + def store_stats(self, spider): + stats = self.stats.get_stats() + elapsed_time = int((datetime.now() - stats.get("start_time")).total_seconds()) + stats.update({"elapsed_time_seconds": elapsed_time}) + + parsed_stats = json.dumps(stats, default=json_serializer) + self.redis_conn.hmset(self.stats_key, json.loads(parsed_stats)) diff --git a/estela_scrapy/log.py b/estela_scrapy/log.py index 9adeab5..9055717 100644 --- a/estela_scrapy/log.py +++ b/estela_scrapy/log.py @@ -9,7 +9,6 @@ from estela_scrapy.utils import producer, to_standard_str - _stderr = sys.stderr diff --git a/estela_scrapy/settings.py b/estela_scrapy/settings.py index 00a2b34..fdd937c 100644 --- a/estela_scrapy/settings.py +++ b/estela_scrapy/settings.py @@ -29,7 +29,8 @@ def load_default_settings(settings): } spider_middlewares = {} extensions = { - "estela_scrapy.extensions.ItemStorageExtension": 1000, + "estela_scrapy.extensions.ItemStorageExtension": 999, + "estela_scrapy.extensions.RedisStatsCollector": 1000, } settings.get("DOWNLOADER_MIDDLEWARES_BASE").update(downloader_middlewares) settings.get("EXTENSIONS_BASE").update(extensions) diff --git a/estela_scrapy/utils.py b/estela_scrapy/utils.py index 71b6f39..fcf4b2c 100644 --- a/estela_scrapy/utils.py +++ b/estela_scrapy/utils.py @@ -1,5 +1,6 @@ -from datetime import date, datetime +from datetime import date, datetime, timedelta +import requests from estela_queue_adapter import get_producer_interface @@ -26,4 +27,26 @@ def to_standard_str(text, encoding="utf-8", errors="strict"): return text.decode(encoding, errors) +def update_job( + job_url, + auth_token, + status, + lifespan=timedelta(seconds=0), + total_bytes=0, + item_count=0, + request_count=0, +): + requests.patch( + job_url, + data={ + "status": status, + "lifespan": lifespan, + "total_response_bytes": total_bytes, + "item_count": item_count, + "request_count": request_count, + }, + headers={"Authorization": "Token {}".format(auth_token)}, + ) + + producer = get_producer_interface() diff --git a/requirements.in b/requirements.in index f68aeec..b0da8b8 100644 --- a/requirements.in +++ b/requirements.in @@ -1,6 +1,7 @@ Scrapy>=1.0 requests black +redis pytest pytest-env git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/requirements.txt b/requirements.txt index db6993a..d522df5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,13 @@ # -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: # # pip-compile requirements.in # appdirs==1.4.4 # via black +async-timeout==4.0.2 + # via redis attrs==21.2.0 # via # automat @@ -109,6 +111,8 @@ pytest-env==0.8.1 # via -r requirements.in queuelib==1.6.1 # via scrapy +redis==4.6.0 + # via -r requirements.in regex==2021.7.6 # via black requests==2.26.0 diff --git a/setup.py b/setup.py index 4b53cf1..10da2e1 100644 --- a/setup.py +++ b/setup.py @@ -10,6 +10,7 @@ install_requires=[ "Scrapy>=1.0", "requests", + "redis", "estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git" ], entry_points={