Skip to content

Commit

Permalink
Merge branch 'main' into github-dls
Browse files Browse the repository at this point in the history
  • Loading branch information
timgrein authored Nov 30, 2023
2 parents 971c7e2 + 57217c9 commit 6b3d19d
Show file tree
Hide file tree
Showing 26 changed files with 1,453 additions and 44 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ __pycache__
# jetbrains files
.idea
*.iml
.cli
8 changes: 0 additions & 8 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@
#service.log_level: INFO
#
#
## Whether telemetry is enabled
#service.telemetry.enabled: true
#
#
## The interval (in seconds) to run telemetry job
#service.telemetry.interval: 3600
#
#
## ------------------------------- Extraction Service ----------------------------------
#
## Local extraction service-related configurations.
Expand Down
Empty file added connectors/cli/.gitkeep
Empty file.
40 changes: 40 additions & 0 deletions connectors/cli/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import os

import yaml
from elasticsearch import ApiError

from connectors.es import ESClient

CONFIG_FILE_PATH = ".cli/config.yml"


class Auth:
def __init__(self, host, username, password):
self.elastic_config = {"host": host, "username": username, "password": password}
self.es_client = ESClient(self.elastic_config)

def authenticate(self):
if asyncio.run(self.__ping_es_client()):
self.__save_config()
return True
else:
return False

def is_config_present(self):
return os.path.isfile(CONFIG_FILE_PATH)

async def __ping_es_client(self):
try:
return await self.es_client.ping()
except ApiError:
return False
finally:
await self.es_client.close()

def __save_config(self):
yaml_content = yaml.dump({"elasticsearch": self.elastic_config})
os.makedirs(os.path.dirname(CONFIG_FILE_PATH), exist_ok=True)

with open(CONFIG_FILE_PATH, "w") as f:
f.write(yaml_content)
183 changes: 183 additions & 0 deletions connectors/cli/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import asyncio
from collections import OrderedDict

from connectors.es.client import ESClient
from connectors.es.settings import DEFAULT_LANGUAGE, Mappings, Settings
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
ConnectorIndex,
)
from connectors.source import get_source_klass
from connectors.utils import iso_utc


class IndexAlreadyExists(Exception):
pass


class Connector:
def __init__(self, config):
self.config = config

# initialize ES client
self.es_client = ESClient(self.config)

self.connector_index = ConnectorIndex(self.config)

async def list_connectors(self):
# TODO move this on top
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)

return [
connector async for connector in self.connector_index.all_connectors()
]

# TODO catch exceptions
finally:
await self.connector_index.close()
await self.es_client.close()

def service_type_configuration(self, source_class):
source_klass = get_source_klass(source_class)
configuration = source_klass.get_default_configuration()

return OrderedDict(sorted(configuration.items(), key=lambda x: x[1]["order"]))

def create(
self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE
):
return asyncio.run(
self.__create(index_name, service_type, configuration, language)
)

async def __create(
self, index_name, service_type, configuration, language=DEFAULT_LANGUAGE
):
try:
return await asyncio.gather(
self.__create_search_index(index_name, language),
self.__create_connector(
index_name, service_type, configuration, language
),
)
except Exception as e:
raise e
finally:
await self.es_client.close()

async def __create_search_index(self, index_name, language):
mappings = Mappings.default_text_fields_mappings(
is_connectors_index=True,
)

settings = Settings(language_code=language, analysis_icu=False).to_hash()

settings["auto_expand_replicas"] = "0-3"
settings["number_of_shards"] = 2

await self.es_client.client.indices.create(
index=index_name, mappings=mappings, settings=settings
)

async def __create_connector(
self, index_name, service_type, configuration, language
):
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)
timestamp = iso_utc()

doc = {
"api_key_id": "",
"configuration": configuration,
"index_name": index_name,
"service_type": service_type,
"status": "configured", # TODO use a predefined constant
"is_native": True, # TODO make it optional
"language": language,
"last_access_control_sync_error": None,
"last_access_control_sync_scheduled_at": None,
"last_access_control_sync_status": None,
"last_sync_status": None,
"last_sync_error": None,
"last_sync_scheduled_at": None,
"last_synced": None,
"last_seen": None,
"created_at": timestamp,
"updated_at": timestamp,
"filtering": self.default_filtering(timestamp),
"scheduling": self.default_scheduling(),
"custom_scheduling": {},
"pipeline": {
"extract_binary_content": True,
"name": "ent-search-generic-ingestion",
"reduce_whitespace": True,
"run_ml_inference": True,
},
"last_indexed_document_count": 0,
"last_deleted_document_count": 0,
}

connector = await self.connector_index.index(doc)
return connector["_id"]
finally:
await self.connector_index.close()

def default_scheduling(self):
return {
"access_control": {"enabled": False, "interval": "0 0 0 * * ?"},
"full": {"enabled": False, "interval": "0 0 0 * * ?"},
"incremental": {"enabled": False, "interval": "0 0 0 * * ?"},
}

