diff --git a/common/db/connections.py b/common/db/connections.py index 80088dfd..b1c73581 100644 --- a/common/db/connections.py +++ b/common/db/connections.py @@ -3,7 +3,7 @@ from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBasicCredentials, HTTPAuthorizationCredentials -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import TigerGraphConnection, AsyncTigerGraphConnection from pyTigerGraph.common.exception import TigerGraphException from requests import HTTPError @@ -21,14 +21,24 @@ def get_db_connection_id_token( graphname: str, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], + async_conn: bool = False ) -> TigerGraphConnectionProxy: - conn = TigerGraphConnection( - host=db_config["hostname"], - graphname=graphname, - apiToken=credentials, - tgCloud=True, - sslPort=14240, - ) + if async_conn: + conn = AsyncTigerGraphConnection( + host=db_config["hostname"], + graphname=graphname, + apiToken=credentials, + tgCloud=True, + sslPort=14240, + ) + else: + conn = TigerGraphConnection( + host=db_config["hostname"], + graphname=graphname, + apiToken=credentials, + tgCloud=True, + sslPort=14240, + ) conn.customizeHeader( timeout=db_config["default_timeout"] * 1000, responseSize=5000000 ) @@ -55,9 +65,10 @@ def get_db_connection_id_token( def get_db_connection_pwd( - graphname, credentials: Annotated[HTTPBasicCredentials, Depends(security)] + graphname, credentials: Annotated[HTTPBasicCredentials, Depends(security)], + async_conn: bool = False ) -> TigerGraphConnectionProxy: - conn = elevate_db_connection_to_token(db_config["hostname"], credentials.username, credentials.password, graphname) + conn = elevate_db_connection_to_token(db_config["hostname"], credentials.username, credentials.password, graphname, async_conn) conn.customizeHeader( timeout=db_config["default_timeout"] * 1000, responseSize=5000000 @@ -70,12 +81,13 @@ def get_db_connection_pwd( def get_db_connection_pwd_manual( graphname, username: str, password: str, + async_conn: bool = False ) -> TigerGraphConnectionProxy: """ Manual auth - pass in user/pass not from basic auth """ conn = elevate_db_connection_to_token( - db_config["hostname"], username, password, graphname + db_config["hostname"], username, password, graphname, async_conn ) conn.customizeHeader( @@ -85,22 +97,19 @@ def get_db_connection_pwd_manual( LogWriter.info("Connected to TigerGraph with password") return conn -def elevate_db_connection_to_token(host, username, password, graphname) -> TigerGraphConnectionProxy: +def elevate_db_connection_to_token(host, username, password, graphname, async_conn: bool = False) -> TigerGraphConnectionProxy: conn = TigerGraphConnection( host=host, username=username, password=password, - graphname=graphname + graphname=graphname, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") ) if db_config["getToken"]: try: - apiToken = conn._post( - conn.restppUrl + "/requesttoken", - authMode="pwd", - data=str({"graph": conn.graphname}), - resKey="results", - )["token"] + apiToken = conn.getToken()[0] except HTTPError: LogWriter.error("Failed to get token") raise HTTPException( @@ -115,13 +124,38 @@ def elevate_db_connection_to_token(host, username, password, graphname) -> Tiger detail="Failed to get token - is the database running?" ) + if async_conn: + conn = AsyncTigerGraphConnection( + host=host, + username=username, + password=password, + graphname=graphname, + apiToken=apiToken, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) + else: + conn = TigerGraphConnection( + host=db_config["hostname"], + username=username, + password=password, + graphname=graphname, + apiToken=apiToken, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) + else: + if async_conn: + conn = AsyncTigerGraphConnection( + host=host, + username=username, + password=password, + graphname=graphname, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) - conn = TigerGraphConnection( - host=db_config["hostname"], - username=username, - password=password, - graphname=graphname, - apiToken=apiToken - ) + # temp fix for path + conn.restppUrl = conn.restppUrl+"/restpp" return conn diff --git a/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql b/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql new file mode 100644 index 00000000..dbd16aa7 --- /dev/null +++ b/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql @@ -0,0 +1,36 @@ +CREATE SCHEMA_CHANGE JOB add_supportai_schema { + ADD VERTEX DocumentChunk(id STRING PRIMARY KEY, idx INT, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Document(id STRING PRIMARY KEY, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Concept(id STRING PRIMARY KEY, description STRING, concept_type STRING, human_curated BOOL, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Entity(id STRING PRIMARY KEY, definition STRING, description SET, entity_type STRING, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Relationship(id STRING PRIMARY KEY, definition STRING, short_name STRING, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX DocumentCollection(id STRING PRIMARY KEY, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Content(id STRING PRIMARY KEY, text STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX EntityType(id STRING PRIMARY KEY, description STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD DIRECTED EDGE HAS_CONTENT(FROM Document, TO Content|FROM DocumentChunk, TO Content) WITH REVERSE_EDGE="reverse_HAS_CONTENT"; + ADD DIRECTED EDGE IS_CHILD_OF(FROM Concept, TO Concept) WITH REVERSE_EDGE="reverse_IS_CHILD_OF"; + ADD DIRECTED EDGE IS_HEAD_OF(FROM Entity, TO Relationship) WITH REVERSE_EDGE="reverse_IS_HEAD_OF"; + ADD DIRECTED EDGE HAS_TAIL(FROM Relationship, TO Entity) WITH REVERSE_EDGE="reverse_HAS_TAIL"; + ADD DIRECTED EDGE DESCRIBES_RELATIONSHIP(FROM Concept, TO Relationship) WITH REVERSE_EDGE="reverse_DESCRIBES_RELATIONSHIP"; + ADD DIRECTED EDGE DESCRIBES_ENTITY(FROM Concept, TO Entity) WITH REVERSE_EDGE="reverse_DESCRIBES_ENTITY"; + ADD DIRECTED EDGE CONTAINS_ENTITY(FROM DocumentChunk, TO Entity|FROM Document, TO Entity) WITH REVERSE_EDGE="reverse_CONTAINS_ENTITY"; + ADD DIRECTED EDGE MENTIONS_RELATIONSHIP(FROM DocumentChunk, TO Relationship|FROM Document, TO Relationship) WITH REVERSE_EDGE="reverse_MENTIONS_RELATIONSHIP"; + ADD DIRECTED EDGE IS_AFTER(FROM DocumentChunk, TO DocumentChunk) WITH REVERSE_EDGE="reverse_IS_AFTER"; + ADD DIRECTED EDGE HAS_CHILD(FROM Document, TO DocumentChunk) WITH REVERSE_EDGE="reverse_HAS_CHILD"; + ADD DIRECTED EDGE HAS_RELATIONSHIP(FROM Concept, TO Concept, relation_type STRING) WITH REVERSE_EDGE="reverse_HAS_RELATIONSHIP"; + ADD DIRECTED EDGE CONTAINS_DOCUMENT(FROM DocumentCollection, TO Document) WITH REVERSE_EDGE="reverse_CONTAINS_DOCUMENT"; + ADD DIRECTED EDGE ENTITY_HAS_TYPE(FROM Entity, TO EntityType) WITH REVERSE_EDGE="reverse_ENTITY_HAS_TYPE"; + ADD DIRECTED EDGE RELATIONSHIP_TYPE(FROM EntityType, TO EntityType, DISCRIMINATOR(relation_type STRING), frequency INT) WITH REVERSE_EDGE="reverse_RELATIONSHIP_TYPE"; + + // GraphRAG + ADD VERTEX Community (id STRING PRIMARY KEY, iteration UINT, description STRING) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX ResolvedEntity(id STRING PRIMARY KEY, entity_type STRING )WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + + ADD DIRECTED EDGE RELATIONSHIP(FROM Entity, TO Entity, relation_type STRING) WITH REVERSE_EDGE="reverse_RELATIONSHIP"; + ADD DIRECTED EDGE RESOLVES_TO(FROM Entity, TO ResolvedEntity, relation_type STRING) WITH REVERSE_EDGE="reverse_RESOLVES_TO"; // Connect ResolvedEntities with their children entities + ADD DIRECTED EDGE RESOLVED_RELATIONSHIP(FROM ResolvedEntity, TO ResolvedEntity, relation_type STRING) WITH REVERSE_EDGE="reverse_RESOLVED_RELATIONSHIP"; // store edges between entities after they're resolved + + ADD DIRECTED EDGE IN_COMMUNITY(FROM ResolvedEntity, TO Community) WITH REVERSE_EDGE="reverse_IN_COMMUNITY"; + ADD DIRECTED EDGE LINKS_TO (from Community, to Community, weight DOUBLE) WITH REVERSE_EDGE="reverse_LINKS_TO"; + ADD DIRECTED EDGE HAS_PARENT (from Community, to Community) WITH REVERSE_EDGE="reverse_HAS_PARENT"; +} diff --git a/common/metrics/tg_proxy.py b/common/metrics/tg_proxy.py index 5bfcb494..d6291c6f 100644 --- a/common/metrics/tg_proxy.py +++ b/common/metrics/tg_proxy.py @@ -33,7 +33,7 @@ def hooked(*args, **kwargs): else: return original_attr - def _req(self, method: str, url: str, authMode: str, *args, **kwargs): + def _req(self, method: str, url: str, authMode: str = "token", *args, **kwargs): # we always use token auth # always use proxy endpoint in GUI for restpp and gsql if self.auth_mode == "pwd": diff --git a/copilot/app/supportai/supportai.py b/copilot/app/supportai/supportai.py index e96663a3..0591319f 100644 --- a/copilot/app/supportai/supportai.py +++ b/copilot/app/supportai/supportai.py @@ -15,7 +15,11 @@ def init_supportai(conn: TigerGraphConnection, graphname: str) -> tuple[dict, dict]: # need to open the file using the absolute path - file_path = "common/gsql/supportai/SupportAI_Schema.gsql" + ver = conn.getVer().split(".") + if int(ver[0]) >= 4 and int(ver[1]) >= 2: + file_path = "common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql" + else: + file_path = "common/gsql/supportai/SupportAI_Schema.gsql" with open(file_path, "r") as f: schema = f.read() schema_res = conn.gsql( diff --git a/docs/notebooks/SupportAIDemo.ipynb b/docs/notebooks/SupportAIDemo.ipynb index f7a69b35..1b1e477b 100644 --- a/docs/notebooks/SupportAIDemo.ipynb +++ b/docs/notebooks/SupportAIDemo.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 50, "metadata": {}, "outputs": [], "source": [ @@ -21,13 +21,14 @@ "\n", "load_dotenv()\n", "# We first create a connection to the database\n", - "host = os.environ[\"HOST\"]\n", + "host = \"http://192.168.99.201\" #os.environ[\"HOST\"]\n", "username = os.getenv(\"USERNAME\", \"tigergraph\")\n", "password = os.getenv(\"PASS\", \"tigergraph\")\n", "conn = TigerGraphConnection(\n", " host=host,\n", " username=username,\n", " password=password,\n", + " gsPort=\"31409\"\n", ")\n", "\n", "# And then add CoPilot's address to the connection. This address\n", @@ -54,28 +55,52 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "'The graph pyTigerGraphRAG is created.'" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.gsql(\"\"\"CREATE GRAPH pyTigerGraphRAG()\"\"\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 51, "metadata": {}, "outputs": [], "source": [ "conn.graphname = \"pyTigerGraphRAG\"\n", - "conn.getToken()" + "#conn.getToken()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'host_name': 'http://192.168.99.201',\n", + " 'schema_creation_status': '\"Using graph \\'pyTigerGraphRAG\\'\\\\nSuccessfully created schema change jobs: [add_supportai_schema].\\\\nWARNING: When modifying the graph schema, reinstalling all affected queries is required, and the duration of this process may vary based on the number and complexity of the queries. To skip query reinstallation, you can run with the \\'-N\\' option, but manual reinstallation of queries will be necessary afterwards.\\\\nKick off schema change job add_supportai_schema\\\\nDoing schema change on graph \\'pyTigerGraphRAG\\' (current version: 0)\\\\nTrying to add local vertex \\'DocumentChunk\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Document\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Concept\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Entity\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Relationship\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'DocumentCollection\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Content\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'EntityType\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Community\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'ResolvedEntity\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_CONTENT\\' and its reverse edge \\'reverse_HAS_CONTENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_CHILD_OF\\' and its reverse edge \\'reverse_IS_CHILD_OF\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_HEAD_OF\\' and its reverse edge \\'reverse_IS_HEAD_OF\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_TAIL\\' and its reverse edge \\'reverse_HAS_TAIL\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'DESCRIBES_RELATIONSHIP\\' and its reverse edge \\'reverse_DESCRIBES_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'DESCRIBES_ENTITY\\' and its reverse edge \\'reverse_DESCRIBES_ENTITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'CONTAINS_ENTITY\\' and its reverse edge \\'reverse_CONTAINS_ENTITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'MENTIONS_RELATIONSHIP\\' and its reverse edge \\'reverse_MENTIONS_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_AFTER\\' and its reverse edge \\'reverse_IS_AFTER\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_CHILD\\' and its reverse edge \\'reverse_HAS_CHILD\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_RELATIONSHIP\\' and its reverse edge \\'reverse_HAS_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'CONTAINS_DOCUMENT\\' and its reverse edge \\'reverse_CONTAINS_DOCUMENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'ENTITY_HAS_TYPE\\' and its reverse edge \\'reverse_ENTITY_HAS_TYPE\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RELATIONSHIP_TYPE\\' and its reverse edge \\'reverse_RELATIONSHIP_TYPE\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RELATIONSHIP\\' and its reverse edge \\'reverse_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RESOLVES_TO\\' and its reverse edge \\'reverse_RESOLVES_TO\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RESOLVED_RELATIONSHIP\\' and its reverse edge \\'reverse_RESOLVED_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IN_COMMUNITY\\' and its reverse edge \\'reverse_IN_COMMUNITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'LINKS_TO\\' and its reverse edge \\'reverse_LINKS_TO\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_PARENT\\' and its reverse edge \\'reverse_HAS_PARENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\n\\\\nGraph pyTigerGraphRAG updated to new version 1\\\\nValidating existing queries for graph pyTigerGraphRAG ...\\\\nThe job add_supportai_schema completes in 1.371 seconds!\\\\nLocal schema change succeeded.\"',\n", + " 'index_creation_status': '\"Using graph \\'pyTigerGraphRAG\\'\\\\nSuccessfully created schema change jobs: [add_supportai_indexes].\\\\nWARNING: When modifying the graph schema, reinstalling all affected queries is required, and the duration of this process may vary based on the number and complexity of the queries. To skip query reinstallation, you can run with the \\'-N\\' option, but manual reinstallation of queries will be necessary afterwards.\\\\nKick off schema change job add_supportai_indexes\\\\nDoing schema change on graph \\'pyTigerGraphRAG\\' (current version: 1)\\\\nTrying to add index \\'doc_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_epoch_processing_indexepoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\n\\\\nGraph pyTigerGraphRAG updated to new version 2\\\\nValidating existing queries for graph pyTigerGraphRAG ...\\\\nThe job add_supportai_indexes completes in 1.212 seconds!\\\\nLocal schema change succeeded.\"'}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.ai.initializeSupportAI()" ] @@ -86,8 +111,8 @@ "metadata": {}, "outputs": [], "source": [ - "access = os.environ[\"AWS_ACCESS_KEY_ID\"]\n", - "sec = os.environ[\"AWS_SECRET_ACCESS_KEY\"]\n", + "access = \"\"\n", + "sec = \"\"\n", "res = conn.ai.createDocumentIngest(\n", " data_source=\"s3\",\n", " data_source_config={\"aws_access_key\": access, \"aws_secret_key\": sec},\n", @@ -98,20 +123,86 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 11, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'job_name': 'load_documents_content_json_4cc0b2115f754540b4543469612743f6',\n", + " 'job_id': 'pyTigerGraphRAG.load_documents_content_json_4cc0b2115f754540b4543469612743f6.stream.SupportAI_pyTigerGraphRAG_bc71b650248d41df83eae15155c2bce5.1733184693598',\n", + " 'log_location': '/home/tigergraph/tigergraph/log/kafkaLoader/pyTigerGraphRAG.load_documents_content_json_4cc0b2115f754540b4543469612743f6.stream.SupportAI_pyTigerGraphRAG_bc71b650248d41df83eae15155c2bce5.1733184693598'}" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.ai.runDocumentIngest(res[\"load_job_id\"], res[\"data_source_id\"], \"s3://tg-documentation/pytg_current/pytg_current.jsonl\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 52, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'status': 'submitted'}" + ] + }, + "execution_count": 52, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "conn.ai.forceConsistencyUpdate(\"graphrag\")" + ] + }, + { + "cell_type": "code", + "execution_count": 49, "metadata": {}, "outputs": [], "source": [ - "conn.ai.forceConsistencyUpdate(method=\"graphrag\")" + "from pyTigerGraph import AsyncTigerGraphConnection\n", + "\n", + "conn = AsyncTigerGraphConnection(\n", + " host=host,\n", + " username=username,\n", + " password=password,\n", + " gsPort=\"31409\"\n", + ")\n", + "\n", + "conn.graphname = \"pyTigerGraphRAG\"\n", + "\n", + "res = await conn.gsql(\"\"\"USE GRAPH pyTigerGraphRAG\n", + " CREATE QUERY hello() FOR GRAPH pyTigerGraphRAG{PRINT \"hello\";}\n", + " INSTALL QUERY hello\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\"Using graph 'pyTigerGraphRAG'\\nline 2:53 extraneous input ''' expecting {ABORT, ALL, AND, ANY, AVG, BY, CASE, COALESCE, COLLECT, COLUMN, COMMIT, COUNT, DATETIME_ADD, DATETIME_SUB, DEFAULT, DIRECTED, DISTINCT, FALSE, FILE, GROUP, INSERT, ISEMPTY, JOIN, LASTHOP, LEFT, LIST, LOG, MAP, MATCH, MAX, MIN, NOT, NOW, NULL, ON, OR, OUTER, PATH, PER, PROJECT, RANGE, REPLACE, STDEV, STDEVP, SELECT_VERTEX, SEMIJOIN, SET, SRC, SUM, TGT, TO_DATETIME, TRIM, TRUE, UNDIRECTED, UPDATE, VIRTUAL, FILTERTYPE, GSQL_INT_MAX, GSQL_INT_MIN, GSQL_UINT_MAX, '__ENGINE__E_ATTR', '__ENGINE__SRC_ATTR', '__ENGINE__TGT_ATTR', '__ENGINE__V_ATTR', '__ENGINE__SRC_VAL', '__ENGINE__TGT_VAL', '__ENGINE__V_VAL', '__ENGINE__MESSAGE', '__ENGINE__CONTEXT', '__ENGINE__REQUEST', '__ENGINE__SERVICEAPI', 'type', '(', '[', '-', '.', '_', CONST_INT, CONST_STR, NAME, GACCNAME}\\nline 2:59 extraneous input ''' expecting {TO_CSV, WHERE, WITH_EMBEDDING, ',', ';'}\\nParsing encountered 2 syntax error(s)\\n\\nSaved as draft query with type/semantic error: [hello].\\nThe status of query hello is DRAFT, skip.\\nSemantic Check Fails: Graph pyTigerGraphRAG: all queries in this catalog have been installed already.\\nQuery installation finished.\"" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "res" ] }, { diff --git a/eventual-consistency-service/app/graphrag/graph_rag.py b/eventual-consistency-service/app/graphrag/graph_rag.py index 70a966bc..7a9b101a 100644 --- a/eventual-consistency-service/app/graphrag/graph_rag.py +++ b/eventual-consistency-service/app/graphrag/graph_rag.py @@ -21,7 +21,7 @@ upsert_batch, add_rels_between_types ) -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import embedding_service from common.embeddings.milvus_embedding_store import MilvusEmbeddingStore @@ -33,7 +33,7 @@ async def stream_docs( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, docs_chan: Channel, ttl_batches: int = 10, ): @@ -41,33 +41,26 @@ async def stream_docs( Streams the document contents into the docs_chan """ logger.info("streaming docs") - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - for i in range(ttl_batches): - doc_ids = await stream_ids(conn, "Document", i, ttl_batches) - if doc_ids["error"]: - # continue to the next batch. - # These docs will not be marked as processed, so the ecc will process it eventually. - continue - - for d in doc_ids["ids"]: - try: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/StreamDocContent/", - params={"doc": d}, - headers=headers, - ) - if res.status_code != 200: - # continue to the next doc. - # This doc will not be marked as processed, so the ecc will process it eventually. - continue - logger.info("stream_docs writes to docs") - await docs_chan.put(res.json()["results"][0]["DocContent"][0]) - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") - continue # try retrieving the next doc + for i in range(ttl_batches): + doc_ids = await stream_ids(conn, "Document", i, ttl_batches) + if doc_ids["error"]: + # continue to the next batch. + # These docs will not be marked as processed, so the ecc will process it eventually. + continue + + for d in doc_ids["ids"]: + try: + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamDocContent", + params={"doc": d}, + ) + logger.info("stream_docs writes to docs") + await docs_chan.put(res[0]["DocContent"][0]) + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") + continue # try retrieving the next doc logger.info("stream_docs done") # close the docs chan -- this function is the only sender @@ -76,7 +69,7 @@ async def stream_docs( async def chunk_docs( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, docs_chan: Channel, embed_chan: Channel, upsert_chan: Channel, @@ -123,7 +116,7 @@ async def upsert(upsert_chan: Channel): load_q.close() -async def load(conn: TigerGraphConnection): +async def load(conn: AsyncTigerGraphConnection): logger.info("Reading from load_q") dd = lambda: defaultdict(dd) # infinite default dict batch_size = 500 @@ -215,7 +208,7 @@ async def extract( upsert_chan: Channel, embed_chan: Channel, extractor: BaseExtractor, - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ): """ Creates and starts one worker for each extract job @@ -238,7 +231,7 @@ async def extract( async def stream_entities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, entity_chan: Channel, ttl_batches: int = 50, ): @@ -264,7 +257,7 @@ async def stream_entities( async def resolve_entities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, emb_store: MilvusEmbeddingStore, entity_chan: Channel, upsert_chan: Channel, @@ -285,41 +278,31 @@ async def resolve_entities( upsert_chan.close() # Copy RELATIONSHIP edges to RESOLVED_RELATIONSHIP - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/ResolveRelationships/", - headers=headers, - ) - res.raise_for_status() + async with tg_sem: + res = await conn.runInstalledQuery( + "ResolveRelationships", + ) -async def communities(conn: TigerGraphConnection, comm_process_chan: Channel): +async def communities(conn: AsyncTigerGraphConnection, comm_process_chan: Channel): """ Run louvain """ # first pass: Group ResolvedEntities into Communities logger.info("Initializing Communities (first louvain pass)") - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/graphrag_louvain_init", - params={"n_batches": 1}, - headers=headers, - ) - res.raise_for_status() + + async with tg_sem: + res = await conn.runInstalledQuery( + "graphrag_louvain_init", + params={"n_batches": 1} + ) # get the modularity - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/modularity", - params={"iteration": 1, "batch_num": 1}, - headers=headers, - ) - res.raise_for_status() - mod = res.json()["results"][0]["mod"] + async with tg_sem: + res = await conn.runInstalledQuery( + "modularity", + params={"iteration": 1, "batch_num": 1} + ) + mod = res[0]["mod"] logger.info(f"****mod pass 1: {mod}") await stream_communities(conn, 1, comm_process_chan) @@ -331,26 +314,19 @@ async def communities(conn: TigerGraphConnection, comm_process_chan: Channel): i += 1 logger.info(f"Running louvain on Communities (iteration: {i})") # louvain pass - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/graphrag_louvain_communities", - params={"n_batches": 1, "iteration": i}, - headers=headers, - ) - - res.raise_for_status() + async with tg_sem: + res = await conn.runInstalledQuery( + "graphrag_louvain_communities", + params={"n_batches": 1, "iteration": i}, + ) # get the modularity - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/modularity", - params={"iteration": i + 1, "batch_num": 1}, - headers=headers, - ) - res.raise_for_status() - mod = res.json()["results"][0]["mod"] + async with tg_sem: + res = await conn.runInstalledQuery( + "modularity", + params={"iteration": i + 1, "batch_num": 1}, + ) + mod = res[0]["mod"] logger.info(f"mod pass {i+1}: {mod} (diff= {abs(prev_mod - mod)})") if mod == 0 or mod - prev_mod <= -0.05: break @@ -364,7 +340,7 @@ async def communities(conn: TigerGraphConnection, comm_process_chan: Channel): async def stream_communities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, i: int, comm_process_chan: Channel, ): @@ -377,15 +353,12 @@ async def stream_communities( # async for i in community_chan: # get the community from that layer - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/stream_community", - params={"iter": i}, - headers=headers, - ) - resp.raise_for_status() - comms = resp.json()["results"][0]["Comms"] + async with tg_sem: + resp = await conn.runInstalledQuery( + "stream_community", + params={"iter": i} + ) + comms = resp[0]["Comms"] for c in comms: await comm_process_chan.put((i, c["v_id"])) @@ -405,7 +378,7 @@ async def stream_communities( async def summarize_communities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, comm_process_chan: Channel, upsert_chan: Channel, embed_chan: Channel, @@ -419,7 +392,7 @@ async def summarize_communities( embed_chan.close() -async def run(graphname: str, conn: TigerGraphConnection): +async def run(graphname: str, conn: AsyncTigerGraphConnection): """ Set up GraphRAG: - Install necessary queries. @@ -467,7 +440,7 @@ async def run(graphname: str, conn: TigerGraphConnection): type_start = time.perf_counter() logger.info("Type Processing Start") res = await add_rels_between_types(conn) - if res["error"]: + if res.get("error", False): logger.error(f"Error adding relationships between types: {res}") else: logger.info(f"Added relationships between types: {res}") diff --git a/eventual-consistency-service/app/graphrag/util.py b/eventual-consistency-service/app/graphrag/util.py index 873d856a..5b949253 100644 --- a/eventual-consistency-service/app/graphrag/util.py +++ b/eventual-consistency-service/app/graphrag/util.py @@ -7,7 +7,7 @@ import httpx from graphrag import reusable_channel, workers -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import ( doc_processing_config, @@ -33,10 +33,10 @@ async def install_queries( requried_queries: list[str], - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ): # queries that are currently installed - installed_queries = [q.split("/")[-1] for q in conn.getEndpoints(dynamic=True)] + installed_queries = [q.split("/")[-1] for q in await conn.getEndpoints(dynamic=True)] # doesn't need to be parallel since tg only does it one at a time for q in requried_queries: @@ -58,7 +58,7 @@ async def init_embedding_index(s: MilvusEmbeddingStore, vertex_field: str): async def init( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ) -> tuple[BaseExtractor, dict[str, MilvusEmbeddingStore]]: # install requried queries requried_queries = [ @@ -126,7 +126,7 @@ async def init( return extractor, index_stores -def make_headers(conn: TigerGraphConnection): +def make_headers(conn: AsyncTigerGraphConnection): if conn.apiToken is None or conn.apiToken == "": tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() headers = {"Authorization": f"Basic {tkn}"} @@ -137,23 +137,19 @@ def make_headers(conn: TigerGraphConnection): async def stream_ids( - conn: TigerGraphConnection, v_type: str, current_batch: int, ttl_batches: int + conn: AsyncTigerGraphConnection, v_type: str, current_batch: int, ttl_batches: int ) -> dict[str, str | list[str]]: - headers = make_headers(conn) - try: - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.post( - f"{conn.restppUrl}/query/{conn.graphname}/StreamIds", - params={ - "current_batch": current_batch, - "ttl_batches": ttl_batches, - "v_type": v_type, - }, - headers=headers, - ) - ids = res.json()["results"][0]["@@ids"] + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamIds", + params={ + "current_batch": current_batch, + "ttl_batches": ttl_batches, + "v_type": v_type, + } + ) + ids = res[0]["@@ids"] return {"error": False, "ids": ids} except Exception as e: @@ -192,7 +188,7 @@ def process_id(v_id: str): async def upsert_vertex( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, vertex_type: str, vertex_id: str, attributes: dict, @@ -203,43 +199,32 @@ async def upsert_vertex( await load_q.put(("vertices", (vertex_type, vertex_id, attrs))) -async def upsert_batch(conn: TigerGraphConnection, data: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - except Exception as e: - err = traceback.format_exc() - logger.error(f"Upsert err:\n{err}") - res.raise_for_status() +async def upsert_batch(conn: AsyncTigerGraphConnection, data: str): + async with tg_sem: + try: + res = await conn.upsertData(data) + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} async def check_vertex_exists(conn, v_id: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.get( - f"{conn.restppUrl}/graph/{conn.graphname}/vertices/Entity/{v_id}", - headers=headers, - ) - - except Exception as e: - err = traceback.format_exc() - logger.error(f"Check err:\n{err}") + async with tg_sem: try: - res.raise_for_status() - return res.json() + res = await conn.getVerticesById("Entity", v_id) + except Exception as e: - logger.error(f"Check err:\n{e}\n{e}") - return {"error": True, "message": e} + err = traceback.format_exc() + logger.error(f"Check err:\n{err}") + return {"error": True, "message": str(e)} + + return {"error": False, "resp": res} async def upsert_edge( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, src_v_type: str, src_v_id: str, edge_type: str, @@ -269,24 +254,18 @@ async def upsert_edge( async def get_commuinty_children(conn, i: int, c: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - try: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/get_community_children", - params={"comm": c, "iter": i}, - headers=headers, - ) - except: - logger.error(f"Get Children err:\n{traceback.format_exc()}") + async with tg_sem: try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Get Children err:\n{e}") + resp = await conn.runInstalledQuery( + "get_community_children", + params={"comm": c, "iter": i} + ) + except: + logger.error(f"Get Children err:\n{traceback.format_exc()}") + descrs = [] try: - res = resp.json()["results"][0]["children"] + res = resp[0]["children"] except Exception as e: logger.error(f"Get Children err:\n{e}") res = [] @@ -307,59 +286,41 @@ async def get_commuinty_children(conn, i: int, c: str): async def check_all_ents_resolved(conn): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/entities_have_resolution", - headers=headers, + resp = await conn.runInstalledQuery( + "entities_have_resolution" ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert Desc err:\n{e}") + except Exception as e: + logger.error(f"Check Vert Desc err:\n{e}") - res = resp.json()["results"][0]["all_resolved"] - logger.info(resp.json()["results"]) + res = resp[0]["all_resolved"] + logger.info(resp) return res async def add_rels_between_types(conn): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/create_entity_type_relationships", - headers=headers, + resp = await conn.runInstalledQuery( + "create_entity_type_relationships" ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert EntityType err:\n{e}") - - if resp.status_code != 200: - logger.error(f"Check Vert EntityType err:\n{resp.text}") - return {"error": True, "message": resp.text} - else: - res = resp.json() - logger.info(resp.json()) - return res + except Exception as e: + logger.error(f"Check Vert EntityType err:\n{e}") + return {"error": True, "message": e} + return resp[0] async def check_vertex_has_desc(conn, i: int): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/communities_have_desc", + resp = await conn.runInstalledQuery( + "communities_have_desc", params={"iter": i}, - headers=headers, ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert Desc err:\n{e}") + except Exception as e: + logger.error(f"Check Vert Desc err:\n{e}") - res = resp.json()["results"][0]["all_have_desc"] - logger.info(resp.json()["results"]) + res = resp[0]["all_have_desc"] + logger.info(res) return res diff --git a/eventual-consistency-service/app/graphrag/workers.py b/eventual-consistency-service/app/graphrag/workers.py index 48ebf0f9..1052cde8 100644 --- a/eventual-consistency-service/app/graphrag/workers.py +++ b/eventual-consistency-service/app/graphrag/workers.py @@ -10,7 +10,7 @@ from aiochannel import Channel from graphrag import community_summarizer, util from langchain_community.graphs.graph_document import GraphDocument, Node -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import milvus_config from common.embeddings.embedding_services import EmbeddingModel @@ -24,7 +24,7 @@ async def install_query( - conn: TigerGraphConnection, query_path: str + conn: AsyncTigerGraphConnection, query_path: str ) -> dict[str, httpx.Response | str | None]: LogWriter.info(f"Installing query {query_path}") with open(f"{query_path}.gsql", "r") as f: @@ -35,19 +35,12 @@ async def install_query( USE GRAPH {conn.graphname} {query} INSTALL QUERY {query_name}""" - tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() - headers = {"Authorization": f"Basic {tkn}"} - async with httpx.AsyncClient(timeout=None) as client: async with util.tg_sem: - res = await client.post( - conn.gsUrl + "/gsqlserver/gsql/file", - data=quote_plus(query.encode("utf-8")), - headers=headers, - ) + res = await conn.gsql(query) - if "error" in res.text.lower(): - LogWriter.error(res.text) + if "error" in res: + LogWriter.error(res) return { "result": None, "error": True, @@ -61,7 +54,7 @@ async def install_query( async def chunk_doc( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, doc: dict[str, str], upsert_chan: Channel, embed_chan: Channel, @@ -100,7 +93,7 @@ async def chunk_doc( return doc["v_id"] -async def upsert_chunk(conn: TigerGraphConnection, doc_id, chunk_id, chunk): +async def upsert_chunk(conn: AsyncTigerGraphConnection, doc_id, chunk_id, chunk): logger.info(f"Upserting chunk {chunk_id}") date_added = int(time.time()) await util.upsert_vertex( @@ -180,7 +173,7 @@ async def get_vert_desc(conn, v_id, node: Node): # if vertex exists, get description content and append this description to it if not exists.get("error", False): # deduplicate descriptions - desc.extend(exists["results"][0]["attributes"]["description"]) + desc.extend(exists["resp"][0]["attributes"]["description"]) desc = list(set(desc)) return desc @@ -192,7 +185,7 @@ async def extract( upsert_chan: Channel, embed_chan: Channel, extractor: BaseExtractor, - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, chunk: str, chunk_id: str, ): @@ -375,7 +368,7 @@ async def extract( async def resolve_entity( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, upsert_chan: Channel, emb_store: MilvusEmbeddingStore, entity_id: str, @@ -454,7 +447,7 @@ async def resolve_entity( async def process_community( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, upsert_chan: Channel, embed_chan: Channel, i: int, diff --git a/eventual-consistency-service/app/main.py b/eventual-consistency-service/app/main.py index d0c9afd8..aef08219 100644 --- a/eventual-consistency-service/app/main.py +++ b/eventual-consistency-service/app/main.py @@ -47,6 +47,7 @@ async def lifespan(_: FastAPI): db_config["username"], db_config["password"], graphname, + async_conn=True ) start_ecc_in_thread(graphname, conn) yield @@ -178,6 +179,7 @@ def consistency_status( credentials.username, credentials.password, graphname, + async_conn=True ) match ecc_method: case SupportAIMethod.SUPPORTAI: diff --git a/eventual-consistency-service/app/supportai/supportai_init.py b/eventual-consistency-service/app/supportai/supportai_init.py index 287d367b..c542cc90 100644 --- a/eventual-consistency-service/app/supportai/supportai_init.py +++ b/eventual-consistency-service/app/supportai/supportai_init.py @@ -43,15 +43,12 @@ async def stream_docs( for d in doc_ids["ids"]: try: async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/StreamDocContent/", + res = await conn.runInstalledQuery( + "StreamDocContent", params={"doc": d}, - headers=headers, ) - if res.status_code != 200: - continue logger.info("stream_docs writes to docs") - await docs_chan.put(res.json()["results"][0]["DocContent"][0]) + await docs_chan.put(res[0]["DocContent"][0]) except Exception as e: exc = traceback.format_exc() logger.error(f"Error retrieveing doc: {d} --> {e}\n{exc}") diff --git a/eventual-consistency-service/app/supportai/util.py b/eventual-consistency-service/app/supportai/util.py index b6cdc948..ba415014 100644 --- a/eventual-consistency-service/app/supportai/util.py +++ b/eventual-consistency-service/app/supportai/util.py @@ -33,7 +33,7 @@ async def install_queries( conn: TigerGraphConnection, ): # queries that are currently installed - installed_queries = [q.split("/")[-1] for q in conn.getEndpoints(dynamic=True)] + installed_queries = [q.split("/")[-1] for q in await conn.getEndpoints(dynamic=True)] # doesn't need to be parallel since tg only does it one at a time for q in requried_queries: @@ -142,20 +142,18 @@ async def stream_ids( headers = make_headers(conn) try: - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.post( - f"{conn.restppUrl}/query/{conn.graphname}/StreamIds", - params={ - "current_batch": current_batch, - "ttl_batches": ttl_batches, - "v_type": v_type, - }, - headers=headers, - ) - ids = res.json()["results"][0]["@@ids"] + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamIds", + params={ + "current_batch": current_batch, + "ttl_batches": ttl_batches, + "v_type": v_type, + } + ) + ids = res[0]["@@ids"] return {"error": False, "ids": ids} - + except Exception as e: exc = traceback.format_exc() LogWriter.error(f"/{conn.graphname}/query/StreamIds\nException Trace:\n{exc}") @@ -201,50 +199,28 @@ async def upsert_vertex( attrs = map_attrs(attributes) data = json.dumps({"vertices": {vertex_type: {vertex_id: attrs}}}) headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - - res.raise_for_status() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Request body: {data}") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - logger.error(f"Request body: {data}") + async with tg_sem: + try: + res = await conn.upsertData(data) + + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} async def check_vertex_exists(conn, v_id: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.get( - f"{conn.restppUrl}/graph/{conn.graphname}/vertices/Entity/{v_id}", - headers=headers, - ) - - res.raise_for_status() - return res.json() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - return {"error": "Request failed"} - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - return {"error": f"HTTP status error {exc.response.status_code}"} + async with tg_sem: + try: + res = await conn.getVerticesById("Entity", v_id) + + except Exception as e: + err = traceback.format_exc() + logger.error(f"Check err:\n{err}") + return {"error": True, "message": str(e)} + + return {"error": False, "resp": res} @@ -278,22 +254,12 @@ async def upsert_edge( } } ) - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - res.raise_for_status() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Request body: {data}") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - logger.error(f"Request body: {data}") + async with tg_sem: + try: + res = await conn.upsertData(data) + + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} diff --git a/eventual-consistency-service/app/supportai/workers.py b/eventual-consistency-service/app/supportai/workers.py index 828655e8..e02ca5ad 100644 --- a/eventual-consistency-service/app/supportai/workers.py +++ b/eventual-consistency-service/app/supportai/workers.py @@ -34,19 +34,12 @@ async def install_query( USE GRAPH {conn.graphname} {query} INSTALL QUERY {query_name}""" - tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() - headers = {"Authorization": f"Basic {tkn}"} - - async with httpx.AsyncClient(timeout=None) as client: - async with util.tg_sem: - res = await client.post( - conn.gsUrl + "/gsqlserver/gsql/file", - data=quote_plus(query.encode("utf-8")), - headers=headers, - ) - if "error" in res.text.lower(): - LogWriter.error(res.text) + async with util.tg_sem: + res = await conn.gsql(query) + + if "error" in res: + LogWriter.error(res) return { "result": None, "error": True, @@ -56,6 +49,7 @@ async def install_query( return {"result": res, "error": False} + async def chunk_doc( conn: TigerGraphConnection, doc: dict[str, str],