From bb849a37330fe16dba4d39f4fe86bdba78e834b5 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 23 Aug 2023 12:17:15 +0200 Subject: [PATCH 01/33] CLI base tool --- connectors/connectors_cli.py | 320 +++++++++++++++++++++++++++++++++++ setup.py | 1 + 2 files changed, 321 insertions(+) create mode 100644 connectors/connectors_cli.py diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py new file mode 100644 index 000000000..ec99b47b5 --- /dev/null +++ b/connectors/connectors_cli.py @@ -0,0 +1,320 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. +# +""" +Command Line Interface. + +This is the main entry point of the framework. When the project is installed as +a Python package, an `elastic-ingest` executable is added in the PATH and +executes the `main` function of this module, which starts the service. +""" +import asyncio +import functools +import json +import logging +import os +import signal +from argparse import ArgumentParser + +from connectors import __version__ +from connectors.config import load_config +from connectors.logger import logger, set_logger +from connectors.preflight_check import PreflightCheck +from connectors.services import get_services +from connectors.source import get_source_klass, get_source_klasses +from connectors.utils import ExtractionService, get_event_loop + +from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings +from connectors.es.sink import SyncOrchestrator +from connectors.logger import logger, set_logger +from connectors.source import get_source_klass +from connectors.utils import validate_index_name + +__all__ = ["main"] + +CONNECTORS_INDEX = ".elastic-connectors" +JOBS_INDEX = ".elastic-connectors-sync-jobs" +DEFAULT_CONFIG = os.path.join(os.path.dirname(__file__), "..", "config.yml") +DEFAULT_FILTERING = [ + { + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "updated_at": "2023-01-31T16:41:27.341Z", + "created_at": "2023-01-31T16:38:49.244Z", + "value": {}, + }, + "rules": [ + { + "field": "_", + "updated_at": "2023-01-31T16:41:27.341Z", + "created_at": "2023-01-31T16:38:49.244Z", + "rule": "regex", + "id": "DEFAULT", + "value": ".*", + "order": 1, + "policy": "include", + } + ], + "validation": {"state": "valid", "errors": []}, + }, + "active": { + "advanced_snippet": { + "updated_at": "2023-01-31T16:41:27.341Z", + "created_at": "2023-01-31T16:38:49.244Z", + "value": {}, + }, + "rules": [ + { + "field": "_", + "updated_at": "2023-01-31T16:41:27.341Z", + "created_at": "2023-01-31T16:38:49.244Z", + "rule": "regex", + "id": "DEFAULT", + "value": ".*", + "order": 1, + "policy": "include", + } + ], + "validation": {"state": "valid", "errors": []}, + }, + } +] +DEFAULT_PIPELINE = { + "version": 1, + "description": "For testing", + "processors": [ + { + "remove": { + "tag": "remove_meta_fields", + "description": "Remove meta fields", + "field": [ + "_attachment", + "_attachment_indexed_chars", + "_extracted_attachment", + "_extract_binary_content", + "_reduce_whitespace", + "_run_ml_inference", + ], + "ignore_missing": True, + } + } + ], +} + +# This should be updated when ftest starts to hate on it for any reason +# Later we won't need it at all +JOB_INDEX_MAPPINGS = { + "dynamic": "false", + "_meta": {"version": 1}, + "properties": { + "cancelation_requested_at": {"type": "date"}, + "canceled_at": {"type": "date"}, + "completed_at": {"type": "date"}, + "connector": { + "properties": { + "configuration": {"type": "object"}, + "filtering": { + "properties": { + "advanced_snippet": { + "properties": { + "created_at": {"type": "date"}, + "updated_at": {"type": "date"}, + "value": {"type": "object"}, + } + }, + "domain": {"type": "keyword"}, + "rules": { + "properties": { + "created_at": {"type": "date"}, + "field": {"type": "keyword"}, + "id": {"type": "keyword"}, + "order": {"type": "short"}, + "policy": {"type": "keyword"}, + "rule": {"type": "keyword"}, + "updated_at": {"type": "date"}, + "value": {"type": "keyword"}, + } + }, + "warnings": { + "properties": { + "ids": {"type": "keyword"}, + "messages": {"type": "text"}, + } + }, + } + }, + "id": {"type": "keyword"}, + "index_name": {"type": "keyword"}, + "language": {"type": "keyword"}, + "pipeline": { + "properties": { + "extract_binary_content": {"type": "boolean"}, + "name": {"type": "keyword"}, + "reduce_whitespace": {"type": "boolean"}, + "run_ml_inference": {"type": "boolean"}, + } + }, + "service_type": {"type": "keyword"}, + } + }, + "created_at": {"type": "date"}, + "deleted_document_count": {"type": "integer"}, + "error": {"type": "keyword"}, + "indexed_document_count": {"type": "integer"}, + "indexed_document_volume": {"type": "integer"}, + "last_seen": {"type": "date"}, + "metadata": {"type": "object"}, + "started_at": {"type": "date"}, + "status": {"type": "keyword"}, + "total_document_count": {"type": "integer"}, + "trigger_method": {"type": "keyword"}, + "worker_hostname": {"type": "keyword"}, + }, +} + + +def _parser(): + """Parses command-line arguments using ArgumentParser and returns it""" + parser = ArgumentParser(prog="elastic-ingest") + + parser.add_argument( + "-c", + "--config-file", + type=str, + help="Configuration file", + default=os.path.join(os.path.dirname(__file__), "..", "config.yml"), + ) + + parser.add_argument( + 'category', + type=str, + choices=['connectors', 'jobs'] + ) + + parser.add_argument( + 'action', + type=str, + choices=['list', 'create'] + ) + + parser.add_argument( + "--version", + action="store_true", + default=False, + help="Display the version and exit.", + ) + + return parser + +async def ensure_index_exists(): + + +async def create_connectors_index(): + +# async def _start_service(actions, config, loop): +# """Starts the service. + +# Steps: +# - performs a preflight check using `PreflightCheck` +# - instantiates a `MultiService` instance and runs its `run` async function +# """ +# preflight = PreflightCheck(config) +# for sig in (signal.SIGINT, signal.SIGTERM): +# loop.add_signal_handler(sig, functools.partial(preflight.shutdown, sig)) +# try: +# if not await preflight.run(): +# return -1 +# finally: +# for sig in (signal.SIGINT, signal.SIGTERM): +# loop.remove_signal_handler(sig) + +# multiservice = get_services(actions, config) +# for sig in (signal.SIGINT, signal.SIGTERM): +# loop.add_signal_handler(sig, functools.partial(multiservice.shutdown, sig.name)) + +# if "PERF8" in os.environ: +# import perf8 + +# async with perf8.measure(): +# return await multiservice.run() +# else: +# return await multiservice.run() + + +def run(args): + """Loads the config file, sets the logger and executes an action. + Actions: + - list: prints out a list of all connectors and exits + - poll: starts the event loop and run forever (default) + """ + + import pdb; pdb.set_trace(); + print(f"Framework version is {__version__}") + + # # load config + config = {} + try: + config = load_config(args.config_file) + import pdb; pdb.set_trace(); + except Exception as e: + # # If something goes wrong while parsing config file, we still want + # # to set up the logger so that Cloud deployments report errors to + # # logs properly + print(f"Could not parse {args.config_file}:\n{e}") + raise + + # import pdb; pdb.set_trace(); + # # just display the list of connectors + # if args.action == ["list"]: + # print("Registered connectors:") + # for source in get_source_klasses(config): + # print(f"- {source.name}") + # print("Bye") + # return 0 + + # if args.action == ["config"]: + # service_type = args.service_type + # print(f"Getting default configuration for service type {service_type}") + + # source_list = config["sources"] + # if service_type not in source_list: + # print(f"Could not find a connector for service type {service_type}") + # return -1 + + # source_klass = get_source_klass(source_list[service_type]) + # print(json.dumps(source_klass.get_simple_configuration(), indent=2)) + # print("Bye") + # return 0 + + # if "list" in args.action: + # print("Cannot use the `list` action with other actions") + # return -1 + + # if "config" in args.action: + # print("Cannot use the `config` action with other actions") + # return -1 + + # loop = get_event_loop(args.uvloop) + # coro = _start_service(args.action, config, loop) + + # try: + # return loop.run_until_complete(coro) + # except asyncio.CancelledError: + # return 0 + # finally: + # logger.info("Bye") + + return -1 + + +def main(args=None): + parser = _parser() + args = parser.parse_args(args=args) + if args.version: + print(__version__) + return 0 + + return run(args) diff --git a/setup.py b/setup.py index 0e1e1b432..30c74b46c 100644 --- a/setup.py +++ b/setup.py @@ -99,5 +99,6 @@ def read_reqs(req_file): [console_scripts] elastic-ingest = connectors.cli:main fake-kibana = connectors.kibana:main + connectors-cli = connectors.connectors_cli:main """, ) From 5964ac8b0e8db90dbca68469b4bd06a9b31ef75a Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Mon, 11 Sep 2023 16:00:45 +0200 Subject: [PATCH 02/33] Add a cli folder --- connectors/cli/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 connectors/cli/.gitkeep diff --git a/connectors/cli/.gitkeep b/connectors/cli/.gitkeep new file mode 100644 index 000000000..e69de29bb From 4a611b57e12d9279474537b307e47534d95b9fbd Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Tue, 12 Sep 2023 11:05:12 +0200 Subject: [PATCH 03/33] Main parser (#1617) * Rename the tool * Create main parser --- connectors/connectors_cli.py | 24 +++++++++--------------- setup.py | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index ec99b47b5..dfb64a423 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -178,9 +178,9 @@ def _parser(): """Parses command-line arguments using ArgumentParser and returns it""" - parser = ArgumentParser(prog="elastic-ingest") + main_parser = ArgumentParser(prog="connectors") - parser.add_argument( + main_parser.add_argument( "-c", "--config-file", type=str, @@ -188,31 +188,25 @@ def _parser(): default=os.path.join(os.path.dirname(__file__), "..", "config.yml"), ) - parser.add_argument( - 'category', + main_parser.add_argument( + 'namespace', type=str, - choices=['connectors', 'jobs'] + choices=['connector', 'job', 'index'] ) - parser.add_argument( - 'action', - type=str, - choices=['list', 'create'] - ) - - parser.add_argument( + main_parser.add_argument( "--version", action="store_true", default=False, help="Display the version and exit.", ) - return parser + return main_parser -async def ensure_index_exists(): +# async def ensure_index_exists(): -async def create_connectors_index(): +# async def create_connectors_index(): # async def _start_service(actions, config, loop): # """Starts the service. diff --git a/setup.py b/setup.py index 30c74b46c..371bd25ec 100644 --- a/setup.py +++ b/setup.py @@ -99,6 +99,6 @@ def read_reqs(req_file): [console_scripts] elastic-ingest = connectors.cli:main fake-kibana = connectors.kibana:main - connectors-cli = connectors.connectors_cli:main + connectors = connectors.connectors_cli:main """, ) From 1f8e18ea54ed15ef461928bfdcb0c87155f7e563 Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Tue, 12 Sep 2023 14:03:08 +0200 Subject: [PATCH 04/33] Replace argparser with click (#1621) --- connectors/connectors_cli.py | 262 ++++++++++++++++------------------- requirements/framework.txt | 1 + 2 files changed, 122 insertions(+), 141 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index dfb64a423..821e92741 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -10,27 +10,29 @@ a Python package, an `elastic-ingest` executable is added in the PATH and executes the `main` function of this module, which starts the service. """ -import asyncio -import functools -import json -import logging +# import asyncio +# import functools +# import json +# import logging import os -import signal -from argparse import ArgumentParser - -from connectors import __version__ -from connectors.config import load_config -from connectors.logger import logger, set_logger -from connectors.preflight_check import PreflightCheck -from connectors.services import get_services -from connectors.source import get_source_klass, get_source_klasses -from connectors.utils import ExtractionService, get_event_loop - -from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings -from connectors.es.sink import SyncOrchestrator -from connectors.logger import logger, set_logger -from connectors.source import get_source_klass -from connectors.utils import validate_index_name +# import signal +# from argparse import ArgumentParser + +# from connectors.config import load_config +# from connectors.logger import logger, set_logger +# from connectors.preflight_check import PreflightCheck +# from connectors.services import get_services +# from connectors.source import get_source_klass, get_source_klasses +# from connectors.utils import ExtractionService, get_event_loop + +# from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings +# from connectors.es.sink import SyncOrchestrator +# from connectors.logger import logger, set_logger +# from connectors.source import get_source_klass +# from connectors.utils import validate_index_name + +import click +from connectors import __version__ # NOQA __all__ = ["main"] @@ -175,140 +177,118 @@ }, } +# Main group +def print_version(ctx, param, value): + if not value or ctx.resilient_parsing: + return + click.echo(__version__) + +# @TODO print help page when no arguments passed +@click.group(invoke_without_command=True) +@click.option('-v', '--version', is_flag=True, callback=print_version, + expose_value=False, is_eager=True) +def cli(): + pass + # ctx = click.get_current_context() + # click.echo(ctx.get_help()) + # ctx.exit() -def _parser(): - """Parses command-line arguments using ArgumentParser and returns it""" - main_parser = ArgumentParser(prog="connectors") +@click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") +def login(): + click.echo('login command') - main_parser.add_argument( - "-c", - "--config-file", - type=str, - help="Configuration file", - default=os.path.join(os.path.dirname(__file__), "..", "config.yml"), - ) +cli.add_command(login) - main_parser.add_argument( - 'namespace', - type=str, - choices=['connector', 'job', 'index'] - ) +# Connector group +@click.group(invoke_without_command=True) +def connector(): + click.echo('test connector') - main_parser.add_argument( - "--version", - action="store_true", - default=False, - help="Display the version and exit.", - ) +cli.add_command(connector) - return main_parser -# async def ensure_index_exists(): +# Index group +@click.group(invoke_without_command=True) +def index(): + click.echo('testing index') +cli.add_command(index) -# async def create_connectors_index(): +# Job group +@click.group(invoke_without_command=True) +def job(): + click.echo('testing job') -# async def _start_service(actions, config, loop): -# """Starts the service. +cli.add_command(job) -# Steps: -# - performs a preflight check using `PreflightCheck` -# - instantiates a `MultiService` instance and runs its `run` async function + +# def run(args): +# """Loads the config file, sets the logger and executes an action. +# Actions: +# - list: prints out a list of all connectors and exits +# - poll: starts the event loop and run forever (default) # """ -# preflight = PreflightCheck(config) -# for sig in (signal.SIGINT, signal.SIGTERM): -# loop.add_signal_handler(sig, functools.partial(preflight.shutdown, sig)) + +# print(f"Framework version is {__version__}") + +# # # load config +# config = {} # try: -# if not await preflight.run(): -# return -1 -# finally: -# for sig in (signal.SIGINT, signal.SIGTERM): -# loop.remove_signal_handler(sig) - -# multiservice = get_services(actions, config) -# for sig in (signal.SIGINT, signal.SIGTERM): -# loop.add_signal_handler(sig, functools.partial(multiservice.shutdown, sig.name)) - -# if "PERF8" in os.environ: -# import perf8 - -# async with perf8.measure(): -# return await multiservice.run() -# else: -# return await multiservice.run() - - -def run(args): - """Loads the config file, sets the logger and executes an action. - Actions: - - list: prints out a list of all connectors and exits - - poll: starts the event loop and run forever (default) - """ - - import pdb; pdb.set_trace(); - print(f"Framework version is {__version__}") - - # # load config - config = {} - try: - config = load_config(args.config_file) - import pdb; pdb.set_trace(); - except Exception as e: - # # If something goes wrong while parsing config file, we still want - # # to set up the logger so that Cloud deployments report errors to - # # logs properly - print(f"Could not parse {args.config_file}:\n{e}") - raise - - # import pdb; pdb.set_trace(); - # # just display the list of connectors - # if args.action == ["list"]: - # print("Registered connectors:") - # for source in get_source_klasses(config): - # print(f"- {source.name}") - # print("Bye") - # return 0 - - # if args.action == ["config"]: - # service_type = args.service_type - # print(f"Getting default configuration for service type {service_type}") - - # source_list = config["sources"] - # if service_type not in source_list: - # print(f"Could not find a connector for service type {service_type}") - # return -1 - - # source_klass = get_source_klass(source_list[service_type]) - # print(json.dumps(source_klass.get_simple_configuration(), indent=2)) - # print("Bye") - # return 0 - - # if "list" in args.action: - # print("Cannot use the `list` action with other actions") - # return -1 - - # if "config" in args.action: - # print("Cannot use the `config` action with other actions") - # return -1 - - # loop = get_event_loop(args.uvloop) - # coro = _start_service(args.action, config, loop) - - # try: - # return loop.run_until_complete(coro) - # except asyncio.CancelledError: - # return 0 - # finally: - # logger.info("Bye") - - return -1 +# config = load_config(args.config_file) +# import pdb; pdb.set_trace(); +# except Exception as e: +# # # If something goes wrong while parsing config file, we still want +# # # to set up the logger so that Cloud deployments report errors to +# # # logs properly +# print(f"Could not parse {args.config_file}:\n{e}") +# raise + +# # import pdb; pdb.set_trace(); +# # # just display the list of connectors +# # if args.action == ["list"]: +# # print("Registered connectors:") +# # for source in get_source_klasses(config): +# # print(f"- {source.name}") +# # print("Bye") +# # return 0 + +# # if args.action == ["config"]: +# # service_type = args.service_type +# # print(f"Getting default configuration for service type {service_type}") + +# # source_list = config["sources"] +# # if service_type not in source_list: +# # print(f"Could not find a connector for service type {service_type}") +# # return -1 + +# # source_klass = get_source_klass(source_list[service_type]) +# # print(json.dumps(source_klass.get_simple_configuration(), indent=2)) +# # print("Bye") +# # return 0 + +# # if "list" in args.action: +# # print("Cannot use the `list` action with other actions") +# # return -1 + +# # if "config" in args.action: +# # print("Cannot use the `config` action with other actions") +# # return -1 + +# # loop = get_event_loop(args.uvloop) +# # coro = _start_service(args.action, config, loop) + +# # try: +# # return loop.run_until_complete(coro) +# # except asyncio.CancelledError: +# # return 0 +# # finally: +# # logger.info("Bye") + +# return -1 def main(args=None): - parser = _parser() - args = parser.parse_args(args=args) - if args.version: - print(__version__) - return 0 + cli() - return run(args) +if __name__ == '__main__': + main() diff --git a/requirements/framework.txt b/requirements/framework.txt index 3b479c90f..5c90426ae 100644 --- a/requirements/framework.txt +++ b/requirements/framework.txt @@ -29,3 +29,4 @@ exchangelib==5.0.3 ldap3==2.9.1 lxml==4.9.3 pywinrm==0.4.3 +click==8.1.7 From 1d0a7e7f2cd30e726bbff87a647d09c7c573f535 Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Tue, 12 Sep 2023 16:38:56 +0200 Subject: [PATCH 05/33] Add -c option (#1625) --- connectors/connectors_cli.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 821e92741..c6ed5af3c 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -32,8 +32,10 @@ # from connectors.utils import validate_index_name import click +import yaml from connectors import __version__ # NOQA + __all__ = ["main"] CONNECTORS_INDEX = ".elastic-connectors" @@ -187,36 +189,46 @@ def print_version(ctx, param, value): @click.group(invoke_without_command=True) @click.option('-v', '--version', is_flag=True, callback=print_version, expose_value=False, is_eager=True) -def cli(): +@click.option('-c', '--config', type=click.File('rb'), default='bin/config.yml') +@click.pass_context +def cli(ctx, config): + if config: + ctx.ensure_object(dict) + ctx.obj['config'] = yaml.safe_load(config) pass + # ctx = click.get_current_context() # click.echo(ctx.get_help()) # ctx.exit() @click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") -def login(): +@click.pass_obj +def login(obj): click.echo('login command') cli.add_command(login) # Connector group @click.group(invoke_without_command=True) -def connector(): - click.echo('test connector') +@click.pass_obj +def connector(obj): + click.echo('testing connector') cli.add_command(connector) # Index group @click.group(invoke_without_command=True) -def index(): +@click.pass_obj +def index(obj): click.echo('testing index') cli.add_command(index) # Job group @click.group(invoke_without_command=True) -def job(): +@click.pass_obj +def job(obj): click.echo('testing job') cli.add_command(job) From cb7092c15c9b6a1cf2d6558c1b86e5d4739ded96 Mon Sep 17 00:00:00 2001 From: Klim Markelov Date: Wed, 13 Sep 2023 14:40:56 +0200 Subject: [PATCH 06/33] Introduce bin/connectors login (#1626) --- .gitignore | 1 + connectors/cli/auth.py | 37 +++++++++++++++++++++++++++++++ connectors/connectors_cli.py | 22 ++++++++++++++---- connectors/{cli.py => old_cli.py} | 0 4 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 connectors/cli/auth.py rename connectors/{cli.py => old_cli.py} (100%) diff --git a/.gitignore b/.gitignore index 2585d4d36..f602e6700 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ __pycache__ # jetbrains files .idea *.iml +.cli diff --git a/connectors/cli/auth.py b/connectors/cli/auth.py new file mode 100644 index 000000000..c0967d98c --- /dev/null +++ b/connectors/cli/auth.py @@ -0,0 +1,37 @@ +from connectors.es import ESClient + +import yaml +import asyncio +import os + +CONFIG_FILE_PATH = '.cli/config.yml' + +class Auth: + def __init__(self, host, username, password): + self.elastic_config = { 'host': host, 'username': username, 'password': password } + self.es_client = ESClient(self.elastic_config) + + def authenticate(self): + if asyncio.run(self.__ping_es_client()): + self.__save_config() + return True + else: + return False + + def is_config_present(self): + return os.path.isfile(CONFIG_FILE_PATH) + + async def __ping_es_client(self): + try: + return await self.es_client.ping() + except: + return False + finally: + await self.es_client.close() + + def __save_config(self): + yaml_content = yaml.dump({'elasticsearch': self.elastic_config }) + os.makedirs(os.path.dirname(CONFIG_FILE_PATH), exist_ok=True) + + with open(CONFIG_FILE_PATH, "w") as f: + f.write(yaml_content) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index c6ed5af3c..c53ada1f7 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -33,6 +33,7 @@ import click import yaml +from connectors.cli.auth import Auth from connectors import __version__ # NOQA @@ -189,7 +190,7 @@ def print_version(ctx, param, value): @click.group(invoke_without_command=True) @click.option('-v', '--version', is_flag=True, callback=print_version, expose_value=False, is_eager=True) -@click.option('-c', '--config', type=click.File('rb'), default='bin/config.yml') +@click.option('-c', '--config', type=click.File('rb')) @click.pass_context def cli(ctx, config): if config: @@ -202,9 +203,22 @@ def cli(ctx, config): # ctx.exit() @click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") -@click.pass_obj -def login(obj): - click.echo('login command') +@click.option('--host', prompt="Elastic host") +@click.option('--username', prompt="Username") +@click.option('--password', prompt="Password", hide_input=True) + +def login(host, username, password): + auth = Auth(host, username, password) + if auth.is_config_present(): + click.confirm(click.style('Config is already present. Are you sure you want to override it?😱', fg='yellow'), abort=True) + if auth.authenticate(): + click.echo(click.style("Authentication successful. You're breathtaking.", fg='green')) + else: + click.echo('') + click.echo(click.style("Authentication failed. Please check your credentials.", fg='red'), err=True) + return + + cli.add_command(login) diff --git a/connectors/cli.py b/connectors/old_cli.py similarity index 100% rename from connectors/cli.py rename to connectors/old_cli.py From d2d68db5ab810b47c53d483b312b453b573fa45d Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Wed, 13 Sep 2023 15:08:40 +0200 Subject: [PATCH 07/33] Introduce connector list command (#1636) * Introduce connectors list command * Read existing config --- connectors/cli/connector.py | 40 +++++ connectors/connectors_cli.py | 281 ++++++----------------------------- requirements/framework.txt | 2 + 3 files changed, 89 insertions(+), 234 deletions(-) create mode 100644 connectors/cli/connector.py diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py new file mode 100644 index 000000000..444d2e7e6 --- /dev/null +++ b/connectors/cli/connector.py @@ -0,0 +1,40 @@ +from connectors.es.client import ESClient +from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX +from connectors.protocol import ConnectorIndex + +class Connector: + def __init__(self, config): + self.config = config + + # initialize ES client + self.es_client = ESClient(self.config) + + self.connector_index = ConnectorIndex(self.config) + + + async def list_connectors(self): + # TODO move this on top + try: + await self.es_client.ensure_exists( + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + ) + + connectors = [] + async for connector in self.connector_index.all_connectors(): + connectors.append(connector) + + return connectors + + # TODO catch exceptions + finally: + + await self.connector_index.close() + await self.es_client.close() + + async def ping(self): + if await self.es_client.ping(): + await self.es_client.close() + return True + else: + return False + diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index c53ada1f7..6dd9db52d 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -10,176 +10,18 @@ a Python package, an `elastic-ingest` executable is added in the PATH and executes the `main` function of this module, which starts the service. """ -# import asyncio -# import functools -# import json -# import logging -import os -# import signal -# from argparse import ArgumentParser - -# from connectors.config import load_config -# from connectors.logger import logger, set_logger -# from connectors.preflight_check import PreflightCheck -# from connectors.services import get_services -# from connectors.source import get_source_klass, get_source_klasses -# from connectors.utils import ExtractionService, get_event_loop - -# from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings -# from connectors.es.sink import SyncOrchestrator -# from connectors.logger import logger, set_logger -# from connectors.source import get_source_klass -# from connectors.utils import validate_index_name - +import asyncio import click import yaml -from connectors.cli.auth import Auth +from connectors.cli.auth import Auth, CONFIG_FILE_PATH from connectors import __version__ # NOQA +from connectors.cli.connector import Connector +from tabulate import tabulate +import os __all__ = ["main"] -CONNECTORS_INDEX = ".elastic-connectors" -JOBS_INDEX = ".elastic-connectors-sync-jobs" -DEFAULT_CONFIG = os.path.join(os.path.dirname(__file__), "..", "config.yml") -DEFAULT_FILTERING = [ - { - "domain": "DEFAULT", - "draft": { - "advanced_snippet": { - "updated_at": "2023-01-31T16:41:27.341Z", - "created_at": "2023-01-31T16:38:49.244Z", - "value": {}, - }, - "rules": [ - { - "field": "_", - "updated_at": "2023-01-31T16:41:27.341Z", - "created_at": "2023-01-31T16:38:49.244Z", - "rule": "regex", - "id": "DEFAULT", - "value": ".*", - "order": 1, - "policy": "include", - } - ], - "validation": {"state": "valid", "errors": []}, - }, - "active": { - "advanced_snippet": { - "updated_at": "2023-01-31T16:41:27.341Z", - "created_at": "2023-01-31T16:38:49.244Z", - "value": {}, - }, - "rules": [ - { - "field": "_", - "updated_at": "2023-01-31T16:41:27.341Z", - "created_at": "2023-01-31T16:38:49.244Z", - "rule": "regex", - "id": "DEFAULT", - "value": ".*", - "order": 1, - "policy": "include", - } - ], - "validation": {"state": "valid", "errors": []}, - }, - } -] -DEFAULT_PIPELINE = { - "version": 1, - "description": "For testing", - "processors": [ - { - "remove": { - "tag": "remove_meta_fields", - "description": "Remove meta fields", - "field": [ - "_attachment", - "_attachment_indexed_chars", - "_extracted_attachment", - "_extract_binary_content", - "_reduce_whitespace", - "_run_ml_inference", - ], - "ignore_missing": True, - } - } - ], -} - -# This should be updated when ftest starts to hate on it for any reason -# Later we won't need it at all -JOB_INDEX_MAPPINGS = { - "dynamic": "false", - "_meta": {"version": 1}, - "properties": { - "cancelation_requested_at": {"type": "date"}, - "canceled_at": {"type": "date"}, - "completed_at": {"type": "date"}, - "connector": { - "properties": { - "configuration": {"type": "object"}, - "filtering": { - "properties": { - "advanced_snippet": { - "properties": { - "created_at": {"type": "date"}, - "updated_at": {"type": "date"}, - "value": {"type": "object"}, - } - }, - "domain": {"type": "keyword"}, - "rules": { - "properties": { - "created_at": {"type": "date"}, - "field": {"type": "keyword"}, - "id": {"type": "keyword"}, - "order": {"type": "short"}, - "policy": {"type": "keyword"}, - "rule": {"type": "keyword"}, - "updated_at": {"type": "date"}, - "value": {"type": "keyword"}, - } - }, - "warnings": { - "properties": { - "ids": {"type": "keyword"}, - "messages": {"type": "text"}, - } - }, - } - }, - "id": {"type": "keyword"}, - "index_name": {"type": "keyword"}, - "language": {"type": "keyword"}, - "pipeline": { - "properties": { - "extract_binary_content": {"type": "boolean"}, - "name": {"type": "keyword"}, - "reduce_whitespace": {"type": "boolean"}, - "run_ml_inference": {"type": "boolean"}, - } - }, - "service_type": {"type": "keyword"}, - } - }, - "created_at": {"type": "date"}, - "deleted_document_count": {"type": "integer"}, - "error": {"type": "keyword"}, - "indexed_document_count": {"type": "integer"}, - "indexed_document_volume": {"type": "integer"}, - "last_seen": {"type": "date"}, - "metadata": {"type": "object"}, - "started_at": {"type": "date"}, - "status": {"type": "keyword"}, - "total_document_count": {"type": "integer"}, - "trigger_method": {"type": "keyword"}, - "worker_hostname": {"type": "keyword"}, - }, -} - # Main group def print_version(ctx, param, value): if not value or ctx.resilient_parsing: @@ -193,10 +35,15 @@ def print_version(ctx, param, value): @click.option('-c', '--config', type=click.File('rb')) @click.pass_context def cli(ctx, config): + ctx.ensure_object(dict) if config: - ctx.ensure_object(dict) ctx.obj['config'] = yaml.safe_load(config) - pass + elif os.path.isfile(CONFIG_FILE_PATH): + with open(CONFIG_FILE_PATH, "r") as f: + ctx.obj['config'] = yaml.safe_load(f.read()) + else: + # todo raise an exception + pass # ctx = click.get_current_context() # click.echo(ctx.get_help()) @@ -217,7 +64,7 @@ def login(host, username, password): click.echo('') click.echo(click.style("Authentication failed. Please check your credentials.", fg='red'), err=True) return - + cli.add_command(login) @@ -226,11 +73,43 @@ def login(host, username, password): @click.group(invoke_without_command=True) @click.pass_obj def connector(obj): - click.echo('testing connector') + pass -cli.add_command(connector) +@click.command() +@click.pass_obj +def list(obj): + click.echo("") + connector = Connector(config=obj['config']['elasticsearch']) + coro = connector.list_connectors() + + try: + connectors = asyncio.get_event_loop().run_until_complete(coro) + if len(connectors) == 0: + click.echo("No connectors found") + + click.echo(f"Showing {len(connectors)} connectors \n") + + table_rows = [] + for connector in connectors: + + formatted_connector = [ + click.style(connector.id, blink=True, fg="green"), + click.style(connector.index_name, blink=True, fg="white"), + click.style(connector.service_type, blink=True, fg='white'), + click.style(connector.status.value, fg="white"), + click.style(connector.last_sync_status.value, fg="white") + ] + table_rows.append(formatted_connector) + + click.echo(tabulate(table_rows, headers=["ID", "Index name", "Service type", "Status", "Last sync job status"])) + except asyncio.CancelledError as e: + click.echo(e) +connector.add_command(list) + +cli.add_command(connector) + # Index group @click.group(invoke_without_command=True) @click.pass_obj @@ -247,72 +126,6 @@ def job(obj): cli.add_command(job) - -# def run(args): -# """Loads the config file, sets the logger and executes an action. -# Actions: -# - list: prints out a list of all connectors and exits -# - poll: starts the event loop and run forever (default) -# """ - -# print(f"Framework version is {__version__}") - -# # # load config -# config = {} -# try: -# config = load_config(args.config_file) -# import pdb; pdb.set_trace(); -# except Exception as e: -# # # If something goes wrong while parsing config file, we still want -# # # to set up the logger so that Cloud deployments report errors to -# # # logs properly -# print(f"Could not parse {args.config_file}:\n{e}") -# raise - -# # import pdb; pdb.set_trace(); -# # # just display the list of connectors -# # if args.action == ["list"]: -# # print("Registered connectors:") -# # for source in get_source_klasses(config): -# # print(f"- {source.name}") -# # print("Bye") -# # return 0 - -# # if args.action == ["config"]: -# # service_type = args.service_type -# # print(f"Getting default configuration for service type {service_type}") - -# # source_list = config["sources"] -# # if service_type not in source_list: -# # print(f"Could not find a connector for service type {service_type}") -# # return -1 - -# # source_klass = get_source_klass(source_list[service_type]) -# # print(json.dumps(source_klass.get_simple_configuration(), indent=2)) -# # print("Bye") -# # return 0 - -# # if "list" in args.action: -# # print("Cannot use the `list` action with other actions") -# # return -1 - -# # if "config" in args.action: -# # print("Cannot use the `config` action with other actions") -# # return -1 - -# # loop = get_event_loop(args.uvloop) -# # coro = _start_service(args.action, config, loop) - -# # try: -# # return loop.run_until_complete(coro) -# # except asyncio.CancelledError: -# # return 0 -# # finally: -# # logger.info("Bye") - -# return -1 - - def main(args=None): cli() diff --git a/requirements/framework.txt b/requirements/framework.txt index 5c90426ae..651a3aebe 100644 --- a/requirements/framework.txt +++ b/requirements/framework.txt @@ -30,3 +30,5 @@ ldap3==2.9.1 lxml==4.9.3 pywinrm==0.4.3 click==8.1.7 +colorama==0.4.6 +tabulate==0.9.0 From f3c14fe12d12b07cb91de9e1b3daeeeee66ac808 Mon Sep 17 00:00:00 2001 From: Klim Markelov Date: Thu, 14 Sep 2023 11:28:46 +0200 Subject: [PATCH 08/33] Add index cli (#1637) --- connectors/cli/index.py | 43 ++++++++++++++++++++++++++++ connectors/connectors_cli.py | 55 +++++++++++++++++++++++++++++++++++- connectors/es/client.py | 6 ++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 connectors/cli/index.py diff --git a/connectors/cli/index.py b/connectors/cli/index.py new file mode 100644 index 000000000..51521696c --- /dev/null +++ b/connectors/cli/index.py @@ -0,0 +1,43 @@ +from connectors.es import ESClient + +import asyncio + +class Index: + def __init__(self, config): + self.elastic_config = config + self.es_client = ESClient(self.elastic_config) + + def list_indices(self): + return asyncio.run(self.__list_indices())['indices'] + + def clean(self, index_name): + return asyncio.run(self.__clean_index(index_name)) + + def delete(self, index_name): + return asyncio.run(self.__delete_index(index_name)) == None + + async def __list_indices(self): + try: + return await self.es_client.list_indices() + except: + # TODO raise + return [] + finally: + await self.es_client.close() + + async def __clean_index(self, index_name): + try: + return await self.es_client.clean_index(index_name) + except: + return False + finally: + await self.es_client.close() + + async def __delete_index(self, index_name): + try: + return await self.es_client.delete_indices([index_name]) + except: + return False + finally: + await self.es_client.close() + diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 6dd9db52d..3e2ceab08 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -14,6 +14,7 @@ import click import yaml from connectors.cli.auth import Auth, CONFIG_FILE_PATH +from connectors.cli.index import Index from connectors import __version__ # NOQA from connectors.cli.connector import Connector from tabulate import tabulate @@ -114,7 +115,59 @@ def list(obj): @click.group(invoke_without_command=True) @click.pass_obj def index(obj): - click.echo('testing index') + pass + +@click.command(help="Show all indices") +@click.pass_obj +def list(obj): + click.echo("") + index = Index(config=obj['config']['elasticsearch']) + indices = index.list_indices() + + if len(indices) == 0: + click.echo("No indices found") + + click.echo(f"Showing {len(indices)} indices \n") + table_rows = [] + for index in indices: + formatted_index = [ + click.style(index, blink=True, fg="white"), + click.style(indices[index]['primaries']['docs']['count']) + ] + table_rows.append(formatted_index) + + click.echo(tabulate(table_rows, headers=["Index name", "Number of documents"])) + + +index.add_command(list) + +@click.command(help="Remove all documents from the index") +@click.pass_obj +@click.argument('index', nargs=1) +def clean(obj, index): + index_cli = Index(config=obj['config']['elasticsearch']) + click.confirm(click.style('Are you sure you want to clean ' + index + '?😱', fg='yellow'), abort=True) + if index_cli.clean(index): + click.echo(click.style("The index has been cleaned. You're breathtaking.", fg='green')) + else: + click.echo('') + click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + +index.add_command(clean) + +@click.command(help="Delete an index") +@click.pass_obj +@click.argument('index', nargs=1) +def delete(obj, index): + index_cli = Index(config=obj['config']['elasticsearch']) + click.confirm(click.style('Are you sure you want to delete ' + index + '?😱', fg='yellow'), abort=True) + if index_cli.delete(index): + click.echo(click.style("The index has been deleted. You're breathtaking.", fg='green')) + else: + click.echo('') + click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + +index.add_command(delete) cli.add_command(index) diff --git a/connectors/es/client.py b/connectors/es/client.py index 52b054669..cf305d012 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -176,6 +176,12 @@ async def ensure_exists(self, indices=None): async def delete_indices(self, indices): await self.client.indices.delete(index=indices, ignore_unavailable=True) + async def clean_index(self, index_name): + return await self.client.delete_by_query(index=index_name, body={ 'query': { 'match_all': {} } }, ignore_unavailable=True) + + async def list_indices(self): + return await self.client.indices.stats(index='search-*') + def with_concurrency_control(retries=3): def wrapper(func): From f0226b85f403ebaacc09585a618e8ae9d8be64d4 Mon Sep 17 00:00:00 2001 From: Klim Markelov Date: Thu, 14 Sep 2023 17:02:35 +0200 Subject: [PATCH 09/33] Introduce Job cli (#1643) --- connectors/cli/job.py | 88 ++++++++++++++++++++++++++++++++++++ connectors/connectors_cli.py | 71 +++++++++++++++++++++++++++-- 2 files changed, 156 insertions(+), 3 deletions(-) create mode 100644 connectors/cli/job.py diff --git a/connectors/cli/job.py b/connectors/cli/job.py new file mode 100644 index 000000000..ac5e53942 --- /dev/null +++ b/connectors/cli/job.py @@ -0,0 +1,88 @@ +from connectors.es.client import ESClient +from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX +from connectors.protocol import SyncJobIndex +from connectors.protocol import ConnectorIndex +from connectors.protocol import Sort, JobTriggerMethod, JobType +import asyncio + +class Job: + def __init__(self, config): + self.config = config + self.es_client = ESClient(self.config) + self.sync_job_index = SyncJobIndex(self.config) + self.connector_index = ConnectorIndex(self.config) + + def list_jobs(self, connector_id=None, index_name=None, job_id=None): + return asyncio.run(self.__async_list_jobs(connector_id, index_name, job_id)) + + def cancel(self, connector_id=None, index_name=None, job_id=None): + return asyncio.run(self.__async_cancel_jobs(connector_id, index_name, job_id)) + + def start(self, connector_id, job_type): + return asyncio.run(self.__async_start(connector_id, job_type)) + + async def __async_start(self, connector_id, job_type): + try: + connector = await self.connector_index.fetch_by_id(connector_id) + await self.sync_job_index.create( + connector=connector, + trigger_method=JobTriggerMethod.ON_DEMAND, + job_type=JobType(job_type), + ) + + return True + except Exception as e: + raise e + finally: + await self.sync_job_index.close() + await self.connector_index.close() + await self.es_client.close() + + async def __async_list_jobs(self, connector_id, index_name, job_id): + try: + await self.es_client.ensure_exists( + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + ) + jobs = self.sync_job_index.get_all_docs( + query=self.__job_list_query(connector_id, index_name, job_id), + sort=self.__job_list_sort(), + ) + formatted_jobs = [] + async for job in jobs: + formatted_jobs.append(job) + + return formatted_jobs + + # TODO catch exceptions + finally: + await self.sync_job_index.close() + await self.es_client.close() + + async def __async_cancel_jobs(self, connector_id, index_name, job_id): + try: + jobs = await self.__async_list_jobs(connector_id, index_name, job_id) + + for job in jobs: + await job.cancel() + + return True + except: + return False + finally: + await self.sync_job_index.close() + await self.es_client.close() + + def __job_list_query(self, connector_id, index_name, job_id): + if job_id: + return { "bool": { "must": [{ "term": { "_id": job_id } }] } } + + if index_name: + return { "bool": { "filter": [{ "term": { "connector.index_name": index_name } }] } } + + if connector_id: + return { "bool": { "must": [{ "term": { "connector.id": connector_id } }] } } + + return None + + def __job_list_sort(self): + return [{"created_at": Sort.ASC.value}] diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 3e2ceab08..9a0cc551f 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -15,6 +15,7 @@ import yaml from connectors.cli.auth import Auth, CONFIG_FILE_PATH from connectors.cli.index import Index +from connectors.cli.job import Job from connectors import __version__ # NOQA from connectors.cli.connector import Connector from tabulate import tabulate @@ -148,7 +149,7 @@ def clean(obj, index): index_cli = Index(config=obj['config']['elasticsearch']) click.confirm(click.style('Are you sure you want to clean ' + index + '?😱', fg='yellow'), abort=True) if index_cli.clean(index): - click.echo(click.style("The index has been cleaned. You're breathtaking.", fg='green')) + click.echo(click.style("The index has been cleaned. You're awesome.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) @@ -162,7 +163,7 @@ def delete(obj, index): index_cli = Index(config=obj['config']['elasticsearch']) click.confirm(click.style('Are you sure you want to delete ' + index + '?😱', fg='yellow'), abort=True) if index_cli.delete(index): - click.echo(click.style("The index has been deleted. You're breathtaking.", fg='green')) + click.echo(click.style("The index has been deleted. You're amazing.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) @@ -175,7 +176,71 @@ def delete(obj, index): @click.group(invoke_without_command=True) @click.pass_obj def job(obj): - click.echo('testing job') + pass + + +@click.command(help="Start a job. -i to pass connector id, -t to pass job type") +@click.pass_obj +@click.option('-i') +@click.option('-t') +def start(obj, i, t): + job_cli = Job(config=obj['config']['elasticsearch']) + click.echo('Starting a job...') + if job_cli.start(connector_id=i, job_type=t): + click.echo(click.style("The job has been started. You're phenomenous!", fg='green')) + else: + click.echo('') + click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + +job.add_command(start) + +@click.command(help="List of jobs sorted by date. -i to pass connector name, -n to pass index name, -j to pass job id") +@click.pass_obj +@click.option('-i') +@click.option('-n') +@click.option('-j') +def list(obj, i, n, j): + job_cli = Job(config=obj['config']['elasticsearch']) + jobs = job_cli.list_jobs(connector_id=i, index_name=n, job_id=j) + + if len(jobs) == 0: + click.echo("No jobs found") + + click.echo(f"Showing {len(jobs)} jobs \n") + table_rows = [] + for job in jobs: + formatted_job = [ + click.style(job.id, blink=True, fg="green"), + click.style(job.connector_id, blink=True, fg="white"), + click.style(job.index_name, blink=True, fg="white"), + click.style(job.status.value, blink=True, fg="white"), + click.style(job.job_type.value, blink=True, fg="white"), + click.style(job.indexed_document_count, blink=True, fg="white"), + click.style(job.indexed_document_volume, blink=True, fg="white"), + click.style(job.deleted_document_count, blink=True, fg="white"), + ] + table_rows.append(formatted_job) + + click.echo(tabulate(table_rows, headers=["Job id", "Connector id", "Index name", "Job status", "Job type", "Documents indexed", "Volume documents indexed (MiB)", "Documents deleted"])) + +job.add_command(list) + +@click.command(help="Cancel jobs. -i to pass connector name, -n to pass index name, -j to pass job id") +@click.pass_obj +@click.option('-i') +@click.option('-n') +@click.option('-j') +def cancel(obj, i, n, j): + job_cli = Job(config=obj['config']['elasticsearch']) + click.confirm(click.style('Are you sure you want to cancel jobs?😱', fg='yellow'), abort=True) + click.echo('Deleting jobs...') + if job_cli.cancel(connector_id=i, index_name=n, job_id=j): + click.echo(click.style("All jobs have been cancelled. You're incredible!", fg='green')) + else: + click.echo('') + click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + +job.add_command(cancel) cli.add_command(job) From 1cd7644847b655b96ae03889dbc2887473ab00a7 Mon Sep 17 00:00:00 2001 From: Dmitriy Burlutskiy Date: Thu, 14 Sep 2023 17:08:43 +0200 Subject: [PATCH 10/33] Introduce connector create command (#1645) --- connectors/cli/connector.py | 165 +++++++++++++++++++++++++++++++++++ connectors/connectors_cli.py | 52 ++++++++++- connectors/es/client.py | 3 + setup.py | 2 +- 4 files changed, 219 insertions(+), 3 deletions(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 444d2e7e6..ef008cc97 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -1,6 +1,14 @@ from connectors.es.client import ESClient from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX from connectors.protocol import ConnectorIndex +from connectors.source import get_source_klass +from collections import OrderedDict +from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings +from connectors.utils import iso_utc +import asyncio + +class IndexAlreadyExists(Exception): + pass class Connector: def __init__(self, config): @@ -38,3 +46,160 @@ async def ping(self): else: return False + + def service_type_configuration(self, source_class): + source_klass = get_source_klass(source_class) + configuration = source_klass.get_default_configuration() + + return OrderedDict(sorted(configuration.items(), key=lambda x: x[1]['order'])) + + def create(self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE): + return asyncio.run(self.__create(index_name, service_type, configuration, language)) + + async def __create(self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE): + try: + return await asyncio.gather( + self.__create_search_index(index_name, language), + self.__create_connector(index_name, service_type, configuration, language) + ) + except Exception as e: + raise e + finally: + await self.es_client.close() + + async def __create_search_index(self, index_name, language): + mappings = Mappings.default_text_fields_mappings( + is_connectors_index=True, + ) + + settings = Settings( + language_code=language, analysis_icu=False + ).to_hash() + + settings["auto_expand_replicas"] = '0-3' + settings["number_of_shards"] = 2 + + try: + await self.es_client.client.indices.create(index=index_name, mappings=mappings, settings=settings) + except Exception as e: + # todo handle exception + raise e + + async def __create_connector(self, index_name, service_type, configuration, language): + try: + await self.es_client.ensure_exists( + indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] + ) + timestamp = iso_utc() + + doc = { + "api_key_id": "", + "configuration": configuration, + "index_name": index_name, + "service_type": service_type, + "status": "configured", # TODO use a predefined constant + "is_native": False, # figure out how to check if it's native or not + "language": language, + "last_access_control_sync_error": None, + "last_access_control_sync_scheduled_at": None, + "last_access_control_sync_status": None, + "last_sync_status": None, + "last_sync_error": None, + "last_sync_scheduled_at": None, + "last_synced": None, + "last_seen": None, + "created_at": timestamp, + "updated_at": timestamp, + "filtering": self.default_filtering(timestamp), + "scheduling": self.default_scheduling(), + "custom_scheduling": {}, + "pipeline": { + "extract_binary_content": True, + "name": "ent-search-generic-ingestion", + "reduce_whitespace": True, + "run_ml_inference": True + }, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0 + } + + connector = await self.connector_index.index(doc) + return connector['_id'] + except Exception as e: + raise e + finally: + + await self.connector_index.close() + + def default_scheduling(self): + return ( + { + "access_control": { + "enabled": False, + "interval": "0 0 0 * * ?" + }, + "full": { + "enabled": False, + "interval": "0 0 0 * * ?" + }, + "incremental": { + "enabled": False, + "interval": "0 0 0 * * ?" + } + } + ) + + def default_filtering(self, timestamp): + return ( + [ + { + "active": { + "advanced_snippet": { + "created_at": timestamp, + "updated_at": timestamp, + "value": {} + }, + "rules": [ + { + "created_at": timestamp, + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": timestamp, + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + }, + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "created_at": timestamp, + "updated_at": timestamp, + "value": {} + }, + "rules": [ + { + "created_at": timestamp, + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": timestamp, + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + } + } + ] + ) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 9a0cc551f..f3b8e5c2d 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -21,6 +21,25 @@ from tabulate import tabulate import os +SERVICE_TYPES = { + 'mongodb': 'connectors.sources.mongo:MongoDataSource', + 's3': 'connectors.sources.s3:S3DataSource', + 'dir': 'connectors.sources.directory:DirectoryDataSource', + 'mysql': 'connectors.sources.mysql:MySqlDataSource', + 'network_drive': 'connectors.sources.network_drive:NASDataSource', + 'google_cloud_storage': 'connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource', + 'azure_blob_storage': 'connectors.sources.azure_blob_storage:AzureBlobStorageDataSource', + 'postgresql': 'connectors.sources.postgresql:PostgreSQLDataSource', + 'oracle': 'connectors.sources.oracle:OracleDataSource', + 'sharepoint_server': 'connectors.sources.sharepoint_server:SharepointServerDataSource', + 'mssql': 'connectors.sources.mssql:MSSQLDataSource', + 'jira': 'connectors.sources.jira:JiraDataSource', + 'confluence': 'connectors.sources.confluence:ConfluenceDataSource', + 'dropbox': 'connectors.sources.dropbox:DropboxDataSource', + 'servicenow': 'connectors.sources.servicenow:ServiceNowDataSource', + 'sharepoint_online': 'connectors.sources.sharepoint_online:SharepointOnlineDataSource', + 'github': 'connectors.sources.github:GitHubDataSource' +} __all__ = ["main"] @@ -77,7 +96,7 @@ def login(host, username, password): def connector(obj): pass -@click.command() +@click.command(help="List all existing connectors") @click.pass_obj def list(obj): click.echo("") @@ -107,7 +126,36 @@ def list(obj): except asyncio.CancelledError as e: click.echo(e) +# TODO add language prompt +@click.command(help="Creates a new connector and a search index") +@click.option('--index_name', prompt=f"{click.style('?', blink=True, fg='green')} Search index name (search-)") +@click.option('--service_type', prompt=f"{click.style('?', blink=True, fg='green')} Service type", type=click.Choice(SERVICE_TYPES.keys(), case_sensitive=False)) +@click.pass_obj +def create(obj, index_name, service_type): + index_name = f"search-{index_name}" + connector = Connector(obj['config']['elasticsearch']) + configuration = connector.service_type_configuration(source_class=SERVICE_TYPES[service_type]) + + prompt = lambda : click.prompt(f"{click.style('?', blink=True, fg='green')} {item['label']}", default=item.get('value', None), hide_input=(True if item.get('sensitive') == True else False)) + + # first fill in the fields that do not depend on other fields + for key, item in configuration.items(): + if 'depends_on' in item: + continue + + configuration[key]['value'] = prompt() + for key, item in configuration.items(): + if not 'depends_on' in item: + continue + + if all(configuration[field_item['field']]['value'] == field_item['value'] for field_item in item['depends_on']): + configuration[key]['value'] = prompt() + + result = connector.create(index_name, service_type, configuration) + click.echo("Connector (ID: " + click.style(result[1], fg="green") + ", service_type: " + click.style(service_type, fg='green') + ") has been created!") + +connector.add_command(create) connector.add_command(list) cli.add_command(connector) @@ -160,7 +208,7 @@ def clean(obj, index): @click.pass_obj @click.argument('index', nargs=1) def delete(obj, index): - index_cli = Index(config=obj['config']['elasticsearch']) + index_cli = Index(config=obj['config']['elasticsearch']) click.confirm(click.style('Are you sure you want to delete ' + index + '?😱', fg='yellow'), abort=True) if index_cli.delete(index): click.echo(click.style("The index has been deleted. You're amazing.", fg='green')) diff --git a/connectors/es/client.py b/connectors/es/client.py index cf305d012..db29d55e0 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -182,6 +182,9 @@ async def clean_index(self, index_name): async def list_indices(self): return await self.client.indices.stats(index='search-*') + def client(self): + return self.client + def with_concurrency_control(retries=3): def wrapper(func): diff --git a/setup.py b/setup.py index 371bd25ec..3178b5f8d 100644 --- a/setup.py +++ b/setup.py @@ -97,7 +97,7 @@ def read_reqs(req_file): install_requires=install_requires, entry_points=""" [console_scripts] - elastic-ingest = connectors.cli:main + elastic-ingest = connectors.old_cli:main fake-kibana = connectors.kibana:main connectors = connectors.connectors_cli:main """, From 860846864d1ffa2d78b2fd6f13705f80c83f646f Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Mon, 18 Sep 2023 15:15:57 +0200 Subject: [PATCH 11/33] Small improvements --- connectors/cli/connector.py | 2 +- connectors/cli/job.py | 16 ++++++++------- connectors/connectors_cli.py | 39 +++++++++++++++++++----------------- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index ef008cc97..93c607d56 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -98,7 +98,7 @@ async def __create_connector(self, index_name, service_type, configuration, lang "index_name": index_name, "service_type": service_type, "status": "configured", # TODO use a predefined constant - "is_native": False, # figure out how to check if it's native or not + "is_native": True, # figure out how to check if it's native or not "language": language, "last_access_control_sync_error": None, "last_access_control_sync_scheduled_at": None, diff --git a/connectors/cli/job.py b/connectors/cli/job.py index ac5e53942..bb838c055 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -5,6 +5,8 @@ from connectors.protocol import Sort, JobTriggerMethod, JobType import asyncio +from connectors.protocol import JobStatus + class Job: def __init__(self, config): self.config = config @@ -29,7 +31,7 @@ async def __async_start(self, connector_id, job_type): trigger_method=JobTriggerMethod.ON_DEMAND, job_type=JobType(job_type), ) - + return True except Exception as e: raise e @@ -50,21 +52,21 @@ async def __async_list_jobs(self, connector_id, index_name, job_id): formatted_jobs = [] async for job in jobs: formatted_jobs.append(job) - + return formatted_jobs # TODO catch exceptions finally: await self.sync_job_index.close() await self.es_client.close() - + async def __async_cancel_jobs(self, connector_id, index_name, job_id): try: jobs = await self.__async_list_jobs(connector_id, index_name, job_id) for job in jobs: - await job.cancel() - + await job._terminate(JobStatus.CANCELED) + return True except: return False @@ -78,10 +80,10 @@ def __job_list_query(self, connector_id, index_name, job_id): if index_name: return { "bool": { "filter": [{ "term": { "connector.index_name": index_name } }] } } - + if connector_id: return { "bool": { "must": [{ "term": { "connector.id": connector_id } }] } } - + return None def __job_list_sort(self): diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index f3b8e5c2d..5a23c0d5f 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -91,7 +91,7 @@ def login(host, username, password): cli.add_command(login) # Connector group -@click.group(invoke_without_command=True) +@click.group(invoke_without_command=True, help="Connectors mangement") @click.pass_obj def connector(obj): pass @@ -99,14 +99,15 @@ def connector(obj): @click.command(help="List all existing connectors") @click.pass_obj def list(obj): - click.echo("") connector = Connector(config=obj['config']['elasticsearch']) coro = connector.list_connectors() try: connectors = asyncio.get_event_loop().run_until_complete(coro) + click.echo("") if len(connectors) == 0: click.echo("No connectors found") + return click.echo(f"Showing {len(connectors)} connectors \n") @@ -169,12 +170,14 @@ def index(obj): @click.command(help="Show all indices") @click.pass_obj def list(obj): - click.echo("") index = Index(config=obj['config']['elasticsearch']) indices = index.list_indices() + click.echo("") + if len(indices) == 0: click.echo("No indices found") + return click.echo(f"Showing {len(indices)} indices \n") table_rows = [] @@ -227,12 +230,12 @@ def job(obj): pass -@click.command(help="Start a job. -i to pass connector id, -t to pass job type") +@click.command(help="Start a sync job.") @click.pass_obj -@click.option('-i') -@click.option('-t') +@click.option('-i', help='Connector ID', required=True) +@click.option('-t', help='Job type', type=click.Choice(['full', 'incremental', 'access_control'], case_sensitive=False), required=True) def start(obj, i, t): - job_cli = Job(config=obj['config']['elasticsearch']) + job_cli = Job(config=obj['config']['elasticsearch']) click.echo('Starting a job...') if job_cli.start(connector_id=i, job_type=t): click.echo(click.style("The job has been started. You're phenomenous!", fg='green')) @@ -242,13 +245,13 @@ def start(obj, i, t): job.add_command(start) -@click.command(help="List of jobs sorted by date. -i to pass connector name, -n to pass index name, -j to pass job id") +@click.command(help="List of jobs sorted by date.") @click.pass_obj -@click.option('-i') -@click.option('-n') -@click.option('-j') +@click.option('-i', help='Connector ID') +@click.option('-n', help='Index name') +@click.option('-j', help='Job id') def list(obj, i, n, j): - job_cli = Job(config=obj['config']['elasticsearch']) + job_cli = Job(config=obj['config']['elasticsearch']) jobs = job_cli.list_jobs(connector_id=i, index_name=n, job_id=j) if len(jobs) == 0: @@ -273,15 +276,15 @@ def list(obj, i, n, j): job.add_command(list) -@click.command(help="Cancel jobs. -i to pass connector name, -n to pass index name, -j to pass job id") +@click.command(help="Cancel a job") @click.pass_obj -@click.option('-i') -@click.option('-n') -@click.option('-j') +@click.option('-i', help='Connector ID') +@click.option('-n', help='Index name') +@click.option('-j', help='Job id', required=True) def cancel(obj, i, n, j): - job_cli = Job(config=obj['config']['elasticsearch']) + job_cli = Job(config=obj['config']['elasticsearch']) click.confirm(click.style('Are you sure you want to cancel jobs?😱', fg='yellow'), abort=True) - click.echo('Deleting jobs...') + click.echo('Canceling jobs...') if job_cli.cancel(connector_id=i, index_name=n, job_id=j): click.echo(click.style("All jobs have been cancelled. You're incredible!", fg='green')) else: From a4427dd75bfb77367f0f3a4449d9e0031b866fef Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Mon, 18 Sep 2023 16:43:04 +0200 Subject: [PATCH 12/33] Fix cancelling jobs --- connectors/cli/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/cli/job.py b/connectors/cli/job.py index bb838c055..6ca10b43b 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -65,7 +65,7 @@ async def __async_cancel_jobs(self, connector_id, index_name, job_id): jobs = await self.__async_list_jobs(connector_id, index_name, job_id) for job in jobs: - await job._terminate(JobStatus.CANCELED) + await job._terminate(JobStatus.CANCELING) return True except: From cfa251ef9ca98f2d0fbf7116242936d6c472087d Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 18 Oct 2023 12:11:12 +0200 Subject: [PATCH 13/33] Remove emojis --- connectors/connectors_cli.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 5a23c0d5f..168e0ff38 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -66,10 +66,6 @@ def cli(ctx, config): # todo raise an exception pass - # ctx = click.get_current_context() - # click.echo(ctx.get_help()) - # ctx.exit() - @click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") @click.option('--host', prompt="Elastic host") @click.option('--username', prompt="Username") @@ -78,9 +74,9 @@ def cli(ctx, config): def login(host, username, password): auth = Auth(host, username, password) if auth.is_config_present(): - click.confirm(click.style('Config is already present. Are you sure you want to override it?😱', fg='yellow'), abort=True) + click.confirm(click.style('Config is already present. Are you sure you want to override it?', fg='yellow'), abort=True) if auth.authenticate(): - click.echo(click.style("Authentication successful. You're breathtaking.", fg='green')) + click.echo(click.style("Authentication successful", fg='green')) else: click.echo('') click.echo(click.style("Authentication failed. Please check your credentials.", fg='red'), err=True) @@ -198,9 +194,9 @@ def list(obj): @click.argument('index', nargs=1) def clean(obj, index): index_cli = Index(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to clean ' + index + '?😱', fg='yellow'), abort=True) + click.confirm(click.style('Are you sure you want to clean ' + index + '?', fg='yellow'), abort=True) if index_cli.clean(index): - click.echo(click.style("The index has been cleaned. You're awesome.", fg='green')) + click.echo(click.style("The index has been cleaned.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) @@ -212,9 +208,9 @@ def clean(obj, index): @click.argument('index', nargs=1) def delete(obj, index): index_cli = Index(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to delete ' + index + '?😱', fg='yellow'), abort=True) + click.confirm(click.style('Are you sure you want to delete ' + index + '?', fg='yellow'), abort=True) if index_cli.delete(index): - click.echo(click.style("The index has been deleted. You're amazing.", fg='green')) + click.echo(click.style("The index has been deleted.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) @@ -238,7 +234,7 @@ def start(obj, i, t): job_cli = Job(config=obj['config']['elasticsearch']) click.echo('Starting a job...') if job_cli.start(connector_id=i, job_type=t): - click.echo(click.style("The job has been started. You're phenomenous!", fg='green')) + click.echo(click.style("The job has been started.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) @@ -283,10 +279,10 @@ def list(obj, i, n, j): @click.option('-j', help='Job id', required=True) def cancel(obj, i, n, j): job_cli = Job(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to cancel jobs?😱', fg='yellow'), abort=True) + click.confirm(click.style('Are you sure you want to cancel jobs?', fg='yellow'), abort=True) click.echo('Canceling jobs...') if job_cli.cancel(connector_id=i, index_name=n, job_id=j): - click.echo(click.style("All jobs have been cancelled. You're incredible!", fg='green')) + click.echo(click.style("All jobs have been cancelled.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) From 24bdf1108e45ff64f037600275450dab4408b91e Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 18 Oct 2023 16:18:27 +0200 Subject: [PATCH 14/33] Drop index_name and job_id params --- connectors/connectors_cli.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 168e0ff38..ee52fdae4 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -243,12 +243,10 @@ def start(obj, i, t): @click.command(help="List of jobs sorted by date.") @click.pass_obj -@click.option('-i', help='Connector ID') -@click.option('-n', help='Index name') -@click.option('-j', help='Job id') -def list(obj, i, n, j): +@click.argument("connector_id", nargs=1) +def list(obj, connector_id): job_cli = Job(config=obj['config']['elasticsearch']) - jobs = job_cli.list_jobs(connector_id=i, index_name=n, job_id=j) + jobs = job_cli.list_jobs(connector_id=connector_id) if len(jobs) == 0: click.echo("No jobs found") From 35a414a65973be522314b342be74c0345a967e60 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 18 Oct 2023 16:24:44 +0200 Subject: [PATCH 15/33] Drop connector_id and index_id for job cancelling --- connectors/connectors_cli.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index ee52fdae4..4aa896e54 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -272,15 +272,13 @@ def list(obj, connector_id): @click.command(help="Cancel a job") @click.pass_obj -@click.option('-i', help='Connector ID') -@click.option('-n', help='Index name') -@click.option('-j', help='Job id', required=True) -def cancel(obj, i, n, j): +@click.argument('job_id') +def cancel(obj, job_id): job_cli = Job(config=obj['config']['elasticsearch']) click.confirm(click.style('Are you sure you want to cancel jobs?', fg='yellow'), abort=True) click.echo('Canceling jobs...') - if job_cli.cancel(connector_id=i, index_name=n, job_id=j): - click.echo(click.style("All jobs have been cancelled.", fg='green')) + if job_cli.cancel(job_id=job_id): + click.echo(click.style("The jobs is cancelling.", fg='green')) else: click.echo('') click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) From 76cf92108b2c555ab4691dcab0907850310e667c Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 21 Nov 2023 17:39:28 +0100 Subject: [PATCH 16/33] Print help page when no subcommands provided --- connectors/connectors_cli.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 4aa896e54..c5139e74d 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -49,13 +49,16 @@ def print_version(ctx, param, value): return click.echo(__version__) -# @TODO print help page when no arguments passed @click.group(invoke_without_command=True) -@click.option('-v', '--version', is_flag=True, callback=print_version, - expose_value=False, is_eager=True) +@click.version_option(__version__, '-v', '--version', message="%(version)s") @click.option('-c', '--config', type=click.File('rb')) @click.pass_context def cli(ctx, config): + # print help page if no subcommands provided + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + return + ctx.ensure_object(dict) if config: ctx.obj['config'] = yaml.safe_load(config) @@ -63,8 +66,7 @@ def cli(ctx, config): with open(CONFIG_FILE_PATH, "r") as f: ctx.obj['config'] = yaml.safe_load(f.read()) else: - # todo raise an exception - pass + raise FileNotFoundError(f"{CONFIG_FILE_PATH} is not found") @click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") @click.option('--host', prompt="Elastic host") From cec531acd6eacf308b08a677c852e31661b04f0b Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 21 Nov 2023 21:52:48 +0100 Subject: [PATCH 17/33] Add language option --- connectors/connectors_cli.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index c5139e74d..e8f1c22f8 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -21,6 +21,8 @@ from tabulate import tabulate import os +from connectors.es.settings import Settings + SERVICE_TYPES = { 'mongodb': 'connectors.sources.mongo:MongoDataSource', 's3': 'connectors.sources.s3:S3DataSource', @@ -125,12 +127,21 @@ def list(obj): except asyncio.CancelledError as e: click.echo(e) -# TODO add language prompt +language_keys = [*Settings().language_data.keys()] + +# Support blank values for languge +def validate_language(ctx, param, value): + if value not in language_keys: + return None + + return value + @click.command(help="Creates a new connector and a search index") @click.option('--index_name', prompt=f"{click.style('?', blink=True, fg='green')} Search index name (search-)") @click.option('--service_type', prompt=f"{click.style('?', blink=True, fg='green')} Service type", type=click.Choice(SERVICE_TYPES.keys(), case_sensitive=False)) +@click.option('--index_language', prompt=f"{click.style('?', blink=True, fg='green')} Index language (leave empty for universal) {language_keys}", default='', callback=validate_language) @click.pass_obj -def create(obj, index_name, service_type): +def create(obj, index_name, service_type, index_language): index_name = f"search-{index_name}" connector = Connector(obj['config']['elasticsearch']) configuration = connector.service_type_configuration(source_class=SERVICE_TYPES[service_type]) @@ -151,7 +162,7 @@ def create(obj, index_name, service_type): if all(configuration[field_item['field']]['value'] == field_item['value'] for field_item in item['depends_on']): configuration[key]['value'] = prompt() - result = connector.create(index_name, service_type, configuration) + result = connector.create(index_name, service_type, configuration, index_language) click.echo("Connector (ID: " + click.style(result[1], fg="green") + ", service_type: " + click.style(service_type, fg='green') + ") has been created!") connector.add_command(create) From c0b34a6ebf728999484a69dac167b6e9f1cf6c47 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 21 Nov 2023 22:27:10 +0100 Subject: [PATCH 18/33] Autoformat --- connectors/cli/auth.py | 21 ++- connectors/cli/connector.py | 147 ++++++++-------- connectors/cli/index.py | 18 +- connectors/cli/job.py | 30 ++-- connectors/connectors_cli.py | 313 ++++++++++++++++++++++++----------- connectors/es/client.py | 6 +- 6 files changed, 334 insertions(+), 201 deletions(-) diff --git a/connectors/cli/auth.py b/connectors/cli/auth.py index c0967d98c..9c80a6b00 100644 --- a/connectors/cli/auth.py +++ b/connectors/cli/auth.py @@ -1,14 +1,17 @@ -from connectors.es import ESClient - -import yaml import asyncio import os -CONFIG_FILE_PATH = '.cli/config.yml' +import yaml +from elasticsearch import ApiError + +from connectors.es import ESClient + +CONFIG_FILE_PATH = ".cli/config.yml" + class Auth: def __init__(self, host, username, password): - self.elastic_config = { 'host': host, 'username': username, 'password': password } + self.elastic_config = {"host": host, "username": username, "password": password} self.es_client = ESClient(self.elastic_config) def authenticate(self): @@ -20,18 +23,18 @@ def authenticate(self): def is_config_present(self): return os.path.isfile(CONFIG_FILE_PATH) - + async def __ping_es_client(self): try: return await self.es_client.ping() - except: + except ApiError: return False finally: await self.es_client.close() def __save_config(self): - yaml_content = yaml.dump({'elasticsearch': self.elastic_config }) + yaml_content = yaml.dump({"elasticsearch": self.elastic_config}) os.makedirs(os.path.dirname(CONFIG_FILE_PATH), exist_ok=True) - + with open(CONFIG_FILE_PATH, "w") as f: f.write(yaml_content) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 93c607d56..30407fec3 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -1,15 +1,21 @@ -from connectors.es.client import ESClient -from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX -from connectors.protocol import ConnectorIndex -from connectors.source import get_source_klass +import asyncio from collections import OrderedDict + +from connectors.es.client import ESClient from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings +from connectors.protocol import ( + CONCRETE_CONNECTORS_INDEX, + CONCRETE_JOBS_INDEX, + ConnectorIndex, +) +from connectors.source import get_source_klass from connectors.utils import iso_utc -import asyncio + class IndexAlreadyExists(Exception): pass + class Connector: def __init__(self, config): self.config = config @@ -19,7 +25,6 @@ def __init__(self, config): self.connector_index = ConnectorIndex(self.config) - async def list_connectors(self): # TODO move this on top try: @@ -35,7 +40,6 @@ async def list_connectors(self): # TODO catch exceptions finally: - await self.connector_index.close() await self.es_client.close() @@ -46,21 +50,28 @@ async def ping(self): else: return False - def service_type_configuration(self, source_class): source_klass = get_source_klass(source_class) configuration = source_klass.get_default_configuration() - return OrderedDict(sorted(configuration.items(), key=lambda x: x[1]['order'])) + return OrderedDict(sorted(configuration.items(), key=lambda x: x[1]["order"])) - def create(self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE): - return asyncio.run(self.__create(index_name, service_type, configuration, language)) + def create( + self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE + ): + return asyncio.run( + self.__create(index_name, service_type, configuration, language) + ) - async def __create(self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE): + async def __create( + self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE + ): try: return await asyncio.gather( self.__create_search_index(index_name, language), - self.__create_connector(index_name, service_type, configuration, language) + self.__create_connector( + index_name, service_type, configuration, language + ), ) except Exception as e: raise e @@ -72,20 +83,22 @@ async def __create_search_index(self, index_name, language): is_connectors_index=True, ) - settings = Settings( - language_code=language, analysis_icu=False - ).to_hash() + settings = Settings(language_code=language, analysis_icu=False).to_hash() - settings["auto_expand_replicas"] = '0-3' + settings["auto_expand_replicas"] = "0-3" settings["number_of_shards"] = 2 try: - await self.es_client.client.indices.create(index=index_name, mappings=mappings, settings=settings) + await self.es_client.client.indices.create( + index=index_name, mappings=mappings, settings=settings + ) except Exception as e: # todo handle exception raise e - async def __create_connector(self, index_name, service_type, configuration, language): + async def __create_connector( + self, index_name, service_type, configuration, language + ): try: await self.es_client.ensure_exists( indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] @@ -97,8 +110,8 @@ async def __create_connector(self, index_name, service_type, configuration, lang "configuration": configuration, "index_name": index_name, "service_type": service_type, - "status": "configured", # TODO use a predefined constant - "is_native": True, # figure out how to check if it's native or not + "status": "configured", # TODO use a predefined constant + "is_native": True, # figure out how to check if it's native or not "language": language, "last_access_control_sync_error": None, "last_access_control_sync_scheduled_at": None, @@ -117,50 +130,37 @@ async def __create_connector(self, index_name, service_type, configuration, lang "extract_binary_content": True, "name": "ent-search-generic-ingestion", "reduce_whitespace": True, - "run_ml_inference": True + "run_ml_inference": True, }, "last_indexed_document_count": 0, - "last_deleted_document_count": 0 + "last_deleted_document_count": 0, } connector = await self.connector_index.index(doc) - return connector['_id'] + return connector["_id"] except Exception as e: raise e finally: - await self.connector_index.close() def default_scheduling(self): - return ( - { - "access_control": { - "enabled": False, - "interval": "0 0 0 * * ?" - }, - "full": { - "enabled": False, - "interval": "0 0 0 * * ?" - }, - "incremental": { - "enabled": False, - "interval": "0 0 0 * * ?" - } - } - ) + return { + "access_control": {"enabled": False, "interval": "0 0 0 * * ?"}, + "full": {"enabled": False, "interval": "0 0 0 * * ?"}, + "incremental": {"enabled": False, "interval": "0 0 0 * * ?"}, + } def default_filtering(self, timestamp): - return ( - [ - { - "active": { - "advanced_snippet": { - "created_at": timestamp, - "updated_at": timestamp, - "value": {} - }, - "rules": [ - { + return [ + { + "active": { + "advanced_snippet": { + "created_at": timestamp, + "updated_at": timestamp, + "value": {}, + }, + "rules": [ + { "created_at": timestamp, "field": "_", "id": "DEFAULT", @@ -168,23 +168,20 @@ def default_filtering(self, timestamp): "policy": "include", "rule": "regex", "updated_at": timestamp, - "value": ".*" - } - ], - "validation": { - "errors": [], - "state": "valid" + "value": ".*", } + ], + "validation": {"errors": [], "state": "valid"}, + }, + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "created_at": timestamp, + "updated_at": timestamp, + "value": {}, }, - "domain": "DEFAULT", - "draft": { - "advanced_snippet": { - "created_at": timestamp, - "updated_at": timestamp, - "value": {} - }, - "rules": [ - { + "rules": [ + { "created_at": timestamp, "field": "_", "id": "DEFAULT", @@ -192,14 +189,10 @@ def default_filtering(self, timestamp): "policy": "include", "rule": "regex", "updated_at": timestamp, - "value": ".*" - } - ], - "validation": { - "errors": [], - "state": "valid" + "value": ".*", } - } - } - ] - ) + ], + "validation": {"errors": [], "state": "valid"}, + }, + } + ] diff --git a/connectors/cli/index.py b/connectors/cli/index.py index 51521696c..d00c8d534 100644 --- a/connectors/cli/index.py +++ b/connectors/cli/index.py @@ -1,6 +1,9 @@ +import asyncio + +from elasticsearch import ApiError + from connectors.es import ESClient -import asyncio class Index: def __init__(self, config): @@ -8,27 +11,27 @@ def __init__(self, config): self.es_client = ESClient(self.elastic_config) def list_indices(self): - return asyncio.run(self.__list_indices())['indices'] + return asyncio.run(self.__list_indices())["indices"] def clean(self, index_name): return asyncio.run(self.__clean_index(index_name)) def delete(self, index_name): - return asyncio.run(self.__delete_index(index_name)) == None + return asyncio.run(self.__delete_index(index_name)) is None async def __list_indices(self): try: return await self.es_client.list_indices() - except: + except ApiError: # TODO raise return [] finally: await self.es_client.close() - + async def __clean_index(self, index_name): try: return await self.es_client.clean_index(index_name) - except: + except ApiError: return False finally: await self.es_client.close() @@ -36,8 +39,7 @@ async def __clean_index(self, index_name): async def __delete_index(self, index_name): try: return await self.es_client.delete_indices([index_name]) - except: + except ApiError: return False finally: await self.es_client.close() - diff --git a/connectors/cli/job.py b/connectors/cli/job.py index 6ca10b43b..a9380ead0 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -1,11 +1,19 @@ -from connectors.es.client import ESClient -from connectors.protocol import CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX -from connectors.protocol import SyncJobIndex -from connectors.protocol import ConnectorIndex -from connectors.protocol import Sort, JobTriggerMethod, JobType import asyncio -from connectors.protocol import JobStatus +from elasticsearch import ApiError + +from connectors.es.client import ESClient +from connectors.protocol import ( + CONCRETE_CONNECTORS_INDEX, + CONCRETE_JOBS_INDEX, + ConnectorIndex, + JobStatus, + JobTriggerMethod, + JobType, + Sort, + SyncJobIndex, +) + class Job: def __init__(self, config): @@ -68,7 +76,7 @@ async def __async_cancel_jobs(self, connector_id, index_name, job_id): await job._terminate(JobStatus.CANCELING) return True - except: + except ApiError: return False finally: await self.sync_job_index.close() @@ -76,13 +84,15 @@ async def __async_cancel_jobs(self, connector_id, index_name, job_id): def __job_list_query(self, connector_id, index_name, job_id): if job_id: - return { "bool": { "must": [{ "term": { "_id": job_id } }] } } + return {"bool": {"must": [{"term": {"_id": job_id}}]}} if index_name: - return { "bool": { "filter": [{ "term": { "connector.index_name": index_name } }] } } + return { + "bool": {"filter": [{"term": {"connector.index_name": index_name}}]} + } if connector_id: - return { "bool": { "must": [{ "term": { "connector.id": connector_id } }] } } + return {"bool": {"must": [{"term": {"connector.id": connector_id}}]}} return None diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index e8f1c22f8..271e2156a 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -11,49 +11,52 @@ executes the `main` function of this module, which starts the service. """ import asyncio +import os + import click import yaml -from connectors.cli.auth import Auth, CONFIG_FILE_PATH -from connectors.cli.index import Index -from connectors.cli.job import Job -from connectors import __version__ # NOQA -from connectors.cli.connector import Connector from tabulate import tabulate -import os +from connectors import __version__ # NOQA +from connectors.cli.auth import CONFIG_FILE_PATH, Auth +from connectors.cli.connector import Connector +from connectors.cli.index import Index +from connectors.cli.job import Job from connectors.es.settings import Settings SERVICE_TYPES = { - 'mongodb': 'connectors.sources.mongo:MongoDataSource', - 's3': 'connectors.sources.s3:S3DataSource', - 'dir': 'connectors.sources.directory:DirectoryDataSource', - 'mysql': 'connectors.sources.mysql:MySqlDataSource', - 'network_drive': 'connectors.sources.network_drive:NASDataSource', - 'google_cloud_storage': 'connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource', - 'azure_blob_storage': 'connectors.sources.azure_blob_storage:AzureBlobStorageDataSource', - 'postgresql': 'connectors.sources.postgresql:PostgreSQLDataSource', - 'oracle': 'connectors.sources.oracle:OracleDataSource', - 'sharepoint_server': 'connectors.sources.sharepoint_server:SharepointServerDataSource', - 'mssql': 'connectors.sources.mssql:MSSQLDataSource', - 'jira': 'connectors.sources.jira:JiraDataSource', - 'confluence': 'connectors.sources.confluence:ConfluenceDataSource', - 'dropbox': 'connectors.sources.dropbox:DropboxDataSource', - 'servicenow': 'connectors.sources.servicenow:ServiceNowDataSource', - 'sharepoint_online': 'connectors.sources.sharepoint_online:SharepointOnlineDataSource', - 'github': 'connectors.sources.github:GitHubDataSource' + "mongodb": "connectors.sources.mongo:MongoDataSource", + "s3": "connectors.sources.s3:S3DataSource", + "dir": "connectors.sources.directory:DirectoryDataSource", + "mysql": "connectors.sources.mysql:MySqlDataSource", + "network_drive": "connectors.sources.network_drive:NASDataSource", + "google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource", + "azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource", + "postgresql": "connectors.sources.postgresql:PostgreSQLDataSource", + "oracle": "connectors.sources.oracle:OracleDataSource", + "sharepoint_server": "connectors.sources.sharepoint_server:SharepointServerDataSource", + "mssql": "connectors.sources.mssql:MSSQLDataSource", + "jira": "connectors.sources.jira:JiraDataSource", + "confluence": "connectors.sources.confluence:ConfluenceDataSource", + "dropbox": "connectors.sources.dropbox:DropboxDataSource", + "servicenow": "connectors.sources.servicenow:ServiceNowDataSource", + "sharepoint_online": "connectors.sources.sharepoint_online:SharepointOnlineDataSource", + "github": "connectors.sources.github:GitHubDataSource", } __all__ = ["main"] + # Main group def print_version(ctx, param, value): if not value or ctx.resilient_parsing: return click.echo(__version__) + @click.group(invoke_without_command=True) -@click.version_option(__version__, '-v', '--version', message="%(version)s") -@click.option('-c', '--config', type=click.File('rb')) +@click.version_option(__version__, "-v", "--version", message="%(version)s") +@click.option("-c", "--config", type=click.File("rb")) @click.pass_context def cli(ctx, config): # print help page if no subcommands provided @@ -63,43 +66,56 @@ def cli(ctx, config): ctx.ensure_object(dict) if config: - ctx.obj['config'] = yaml.safe_load(config) + ctx.obj["config"] = yaml.safe_load(config) elif os.path.isfile(CONFIG_FILE_PATH): with open(CONFIG_FILE_PATH, "r") as f: - ctx.obj['config'] = yaml.safe_load(f.read()) + ctx.obj["config"] = yaml.safe_load(f.read()) else: - raise FileNotFoundError(f"{CONFIG_FILE_PATH} is not found") + msg = f"{CONFIG_FILE_PATH} is not found" + raise FileNotFoundError(msg) -@click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") -@click.option('--host', prompt="Elastic host") -@click.option('--username', prompt="Username") -@click.option('--password', prompt="Password", hide_input=True) +@click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") +@click.option("--host", prompt="Elastic host") +@click.option("--username", prompt="Username") +@click.option("--password", prompt="Password", hide_input=True) def login(host, username, password): auth = Auth(host, username, password) if auth.is_config_present(): - click.confirm(click.style('Config is already present. Are you sure you want to override it?', fg='yellow'), abort=True) + click.confirm( + click.style( + "Config is already present. Are you sure you want to override it?", + fg="yellow", + ), + abort=True, + ) if auth.authenticate(): - click.echo(click.style("Authentication successful", fg='green')) + click.echo(click.style("Authentication successful", fg="green")) else: - click.echo('') - click.echo(click.style("Authentication failed. Please check your credentials.", fg='red'), err=True) + click.echo("") + click.echo( + click.style( + "Authentication failed. Please check your credentials.", fg="red" + ), + err=True, + ) return - cli.add_command(login) + # Connector group @click.group(invoke_without_command=True, help="Connectors mangement") @click.pass_obj def connector(obj): pass -@click.command(help="List all existing connectors") + +@click.command(name="list", help="List all existing connectors") @click.pass_obj -def list(obj): - connector = Connector(config=obj['config']['elasticsearch']) +def list_command(obj): + connector = Connector(config=obj["config"]["elasticsearch"]) coro = connector.list_connectors() try: @@ -113,22 +129,34 @@ def list(obj): table_rows = [] for connector in connectors: - formatted_connector = [ click.style(connector.id, blink=True, fg="green"), click.style(connector.index_name, blink=True, fg="white"), - click.style(connector.service_type, blink=True, fg='white'), + click.style(connector.service_type, blink=True, fg="white"), click.style(connector.status.value, fg="white"), - click.style(connector.last_sync_status.value, fg="white") + click.style(connector.last_sync_status.value, fg="white"), ] table_rows.append(formatted_connector) - click.echo(tabulate(table_rows, headers=["ID", "Index name", "Service type", "Status", "Last sync job status"])) + click.echo( + tabulate( + table_rows, + headers=[ + "ID", + "Index name", + "Service type", + "Status", + "Last sync job status", + ], + ) + ) except asyncio.CancelledError as e: click.echo(e) + language_keys = [*Settings().language_data.keys()] + # Support blank values for languge def validate_language(ctx, param, value): if value not in language_keys: @@ -136,50 +164,82 @@ def validate_language(ctx, param, value): return value + @click.command(help="Creates a new connector and a search index") -@click.option('--index_name', prompt=f"{click.style('?', blink=True, fg='green')} Search index name (search-)") -@click.option('--service_type', prompt=f"{click.style('?', blink=True, fg='green')} Service type", type=click.Choice(SERVICE_TYPES.keys(), case_sensitive=False)) -@click.option('--index_language', prompt=f"{click.style('?', blink=True, fg='green')} Index language (leave empty for universal) {language_keys}", default='', callback=validate_language) +@click.option( + "--index_name", + prompt=f"{click.style('?', blink=True, fg='green')} Search index name (search-)", +) +@click.option( + "--service_type", + prompt=f"{click.style('?', blink=True, fg='green')} Service type", + type=click.Choice(SERVICE_TYPES.keys(), case_sensitive=False), +) +@click.option( + "--index_language", + prompt=f"{click.style('?', blink=True, fg='green')} Index language (leave empty for universal) {language_keys}", + default="", + callback=validate_language, +) @click.pass_obj def create(obj, index_name, service_type, index_language): index_name = f"search-{index_name}" - connector = Connector(obj['config']['elasticsearch']) - configuration = connector.service_type_configuration(source_class=SERVICE_TYPES[service_type]) - - prompt = lambda : click.prompt(f"{click.style('?', blink=True, fg='green')} {item['label']}", default=item.get('value', None), hide_input=(True if item.get('sensitive') == True else False)) + connector = Connector(obj["config"]["elasticsearch"]) + configuration = connector.service_type_configuration( + source_class=SERVICE_TYPES[service_type] + ) + + def prompt(): + return click.prompt( + f"{click.style('?', blink=True, fg='green')} {item['label']}", + default=item.get("value", None), + hide_input=True if item.get("sensitive") is True else False, + ) # first fill in the fields that do not depend on other fields for key, item in configuration.items(): - if 'depends_on' in item: + if "depends_on" in item: continue - configuration[key]['value'] = prompt() + configuration[key]["value"] = prompt() for key, item in configuration.items(): - if not 'depends_on' in item: + if "depends_on" not in item: continue - if all(configuration[field_item['field']]['value'] == field_item['value'] for field_item in item['depends_on']): - configuration[key]['value'] = prompt() + if all( + configuration[field_item["field"]]["value"] == field_item["value"] + for field_item in item["depends_on"] + ): + configuration[key]["value"] = prompt() result = connector.create(index_name, service_type, configuration, index_language) - click.echo("Connector (ID: " + click.style(result[1], fg="green") + ", service_type: " + click.style(service_type, fg='green') + ") has been created!") + click.echo( + "Connector (ID: " + + click.style(result[1], fg="green") + + ", service_type: " + + click.style(service_type, fg="green") + + ") has been created!" + ) + connector.add_command(create) -connector.add_command(list) +connector.add_command(list_command) cli.add_command(connector) + # Index group @click.group(invoke_without_command=True) @click.pass_obj def index(obj): pass -@click.command(help="Show all indices") + +@click.command(name="list", help="Show all indices") @click.pass_obj -def list(obj): - index = Index(config=obj['config']['elasticsearch']) +def list_command(obj): + index = Index(config=obj["config"]["elasticsearch"]) indices = index.list_indices() click.echo("") @@ -193,45 +253,68 @@ def list(obj): for index in indices: formatted_index = [ click.style(index, blink=True, fg="white"), - click.style(indices[index]['primaries']['docs']['count']) + click.style(indices[index]["primaries"]["docs"]["count"]), ] table_rows.append(formatted_index) click.echo(tabulate(table_rows, headers=["Index name", "Number of documents"])) -index.add_command(list) +index.add_command(list_command) + @click.command(help="Remove all documents from the index") @click.pass_obj -@click.argument('index', nargs=1) +@click.argument("index", nargs=1) def clean(obj, index): - index_cli = Index(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to clean ' + index + '?', fg='yellow'), abort=True) + index_cli = Index(config=obj["config"]["elasticsearch"]) + click.confirm( + click.style("Are you sure you want to clean " + index + "?", fg="yellow"), + abort=True, + ) if index_cli.clean(index): - click.echo(click.style("The index has been cleaned.", fg='green')) + click.echo(click.style("The index has been cleaned.", fg="green")) else: - click.echo('') - click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + click.echo("") + click.echo( + click.style( + "Something went wrong. Please try again later or check your credentials", + fg="red", + ), + err=True, + ) + index.add_command(clean) + @click.command(help="Delete an index") @click.pass_obj -@click.argument('index', nargs=1) +@click.argument("index", nargs=1) def delete(obj, index): - index_cli = Index(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to delete ' + index + '?', fg='yellow'), abort=True) + index_cli = Index(config=obj["config"]["elasticsearch"]) + click.confirm( + click.style("Are you sure you want to delete " + index + "?", fg="yellow"), + abort=True, + ) if index_cli.delete(index): - click.echo(click.style("The index has been deleted.", fg='green')) + click.echo(click.style("The index has been deleted.", fg="green")) else: - click.echo('') - click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + click.echo("") + click.echo( + click.style( + "Something went wrong. Please try again later or check your credentials", + fg="red", + ), + err=True, + ) + index.add_command(delete) cli.add_command(index) + # Job group @click.group(invoke_without_command=True) @click.pass_obj @@ -241,24 +324,37 @@ def job(obj): @click.command(help="Start a sync job.") @click.pass_obj -@click.option('-i', help='Connector ID', required=True) -@click.option('-t', help='Job type', type=click.Choice(['full', 'incremental', 'access_control'], case_sensitive=False), required=True) +@click.option("-i", help="Connector ID", required=True) +@click.option( + "-t", + help="Job type", + type=click.Choice(["full", "incremental", "access_control"], case_sensitive=False), + required=True, +) def start(obj, i, t): - job_cli = Job(config=obj['config']['elasticsearch']) - click.echo('Starting a job...') + job_cli = Job(config=obj["config"]["elasticsearch"]) + click.echo("Starting a job...") if job_cli.start(connector_id=i, job_type=t): - click.echo(click.style("The job has been started.", fg='green')) + click.echo(click.style("The job has been started.", fg="green")) else: - click.echo('') - click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + click.echo("") + click.echo( + click.style( + "Something went wrong. Please try again later or check your credentials", + fg="red", + ), + err=True, + ) + job.add_command(start) -@click.command(help="List of jobs sorted by date.") + +@click.command(name="list", help="List of jobs sorted by date.") @click.pass_obj @click.argument("connector_id", nargs=1) -def list(obj, connector_id): - job_cli = Job(config=obj['config']['elasticsearch']) +def list_command(obj, connector_id): + job_cli = Job(config=obj["config"]["elasticsearch"]) jobs = job_cli.list_jobs(connector_id=connector_id) if len(jobs) == 0: @@ -279,29 +375,56 @@ def list(obj, connector_id): ] table_rows.append(formatted_job) - click.echo(tabulate(table_rows, headers=["Job id", "Connector id", "Index name", "Job status", "Job type", "Documents indexed", "Volume documents indexed (MiB)", "Documents deleted"])) + click.echo( + tabulate( + table_rows, + headers=[ + "Job id", + "Connector id", + "Index name", + "Job status", + "Job type", + "Documents indexed", + "Volume documents indexed (MiB)", + "Documents deleted", + ], + ) + ) + + +job.add_command(list_command) -job.add_command(list) @click.command(help="Cancel a job") @click.pass_obj -@click.argument('job_id') +@click.argument("job_id") def cancel(obj, job_id): - job_cli = Job(config=obj['config']['elasticsearch']) - click.confirm(click.style('Are you sure you want to cancel jobs?', fg='yellow'), abort=True) - click.echo('Canceling jobs...') + job_cli = Job(config=obj["config"]["elasticsearch"]) + click.confirm( + click.style("Are you sure you want to cancel jobs?", fg="yellow"), abort=True + ) + click.echo("Canceling jobs...") if job_cli.cancel(job_id=job_id): - click.echo(click.style("The jobs is cancelling.", fg='green')) + click.echo(click.style("The jobs is cancelling.", fg="green")) else: - click.echo('') - click.echo(click.style("Something went wrong. Please try again later or check your credentials", fg='red'), err=True) + click.echo("") + click.echo( + click.style( + "Something went wrong. Please try again later or check your credentials", + fg="red", + ), + err=True, + ) + job.add_command(cancel) cli.add_command(job) + def main(args=None): cli() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/connectors/es/client.py b/connectors/es/client.py index db29d55e0..33602bbe3 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -177,10 +177,12 @@ async def delete_indices(self, indices): await self.client.indices.delete(index=indices, ignore_unavailable=True) async def clean_index(self, index_name): - return await self.client.delete_by_query(index=index_name, body={ 'query': { 'match_all': {} } }, ignore_unavailable=True) + return await self.client.delete_by_query( + index=index_name, body={"query": {"match_all": {}}}, ignore_unavailable=True + ) async def list_indices(self): - return await self.client.indices.stats(index='search-*') + return await self.client.indices.stats(index="search-*") def client(self): return self.client From f91c17c901675612e0f78401809f3c8684357527 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 21 Nov 2023 22:50:04 +0100 Subject: [PATCH 19/33] More linter --- connectors/cli/index.py | 5 ++--- connectors/connectors_cli.py | 14 +++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/connectors/cli/index.py b/connectors/cli/index.py index d00c8d534..9bf622093 100644 --- a/connectors/cli/index.py +++ b/connectors/cli/index.py @@ -22,9 +22,8 @@ def delete(self, index_name): async def __list_indices(self): try: return await self.es_client.list_indices() - except ApiError: - # TODO raise - return [] + except ApiError as e: + raise e finally: await self.es_client.close() diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 271e2156a..dd43dd93e 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -114,7 +114,7 @@ def connector(obj): @click.command(name="list", help="List all existing connectors") @click.pass_obj -def list_command(obj): +def list_connectors(obj): connector = Connector(config=obj["config"]["elasticsearch"]) coro = connector.list_connectors() @@ -173,7 +173,7 @@ def validate_language(ctx, param, value): @click.option( "--service_type", prompt=f"{click.style('?', blink=True, fg='green')} Service type", - type=click.Choice(SERVICE_TYPES.keys(), case_sensitive=False), + type=click.Choice(list(SERVICE_TYPES.keys()), case_sensitive=False), ) @click.option( "--index_language", @@ -224,7 +224,7 @@ def prompt(): connector.add_command(create) -connector.add_command(list_command) +connector.add_command(list_connectors) cli.add_command(connector) @@ -238,7 +238,7 @@ def index(obj): @click.command(name="list", help="Show all indices") @click.pass_obj -def list_command(obj): +def list_indices(obj): index = Index(config=obj["config"]["elasticsearch"]) indices = index.list_indices() @@ -260,7 +260,7 @@ def list_command(obj): click.echo(tabulate(table_rows, headers=["Index name", "Number of documents"])) -index.add_command(list_command) +index.add_command(list_indices) @click.command(help="Remove all documents from the index") @@ -353,7 +353,7 @@ def start(obj, i, t): @click.command(name="list", help="List of jobs sorted by date.") @click.pass_obj @click.argument("connector_id", nargs=1) -def list_command(obj, connector_id): +def list_jobs(obj, connector_id): job_cli = Job(config=obj["config"]["elasticsearch"]) jobs = job_cli.list_jobs(connector_id=connector_id) @@ -392,7 +392,7 @@ def list_command(obj, connector_id): ) -job.add_command(list_command) +job.add_command(list_jobs) @click.command(help="Cancel a job") From d4bdc8d08a47577b2aeafd0dbbfe652978a43723 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 22 Nov 2023 14:22:39 +0100 Subject: [PATCH 20/33] Fix tests --- tests/test_cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index bab9886fc..a9b9d5226 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -14,7 +14,7 @@ import pytest from connectors import __version__ -from connectors.cli import main, run +from connectors.old_cli import main, run HERE = os.path.dirname(__file__) FIXTURES_DIR = os.path.abspath(os.path.join(HERE, "fixtures")) @@ -90,8 +90,8 @@ def test_run_snowflake(mock_responses, set_env): assert "Cannot use the `list` action with other actions" in output -@patch("connectors.cli.set_logger") -@patch("connectors.cli.load_config", side_effect=Exception("something went wrong")) +@patch("connectors.old_cli.set_logger") +@patch("connectors.old_cli.load_config", side_effect=Exception("something went wrong")) def test_main_with_invalid_configuration(load_config, set_logger): args = mock.MagicMock() args.log_level = logging.DEBUG # should be ignored! From a2da0073235cc6616f32da93a1810efaab1e4380 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 22 Nov 2023 17:16:54 +0100 Subject: [PATCH 21/33] Add tests for cli, login commands --- tests/test_connectors_cli.py | 64 ++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 tests/test_connectors_cli.py diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py new file mode 100644 index 000000000..e1d4c0355 --- /dev/null +++ b/tests/test_connectors_cli.py @@ -0,0 +1,64 @@ +from click.testing import CliRunner +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + + +from connectors.connectors_cli import cli, login +from connectors.cli.auth import Auth, CONFIG_FILE_PATH +import os +from connectors import __version__ # NOQA + +def test_version(): + runner = CliRunner() + result = runner.invoke(cli, ['-v']) + assert result.exit_code == 0 + assert result.output.strip() == __version__ + +def test_help_page(): + runner = CliRunner() + result = runner.invoke(cli, ['--help']) + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + +def test_help_page_when_no_arguments(): + runner = CliRunner() + result = runner.invoke(cli, []) + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + + +@patch("connectors.cli.auth.Auth._Auth__ping_es_client", AsyncMock(return_value=False)) +def test_login_unsuccessful(tmp_path): + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + result = runner.invoke(login, input="http://localhost:9200/\nwrong_username\nwrong_password\n") + assert result.exit_code == 0 + assert "Authentication failed" in result.output + assert not os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) + + +@patch("connectors.cli.auth.Auth._Auth__ping_es_client", AsyncMock(return_value=True)) +def test_login_successful(tmp_path): + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + result = runner.invoke(login, input="http://localhost:9200/\nwrong_username\nwrong_password\n") + assert result.exit_code == 0 + assert "Authentication successful" in result.output + assert os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) + +@patch("click.confirm") +def test_login_when_credentials_file_exists(mocked_confirm, tmp_path): + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + mocked_confirm.return_value = True + + # Create config file + os.makedirs(os.path.dirname(CONFIG_FILE_PATH)) + with open(os.path.join(temp_dir, CONFIG_FILE_PATH), "w") as f: + f.write("fake config file") + + result = runner.invoke(login, input="http://localhost:9200/\ncorrect_username\ncorrect_password\n") + assert result.exit_code == 0 + assert mocked_confirm.called_once() From 56956668604dd4166b13f03895a556d9ad377da4 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Wed, 22 Nov 2023 17:42:31 +0100 Subject: [PATCH 22/33] Print help page when no commands provided --- connectors/connectors_cli.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index dd43dd93e..c9a150909 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -107,9 +107,12 @@ def login(host, username, password): # Connector group @click.group(invoke_without_command=True, help="Connectors mangement") -@click.pass_obj -def connector(obj): - pass +@click.pass_context +def connector(ctx): + # print help page if no subcommands provided + if ctx.invoked_subcommand is None: + click.echo(ctx.get_help()) + return @click.command(name="list", help="List all existing connectors") From fc0ab421a7de4801b37346de75284045a3f02626 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Thu, 23 Nov 2023 15:38:17 +0100 Subject: [PATCH 23/33] Tests for connect and index commands --- connectors/connectors_cli.py | 2 +- tests/test_connectors_cli.py | 196 +++++++++++++++++++++++++++-------- 2 files changed, 156 insertions(+), 42 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index c9a150909..9be06d1ca 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -122,7 +122,7 @@ def list_connectors(obj): coro = connector.list_connectors() try: - connectors = asyncio.get_event_loop().run_until_complete(coro) + connectors = asyncio.run(coro) click.echo("") if len(connectors) == 0: click.echo("No connectors found") diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py index e1d4c0355..c01ee1cbe 100644 --- a/tests/test_connectors_cli.py +++ b/tests/test_connectors_cli.py @@ -1,64 +1,178 @@ -from click.testing import CliRunner -import pytest -from unittest.mock import AsyncMock, patch, MagicMock +import os +from unittest.mock import AsyncMock, MagicMock, patch +from click.testing import CliRunner -from connectors.connectors_cli import cli, login -from connectors.cli.auth import Auth, CONFIG_FILE_PATH -import os from connectors import __version__ # NOQA +from connectors.cli.auth import CONFIG_FILE_PATH +from connectors.connectors_cli import cli, login +from connectors.protocol.connectors import Connector as ConnectorObject +from tests.commons import AsyncIterator + def test_version(): - runner = CliRunner() - result = runner.invoke(cli, ['-v']) - assert result.exit_code == 0 - assert result.output.strip() == __version__ + runner = CliRunner() + result = runner.invoke(cli, ["-v"]) + assert result.exit_code == 0 + assert result.output.strip() == __version__ + def test_help_page(): - runner = CliRunner() - result = runner.invoke(cli, ['--help']) - assert "Usage:" in result.output - assert "Options:" in result.output - assert "Commands:" in result.output + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + def test_help_page_when_no_arguments(): - runner = CliRunner() - result = runner.invoke(cli, []) - assert "Usage:" in result.output - assert "Options:" in result.output - assert "Commands:" in result.output + runner = CliRunner() + result = runner.invoke(cli, []) + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output @patch("connectors.cli.auth.Auth._Auth__ping_es_client", AsyncMock(return_value=False)) def test_login_unsuccessful(tmp_path): - runner = CliRunner() - with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: - result = runner.invoke(login, input="http://localhost:9200/\nwrong_username\nwrong_password\n") - assert result.exit_code == 0 - assert "Authentication failed" in result.output - assert not os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + result = runner.invoke( + login, input="http://localhost:9200/\nwrong_username\nwrong_password\n" + ) + assert result.exit_code == 0 + assert "Authentication failed" in result.output + assert not os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) @patch("connectors.cli.auth.Auth._Auth__ping_es_client", AsyncMock(return_value=True)) def test_login_successful(tmp_path): - runner = CliRunner() - with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: - result = runner.invoke(login, input="http://localhost:9200/\nwrong_username\nwrong_password\n") - assert result.exit_code == 0 - assert "Authentication successful" in result.output - assert os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + result = runner.invoke( + login, input="http://localhost:9200/\nwrong_username\nwrong_password\n" + ) + assert result.exit_code == 0 + assert "Authentication successful" in result.output + assert os.path.isfile(os.path.join(temp_dir, CONFIG_FILE_PATH)) + @patch("click.confirm") def test_login_when_credentials_file_exists(mocked_confirm, tmp_path): - runner = CliRunner() - with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: - mocked_confirm.return_value = True + runner = CliRunner() + with runner.isolated_filesystem(temp_dir=tmp_path) as temp_dir: + mocked_confirm.return_value = True + + # Create config file + os.makedirs(os.path.dirname(CONFIG_FILE_PATH)) + with open(os.path.join(temp_dir, CONFIG_FILE_PATH), "w") as f: + f.write("fake config file") + + result = runner.invoke( + login, input="http://localhost:9200/\ncorrect_username\ncorrect_password\n" + ) + assert result.exit_code == 0 + assert mocked_confirm.called_once() + + +def test_connector_help_page(): + runner = CliRunner() + result = runner.invoke(cli, ["connector", "--help"]) + assert result.exit_code == 0 + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output - # Create config file - os.makedirs(os.path.dirname(CONFIG_FILE_PATH)) - with open(os.path.join(temp_dir, CONFIG_FILE_PATH), "w") as f: - f.write("fake config file") - result = runner.invoke(login, input="http://localhost:9200/\ncorrect_username\ncorrect_password\n") +@patch("connectors.cli.connector.Connector.list_connectors", AsyncMock(return_value=[])) +def test_connector_list_no_connectors(): + runner = CliRunner() + result = runner.invoke(cli, ["connector", "list"]) assert result.exit_code == 0 - assert mocked_confirm.called_once() + assert "No connectors found" in result.output + + +def test_connector_list_one_connector(): + runner = CliRunner() + connector_index = MagicMock() + + doc = { + "_source": { + "index_name": "test_connector", + "service_type": "mongodb", + "last_sync_status": "error", + "status": "connected", + }, + "_id": "test_id", + } + connectors = [ConnectorObject(connector_index, doc)] + + with patch( + "connectors.protocol.ConnectorIndex.all_connectors", AsyncIterator(connectors) + ): + result = runner.invoke(cli, ["connector", "list"]) + + assert result.exit_code == 0 + assert "test_connector" in result.output + assert "test_id" in result.output + assert "mongodb" in result.output + assert "error" in result.output + assert "connected" in result.output + + +def test_index_help_page(): + runner = CliRunner() + result = runner.invoke(cli, ["index", "--help"]) + assert result.exit_code == 0 + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + + +@patch("connectors.cli.index.Index.list_indices", MagicMock(return_value=[])) +def test_index_list_no_indexes(): + runner = CliRunner() + result = runner.invoke(cli, ["index", "list"]) + assert result.exit_code == 0 + assert "No indices found" in result.output + + +def test_index_list_one_index(): + runner = CliRunner() + indices = {"test_index": {"primaries": {"docs": {"count": 10}}}} + + with patch( + "connectors.cli.index.Index.list_indices", MagicMock(return_value=indices) + ): + result = runner.invoke(cli, ["index", "list"]) + + assert result.exit_code == 0 + assert "test_index" in result.output + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_index_clean(): + runner = CliRunner() + index_name = "test_index" + with patch( + "connectors.es.client.ESClient.clean_index", AsyncMock(return_value=True) + ) as mocked_method: + result = runner.invoke(cli, ["index", "clean", index_name]) + + assert "The index has been cleaned" in result.output + mocked_method.assert_called_once_with(index_name) + assert result.exit_code == 0 + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_index_delete(): + runner = CliRunner() + index_name = "test_index" + with patch( + "connectors.es.client.ESClient.delete_indices", AsyncMock(return_value=None) + ) as mocked_method: + result = runner.invoke(cli, ["index", "delete", index_name]) + + assert "The index has been deleted" in result.output + mocked_method.assert_called_once_with([index_name]) + assert result.exit_code == 0 From 772053109310e34d150f8ccf190ed83539afd988 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Thu, 23 Nov 2023 15:57:49 +0100 Subject: [PATCH 24/33] Update help page --- connectors/connectors_cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 9be06d1ca..9a7cac1a1 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -106,7 +106,7 @@ def login(host, username, password): # Connector group -@click.group(invoke_without_command=True, help="Connectors mangement") +@click.group(invoke_without_command=True, help="Connectors management") @click.pass_context def connector(ctx): # print help page if no subcommands provided @@ -233,7 +233,7 @@ def prompt(): # Index group -@click.group(invoke_without_command=True) +@click.group(invoke_without_command=True, help="Search indices management") @click.pass_obj def index(obj): pass @@ -319,7 +319,7 @@ def delete(obj, index): # Job group -@click.group(invoke_without_command=True) +@click.group(invoke_without_command=True, help="Sync jobs management") @click.pass_obj def job(obj): pass From 8cd39a19f3aca2f547ad14b37810aee63ad7f442 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Thu, 23 Nov 2023 17:40:02 +0100 Subject: [PATCH 25/33] Add TODO --- connectors/cli/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 30407fec3..f7250d479 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -111,7 +111,7 @@ async def __create_connector( "index_name": index_name, "service_type": service_type, "status": "configured", # TODO use a predefined constant - "is_native": True, # figure out how to check if it's native or not + "is_native": True, # TODO make it optional "language": language, "last_access_control_sync_error": None, "last_access_control_sync_scheduled_at": None, From e2e119420d2f657efe0dd5a78bc6bcc9e4c60351 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Fri, 24 Nov 2023 18:11:26 +0100 Subject: [PATCH 26/33] Adress feedback --- connectors/cli/connector.py | 8 +------ connectors/cli/index.py | 5 +++-- connectors/cli/job.py | 7 +----- connectors/connectors_cli.py | 31 +++----------------------- tests/{test_cli.py => test_old_cli.py} | 0 5 files changed, 8 insertions(+), 43 deletions(-) rename tests/{test_cli.py => test_old_cli.py} (100%) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index f7250d479..78efc85d9 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -32,11 +32,7 @@ async def list_connectors(self): indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] ) - connectors = [] - async for connector in self.connector_index.all_connectors(): - connectors.append(connector) - - return connectors + return [connector async for connector in self.connector_index.all_connectors()] # TODO catch exceptions finally: @@ -138,8 +134,6 @@ async def __create_connector( connector = await self.connector_index.index(doc) return connector["_id"] - except Exception as e: - raise e finally: await self.connector_index.close() diff --git a/connectors/cli/index.py b/connectors/cli/index.py index 9bf622093..319ee3026 100644 --- a/connectors/cli/index.py +++ b/connectors/cli/index.py @@ -17,7 +17,7 @@ def clean(self, index_name): return asyncio.run(self.__clean_index(index_name)) def delete(self, index_name): - return asyncio.run(self.__delete_index(index_name)) is None + return asyncio.run(self.__delete_index(index_name)) async def __list_indices(self): try: @@ -37,7 +37,8 @@ async def __clean_index(self, index_name): async def __delete_index(self, index_name): try: - return await self.es_client.delete_indices([index_name]) + await self.es_client.delete_indices([index_name]) + return True except ApiError: return False finally: diff --git a/connectors/cli/job.py b/connectors/cli/job.py index a9380ead0..3e1cc8f56 100644 --- a/connectors/cli/job.py +++ b/connectors/cli/job.py @@ -41,8 +41,6 @@ async def __async_start(self, connector_id, job_type): ) return True - except Exception as e: - raise e finally: await self.sync_job_index.close() await self.connector_index.close() @@ -57,11 +55,8 @@ async def __async_list_jobs(self, connector_id, index_name, job_id): query=self.__job_list_query(connector_id, index_name, job_id), sort=self.__job_list_sort(), ) - formatted_jobs = [] - async for job in jobs: - formatted_jobs.append(job) - return formatted_jobs + return [job async for job in jobs] # TODO catch exceptions finally: diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 9a7cac1a1..84e262b4b 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -23,37 +23,12 @@ from connectors.cli.index import Index from connectors.cli.job import Job from connectors.es.settings import Settings - -SERVICE_TYPES = { - "mongodb": "connectors.sources.mongo:MongoDataSource", - "s3": "connectors.sources.s3:S3DataSource", - "dir": "connectors.sources.directory:DirectoryDataSource", - "mysql": "connectors.sources.mysql:MySqlDataSource", - "network_drive": "connectors.sources.network_drive:NASDataSource", - "google_cloud_storage": "connectors.sources.google_cloud_storage:GoogleCloudStorageDataSource", - "azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource", - "postgresql": "connectors.sources.postgresql:PostgreSQLDataSource", - "oracle": "connectors.sources.oracle:OracleDataSource", - "sharepoint_server": "connectors.sources.sharepoint_server:SharepointServerDataSource", - "mssql": "connectors.sources.mssql:MSSQLDataSource", - "jira": "connectors.sources.jira:JiraDataSource", - "confluence": "connectors.sources.confluence:ConfluenceDataSource", - "dropbox": "connectors.sources.dropbox:DropboxDataSource", - "servicenow": "connectors.sources.servicenow:ServiceNowDataSource", - "sharepoint_online": "connectors.sources.sharepoint_online:SharepointOnlineDataSource", - "github": "connectors.sources.github:GitHubDataSource", -} +from connectors.config import _default_config __all__ = ["main"] # Main group -def print_version(ctx, param, value): - if not value or ctx.resilient_parsing: - return - click.echo(__version__) - - @click.group(invoke_without_command=True) @click.version_option(__version__, "-v", "--version", message="%(version)s") @click.option("-c", "--config", type=click.File("rb")) @@ -176,7 +151,7 @@ def validate_language(ctx, param, value): @click.option( "--service_type", prompt=f"{click.style('?', blink=True, fg='green')} Service type", - type=click.Choice(list(SERVICE_TYPES.keys()), case_sensitive=False), + type=click.Choice(list(_default_config()['sources'].keys()), case_sensitive=False), ) @click.option( "--index_language", @@ -189,7 +164,7 @@ def create(obj, index_name, service_type, index_language): index_name = f"search-{index_name}" connector = Connector(obj["config"]["elasticsearch"]) configuration = connector.service_type_configuration( - source_class=SERVICE_TYPES[service_type] + source_class=_default_config()['sources'][service_type] ) def prompt(): diff --git a/tests/test_cli.py b/tests/test_old_cli.py similarity index 100% rename from tests/test_cli.py rename to tests/test_old_cli.py From aaf816ff1fff6bbb37934fa20812a128f6e771b5 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Mon, 27 Nov 2023 15:06:37 +0100 Subject: [PATCH 27/33] Remove redundant client() method --- connectors/es/client.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/connectors/es/client.py b/connectors/es/client.py index 33602bbe3..3261fa8af 100644 --- a/connectors/es/client.py +++ b/connectors/es/client.py @@ -184,9 +184,6 @@ async def clean_index(self, index_name): async def list_indices(self): return await self.client.indices.stats(index="search-*") - def client(self): - return self.client - def with_concurrency_control(retries=3): def wrapper(func): From d2adb8ad1ef4ef740c9a5269b730510d6beb59f8 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 16:31:48 +0100 Subject: [PATCH 28/33] Add more tests --- connectors/cli/connector.py | 21 +--- connectors/connectors_cli.py | 17 ++- tests/test_connectors_cli.py | 220 ++++++++++++++++++++++++++++++++++- 3 files changed, 231 insertions(+), 27 deletions(-) diff --git a/connectors/cli/connector.py b/connectors/cli/connector.py index 78efc85d9..e7b7d779e 100644 --- a/connectors/cli/connector.py +++ b/connectors/cli/connector.py @@ -32,20 +32,15 @@ async def list_connectors(self): indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX] ) - return [connector async for connector in self.connector_index.all_connectors()] + return [ + connector async for connector in self.connector_index.all_connectors() + ] # TODO catch exceptions finally: await self.connector_index.close() await self.es_client.close() - async def ping(self): - if await self.es_client.ping(): - await self.es_client.close() - return True - else: - return False - def service_type_configuration(self, source_class): source_klass = get_source_klass(source_class) configuration = source_klass.get_default_configuration() @@ -84,13 +79,9 @@ async def __create_search_index(self, index_name, language): settings["auto_expand_replicas"] = "0-3" settings["number_of_shards"] = 2 - try: - await self.es_client.client.indices.create( - index=index_name, mappings=mappings, settings=settings - ) - except Exception as e: - # todo handle exception - raise e + await self.es_client.client.indices.create( + index=index_name, mappings=mappings, settings=settings + ) async def __create_connector( self, index_name, service_type, configuration, language diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 84e262b4b..faeb6c9e0 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -22,8 +22,8 @@ from connectors.cli.connector import Connector from connectors.cli.index import Index from connectors.cli.job import Job -from connectors.es.settings import Settings from connectors.config import _default_config +from connectors.es.settings import Settings __all__ = ["main"] @@ -81,13 +81,10 @@ def login(host, username, password): # Connector group -@click.group(invoke_without_command=True, help="Connectors management") +@click.group(invoke_without_command=False, help="Connectors management") @click.pass_context def connector(ctx): - # print help page if no subcommands provided - if ctx.invoked_subcommand is None: - click.echo(ctx.get_help()) - return + pass @click.command(name="list", help="List all existing connectors") @@ -151,7 +148,7 @@ def validate_language(ctx, param, value): @click.option( "--service_type", prompt=f"{click.style('?', blink=True, fg='green')} Service type", - type=click.Choice(list(_default_config()['sources'].keys()), case_sensitive=False), + type=click.Choice(list(_default_config()["sources"].keys()), case_sensitive=False), ) @click.option( "--index_language", @@ -164,7 +161,7 @@ def create(obj, index_name, service_type, index_language): index_name = f"search-{index_name}" connector = Connector(obj["config"]["elasticsearch"]) configuration = connector.service_type_configuration( - source_class=_default_config()['sources'][service_type] + source_class=_default_config()["sources"][service_type] ) def prompt(): @@ -294,7 +291,7 @@ def delete(obj, index): # Job group -@click.group(invoke_without_command=True, help="Sync jobs management") +@click.group(invoke_without_command=False, help="Sync jobs management") @click.pass_obj def job(obj): pass @@ -383,7 +380,7 @@ def cancel(obj, job_id): ) click.echo("Canceling jobs...") if job_cli.cancel(job_id=job_id): - click.echo(click.style("The jobs is cancelling.", fg="green")) + click.echo(click.style("The job has been cancelled", fg="green")) else: click.echo("") click.echo( diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py index c01ee1cbe..0d51f05e6 100644 --- a/tests/test_connectors_cli.py +++ b/tests/test_connectors_cli.py @@ -1,12 +1,16 @@ import os from unittest.mock import AsyncMock, MagicMock, patch +import pytest from click.testing import CliRunner +from elasticsearch import ApiError from connectors import __version__ # NOQA from connectors.cli.auth import CONFIG_FILE_PATH from connectors.connectors_cli import cli, login from connectors.protocol.connectors import Connector as ConnectorObject +from connectors.protocol.connectors import JobStatus +from connectors.protocol.connectors import SyncJob as SyncJobObject from tests.commons import AsyncIterator @@ -120,6 +124,44 @@ def test_connector_list_one_connector(): assert "connected" in result.output +@pytest.fixture(autouse=True) +def mock_es_client(): + with patch("connectors.cli.connector.ESClient") as mock: + mock.return_value = AsyncMock() + yield mock + + +@patch("click.confirm") +def test_connector_create(patch_click_confirm): + runner = CliRunner() + + # configuration for the MongoDB connector + input_params = "\n".join( + [ + "test_connector", + "mongodb", + "en", + "http://localhost/", + "username", + "password", + "database", + "collection", + "False", + ] + ) + + with patch( + "connectors.protocol.connectors.ConnectorIndex.index", + AsyncMock(return_value={"_id": "new_connector_id"}), + ) as patched_create: + result = runner.invoke(cli, ["connector", "create"], input=input_params) + + patched_create.assert_called_once() + assert result.exit_code == 0 + + assert "has been created" in result.output + + def test_index_help_page(): runner = CliRunner() result = runner.invoke(cli, ["index", "--help"]) @@ -139,10 +181,10 @@ def test_index_list_no_indexes(): def test_index_list_one_index(): runner = CliRunner() - indices = {"test_index": {"primaries": {"docs": {"count": 10}}}} + indices = {"indices": {"test_index": {"primaries": {"docs": {"count": 10}}}}} with patch( - "connectors.cli.index.Index.list_indices", MagicMock(return_value=indices) + "connectors.es.client.ESClient.list_indices", AsyncMock(return_value=indices) ): result = runner.invoke(cli, ["index", "list"]) @@ -164,6 +206,20 @@ def test_index_clean(): assert result.exit_code == 0 +@patch("click.confirm", MagicMock(return_value=True)) +def test_index_clean_error(): + runner = CliRunner() + index_name = "test_index" + with patch( + "connectors.es.client.ESClient.clean_index", + side_effect=ApiError(500, meta="meta", body="error"), + ): + result = runner.invoke(cli, ["index", "clean", index_name]) + + assert "Something went wrong." in result.output + assert result.exit_code == 0 + + @patch("click.confirm", MagicMock(return_value=True)) def test_index_delete(): runner = CliRunner() @@ -176,3 +232,163 @@ def test_index_delete(): assert "The index has been deleted" in result.output mocked_method.assert_called_once_with([index_name]) assert result.exit_code == 0 + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_delete_index_error(): + runner = CliRunner() + index_name = "test_index" + with patch( + "connectors.es.client.ESClient.delete_indices", + side_effect=ApiError(500, meta="meta", body="error"), + ): + result = runner.invoke(cli, ["index", "delete", index_name]) + + assert "Something went wrong." in result.output + assert result.exit_code == 0 + + +def test_job_help_page(): + runner = CliRunner() + result = runner.invoke(cli, ["job", "--help"]) + assert result.exit_code == 0 + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + + +def test_job_help_page_without_subcommands(): + runner = CliRunner() + result = runner.invoke(cli, ["job"]) + assert result.exit_code == 0 + assert "Usage:" in result.output + assert "Options:" in result.output + assert "Commands:" in result.output + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_job_cancel(): + runner = CliRunner() + job_id = "test_job_id" + + job_index = MagicMock() + + doc = { + "_source": { + "connector": { + "index_name": "test_connector", + "service_type": "mongodb", + "last_sync_status": "error", + "status": "connected", + }, + "status": "running", + "job_type": "full", + }, + "_id": job_id, + } + + job = SyncJobObject(job_index, doc) + + with patch( + "connectors.cli.job.Job._Job__async_list_jobs", AsyncMock(return_value=[job]) + ): + with patch.object(job, "_terminate") as mocked_method: + result = runner.invoke(cli, ["job", "cancel", job_id]) + + mocked_method.assert_called_once_with(JobStatus.CANCELING) + assert "The job has been cancelled" in result.output + assert result.exit_code == 0 + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_job_cancel_error(): + runner = CliRunner() + job_id = "test_job_id" + with patch( + "connectors.cli.job.Job._Job__async_list_jobs", + side_effect=ApiError(500, meta="meta", body="error"), + ): + result = runner.invoke(cli, ["job", "cancel", job_id]) + + assert "Something went wrong." in result.output + assert result.exit_code == 0 + + +def test_job_list_no_jobs(): + runner = CliRunner() + connector_id = "test_connector_id" + + with patch( + "connectors.cli.job.Job._Job__async_list_jobs", AsyncMock(return_value=[]) + ): + result = runner.invoke(cli, ["job", "list", connector_id]) + + assert "No jobs found" in result.output + assert result.exit_code == 0 + + +@patch("click.confirm", MagicMock(return_value=True)) +def test_job_list_one_job(): + runner = CliRunner() + job_id = "test_job_id" + connector_id = "test_connector_id" + index_name = "test_index_name" + status = "canceled" + deleted_document_count = 123 + indexed_document_count = 123123 + indexed_document_volume = 100500 + + job_index = MagicMock() + + doc = { + "_source": { + "connector": { + "id": connector_id, + "index_name": index_name, + "service_type": "mongodb", + "last_sync_status": "error", + "status": "connected", + }, + "status": status, + "deleted_document_count": deleted_document_count, + "indexed_document_count": indexed_document_count, + "indexed_document_volume": indexed_document_volume, + "job_type": "full", + }, + "_id": job_id, + } + + job = SyncJobObject(job_index, doc) + + with patch( + "connectors.protocol.connectors.SyncJobIndex.get_all_docs", AsyncIterator([job]) + ): + result = runner.invoke(cli, ["job", "list", connector_id]) + + assert job_id in result.output + assert connector_id in result.output + assert index_name in result.output + assert status in result.output + assert str(deleted_document_count) in result.output + assert str(indexed_document_count) in result.output + assert str(indexed_document_volume) in result.output + assert result.exit_code == 0 + + +@patch( + "connectors.protocol.connectors.ConnectorIndex.fetch_by_id", + AsyncMock(return_value=MagicMock()), +) +def test_job_start(): + runner = CliRunner() + connector_id = "test_connector_id" + + with patch( + "connectors.protocol.connectors.SyncJobIndex.create", + AsyncMock(return_value=True), + ) as patched_create: + result = runner.invoke(cli, ["job", "start", "-i", connector_id, "-t", "full"]) + + patched_create.assert_called_once() + assert "The job has been started" in result.output + assert result.exit_code == 0 From 1ecefdc4276f77e72697a07fb1723351e11da15f Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 17:53:17 +0100 Subject: [PATCH 29/33] Fix tests --- connectors/connectors_cli.py | 15 +++++++++++++++ tests/test_connectors_cli.py | 26 +++++++++++++++++++------- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index faeb6c9e0..94f622f43 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -28,6 +28,18 @@ __all__ = ["main"] +def load_config(ctx, config): + import pdb; pdb.set_trace() + if config: + return yaml.safe_load(config) + elif os.path.isfile(CONFIG_FILE_PATH): + with open(CONFIG_FILE_PATH, "r") as f: + return yaml.safe_load(f.read()) + else: + msg = f"{CONFIG_FILE_PATH} is not found" + raise FileNotFoundError(msg) + + # Main group @click.group(invoke_without_command=True) @click.version_option(__version__, "-v", "--version", message="%(version)s") @@ -45,6 +57,8 @@ def cli(ctx, config): elif os.path.isfile(CONFIG_FILE_PATH): with open(CONFIG_FILE_PATH, "r") as f: ctx.obj["config"] = yaml.safe_load(f.read()) + elif ctx.invoked_subcommand == "login": + pass else: msg = f"{CONFIG_FILE_PATH} is not found" raise FileNotFoundError(msg) @@ -307,6 +321,7 @@ def job(obj): required=True, ) def start(obj, i, t): + # import pdb; pdb.set_trace() job_cli = Job(config=obj["config"]["elasticsearch"]) click.echo("Starting a job...") if job_cli.start(connector_id=i, job_type=t): diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py index 0d51f05e6..c695a58c4 100644 --- a/tests/test_connectors_cli.py +++ b/tests/test_connectors_cli.py @@ -14,6 +14,25 @@ from tests.commons import AsyncIterator +@pytest.fixture(autouse=True) +def mock_cli_config(): + with patch('connectors.connectors_cli.load_config') as mock: + mock.return_value = {"elasticsearch": { "host": "http://localhost:9211/"} } + yield mock + +@pytest.fixture(autouse=True) +def mock_connector_es_client(): + with patch('connectors.cli.connector.ESClient') as mock: + mock.return_value = AsyncMock() + yield mock + +@pytest.fixture(autouse=True) +def mock_job_es_client(): + with patch('connectors.cli.job.ESClient') as mock: + mock.return_value = AsyncMock() + yield mock + + def test_version(): runner = CliRunner() result = runner.invoke(cli, ["-v"]) @@ -124,13 +143,6 @@ def test_connector_list_one_connector(): assert "connected" in result.output -@pytest.fixture(autouse=True) -def mock_es_client(): - with patch("connectors.cli.connector.ESClient") as mock: - mock.return_value = AsyncMock() - yield mock - - @patch("click.confirm") def test_connector_create(patch_click_confirm): runner = CliRunner() From b6fa068a27ed6fadda85b6cb204ac6d4c60e4034 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 17:55:56 +0100 Subject: [PATCH 30/33] Remove pdb --- connectors/connectors_cli.py | 2 -- tests/test_connectors_cli.py | 10 ++++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 94f622f43..0efb21725 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -29,7 +29,6 @@ def load_config(ctx, config): - import pdb; pdb.set_trace() if config: return yaml.safe_load(config) elif os.path.isfile(CONFIG_FILE_PATH): @@ -321,7 +320,6 @@ def job(obj): required=True, ) def start(obj, i, t): - # import pdb; pdb.set_trace() job_cli = Job(config=obj["config"]["elasticsearch"]) click.echo("Starting a job...") if job_cli.start(connector_id=i, job_type=t): diff --git a/tests/test_connectors_cli.py b/tests/test_connectors_cli.py index c695a58c4..1cd778367 100644 --- a/tests/test_connectors_cli.py +++ b/tests/test_connectors_cli.py @@ -16,19 +16,21 @@ @pytest.fixture(autouse=True) def mock_cli_config(): - with patch('connectors.connectors_cli.load_config') as mock: - mock.return_value = {"elasticsearch": { "host": "http://localhost:9211/"} } + with patch("connectors.connectors_cli.load_config") as mock: + mock.return_value = {"elasticsearch": {"host": "http://localhost:9211/"}} yield mock + @pytest.fixture(autouse=True) def mock_connector_es_client(): - with patch('connectors.cli.connector.ESClient') as mock: + with patch("connectors.cli.connector.ESClient") as mock: mock.return_value = AsyncMock() yield mock + @pytest.fixture(autouse=True) def mock_job_es_client(): - with patch('connectors.cli.job.ESClient') as mock: + with patch("connectors.cli.job.ESClient") as mock: mock.return_value = AsyncMock() yield mock From 5610ca192f4f12484e80015f72e24946d124a399 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 18:17:05 +0100 Subject: [PATCH 31/33] Fix loading configuration --- connectors/connectors_cli.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/connectors/connectors_cli.py b/connectors/connectors_cli.py index 0efb21725..b582d6f8d 100644 --- a/connectors/connectors_cli.py +++ b/connectors/connectors_cli.py @@ -34,6 +34,8 @@ def load_config(ctx, config): elif os.path.isfile(CONFIG_FILE_PATH): with open(CONFIG_FILE_PATH, "r") as f: return yaml.safe_load(f.read()) + elif ctx.invoked_subcommand == "login": + pass else: msg = f"{CONFIG_FILE_PATH} is not found" raise FileNotFoundError(msg) @@ -51,16 +53,7 @@ def cli(ctx, config): return ctx.ensure_object(dict) - if config: - ctx.obj["config"] = yaml.safe_load(config) - elif os.path.isfile(CONFIG_FILE_PATH): - with open(CONFIG_FILE_PATH, "r") as f: - ctx.obj["config"] = yaml.safe_load(f.read()) - elif ctx.invoked_subcommand == "login": - pass - else: - msg = f"{CONFIG_FILE_PATH} is not found" - raise FileNotFoundError(msg) + ctx.obj["config"] = load_config(ctx, config) @click.command(help="Authenticate Connectors CLI with an Elasticsearch instance") From b1ecb794b45249d4fbb826b2cca410e86840a5fb Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 19:02:40 +0100 Subject: [PATCH 32/33] Rename old cli --- connectors/{old_cli.py => service_cli.py} | 0 setup.py | 2 +- tests/{test_old_cli.py => test_service_cli.py} | 6 +++--- 3 files changed, 4 insertions(+), 4 deletions(-) rename connectors/{old_cli.py => service_cli.py} (100%) rename tests/{test_old_cli.py => test_service_cli.py} (94%) diff --git a/connectors/old_cli.py b/connectors/service_cli.py similarity index 100% rename from connectors/old_cli.py rename to connectors/service_cli.py diff --git a/setup.py b/setup.py index 3178b5f8d..4a2d9b5d6 100644 --- a/setup.py +++ b/setup.py @@ -97,7 +97,7 @@ def read_reqs(req_file): install_requires=install_requires, entry_points=""" [console_scripts] - elastic-ingest = connectors.old_cli:main + elastic-ingest = connectors.service_cli:main fake-kibana = connectors.kibana:main connectors = connectors.connectors_cli:main """, diff --git a/tests/test_old_cli.py b/tests/test_service_cli.py similarity index 94% rename from tests/test_old_cli.py rename to tests/test_service_cli.py index a9b9d5226..7d6d94860 100644 --- a/tests/test_old_cli.py +++ b/tests/test_service_cli.py @@ -14,7 +14,7 @@ import pytest from connectors import __version__ -from connectors.old_cli import main, run +from connectors.service_cli import main, run HERE = os.path.dirname(__file__) FIXTURES_DIR = os.path.abspath(os.path.join(HERE, "fixtures")) @@ -90,8 +90,8 @@ def test_run_snowflake(mock_responses, set_env): assert "Cannot use the `list` action with other actions" in output -@patch("connectors.old_cli.set_logger") -@patch("connectors.old_cli.load_config", side_effect=Exception("something went wrong")) +@patch("connectors.service_cli.set_logger") +@patch("connectors.service_cli.load_config", side_effect=Exception("something went wrong")) def test_main_with_invalid_configuration(load_config, set_logger): args = mock.MagicMock() args.log_level = logging.DEBUG # should be ignored! From 6c9363b04772da6fcd8635525a7baf628a3b48a3 Mon Sep 17 00:00:00 2001 From: Dmitrii Burlutckii Date: Tue, 28 Nov 2023 19:17:01 +0100 Subject: [PATCH 33/33] Lint --- tests/test_service_cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_service_cli.py b/tests/test_service_cli.py index 7d6d94860..635f399bb 100644 --- a/tests/test_service_cli.py +++ b/tests/test_service_cli.py @@ -91,7 +91,9 @@ def test_run_snowflake(mock_responses, set_env): @patch("connectors.service_cli.set_logger") -@patch("connectors.service_cli.load_config", side_effect=Exception("something went wrong")) +@patch( + "connectors.service_cli.load_config", side_effect=Exception("something went wrong") +) def test_main_with_invalid_configuration(load_config, set_logger): args = mock.MagicMock() args.log_level = logging.DEBUG # should be ignored!