Skip to content

Commit

Permalink
BITMAKER-2425: Decouple Kafka from estela and setup a queuing adapter (
Browse files Browse the repository at this point in the history
  • Loading branch information
rodp63 authored Apr 1, 2023
1 parent e12a465 commit 4142cd8
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 266 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Estela Entrypoint
# estela Entrypoint

[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![version](https://img.shields.io/badge/version-0.1-blue)](https://github.com/bitmakerla/estela-entrypoint)
Expand Down Expand Up @@ -34,8 +34,11 @@ Job specifications are passed through env variables:
- [Required] _api_host_: API host URL.
- [Optional] _args_: Dictionary with job arguments.
- [Required] _collection_: String with name of collection where items will be stored.
- `KAFKA_ADVERTISED_LISTENERS`: List of advertised hosts in a comma-separated style.
- `KAFKA_ADVERTISED_PORT`: Default value: _9092_.
- [Optional] _unique_: String, `"True"` if the data will be stored in a unique collection, `"False"` otherwise. Required only for cronjobs.
- `QUEUE_PLATFORM`: The queue platform used by estela, review the list of the current
[supported platforms](https://estela.bitmaker.la/docs/estela/queueing.html#supported-platforms).
- `QUEUE_PLATFORM_{PARAMETERS}`: Please, refer to the `estela-queue-adapter`
[documentation](https://estela.bitmaker.la/docs/estela/queueing.html#estela-queue-adapter) to declare the needed variables.

## Testing

Expand Down
45 changes: 26 additions & 19 deletions estela_scrapy/__main__.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
import logging
import os
import sys
import logging

from estela_scrapy.env import decode_job, get_args_and_env
from estela_scrapy.log import init_logging
from estela_scrapy.settings import populate_settings


def run_scrapy(argv, settings):
from scrapy.cmdline import execute

# an intermediate function might be needed for other commands [!] missing
sys.argv = argv
execute(settings=settings)


def run_code(args, log_handler=None, commands_module=None):
try:
from estela_scrapy.settings import populate_settings

# API data might be sent [!] missing
settings = populate_settings()
if commands_module:
settings.set("COMMANDS_MODULE", commands_module, priority="cmdline")
if log_handler is not None:
log_handler.setLevel(settings["LOG_LEVEL"])
except Exception:
logging.exception("Settings initialization failed")
logging.exception("Settings initialization failed.")
raise
try:
run_scrapy(args, settings)
except Exception:
logging.exception("Job runtime exception")
except Exception as ex:
logging.exception(f"Job runtime exception: {str(ex)}")
raise


def describe_project():
"""Describe scrapy project."""
from estela_scrapy.env import setup_scrapy_conf

setup_scrapy_conf()
Expand All @@ -45,34 +44,42 @@ def describe_project():

def setup_and_launch():
try:
from estela_scrapy.env import decode_job, get_args_and_env, setup_scrapy_conf

job = decode_job()
assert job, "JOB_INFO must be set"
args, env = get_args_and_env(job)

from estela_scrapy.env import setup_scrapy_conf

os.environ.update(env)
setup_scrapy_conf()

from estela_scrapy.log import init_logging

loghdlr = init_logging()
except:
logging.exception("Environment variables were not defined properly")
except Exception:
logging.exception("Environment variables were not defined properly.")
raise

run_code(args, loghdlr)


def main():
"""Start the crawling process."""
from estela_scrapy.utils import producer

try:
if producer.get_connection():
logging.debug("Successful connection to the queue platform.")
else:
raise Exception("Could not connect to the queue platform.")
setup_and_launch()
code = 0
except SystemExit as ex:
return ex.code
code = ex.code
except:
return 1
return 0
code = 1
finally:
producer.flush()
producer.close()

return code


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion estela_scrapy/commands/describe_project.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import subprocess
from pkg_resources import parse_version

from pkg_resources import parse_version
from scrapy import __version__
from scrapy.commands import ScrapyCommand

Expand Down
3 changes: 1 addition & 2 deletions estela_scrapy/env.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
import json
import os


def decode_job():
Expand All @@ -20,7 +20,6 @@ def get_api_args(args_dict):
def get_args_and_env(msg):
args = ["scrapy", "crawl", str(msg["spider"])]
args += get_api_args(msg.get("args", {}))
# consider API settings [!] missing
env = {
"ESTELA_SPIDER_JOB": msg["key"],
"ESTELA_SPIDER_NAME": msg["spider"],
Expand Down
12 changes: 5 additions & 7 deletions estela_scrapy/extensions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
import os
from datetime import datetime, timedelta
from datetime import timedelta

import requests
from scrapy import signals
from scrapy.exporters import PythonItemExporter

from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error
from estela_scrapy.utils import datetime_to_json
from estela_scrapy.utils import producer


RUNNING_STATUS = "RUNNING"
COMPLETED_STATUS = "COMPLETED"
Expand All @@ -18,7 +18,6 @@
class ItemStorageExtension:
def __init__(self, stats):
self.stats = stats
self.producer = connect_kafka_producer()
exporter_kwargs = {"binary": False}
self.exporter = PythonItemExporter(**exporter_kwargs)
job = os.getenv("ESTELA_SPIDER_JOB")
Expand Down Expand Up @@ -67,7 +66,7 @@ def item_scraped(self, item):
"payload": dict(item),
"unique": os.getenv("ESTELA_UNIQUE_COLLECTION"),
}
self.producer.send("job_items", value=data).add_errback(on_kafka_send_error)
producer.send("job_items", data)

def spider_closed(self, spider, reason):
spider_stats = self.stats.get_stats()
Expand All @@ -84,5 +83,4 @@ def spider_closed(self, spider, reason):
"jid": os.getenv("ESTELA_SPIDER_JOB"),
"payload": json.loads(parser_stats),
}
self.producer.send("job_stats", value=data).add_errback(on_kafka_send_error)
self.producer.flush()
producer.send("job_stats", data)
16 changes: 8 additions & 8 deletions estela_scrapy/log.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import logging
import sys
import os
import warnings
import sys
import time
import warnings

from estela_queue_adapter import queue_noisy_libraries
from twisted.python import log as txlog
from scrapy import __version__
from estela_scrapy.utils import to_standar_str
from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error

from estela_scrapy.utils import producer, to_standar_str


_stderr = sys.stderr


def _logfn(level, message, parent="none"):
producer = connect_kafka_producer()
data = {
"jid": os.getenv("ESTELA_SPIDER_JOB"),
"payload": {"log": str(message), "datetime": float(time.time())},
}
response = producer.send("job_logs", value=data)
producer.send("job_logs", data)


def init_logging():
Expand All @@ -35,7 +35,7 @@ def init_logging():

# Silence commonly used noisy libraries
nh = logging.NullHandler()
for ln in ("boto", "requests", "kafka.conn"):
for ln in ["requests"] + queue_noisy_libraries:
lg = logging.getLogger(ln)
lg.propagate = 0
lg.addHandler(nh)
Expand Down
10 changes: 3 additions & 7 deletions estela_scrapy/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import os

from estela_scrapy.utils import parse_time
from estela_scrapy.producer import connect_kafka_producer, on_kafka_send_error
from scrapy.utils.request import request_fingerprint

from estela_scrapy.utils import parse_time, producer

class StorageDownloaderMiddleware:
def __init__(self):
self.producer = connect_kafka_producer()

class StorageDownloaderMiddleware:
def process_response(self, request, response, spider):
data = {
"jid": os.getenv("ESTELA_SPIDER_JOB"),
Expand All @@ -22,6 +19,5 @@ def process_response(self, request, response, spider):
"fingerprint": request_fingerprint(request),
},
}
self.producer.send("job_requests", value=data).add_errback(on_kafka_send_error)
# process parent request [!] missing
producer.send("job_requests", data)
return response
31 changes: 0 additions & 31 deletions estela_scrapy/producer.py

This file was deleted.

8 changes: 0 additions & 8 deletions estela_scrapy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def update_deprecated_classpaths(settings):


def load_default_settings(settings):
# Load the default ESTELA-APP settings
downloader_middlewares = {
"estela_scrapy.middlewares.StorageDownloaderMiddleware": 1000,
}
Expand All @@ -35,8 +34,6 @@ def load_default_settings(settings):
settings.get("DOWNLOADER_MIDDLEWARES_BASE").update(downloader_middlewares)
settings.get("EXTENSIONS_BASE").update(extensions)
settings.get("SPIDER_MIDDLEWARES_BASE").update(spider_middlewares)
# memory_limit [!] missing
# set other default settings with max priority
settings.setdict({"LOG_LEVEL": "INFO"}, priority="cmdline")
settings.setdict({"LOG_ENABLED": False}, priority="cmdline")

Expand All @@ -45,10 +42,5 @@ def populate_settings():
assert "scrapy.conf" not in sys.modules, "Scrapy settings already loaded"
settings = get_project_settings().copy()
update_deprecated_classpaths(settings)
# consider use special class if there're problems with AWS and encoding [!] missing
# consider API settings [!] missing
# https://shub.readthedocs.io/en/stable/custom-images-contract.html#shub-settings
load_default_settings(settings)
# load and merge API settings according to priority [job > spider > organization > project]
# afeter merging, somre enforcement might be done [!] missing
return settings
5 changes: 5 additions & 0 deletions estela_scrapy/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from datetime import datetime

from estela_queue_adapter import get_producer_interface


def parse_time(date=None):
if date is None:
Expand All @@ -20,3 +22,6 @@ def to_standar_str(text, encoding="utf-8", errors="strict"):
if not isinstance(text, bytes):
raise TypeError("Unable to standardize {} type".format(type(text).__name__))
return text.decode(encoding, errors)


producer = get_producer_interface()
57 changes: 0 additions & 57 deletions estela_scrapy/writer.py

This file was deleted.

5 changes: 5 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[pytest]
env =
QUEUE_PLATFORM=kafka
QUEUE_PLATFORM_LISTENERS=localhost
QUEUE_PLATFORM_PORT=9092
3 changes: 2 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Scrapy>=1.0
kafka-python
requests
black
pytest
pytest-env
git+https://github.com/bitmakerla/estela-queue-adapter.git
Loading

0 comments on commit 4142cd8

Please sign in to comment.