diff --git a/README.md b/README.md index db1227a..490b04d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Estela Entrypoint +# estela Entrypoint [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![version](https://img.shields.io/badge/version-0.1-blue)](https://github.com/bitmakerla/estela-entrypoint) @@ -34,8 +34,11 @@ Job specifications are passed through env variables: - [Required] _api_host_: API host URL. - [Optional] _args_: Dictionary with job arguments. - [Required] _collection_: String with name of collection where items will be stored. -- `KAFKA_ADVERTISED_LISTENERS`: List of advertised hosts in a comma-separated style. -- `KAFKA_ADVERTISED_PORT`: Default value: _9092_. + - [Optional] _unique_: String, `"True"` if the data will be stored in a unique collection, `"False"` otherwise. Required only for cronjobs. +- `QUEUE_PLATFORM`: The queue platform used by estela, review the list of the current + [supported platforms](https://estela.bitmaker.la/docs/estela/queueing.html#supported-platforms). +- `QUEUE_PLATFORM_{PARAMETERS}`: Please, refer to the `estela-queue-adapter` + [documentation](https://estela.bitmaker.la/docs/estela/queueing.html#estela-queue-adapter) to declare the needed variables. ## Testing diff --git a/estela_scrapy/__main__.py b/estela_scrapy/__main__.py index 4903d53..93a561d 100644 --- a/estela_scrapy/__main__.py +++ b/estela_scrapy/__main__.py @@ -1,38 +1,37 @@ +import logging import os import sys -import logging + +from estela_scrapy.env import decode_job, get_args_and_env +from estela_scrapy.log import init_logging +from estela_scrapy.settings import populate_settings def run_scrapy(argv, settings): from scrapy.cmdline import execute - # an intermediate function might be needed for other commands [!] missing sys.argv = argv execute(settings=settings) def run_code(args, log_handler=None, commands_module=None): try: - from estela_scrapy.settings import populate_settings - - # API data might be sent [!] missing settings = populate_settings() if commands_module: settings.set("COMMANDS_MODULE", commands_module, priority="cmdline") if log_handler is not None: log_handler.setLevel(settings["LOG_LEVEL"]) except Exception: - logging.exception("Settings initialization failed") + logging.exception("Settings initialization failed.") raise try: run_scrapy(args, settings) - except Exception: - logging.exception("Job runtime exception") + except Exception as ex: + logging.exception(f"Job runtime exception: {str(ex)}") raise def describe_project(): - """Describe scrapy project.""" from estela_scrapy.env import setup_scrapy_conf setup_scrapy_conf() @@ -45,34 +44,42 @@ def describe_project(): def setup_and_launch(): try: - from estela_scrapy.env import decode_job, get_args_and_env, setup_scrapy_conf - job = decode_job() assert job, "JOB_INFO must be set" args, env = get_args_and_env(job) + from estela_scrapy.env import setup_scrapy_conf + os.environ.update(env) setup_scrapy_conf() - from estela_scrapy.log import init_logging - loghdlr = init_logging() - except: - logging.exception("Environment variables were not defined properly") + except Exception: + logging.exception("Environment variables were not defined properly.") raise run_code(args, loghdlr) def main(): - """Start the crawling process.""" + from estela_scrapy.utils import producer + try: + if producer.get_connection(): + logging.debug("Successful connection to the queue platform.") + else: + raise Exception("Could not connect to the queue platform.") setup_and_launch() + code = 0 except SystemExit as ex: - return ex.code + code = ex.code except: - return 1 - return 0 + code = 1 + finally: + producer.flush() + producer.close() + + return code if __name__ == "__main__": diff --git a/estela_scrapy/commands/describe_project.py b/estela_scrapy/commands/describe_project.py index 4168f9b..8f2e148 100644 --- a/estela_scrapy/commands/describe_project.py +++ b/estela_scrapy/commands/describe_project.py @@ -1,7 +1,7 @@ import json import subprocess -from pkg_resources import parse_version +from pkg_resources import parse_version from scrapy import __version__ from scrapy.commands import ScrapyCommand diff --git a/estela_scrapy/env.py b/estela_scrapy/env.py index aad7fb1..404ed27 100644 --- a/estela_scrapy/env.py +++ b/estela_scrapy/env.py @@ -1,5 +1,5 @@ -import os import json +import os def decode_job(): @@ -20,7 +20,6 @@ def get_api_args(args_dict): def get_args_and_env(msg): args = ["scrapy", "crawl", str(msg["spider"])] args += get_api_args(msg.get("args", {})) - # consider API settings [!] missing env = { "ESTELA_SPIDER_JOB": msg["key"], "ESTELA_SPIDER_NAME": msg["spider"], diff --git a/estela_scrapy/extensions.py b/estela_scrapy/extensions.py index 23238b3..a05bd19 100644 --- a/estela_scrapy/extensions.py +++ b/estela_scrapy/extensions.py @@ -1,13 +1,13 @@ import json import os -from datetime import datetime, timedelta +from datetime import timedelta import requests from scrapy import signals from scrapy.exporters import PythonItemExporter -from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error -from estela_scrapy.utils import datetime_to_json +from estela_scrapy.utils import producer + RUNNING_STATUS = "RUNNING" COMPLETED_STATUS = "COMPLETED" @@ -18,7 +18,6 @@ class ItemStorageExtension: def __init__(self, stats): self.stats = stats - self.producer = connect_kafka_producer() exporter_kwargs = {"binary": False} self.exporter = PythonItemExporter(**exporter_kwargs) job = os.getenv("ESTELA_SPIDER_JOB") @@ -67,7 +66,7 @@ def item_scraped(self, item): "payload": dict(item), "unique": os.getenv("ESTELA_UNIQUE_COLLECTION"), } - self.producer.send("job_items", value=data).add_errback(on_kafka_send_error) + producer.send("job_items", data) def spider_closed(self, spider, reason): spider_stats = self.stats.get_stats() @@ -84,5 +83,4 @@ def spider_closed(self, spider, reason): "jid": os.getenv("ESTELA_SPIDER_JOB"), "payload": json.loads(parser_stats), } - self.producer.send("job_stats", value=data).add_errback(on_kafka_send_error) - self.producer.flush() + producer.send("job_stats", data) diff --git a/estela_scrapy/log.py b/estela_scrapy/log.py index f974ad2..f533508 100644 --- a/estela_scrapy/log.py +++ b/estela_scrapy/log.py @@ -1,24 +1,24 @@ import logging -import sys import os -import warnings +import sys import time +import warnings +from estela_queue_adapter import queue_noisy_libraries from twisted.python import log as txlog -from scrapy import __version__ -from estela_scrapy.utils import to_standar_str -from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error + +from estela_scrapy.utils import producer, to_standar_str + _stderr = sys.stderr def _logfn(level, message, parent="none"): - producer = connect_kafka_producer() data = { "jid": os.getenv("ESTELA_SPIDER_JOB"), "payload": {"log": str(message), "datetime": float(time.time())}, } - response = producer.send("job_logs", value=data) + producer.send("job_logs", data) def init_logging(): @@ -35,7 +35,7 @@ def init_logging(): # Silence commonly used noisy libraries nh = logging.NullHandler() - for ln in ("boto", "requests", "kafka.conn"): + for ln in ["requests"] + queue_noisy_libraries: lg = logging.getLogger(ln) lg.propagate = 0 lg.addHandler(nh) diff --git a/estela_scrapy/middlewares.py b/estela_scrapy/middlewares.py index 0c84ffa..536560f 100644 --- a/estela_scrapy/middlewares.py +++ b/estela_scrapy/middlewares.py @@ -1,14 +1,11 @@ import os -from estela_scrapy.utils import parse_time -from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error from scrapy.utils.request import request_fingerprint +from estela_scrapy.utils import parse_time, producer -class StorageDownloaderMiddleware: - def __init__(self): - self.producer = connect_kafka_producer() +class StorageDownloaderMiddleware: def process_response(self, request, response, spider): data = { "jid": os.getenv("ESTELA_SPIDER_JOB"), @@ -22,6 +19,5 @@ def process_response(self, request, response, spider): "fingerprint": request_fingerprint(request), }, } - self.producer.send("job_requests", value=data).add_errback(on_kafka_send_error) - # process parent request [!] missing + producer.send("job_requests", data) return response diff --git a/estela_scrapy/producer.py b/estela_scrapy/producer.py deleted file mode 100644 index 483ee90..0000000 --- a/estela_scrapy/producer.py +++ /dev/null @@ -1,31 +0,0 @@ -import os -import json -import logging - -from kafka import KafkaProducer - - -def connect_kafka_producer(): - _producer = None - kafka_advertised_port = os.getenv("KAFKA_ADVERTISED_PORT", "9092") - kafka_advertised_listeners = os.getenv("KAFKA_ADVERTISED_LISTENERS").split(",") - bootstrap_servers = [ - "{}:{}".format(kafka_advertised_listener, kafka_advertised_port) - for kafka_advertised_listener in kafka_advertised_listeners - ] - try: - _producer = KafkaProducer( - bootstrap_servers=bootstrap_servers, - value_serializer=lambda x: json.dumps(x).encode("utf-8"), - api_version=(0, 10), - acks=1, - retries=1, - ) - except Exception as ex: - logging.error("Exception while connecting Kafka: {}".format(str(ex))) - finally: - return _producer - - -def on_kafka_send_error(excp): - logging.error(str(excp)) diff --git a/estela_scrapy/settings.py b/estela_scrapy/settings.py index 59ac868..00a2b34 100644 --- a/estela_scrapy/settings.py +++ b/estela_scrapy/settings.py @@ -24,7 +24,6 @@ def update_deprecated_classpaths(settings): def load_default_settings(settings): - # Load the default ESTELA-APP settings downloader_middlewares = { "estela_scrapy.middlewares.StorageDownloaderMiddleware": 1000, } @@ -35,8 +34,6 @@ def load_default_settings(settings): settings.get("DOWNLOADER_MIDDLEWARES_BASE").update(downloader_middlewares) settings.get("EXTENSIONS_BASE").update(extensions) settings.get("SPIDER_MIDDLEWARES_BASE").update(spider_middlewares) - # memory_limit [!] missing - # set other default settings with max priority settings.setdict({"LOG_LEVEL": "INFO"}, priority="cmdline") settings.setdict({"LOG_ENABLED": False}, priority="cmdline") @@ -45,10 +42,5 @@ def populate_settings(): assert "scrapy.conf" not in sys.modules, "Scrapy settings already loaded" settings = get_project_settings().copy() update_deprecated_classpaths(settings) - # consider use special class if there're problems with AWS and encoding [!] missing - # consider API settings [!] missing - # https://shub.readthedocs.io/en/stable/custom-images-contract.html#shub-settings load_default_settings(settings) - # load and merge API settings according to priority [job > spider > organization > project] - # afeter merging, somre enforcement might be done [!] missing return settings diff --git a/estela_scrapy/utils.py b/estela_scrapy/utils.py index 3b0f012..96c6d7a 100644 --- a/estela_scrapy/utils.py +++ b/estela_scrapy/utils.py @@ -1,5 +1,7 @@ from datetime import datetime +from estela_queue_adapter import get_producer_interface + def parse_time(date=None): if date is None: @@ -20,3 +22,6 @@ def to_standar_str(text, encoding="utf-8", errors="strict"): if not isinstance(text, bytes): raise TypeError("Unable to standardize {} type".format(type(text).__name__)) return text.decode(encoding, errors) + + +producer = get_producer_interface() diff --git a/estela_scrapy/writer.py b/estela_scrapy/writer.py deleted file mode 100644 index 2194870..0000000 --- a/estela_scrapy/writer.py +++ /dev/null @@ -1,57 +0,0 @@ -import os -import json -import threading - -from estela_scrapy.utils import parse_time - - -class PipeWriter: - def __init__(self, fifo_path): - self.path = fifo_path - self.lock = threading.Lock() - self.pipe = None - - def open(self): - try: - with self.lock: - self.pipe = open(self.path, "wb") - except: - raise RuntimeError - - def close(self): - with self.lock: - self.pipe.close() - - def write(self, command, payload): - command = command.encode("utf-8") - payload = json.dumps( - payload, - separators=(",", ":"), - ).encode("utf-8") - with self.lock: - self.pipe.write(command) - self.pipe.write(b" ") - self.pipe.write(payload) - self.pipe.write(b"\n") - self.pipe.flush() - - def write_item(self, item): - self.write("ITM", item) - - def write_request(self, url, status, fp, duration, method, rsize): - req = { - "url": url, - "status": int(status), - "method": method, - "duration": int(duration), - "time": parse_time(), - "response_size": int(rsize), - "fingerprint": fp, - } - self.write("REQ", req) - - def write_fin(self, reason): - self.write("FIN", {"finish_reason": reason}) - - -pipe_writer = PipeWriter(os.environ.get("FIFO_PATH", "")) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..829534c --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +env = + QUEUE_PLATFORM=kafka + QUEUE_PLATFORM_LISTENERS=localhost + QUEUE_PLATFORM_PORT=9092 \ No newline at end of file diff --git a/requirements.in b/requirements.in index c46e8a9..f68aeec 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,6 @@ Scrapy>=1.0 -kafka-python requests black pytest +pytest-env +git+https://github.com/bitmakerla/estela-queue-adapter.git diff --git a/requirements.txt b/requirements.txt index 56035f2..db6993a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.6 +# This file is autogenerated by pip-compile with python 3.9 # To update, run: # # pip-compile requirements.in @@ -35,8 +35,10 @@ cssselect==1.1.0 # via # parsel # scrapy -dataclasses==0.8 - # via black +estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git + # via -r requirements.in +exceptiongroup==1.1.0 + # via pytest h2==3.2.0 # via # scrapy @@ -51,11 +53,6 @@ idna==3.2 # via # hyperlink # requests -importlib-metadata==4.6.1 - # via - # click - # pluggy - # pytest incremental==21.3.0 # via twisted iniconfig==1.1.1 @@ -69,7 +66,7 @@ itemloaders==1.0.4 jmespath==0.10.0 # via itemloaders kafka-python==2.0.2 - # via -r requirements.in + # via estela-queue-adapter lxml==4.6.3 # via # parsel @@ -90,8 +87,6 @@ priority==1.3.0 # via twisted protego==0.1.16 # via scrapy -py==1.10.0 - # via pytest pyasn1==0.4.8 # via # pyasn1-modules @@ -106,7 +101,11 @@ pyopenssl==20.0.1 # via scrapy pyparsing==2.4.7 # via packaging -pytest==6.2.4 +pytest==7.2.1 + # via + # -r requirements.in + # pytest-env +pytest-env==0.8.1 # via -r requirements.in queuelib==1.6.1 # via scrapy @@ -126,18 +125,12 @@ six==1.16.0 # pyopenssl # service-identity # w3lib -toml==0.10.2 - # via pytest tomli==1.0.4 - # via black -twisted[http2]==21.2.0 - # via scrapy -typed-ast==1.4.3 - # via black -typing-extensions==3.10.0.0 # via # black - # importlib-metadata + # pytest +twisted[http2]==21.2.0 + # via scrapy urllib3==1.26.6 # via requests w3lib==1.22.0 @@ -145,9 +138,7 @@ w3lib==1.22.0 # itemloaders # parsel # scrapy -zipp==3.5.0 - # via importlib-metadata -zope.interface==5.4.0 +zope-interface==5.4.0 # via # scrapy # twisted diff --git a/setup.py b/setup.py index 8960283..4b53cf1 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( name="estela-entrypoint", @@ -9,8 +9,8 @@ packages=find_packages(), install_requires=[ "Scrapy>=1.0", - "kafka-python", "requests", + "estela-queue-adapter @ git+https://github.com/bitmakerla/estela-queue-adapter.git" ], entry_points={ "console_scripts": [ diff --git a/tests/test_env.py b/tests/test_env.py index 332b2b4..c084c79 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -1,10 +1,12 @@ import os from unittest import mock -from estela_scrapy.env import decode_job -from estela_scrapy.env import get_api_args -from estela_scrapy.env import get_args_and_env -from estela_scrapy.env import setup_scrapy_conf +from estela_scrapy.env import ( + decode_job, + get_api_args, + get_args_and_env, + setup_scrapy_conf, +) @mock.patch.dict(os.environ, {"JOB_INFO": '{"key": "value"}'}) diff --git a/tests/test_main.py b/tests/test_main.py index 4d642fb..570bc8b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,15 +1,25 @@ +import json import os import sys from unittest import mock -from estela_scrapy.__main__ import run_scrapy -from estela_scrapy.__main__ import run_code -from estela_scrapy.__main__ import describe_project -from estela_scrapy.__main__ import setup_and_launch -from estela_scrapy.__main__ import main +from estela_scrapy.__main__ import ( + describe_project, + main, + run_code, + run_scrapy, + setup_and_launch, +) - -JOB_INFO = '{"spider": "sample", "key": "6-6-6", "api_host": "http://estela-api.com", "auth_token": "", "collection": "sj-1-2", "unique": "True"}' +job_info_dict = { + "key": "6-6-6", + "spider": "sample", + "auth_token": "", + "api_host": "http://estela-api.com", + "collection": "sj-1-2", + "unique": "True", +} +JOB_ENV = {"JOB_INFO": json.dumps(job_info_dict)} @mock.patch("scrapy.cmdline.execute") @@ -29,29 +39,31 @@ def test_run_code(mock_run_scrapy): @mock.patch("estela_scrapy.__main__.run_scrapy") def test_run_code_commands_module(mock_run_scrapy): - run_code(["execution", "args"], "commands_module") + run_code(["execution", "args"], commands_module="commands_module") settings = mock_run_scrapy.call_args[0][1] assert settings["COMMANDS_MODULE"] == "commands_module" -@mock.patch.dict(os.environ, {"JOB_INFO": JOB_INFO}) +@mock.patch.dict(os.environ, JOB_ENV) @mock.patch("estela_scrapy.env.setup_scrapy_conf") @mock.patch("estela_scrapy.__main__.run_code") def test_setup_and_launch(mock_run_code, mock_setup_scrapy_conf): setup_and_launch() assert mock_run_code.called - assert mock_setup_scrapy_conf.called expected_env = { "ESTELA_SPIDER_JOB": "6-6-6", "ESTELA_SPIDER_NAME": "sample", "ESTELA_API_HOST": "http://estela-api.com", "ESTELA_AUTH_TOKEN": "", + "ESTELA_COLLECTION": "sj-1-2", + "ESTELA_UNIQUE_COLLECTION": "True", } expected_args = ["scrapy", "crawl", "sample"] run_code_args = mock_run_code.call_args[0] for key, value in expected_env.items(): assert os.environ.get(key) == value assert run_code_args[0] == expected_args + assert mock_setup_scrapy_conf.called @mock.patch("estela_scrapy.env.setup_scrapy_conf") @@ -62,12 +74,19 @@ def test_describe_project(mock_run_code, mock_setup_scrapy_conf): assert mock_run_code.called assert mock_setup_scrapy_conf.called expected_args = ["scrapy", "describe_project"] - run_args = mock_run_code.call_args[0] + run_args, run_kwargs = mock_run_code.call_args assert run_args[0] == expected_args - assert run_args[1] == "estela_scrapy.commands" + assert run_kwargs["commands_module"] == "estela_scrapy.commands" +@mock.patch("estela_scrapy.utils.producer.get_connection", return_value=True) @mock.patch("estela_scrapy.__main__.setup_and_launch") -def test_main(mock_setup_and_launch): - main() +@mock.patch("estela_scrapy.utils.producer.flush") +@mock.patch("estela_scrapy.utils.producer.close") +def test_main(mock_get_conn, mock_close, mock_flush, mock_setup_and_launch): + exit_code = main() + assert mock_get_conn.called assert mock_setup_and_launch.called + assert mock_flush.called + assert mock_close.called + assert exit_code == 0 diff --git a/tests/test_settings.py b/tests/test_settings.py index 13da0e8..6aba68f 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -1,9 +1,12 @@ from unittest import mock from scrapy.settings import Settings -from estela_scrapy.settings import update_deprecated_classpaths -from estela_scrapy.settings import load_default_settings -from estela_scrapy.settings import populate_settings + +from estela_scrapy.settings import ( + load_default_settings, + populate_settings, + update_deprecated_classpaths, +) @mock.patch("estela_scrapy.settings.update_deprecated_classpaths") diff --git a/tests/test_writer.py b/tests/test_writer.py deleted file mode 100644 index e4eb5dc..0000000 --- a/tests/test_writer.py +++ /dev/null @@ -1,75 +0,0 @@ -import json -import os -import pytest -import threading - -from queue import Queue -from estela_scrapy.writer import PipeWriter - - -@pytest.fixture -def fifo(tmpdir): - path = os.path.join(str(tmpdir.mkdir("estela")), "temp.fifo") - os.mkfifo(path) - return path - - -@pytest.fixture -def queue(): - return Queue() - - -@pytest.fixture -def reader(fifo, queue): - def read_from_fifo(): - with open(fifo) as f: - for line in iter(f.readline, ""): - queue.put(line) - - reader_thread = threading.Thread(target=read_from_fifo) - reader_thread.start() - try: - yield reader_thread - finally: - reader_thread.join(timeout=1) - - -@pytest.fixture -def writer(fifo, reader): - w = PipeWriter(fifo) - w.open() - try: - yield w - finally: - w.close() - - -def parse_line(msg): - assert msg.endswith("\n") - cmd, _, payload = msg.strip().partition(" ") - return cmd, json.loads(payload) - - -def test_close(writer): - assert writer.pipe.closed is False - writer.close() - assert writer.pipe.closed is True - - -def test_write_item(writer, queue): - writer.write_item({"my": "item"}) - line = queue.get(timeout=1) - cmd, payload = parse_line(line) - assert queue.empty() - assert cmd == "ITM" - assert payload == {"my": "item"} - - -def test_fin(writer, queue): - reason = "boom_reason" - writer.write_fin(reason) - line = queue.get(timeout=1) - cmd, payload = parse_line(line) - assert queue.empty() - assert cmd == "FIN" - assert payload == {"finish_reason": reason}