Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectors CLI tool #1616

Merged
merged 34 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bb849a3
CLI base tool
vidok Aug 23, 2023
5964ac8
Add a cli folder
vidok Sep 11, 2023
4a611b5
Main parser (#1617)
vidok Sep 12, 2023
1f8e18e
Replace argparser with click (#1621)
vidok Sep 12, 2023
1d0a7e7
Add -c option (#1625)
vidok Sep 12, 2023
cb7092c
Introduce bin/connectors login (#1626)
tutelaris Sep 13, 2023
d2d68db
Introduce connector list command (#1636)
vidok Sep 13, 2023
f3c14fe
Add index cli (#1637)
tutelaris Sep 14, 2023
f0226b8
Introduce Job cli (#1643)
tutelaris Sep 14, 2023
1cd7644
Introduce connector create command (#1645)
vidok Sep 14, 2023
8608468
Small improvements
vidok Sep 18, 2023
a4427dd
Fix cancelling jobs
vidok Sep 18, 2023
cfa251e
Remove emojis
vidok Oct 18, 2023
24bdf11
Drop index_name and job_id params
vidok Oct 18, 2023
35a414a
Drop connector_id and index_id for job cancelling
vidok Oct 18, 2023
76cf921
Print help page when no subcommands provided
vidok Nov 21, 2023
cec531a
Add language option
vidok Nov 21, 2023
c0b34a6
Autoformat
vidok Nov 21, 2023
f91c17c
More linter
vidok Nov 21, 2023
d4bdc8d
Fix tests
vidok Nov 22, 2023
a2da007
Add tests for cli, login commands
vidok Nov 22, 2023
5695666
Print help page when no commands provided
vidok Nov 22, 2023
fc0ab42
Tests for connect and index commands
vidok Nov 23, 2023
7720531
Update help page
vidok Nov 23, 2023
8cd39a1
Add TODO
vidok Nov 23, 2023
e2e1194
Adress feedback
vidok Nov 24, 2023
aaf816f
Remove redundant client() method
vidok Nov 27, 2023
d2adb8a
Add more tests
vidok Nov 28, 2023
1ecefdc
Fix tests
vidok Nov 28, 2023
b6fa068
Remove pdb
vidok Nov 28, 2023
5610ca1
Fix loading configuration
vidok Nov 28, 2023
b1ecb79
Rename old cli
vidok Nov 28, 2023
6c9363b
Lint
vidok Nov 28, 2023
1588896
Merge branch 'main' into dmitrii/5185-connectors-clients-serverless
vidok Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ __pycache__
# jetbrains files
.idea
*.iml
.cli
Empty file added connectors/cli/.gitkeep
Empty file.
40 changes: 40 additions & 0 deletions connectors/cli/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import os

import yaml
from elasticsearch import ApiError

from connectors.es import ESClient

CONFIG_FILE_PATH = ".cli/config.yml"


class Auth:
def __init__(self, host, username, password):
self.elastic_config = {"host": host, "username": username, "password": password}
self.es_client = ESClient(self.elastic_config)

def authenticate(self):
if asyncio.run(self.__ping_es_client()):
self.__save_config()
return True
else:
return False

def is_config_present(self):
return os.path.isfile(CONFIG_FILE_PATH)

async def __ping_es_client(self):
try:
return await self.es_client.ping()
except ApiError:
return False
finally:
await self.es_client.close()

def __save_config(self):
yaml_content = yaml.dump({"elasticsearch": self.elastic_config})
os.makedirs(os.path.dirname(CONFIG_FILE_PATH), exist_ok=True)

with open(CONFIG_FILE_PATH, "w") as f:
f.write(yaml_content)
183 changes: 183 additions & 0 deletions connectors/cli/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import asyncio
from collections import OrderedDict

from connectors.es.client import ESClient
from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
ConnectorIndex,
)
from connectors.source import get_source_klass
from connectors.utils import iso_utc


class IndexAlreadyExists(Exception):
pass


class Connector:
def __init__(self, config):
self.config = config

# initialize ES client
self.es_client = ESClient(self.config)

self.connector_index = ConnectorIndex(self.config)

async def list_connectors(self):
# TODO move this on top
vidok marked this conversation as resolved.
Show resolved Hide resolved
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)

return [
connector async for connector in self.connector_index.all_connectors()
]

# TODO catch exceptions
finally:
await self.connector_index.close()
await self.es_client.close()

def service_type_configuration(self, source_class):
source_klass = get_source_klass(source_class)
configuration = source_klass.get_default_configuration()

return OrderedDict(sorted(configuration.items(), key=lambda x: x[1]["order"]))

def create(
self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE
):
return asyncio.run(
self.__create(index_name, service_type, configuration, language)
)

async def __create(
self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE
):
try:
return await asyncio.gather(
self.__create_search_index(index_name, language),
self.__create_connector(
index_name, service_type, configuration, language
),
)
except Exception as e:
raise e
finally:
await self.es_client.close()

async def __create_search_index(self, index_name, language):
mappings = Mappings.default_text_fields_mappings(
is_connectors_index=True,
)

settings = Settings(language_code=language, analysis_icu=False).to_hash()

settings["auto_expand_replicas"] = "0-3"
settings["number_of_shards"] = 2

await self.es_client.client.indices.create(
index=index_name, mappings=mappings, settings=settings
)

async def __create_connector(
self, index_name, service_type, configuration, language
):
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)
timestamp = iso_utc()

doc = {
"api_key_id": "",
"configuration": configuration,
"index_name": index_name,
"service_type": service_type,
"status": "configured", # TODO use a predefined constant
"is_native": True, # TODO make it optional
vidok marked this conversation as resolved.
Show resolved Hide resolved
"language": language,
"last_access_control_sync_error": None,
"last_access_control_sync_scheduled_at": None,
"last_access_control_sync_status": None,
"last_sync_status": None,
"last_sync_error": None,
"last_sync_scheduled_at": None,
"last_synced": None,
"last_seen": None,
"created_at": timestamp,
"updated_at": timestamp,
"filtering": self.default_filtering(timestamp),
"scheduling": self.default_scheduling(),
"custom_scheduling": {},
"pipeline": {
"extract_binary_content": True,
"name": "ent-search-generic-ingestion",
"reduce_whitespace": True,
"run_ml_inference": True,
},
"last_indexed_document_count": 0,
"last_deleted_document_count": 0,
}

connector = await self.connector_index.index(doc)
return connector["_id"]
finally:
await self.connector_index.close()

def default_scheduling(self):
return {
"access_control": {"enabled": False, "interval": "0 0 0 * * ?"},
"full": {"enabled": False, "interval": "0 0 0 * * ?"},
"incremental": {"enabled": False, "interval": "0 0 0 * * ?"},
}

def default_filtering(self, timestamp):
return [
{
"active": {
"advanced_snippet": {
"created_at": timestamp,
"updated_at": timestamp,
"value": {},
},
"rules": [
{
"created_at": timestamp,
"field": "_",
"id": "DEFAULT",
"order": 0,
"policy": "include",
"rule": "regex",
"updated_at": timestamp,
"value": ".*",
}
],
"validation": {"errors": [], "state": "valid"},
},
"domain": "DEFAULT",
"draft": {
"advanced_snippet": {
"created_at": timestamp,
"updated_at": timestamp,
"value": {},
},
"rules": [
{
"created_at": timestamp,
"field": "_",
"id": "DEFAULT",
"order": 0,
"policy": "include",
"rule": "regex",
"updated_at": timestamp,
"value": ".*",
}
],
"validation": {"errors": [], "state": "valid"},
},
}
]
45 changes: 45 additions & 0 deletions connectors/cli/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio

from elasticsearch import ApiError

from connectors.es import ESClient


class Index:
def __init__(self, config):
self.elastic_config = config
self.es_client = ESClient(self.elastic_config)

def list_indices(self):
return asyncio.run(self.__list_indices())["indices"]

def clean(self, index_name):
return asyncio.run(self.__clean_index(index_name))

def delete(self, index_name):
return asyncio.run(self.__delete_index(index_name))

async def __list_indices(self):
try:
return await self.es_client.list_indices()
except ApiError as e:
raise e
finally:
await self.es_client.close()

async def __clean_index(self, index_name):
try:
return await self.es_client.clean_index(index_name)
except ApiError:
return False
finally:
await self.es_client.close()

async def __delete_index(self, index_name):
try:
await self.es_client.delete_indices([index_name])
return True
except ApiError:
return False
finally:
await self.es_client.close()
95 changes: 95 additions & 0 deletions connectors/cli/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio

from elasticsearch import ApiError

from connectors.es.client import ESClient
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
ConnectorIndex,
JobStatus,
JobTriggerMethod,
JobType,
Sort,
SyncJobIndex,
)


class Job:
def __init__(self, config):
self.config = config
self.es_client = ESClient(self.config)
self.sync_job_index = SyncJobIndex(self.config)
self.connector_index = ConnectorIndex(self.config)

def list_jobs(self, connector_id=None, index_name=None, job_id=None):
return asyncio.run(self.__async_list_jobs(connector_id, index_name, job_id))

def cancel(self, connector_id=None, index_name=None, job_id=None):
return asyncio.run(self.__async_cancel_jobs(connector_id, index_name, job_id))

def start(self, connector_id, job_type):
return asyncio.run(self.__async_start(connector_id, job_type))

async def __async_start(self, connector_id, job_type):
try:
connector = await self.connector_index.fetch_by_id(connector_id)
await self.sync_job_index.create(
connector=connector,
trigger_method=JobTriggerMethod.ON_DEMAND,
job_type=JobType(job_type),
)

return True
finally:
await self.sync_job_index.close()
await self.connector_index.close()
await self.es_client.close()

async def __async_list_jobs(self, connector_id, index_name, job_id):
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)
jobs = self.sync_job_index.get_all_docs(
query=self.__job_list_query(connector_id, index_name, job_id),
sort=self.__job_list_sort(),
)

return [job async for job in jobs]

# TODO catch exceptions
finally:
await self.sync_job_index.close()
await self.es_client.close()

async def __async_cancel_jobs(self, connector_id, index_name, job_id):
try:
jobs = await self.__async_list_jobs(connector_id, index_name, job_id)

for job in jobs:
await job._terminate(JobStatus.CANCELING)

return True
except ApiError:
return False
finally:
await self.sync_job_index.close()
await self.es_client.close()

def __job_list_query(self, connector_id, index_name, job_id):
if job_id:
return {"bool": {"must": [{"term": {"_id": job_id}}]}}

if index_name:
return {
"bool": {"filter": [{"term": {"connector.index_name": index_name}}]}
}

if connector_id:
return {"bool": {"must": [{"term": {"connector.id": connector_id}}]}}

return None

def __job_list_sort(self):
return [{"created_at": Sort.ASC.value}]
Loading
Loading