def default_filtering(self, timestamp):
return [
{
"active": {
"advanced_snippet": {
"created_at": timestamp,
"updated_at": timestamp,
"value": {},
},
"rules": [
{
"created_at": timestamp,
"field": "_",
"id": "DEFAULT",
"order": 0,
"policy": "include",
"rule": "regex",
"updated_at": timestamp,
"value": ".*",
}
],
"validation": {"errors": [], "state": "valid"},
},
"domain": "DEFAULT",
"draft": {
"advanced_snippet": {
"created_at": timestamp,
"updated_at": timestamp,
"value": {},
},
"rules": [
{
"created_at": timestamp,
"field": "_",
"id": "DEFAULT",
"order": 0,
"policy": "include",
"rule": "regex",
"updated_at": timestamp,
"value": ".*",
}
],
"validation": {"errors": [], "state": "valid"},
},
}
]
45 changes: 45 additions & 0 deletions connectors/cli/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio

from elasticsearch import ApiError

from connectors.es import ESClient


class Index:
def __init__(self, config):
self.elastic_config = config
self.es_client = ESClient(self.elastic_config)

def list_indices(self):
return asyncio.run(self.__list_indices())["indices"]

def clean(self, index_name):
return asyncio.run(self.__clean_index(index_name))

def delete(self, index_name):
return asyncio.run(self.__delete_index(index_name))

async def __list_indices(self):
try:
return await self.es_client.list_indices()
except ApiError as e:
raise e
finally:
await self.es_client.close()

async def __clean_index(self, index_name):
try:
return await self.es_client.clean_index(index_name)
except ApiError:
return False
finally:
await self.es_client.close()

async def __delete_index(self, index_name):
try:
await self.es_client.delete_indices([index_name])
return True
except ApiError:
return False
finally:
await self.es_client.close()
95 changes: 95 additions & 0 deletions connectors/cli/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio

from elasticsearch import ApiError

from connectors.es.client import ESClient
from connectors.protocol import (
CONCRETE_CONNECTORS_INDEX,
CONCRETE_JOBS_INDEX,
ConnectorIndex,
JobStatus,
JobTriggerMethod,
JobType,
Sort,
SyncJobIndex,
)


class Job:
def __init__(self, config):
self.config = config
self.es_client = ESClient(self.config)
self.sync_job_index = SyncJobIndex(self.config)
self.connector_index = ConnectorIndex(self.config)

def list_jobs(self, connector_id=None, index_name=None, job_id=None):
return asyncio.run(self.__async_list_jobs(connector_id, index_name, job_id))

def cancel(self, connector_id=None, index_name=None, job_id=None):
return asyncio.run(self.__async_cancel_jobs(connector_id, index_name, job_id))

def start(self, connector_id, job_type):
return asyncio.run(self.__async_start(connector_id, job_type))

async def __async_start(self, connector_id, job_type):
try:
connector = await self.connector_index.fetch_by_id(connector_id)
await self.sync_job_index.create(
connector=connector,
trigger_method=JobTriggerMethod.ON_DEMAND,
job_type=JobType(job_type),
)

return True
finally:
await self.sync_job_index.close()
await self.connector_index.close()
await self.es_client.close()

async def __async_list_jobs(self, connector_id, index_name, job_id):
try:
await self.es_client.ensure_exists(
indices=[CONCRETE_CONNECTORS_INDEX, CONCRETE_JOBS_INDEX]
)
jobs = self.sync_job_index.get_all_docs(
query=self.__job_list_query(connector_id, index_name, job_id),
sort=self.__job_list_sort(),
)

return [job async for job in jobs]

# TODO catch exceptions
finally:
await self.sync_job_index.close()
await self.es_client.close()

async def __async_cancel_jobs(self, connector_id, index_name, job_id):
try:
jobs = await self.__async_list_jobs(connector_id, index_name, job_id)

for job in jobs:
await job._terminate(JobStatus.CANCELING)

return True
except ApiError:
return False
finally:
await self.sync_job_index.close()
await self.es_client.close()

def __job_list_query(self, connector_id, index_name, job_id):
if job_id:
return {"bool": {"must": [{"term": {"_id": job_id}}]}}

if index_name:
return {
"bool": {"filter": [{"term": {"connector.index_name": index_name}}]}
}

if connector_id:
return {"bool": {"must": [{"term": {"connector.id": connector_id}}]}}

return None

def __job_list_sort(self):
return [{"created_at": Sort.ASC.value}]
4 changes: 0 additions & 4 deletions connectors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ def _default_config():
"max_concurrent_access_control_syncs": 1,
"job_cleanup_interval": 300,
"log_level": "INFO",
"telemetry": {
"enabled": True,
"interval": 3600,
},
},
"sources": {
"azure_blob_storage": "connectors.sources.azure_blob_storage:AzureBlobStorageDataSource",
Expand Down
Loading

0 comments on commit 6b3d19d

Please sign in to comment.