diff --git a/README.rst b/README.rst index d7052ed2eb1..ae33108f9b8 100644 --- a/README.rst +++ b/README.rst @@ -313,6 +313,9 @@ Transports and Backends :``celery[couchbase]``: for using Couchbase as a result backend. +:``celery[arangodb]``: + for using ArangoDB as a result backend. + :``celery[elasticsearch]``: for using Elasticsearch as a result backend. diff --git a/celery/app/backends.py b/celery/app/backends.py index 8a7d54438bf..5092f0d519a 100644 --- a/celery/app/backends.py +++ b/celery/app/backends.py @@ -37,6 +37,7 @@ 'consul': 'celery.backends.consul:ConsulBackend', 'dynamodb': 'celery.backends.dynamodb:DynamoDBBackend', 'azureblockblob': 'celery.backends.azureblockblob:AzureBlockBlobBackend', + 'arangodb': 'celery.backends.arangodb:ArangoDbBackend', 's3': 'celery.backends.s3:S3Backend', } diff --git a/celery/app/defaults.py b/celery/app/defaults.py index 85f29b82c77..6a2a49854e7 100644 --- a/celery/app/defaults.py +++ b/celery/app/defaults.py @@ -158,6 +158,10 @@ def __repr__(self): backend_settings=Option(None, type='dict'), ), + arangodb=Namespace( + __old__=old_ns('celery_arangodb'), + backend_settings=Option(None, type='dict') + ), mongodb=Namespace( __old__=old_ns('celery_mongodb'), diff --git a/celery/backends/arangodb.py b/celery/backends/arangodb.py new file mode 100644 index 00000000000..e88c35da173 --- /dev/null +++ b/celery/backends/arangodb.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +"""ArangoDb result store backend.""" + +# pylint: disable=W1202,W0703 + +from __future__ import absolute_import, unicode_literals + +import logging +import json +from kombu.utils.url import _parse_url +from kombu.utils.encoding import str_t + +from celery.exceptions import ImproperlyConfigured + +from .base import KeyValueStoreBackend + +try: + from pyArango import connection as py_arango_connection + from pyArango.theExceptions import AQLQueryError +except ImportError: + py_arango_connection = AQLQueryError = None # noqa + +__all__ = ('ArangoDbBackend',) + + +class ArangoDbBackend(KeyValueStoreBackend): + """ArangoDb backend. + + Sample url + "arangodb://username:password@host:port/database/collection" + *arangodb_backend_settings* is where the settings are present + (in the app.conf) + Settings should contain the host, port, username, password, database name, + collection name else the default will be chosen. + Default database name and collection name is celery. + + Raises + ------ + celery.exceptions.ImproperlyConfigured: + if module :pypi:`pyArango` is not available. + + """ + + host = '127.0.0.1' + port = '8529' + database = 'celery' + collection = 'celery' + username = None + password = None + # protocol is not supported in backend url (http is taken as default) + http_protocol = 'http' + + # Use str as arangodb key not bytes + key_t = str_t + + def __init__(self, url=None, *args, **kwargs): + """Parse the url or load the settings from settings object.""" + super(ArangoDbBackend, self).__init__(*args, **kwargs) + + if py_arango_connection is None: + raise ImproperlyConfigured( + 'You need to install the pyArango library to use the ' + 'ArangoDb backend.', + ) + + self.url = url + + if url is None: + host = port = database = collection = username = password = None + else: + ( + _schema, host, port, username, password, + database_collection, _query + ) = _parse_url(url) + if database_collection is None: + database = collection = None + else: + database, collection = database_collection.split('/') + + config = self.app.conf.get('arangodb_backend_settings', None) + if config is not None: + if not isinstance(config, dict): + raise ImproperlyConfigured( + 'ArangoDb backend settings should be grouped in a dict', + ) + else: + config = {} + + self.host = host or config.get('host', self.host) + self.port = int(port or config.get('port', self.port)) + self.http_protocol = config.get('http_protocol', self.http_protocol) + self.database = database or config.get('database', self.database) + self.collection = \ + collection or config.get('collection', self.collection) + self.username = username or config.get('username', self.username) + self.password = password or config.get('password', self.password) + self.arangodb_url = "{http_protocol}://{host}:{port}".format( + http_protocol=self.http_protocol, host=self.host, port=self.port + ) + self._connection = None + + @property + def connection(self): + """Connect to the arangodb server.""" + if self._connection is None: + self._connection = py_arango_connection.Connection( + arangoURL=self.arangodb_url, username=self.username, + password=self.password + ) + return self._connection + + @property + def db(self): + """Database Object to the given database.""" + return self.connection[self.database] + + def get(self, key): + try: + logging.debug( + 'RETURN DOCUMENT("{collection}/{key}").task'.format( + collection=self.collection, key=key + ) + ) + query = self.db.AQLQuery( + 'RETURN DOCUMENT("{collection}/{key}").task'.format( + collection=self.collection, key=key + ) + ) + result = query.response["result"][0] + if result is None: + return None + return json.dumps(result) + except AQLQueryError as aql_err: + logging.error(aql_err) + return None + except Exception as err: + logging.error(err) + return None + + def set(self, key, value): + """Insert a doc with value into task attribute and _key as key.""" + try: + logging.debug( + 'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}' + .format( + collection=self.collection, key=key, task=value + ) + ) + self.db.AQLQuery( + 'INSERT {{ task: {task}, _key: "{key}" }} INTO {collection}' + .format( + collection=self.collection, key=key, task=value + ) + ) + except AQLQueryError as aql_err: + logging.error(aql_err) + except Exception as err: + logging.error(err) + + def mget(self, keys): + try: + json_keys = json.dumps(keys) + logging.debug( + """ + FOR key in {keys} + RETURN DOCUMENT(CONCAT("{collection}/", key).task + """.format( + collection=self.collection, keys=json_keys + ) + ) + query = self.db.AQLQuery( + """ + FOR key in {keys} + RETURN DOCUMENT(CONCAT("{collection}/", key).task + """.format( + collection=self.collection, keys=json_keys + ) + ) + results = [] + while True: + results.extend(query.response['result']) + query.nextBatch() + except StopIteration: + values = [ + result if result is None else json.dumps(result) + for result in results + ] + return values + except AQLQueryError as aql_err: + logging.error(aql_err) + return [None] * len(keys) + except Exception as err: + logging.error(err) + return [None] * len(keys) + + def delete(self, key): + try: + logging.debug( + 'REMOVE {{ _key: "{key}" }} IN {collection}'.format( + key=key, collection=self.collection + ) + ) + self.db.AQLQuery( + 'REMOVE {{ _key: "{key}" }} IN {collection}'.format( + key=key, collection=self.collection + ) + ) + except AQLQueryError as aql_err: + logging.error(aql_err) + except Exception as err: + logging.error(err) diff --git a/docs/includes/installation.txt b/docs/includes/installation.txt index 307cc685471..09887edbf0d 100644 --- a/docs/includes/installation.txt +++ b/docs/includes/installation.txt @@ -82,6 +82,9 @@ Transports and Backends :``celery[couchbase]``: for using Couchbase as a result backend. +:``celery[arangodb]``: + for using ArangoDB as a result backend. + :``celery[elasticsearch]``: for using Elasticsearch as a result backend. diff --git a/docs/internals/reference/celery.backends.arangodb.rst b/docs/internals/reference/celery.backends.arangodb.rst new file mode 100644 index 00000000000..c05b0624480 --- /dev/null +++ b/docs/internals/reference/celery.backends.arangodb.rst @@ -0,0 +1,11 @@ +============================================ + ``celery.backends.arangodb`` +============================================ + +.. contents:: + :local: +.. currentmodule:: celery.backends.arangodb + +.. automodule:: celery.backends.arangodb + :members: + :undoc-members: diff --git a/docs/internals/reference/index.rst b/docs/internals/reference/index.rst index 73caac8e93d..a06c2a65282 100644 --- a/docs/internals/reference/index.rst +++ b/docs/internals/reference/index.rst @@ -36,6 +36,7 @@ celery.backends.riak celery.backends.cassandra celery.backends.couchbase + celery.backends.arangodb celery.backends.dynamodb celery.backends.filesystem celery.backends.cosmosdbsql diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 6b43018d026..3ba49983e41 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -18,6 +18,7 @@ Andreas Andrey Andriy Aneil +ArangoDB Areski Armin Artyom diff --git a/docs/userguide/configuration.rst b/docs/userguide/configuration.rst index f7944e75a45..d8c31745ee3 100644 --- a/docs/userguide/configuration.rst +++ b/docs/userguide/configuration.rst @@ -96,6 +96,7 @@ have been moved into a new ``task_`` prefix. ``S3_ENDPOINT_URL`` :setting:`s3_endpoint_url` ``S3_REGION`` :setting:`s3_region` ``CELERY_COUCHBASE_BACKEND_SETTINGS`` :setting:`couchbase_backend_settings` +``CELERY_ARANGODB_BACKEND_SETTINGS`` :setting:`arangodb_backend_settings` ``CELERY_MONGODB_BACKEND_SETTINGS`` :setting:`mongodb_backend_settings` ``CELERY_EVENT_QUEUE_EXPIRES`` :setting:`event_queue_expires` ``CELERY_EVENT_QUEUE_TTL`` :setting:`event_queue_ttl` @@ -607,6 +608,10 @@ Can be one of the following: Use `Couchbase`_ to store the results. See :ref:`conf-couchbase-result-backend`. +* ``arangodb`` + Use `ArangoDB`_ to store the results. + See :ref:`conf-arangodb-result-backend`. + * ``couchdb`` Use `CouchDB`_ to store the results. See :ref:`conf-couchdb-result-backend`. @@ -645,6 +650,7 @@ Can be one of the following: .. _`CouchDB`: http://www.couchdb.com/ .. _`CosmosDB`: https://azure.microsoft.com/en-us/services/cosmos-db/ .. _`Couchbase`: https://www.couchbase.com/ +.. _`ArangoDB`: https://www.arangodb.com/ .. _`Consul`: https://consul.io/ .. _`AzureBlockBlob`: https://azure.microsoft.com/en-us/services/storage/blobs/ .. _`S3`: https://aws.amazon.com/s3/ @@ -1613,6 +1619,66 @@ This is a dict supporting the following keys: Password to authenticate to the Couchbase server (optional). +.. _conf-arangodb-result-backend: + +ArangoDB backend settings +-------------------------- + +.. note:: + + The ArangoDB backend requires the :pypi:`pyArango` library. + + To install this package use :command:`pip`: + + .. code-block:: console + + $ pip install celery[arangodb] + + See :ref:`bundles` for instructions how to combine multiple extension + requirements. + +This backend can be configured via the :setting:`result_backend` +set to a ArangoDB URL: + +.. code-block:: python + + result_backend = 'arangodb://username:password@host:port/database/collection' + +.. setting:: arangodb_backend_settings + +``arangodb_backend_settings`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Default: ``{}`` (empty mapping). + +This is a dict supporting the following keys: + +* ``host`` + + Host name of the ArangoDB server. Defaults to ``localhost``. + +* ``port`` + + The port the ArangoDB server is listening to. Defaults to ``8529``. + +* ``database`` + + The default database in the ArangoDB server is writing to. + Defaults to ``celery``. + +* ``collection`` + + The default collection in the ArangoDB servers database is writing to. + Defaults to ``celery``. + +* ``username`` + + User name to authenticate to the ArangoDB server as (optional). + +* ``password`` + + Password to authenticate to the ArangoDB server (optional). + .. _conf-cosmosdbsql-result-backend: CosmosDB backend settings (experimental) diff --git a/requirements/extras/arangodb.txt b/requirements/extras/arangodb.txt new file mode 100644 index 00000000000..1a6b85f1294 --- /dev/null +++ b/requirements/extras/arangodb.txt @@ -0,0 +1 @@ +pyArango>=1.3.2 \ No newline at end of file diff --git a/requirements/test-ci-default.txt b/requirements/test-ci-default.txt index 08f766b79eb..f7d59e5737a 100644 --- a/requirements/test-ci-default.txt +++ b/requirements/test-ci-default.txt @@ -14,6 +14,7 @@ -r extras/elasticsearch.txt -r extras/couchdb.txt -r extras/couchbase.txt +-r extras/arangodb.txt -r extras/consul.txt -r extras/cosmosdbsql.txt -r extras/cassandra.txt diff --git a/setup.py b/setup.py index 123a1392f6e..7287852453f 100644 --- a/setup.py +++ b/setup.py @@ -57,6 +57,7 @@ def _pyimp(): 'memcache', 'pymemcache', 'couchbase', + 'arangodb', 'eventlet', 'gevent', 'msgpack', diff --git a/t/unit/backends/test_arangodb.py b/t/unit/backends/test_arangodb.py new file mode 100644 index 00000000000..70cb6d65964 --- /dev/null +++ b/t/unit/backends/test_arangodb.py @@ -0,0 +1,107 @@ +"""Tests for the ArangoDb.""" +from __future__ import absolute_import, unicode_literals + +import pytest +from case import Mock, patch, sentinel, skip + +from celery.app import backends +from celery.backends import arangodb as module +from celery.backends.arangodb import ArangoDbBackend +from celery.exceptions import ImproperlyConfigured + +try: + import pyArango +except ImportError: + pyArango = None # noqa + + +@skip.unless_module('pyArango') +class test_ArangoDbBackend: + + def setup(self): + self.backend = ArangoDbBackend(app=self.app) + + def test_init_no_arangodb(self): + prev, module.py_arango_connection = module.py_arango_connection, None + try: + with pytest.raises(ImproperlyConfigured): + ArangoDbBackend(app=self.app) + finally: + module.py_arango_connection = prev + + def test_init_no_settings(self): + self.app.conf.arangodb_backend_settings = [] + with pytest.raises(ImproperlyConfigured): + ArangoDbBackend(app=self.app) + + def test_init_settings_is_None(self): + self.app.conf.arangodb_backend_settings = None + ArangoDbBackend(app=self.app) + + def test_get_connection_connection_exists(self): + with patch('pyArango.connection.Connection') as mock_Connection: + self.backend._connection = sentinel._connection + + connection = self.backend._connection + + assert sentinel._connection == connection + mock_Connection.assert_not_called() + + def test_get(self): + self.app.conf.arangodb_backend_settings = {} + x = ArangoDbBackend(app=self.app) + x.get = Mock() + x.get.return_value = sentinel.retval + assert x.get('1f3fab') == sentinel.retval + x.get.assert_called_once_with('1f3fab') + + def test_delete(self): + self.app.conf.arangodb_backend_settings = {} + x = ArangoDbBackend(app=self.app) + x.delete = Mock() + x.delete.return_value = None + assert x.delete('1f3fab') is None + + def test_config_params(self): + self.app.conf.arangodb_backend_settings = { + 'host': 'test.arangodb.com', + 'port': '8529', + 'username': 'johndoe', + 'password': 'mysecret', + 'database': 'celery_database', + 'collection': 'celery_collection', + 'http_protocol': 'https' + } + x = ArangoDbBackend(app=self.app) + assert x.host == 'test.arangodb.com' + assert x.port == 8529 + assert x.username == 'johndoe' + assert x.password == 'mysecret' + assert x.database == 'celery_database' + assert x.collection == 'celery_collection' + assert x.http_protocol == 'https' + assert x.arangodb_url == 'https://test.arangodb.com:8529' + + def test_backend_by_url( + self, url="arangodb://username:password@host:port/database/collection" + ): + from celery.backends.arangodb import ArangoDbBackend + backend, url_ = backends.by_url(url, self.app.loader) + assert backend is ArangoDbBackend + assert url_ == url + + def test_backend_params_by_url(self): + url = ( + "arangodb://johndoe:mysecret@test.arangodb.com:8529/" + "celery_database/celery_collection" + ) + with self.Celery(backend=url) as app: + x = app.backend + assert x.host == 'test.arangodb.com' + assert x.port == 8529 + assert x.username == 'johndoe' + assert x.password == 'mysecret' + assert x.database == 'celery_database' + assert x.collection == 'celery_collection' + assert x.http_protocol == 'http' + assert x.arangodb_url == 'http://test.arangodb.com:8529'