From d2160dc3fc0eea837b5df5748bb544c1934a9b43 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Sun, 1 Dec 2024 10:55:06 -0600 Subject: [PATCH 1/8] fix(connections): add port configs --- common/db/connections.py | 4 +++- common/requirements.txt | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/common/db/connections.py b/common/db/connections.py index 95f5abf7..4ea1ec8a 100644 --- a/common/db/connections.py +++ b/common/db/connections.py @@ -90,7 +90,9 @@ def elevate_db_connection_to_token(host, username, password, graphname) -> Tiger host=host, username=username, password=password, - graphname=graphname + graphname=graphname, + restppPort=db_config["restppPort"], + gsPort=db_config["gsPort"] ) if db_config["getToken"]: diff --git a/common/requirements.txt b/common/requirements.txt index 86bdc50c..e0457518 100644 --- a/common/requirements.txt +++ b/common/requirements.txt @@ -133,7 +133,7 @@ python-dotenv==1.0.1 python-iso639==2024.4.27 python-magic==0.4.27 pyTigerDriver==1.0.15 -pyTigerGraph==1.6.5 +pyTigerGraph==1.8.1 pytz==2024.1 PyYAML==6.0.2 rapidfuzz==3.9.6 From c4936616052de81591af65025f555988b34d195c Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Tue, 17 Dec 2024 08:47:40 -0600 Subject: [PATCH 2/8] updates --- common/db/connections.py | 81 ++++++++++++++----- .../SupportAI_Schema_Native_Vector.gsql | 36 +++++++++ common/requirements.txt | 2 +- copilot/app/agent/agent_graph.py | 2 +- copilot/app/supportai/supportai.py | 6 +- docs/notebooks/SupportAIDemo.ipynb | 79 ++++++++++++++---- .../app/graphrag/graph_rag.py | 22 ++--- .../app/graphrag/util.py | 18 ++--- .../app/graphrag/workers.py | 27 +++---- eventual-consistency-service/app/main.py | 2 + 10 files changed, 198 insertions(+), 77 deletions(-) create mode 100644 common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql diff --git a/common/db/connections.py b/common/db/connections.py index 4ea1ec8a..1ab6a8ac 100644 --- a/common/db/connections.py +++ b/common/db/connections.py @@ -3,8 +3,8 @@ from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBasicCredentials, HTTPAuthorizationCredentials -from pyTigerGraph import TigerGraphConnection -from pyTigerGraph.pyTigerGraphException import TigerGraphException +from pyTigerGraph import TigerGraphConnection, AsyncTigerGraphConnection +from pyTigerGraph.common.exception import TigerGraphException from requests import HTTPError from common.config import ( @@ -21,14 +21,24 @@ def get_db_connection_id_token( graphname: str, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], + async_conn: bool = False ) -> TigerGraphConnectionProxy: - conn = TigerGraphConnection( - host=db_config["hostname"], - graphname=graphname, - apiToken=credentials, - tgCloud=True, - sslPort=14240, - ) + if async_conn: + conn = AsyncTigerGraphConnection( + host=db_config["hostname"], + graphname=graphname, + apiToken=credentials, + tgCloud=True, + sslPort=14240, + ) + else: + conn = TigerGraphConnection( + host=db_config["hostname"], + graphname=graphname, + apiToken=credentials, + tgCloud=True, + sslPort=14240, + ) conn.customizeHeader( timeout=db_config["default_timeout"] * 1000, responseSize=5000000 ) @@ -55,9 +65,10 @@ def get_db_connection_id_token( def get_db_connection_pwd( - graphname, credentials: Annotated[HTTPBasicCredentials, Depends(security)] + graphname, credentials: Annotated[HTTPBasicCredentials, Depends(security)], + async_conn: bool = False ) -> TigerGraphConnectionProxy: - conn = elevate_db_connection_to_token(db_config["hostname"], credentials.username, credentials.password, graphname) + conn = elevate_db_connection_to_token(db_config["hostname"], credentials.username, credentials.password, graphname, async_conn) conn.customizeHeader( timeout=db_config["default_timeout"] * 1000, responseSize=5000000 @@ -70,12 +81,13 @@ def get_db_connection_pwd( def get_db_connection_pwd_manual( graphname, username: str, password: str, + async_conn: bool = False ) -> TigerGraphConnectionProxy: """ Manual auth - pass in user/pass not from basic auth """ conn = elevate_db_connection_to_token( - db_config["hostname"], username, password, graphname + db_config["hostname"], username, password, graphname, async_conn ) conn.customizeHeader( @@ -85,14 +97,14 @@ def get_db_connection_pwd_manual( LogWriter.info("Connected to TigerGraph with password") return conn -def elevate_db_connection_to_token(host, username, password, graphname) -> TigerGraphConnectionProxy: +def elevate_db_connection_to_token(host, username, password, graphname, async_conn: bool = False) -> TigerGraphConnectionProxy: conn = TigerGraphConnection( host=host, username=username, password=password, graphname=graphname, - restppPort=db_config["restppPort"], - gsPort=db_config["gsPort"] + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") ) if db_config["getToken"]: @@ -117,13 +129,38 @@ def elevate_db_connection_to_token(host, username, password, graphname) -> Tiger detail="Failed to get token - is the database running?" ) + if async_conn: + conn = AsyncTigerGraphConnection( + host=host, + username=username, + password=password, + graphname=graphname, + apiToken=apiToken, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) + else: + conn = TigerGraphConnection( + host=db_config["hostname"], + username=username, + password=password, + graphname=graphname, + apiToken=apiToken, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) + else: + if async_conn: + conn = AsyncTigerGraphConnection( + host=host, + username=username, + password=password, + graphname=graphname, + restppPort=db_config.get("restppPort", "9000"), + gsPort=db_config.get("gsPort", "14240") + ) - conn = TigerGraphConnection( - host=db_config["hostname"], - username=username, - password=password, - graphname=graphname, - apiToken=apiToken - ) + # temp fix for path + conn.restppUrl = conn.restppUrl+"/restpp" return conn diff --git a/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql b/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql new file mode 100644 index 00000000..dbd16aa7 --- /dev/null +++ b/common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql @@ -0,0 +1,36 @@ +CREATE SCHEMA_CHANGE JOB add_supportai_schema { + ADD VERTEX DocumentChunk(id STRING PRIMARY KEY, idx INT, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Document(id STRING PRIMARY KEY, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Concept(id STRING PRIMARY KEY, description STRING, concept_type STRING, human_curated BOOL, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Entity(id STRING PRIMARY KEY, definition STRING, description SET, entity_type STRING, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Relationship(id STRING PRIMARY KEY, definition STRING, short_name STRING, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX DocumentCollection(id STRING PRIMARY KEY, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX Content(id STRING PRIMARY KEY, text STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX EntityType(id STRING PRIMARY KEY, description STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE"; + ADD DIRECTED EDGE HAS_CONTENT(FROM Document, TO Content|FROM DocumentChunk, TO Content) WITH REVERSE_EDGE="reverse_HAS_CONTENT"; + ADD DIRECTED EDGE IS_CHILD_OF(FROM Concept, TO Concept) WITH REVERSE_EDGE="reverse_IS_CHILD_OF"; + ADD DIRECTED EDGE IS_HEAD_OF(FROM Entity, TO Relationship) WITH REVERSE_EDGE="reverse_IS_HEAD_OF"; + ADD DIRECTED EDGE HAS_TAIL(FROM Relationship, TO Entity) WITH REVERSE_EDGE="reverse_HAS_TAIL"; + ADD DIRECTED EDGE DESCRIBES_RELATIONSHIP(FROM Concept, TO Relationship) WITH REVERSE_EDGE="reverse_DESCRIBES_RELATIONSHIP"; + ADD DIRECTED EDGE DESCRIBES_ENTITY(FROM Concept, TO Entity) WITH REVERSE_EDGE="reverse_DESCRIBES_ENTITY"; + ADD DIRECTED EDGE CONTAINS_ENTITY(FROM DocumentChunk, TO Entity|FROM Document, TO Entity) WITH REVERSE_EDGE="reverse_CONTAINS_ENTITY"; + ADD DIRECTED EDGE MENTIONS_RELATIONSHIP(FROM DocumentChunk, TO Relationship|FROM Document, TO Relationship) WITH REVERSE_EDGE="reverse_MENTIONS_RELATIONSHIP"; + ADD DIRECTED EDGE IS_AFTER(FROM DocumentChunk, TO DocumentChunk) WITH REVERSE_EDGE="reverse_IS_AFTER"; + ADD DIRECTED EDGE HAS_CHILD(FROM Document, TO DocumentChunk) WITH REVERSE_EDGE="reverse_HAS_CHILD"; + ADD DIRECTED EDGE HAS_RELATIONSHIP(FROM Concept, TO Concept, relation_type STRING) WITH REVERSE_EDGE="reverse_HAS_RELATIONSHIP"; + ADD DIRECTED EDGE CONTAINS_DOCUMENT(FROM DocumentCollection, TO Document) WITH REVERSE_EDGE="reverse_CONTAINS_DOCUMENT"; + ADD DIRECTED EDGE ENTITY_HAS_TYPE(FROM Entity, TO EntityType) WITH REVERSE_EDGE="reverse_ENTITY_HAS_TYPE"; + ADD DIRECTED EDGE RELATIONSHIP_TYPE(FROM EntityType, TO EntityType, DISCRIMINATOR(relation_type STRING), frequency INT) WITH REVERSE_EDGE="reverse_RELATIONSHIP_TYPE"; + + // GraphRAG + ADD VERTEX Community (id STRING PRIMARY KEY, iteration UINT, description STRING) WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + ADD VERTEX ResolvedEntity(id STRING PRIMARY KEY, entity_type STRING )WITH EMBEDDING ATTRIBUTE embedding(dimension=1536, metric=cosine) STATS="OUTDEGREE_BY_EDGETYPE"; + + ADD DIRECTED EDGE RELATIONSHIP(FROM Entity, TO Entity, relation_type STRING) WITH REVERSE_EDGE="reverse_RELATIONSHIP"; + ADD DIRECTED EDGE RESOLVES_TO(FROM Entity, TO ResolvedEntity, relation_type STRING) WITH REVERSE_EDGE="reverse_RESOLVES_TO"; // Connect ResolvedEntities with their children entities + ADD DIRECTED EDGE RESOLVED_RELATIONSHIP(FROM ResolvedEntity, TO ResolvedEntity, relation_type STRING) WITH REVERSE_EDGE="reverse_RESOLVED_RELATIONSHIP"; // store edges between entities after they're resolved + + ADD DIRECTED EDGE IN_COMMUNITY(FROM ResolvedEntity, TO Community) WITH REVERSE_EDGE="reverse_IN_COMMUNITY"; + ADD DIRECTED EDGE LINKS_TO (from Community, to Community, weight DOUBLE) WITH REVERSE_EDGE="reverse_LINKS_TO"; + ADD DIRECTED EDGE HAS_PARENT (from Community, to Community) WITH REVERSE_EDGE="reverse_HAS_PARENT"; +} diff --git a/common/requirements.txt b/common/requirements.txt index e0457518..1570c969 100644 --- a/common/requirements.txt +++ b/common/requirements.txt @@ -133,7 +133,7 @@ python-dotenv==1.0.1 python-iso639==2024.4.27 python-magic==0.4.27 pyTigerDriver==1.0.15 -pyTigerGraph==1.8.1 +pyTigerGraph==1.8.3 pytz==2024.1 PyYAML==6.0.2 rapidfuzz==3.9.6 diff --git a/copilot/app/agent/agent_graph.py b/copilot/app/agent/agent_graph.py index 93afb79c..b15a21c0 100644 --- a/copilot/app/agent/agent_graph.py +++ b/copilot/app/agent/agent_graph.py @@ -9,7 +9,7 @@ from agent.agent_usefulness_check import TigerGraphAgentUsefulnessCheck from agent.Q import DONE, Q from langgraph.graph import END, StateGraph -from pyTigerGraph.pyTigerGraphException import TigerGraphException +from pyTigerGraph.common.exception import TigerGraphException from supportai.retrievers import (HNSWOverlapRetriever, HNSWRetriever, HNSWSiblingRetriever, GraphRAG) from tools import MapQuestionToSchemaException diff --git a/copilot/app/supportai/supportai.py b/copilot/app/supportai/supportai.py index e96663a3..0591319f 100644 --- a/copilot/app/supportai/supportai.py +++ b/copilot/app/supportai/supportai.py @@ -15,7 +15,11 @@ def init_supportai(conn: TigerGraphConnection, graphname: str) -> tuple[dict, dict]: # need to open the file using the absolute path - file_path = "common/gsql/supportai/SupportAI_Schema.gsql" + ver = conn.getVer().split(".") + if int(ver[0]) >= 4 and int(ver[1]) >= 2: + file_path = "common/gsql/supportai/SupportAI_Schema_Native_Vector.gsql" + else: + file_path = "common/gsql/supportai/SupportAI_Schema.gsql" with open(file_path, "r") as f: schema = f.read() schema_res = conn.gsql( diff --git a/docs/notebooks/SupportAIDemo.ipynb b/docs/notebooks/SupportAIDemo.ipynb index f7a69b35..4b21a0c5 100644 --- a/docs/notebooks/SupportAIDemo.ipynb +++ b/docs/notebooks/SupportAIDemo.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -21,13 +21,14 @@ "\n", "load_dotenv()\n", "# We first create a connection to the database\n", - "host = os.environ[\"HOST\"]\n", + "host = \"http://192.168.99.201\" #os.environ[\"HOST\"]\n", "username = os.getenv(\"USERNAME\", \"tigergraph\")\n", "password = os.getenv(\"PASS\", \"tigergraph\")\n", "conn = TigerGraphConnection(\n", " host=host,\n", " username=username,\n", " password=password,\n", + " gsPort=\"31409\"\n", ")\n", "\n", "# And then add CoPilot's address to the connection. This address\n", @@ -54,40 +55,64 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "'The graph pyTigerGraphRAG is created.'" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.gsql(\"\"\"CREATE GRAPH pyTigerGraphRAG()\"\"\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "conn.graphname = \"pyTigerGraphRAG\"\n", - "conn.getToken()" + "#conn.getToken()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'host_name': 'http://192.168.99.201',\n", + " 'schema_creation_status': '\"Using graph \\'pyTigerGraphRAG\\'\\\\nSuccessfully created schema change jobs: [add_supportai_schema].\\\\nWARNING: When modifying the graph schema, reinstalling all affected queries is required, and the duration of this process may vary based on the number and complexity of the queries. To skip query reinstallation, you can run with the \\'-N\\' option, but manual reinstallation of queries will be necessary afterwards.\\\\nKick off schema change job add_supportai_schema\\\\nDoing schema change on graph \\'pyTigerGraphRAG\\' (current version: 0)\\\\nTrying to add local vertex \\'DocumentChunk\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Document\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Concept\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Entity\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Relationship\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'DocumentCollection\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Content\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'EntityType\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'Community\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local vertex \\'ResolvedEntity\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_CONTENT\\' and its reverse edge \\'reverse_HAS_CONTENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_CHILD_OF\\' and its reverse edge \\'reverse_IS_CHILD_OF\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_HEAD_OF\\' and its reverse edge \\'reverse_IS_HEAD_OF\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_TAIL\\' and its reverse edge \\'reverse_HAS_TAIL\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'DESCRIBES_RELATIONSHIP\\' and its reverse edge \\'reverse_DESCRIBES_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'DESCRIBES_ENTITY\\' and its reverse edge \\'reverse_DESCRIBES_ENTITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'CONTAINS_ENTITY\\' and its reverse edge \\'reverse_CONTAINS_ENTITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'MENTIONS_RELATIONSHIP\\' and its reverse edge \\'reverse_MENTIONS_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IS_AFTER\\' and its reverse edge \\'reverse_IS_AFTER\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_CHILD\\' and its reverse edge \\'reverse_HAS_CHILD\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_RELATIONSHIP\\' and its reverse edge \\'reverse_HAS_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'CONTAINS_DOCUMENT\\' and its reverse edge \\'reverse_CONTAINS_DOCUMENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'ENTITY_HAS_TYPE\\' and its reverse edge \\'reverse_ENTITY_HAS_TYPE\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RELATIONSHIP_TYPE\\' and its reverse edge \\'reverse_RELATIONSHIP_TYPE\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RELATIONSHIP\\' and its reverse edge \\'reverse_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RESOLVES_TO\\' and its reverse edge \\'reverse_RESOLVES_TO\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'RESOLVED_RELATIONSHIP\\' and its reverse edge \\'reverse_RESOLVED_RELATIONSHIP\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'IN_COMMUNITY\\' and its reverse edge \\'reverse_IN_COMMUNITY\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'LINKS_TO\\' and its reverse edge \\'reverse_LINKS_TO\\' to the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add local edge \\'HAS_PARENT\\' and its reverse edge \\'reverse_HAS_PARENT\\' to the graph \\'pyTigerGraphRAG\\'.\\\\n\\\\nGraph pyTigerGraphRAG updated to new version 1\\\\nValidating existing queries for graph pyTigerGraphRAG ...\\\\nThe job add_supportai_schema completes in 1.371 seconds!\\\\nLocal schema change succeeded.\"',\n", + " 'index_creation_status': '\"Using graph \\'pyTigerGraphRAG\\'\\\\nSuccessfully created schema change jobs: [add_supportai_indexes].\\\\nWARNING: When modifying the graph schema, reinstalling all affected queries is required, and the duration of this process may vary based on the number and complexity of the queries. To skip query reinstallation, you can run with the \\'-N\\' option, but manual reinstallation of queries will be necessary afterwards.\\\\nKick off schema change job add_supportai_indexes\\\\nDoing schema change on graph \\'pyTigerGraphRAG\\' (current version: 1)\\\\nTrying to add index \\'doc_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_epoch_processing_indexepoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'Document\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'doc_chunk_epoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'DocumentChunk\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_added_index\\' on the attribute \\'epoch_added\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_processing_index\\' on the attribute \\'epoch_processing\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\nTrying to add index \\'concept_epoch_processed_index\\' on the attribute \\'epoch_processed\\' of local vertex \\'Concept\\' on the graph \\'pyTigerGraphRAG\\'.\\\\n\\\\nGraph pyTigerGraphRAG updated to new version 2\\\\nValidating existing queries for graph pyTigerGraphRAG ...\\\\nThe job add_supportai_indexes completes in 1.212 seconds!\\\\nLocal schema change succeeded.\"'}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.ai.initializeSupportAI()" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, "metadata": {}, "outputs": [], "source": [ - "access = os.environ[\"AWS_ACCESS_KEY_ID\"]\n", - "sec = os.environ[\"AWS_SECRET_ACCESS_KEY\"]\n", + "access = \"AKIARJ6KUJUIS7KJ27YO\"\n", + "sec = \"swYmXU+4yZbXiYCMfwSFxrcS0hNiOd6nzYog6VCZ\"\n", "res = conn.ai.createDocumentIngest(\n", " data_source=\"s3\",\n", " data_source_config={\"aws_access_key\": access, \"aws_secret_key\": sec},\n", @@ -98,9 +123,22 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 11, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'job_name': 'load_documents_content_json_4cc0b2115f754540b4543469612743f6',\n", + " 'job_id': 'pyTigerGraphRAG.load_documents_content_json_4cc0b2115f754540b4543469612743f6.stream.SupportAI_pyTigerGraphRAG_bc71b650248d41df83eae15155c2bce5.1733184693598',\n", + " 'log_location': '/home/tigergraph/tigergraph/log/kafkaLoader/pyTigerGraphRAG.load_documents_content_json_4cc0b2115f754540b4543469612743f6.stream.SupportAI_pyTigerGraphRAG_bc71b650248d41df83eae15155c2bce5.1733184693598'}" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "conn.ai.runDocumentIngest(res[\"load_job_id\"], res[\"data_source_id\"], \"s3://tg-documentation/pytg_current/pytg_current.jsonl\")" ] @@ -109,9 +147,20 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "{'status': 'submitted'}" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "conn.ai.forceConsistencyUpdate(method=\"graphrag\")" + "conn.ai.forceConsistencyUpdate()" ] }, { diff --git a/eventual-consistency-service/app/graphrag/graph_rag.py b/eventual-consistency-service/app/graphrag/graph_rag.py index 70a966bc..23f66f4b 100644 --- a/eventual-consistency-service/app/graphrag/graph_rag.py +++ b/eventual-consistency-service/app/graphrag/graph_rag.py @@ -21,7 +21,7 @@ upsert_batch, add_rels_between_types ) -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import embedding_service from common.embeddings.milvus_embedding_store import MilvusEmbeddingStore @@ -33,7 +33,7 @@ async def stream_docs( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, docs_chan: Channel, ttl_batches: int = 10, ): @@ -76,7 +76,7 @@ async def stream_docs( async def chunk_docs( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, docs_chan: Channel, embed_chan: Channel, upsert_chan: Channel, @@ -123,7 +123,7 @@ async def upsert(upsert_chan: Channel): load_q.close() -async def load(conn: TigerGraphConnection): +async def load(conn: AsyncTigerGraphConnection): logger.info("Reading from load_q") dd = lambda: defaultdict(dd) # infinite default dict batch_size = 500 @@ -215,7 +215,7 @@ async def extract( upsert_chan: Channel, embed_chan: Channel, extractor: BaseExtractor, - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ): """ Creates and starts one worker for each extract job @@ -238,7 +238,7 @@ async def extract( async def stream_entities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, entity_chan: Channel, ttl_batches: int = 50, ): @@ -264,7 +264,7 @@ async def stream_entities( async def resolve_entities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, emb_store: MilvusEmbeddingStore, entity_chan: Channel, upsert_chan: Channel, @@ -295,7 +295,7 @@ async def resolve_entities( res.raise_for_status() -async def communities(conn: TigerGraphConnection, comm_process_chan: Channel): +async def communities(conn: AsyncTigerGraphConnection, comm_process_chan: Channel): """ Run louvain """ @@ -364,7 +364,7 @@ async def communities(conn: TigerGraphConnection, comm_process_chan: Channel): async def stream_communities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, i: int, comm_process_chan: Channel, ): @@ -405,7 +405,7 @@ async def stream_communities( async def summarize_communities( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, comm_process_chan: Channel, upsert_chan: Channel, embed_chan: Channel, @@ -419,7 +419,7 @@ async def summarize_communities( embed_chan.close() -async def run(graphname: str, conn: TigerGraphConnection): +async def run(graphname: str, conn: AsyncTigerGraphConnection): """ Set up GraphRAG: - Install necessary queries. diff --git a/eventual-consistency-service/app/graphrag/util.py b/eventual-consistency-service/app/graphrag/util.py index 873d856a..c4497ab9 100644 --- a/eventual-consistency-service/app/graphrag/util.py +++ b/eventual-consistency-service/app/graphrag/util.py @@ -7,7 +7,7 @@ import httpx from graphrag import reusable_channel, workers -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import ( doc_processing_config, @@ -33,10 +33,10 @@ async def install_queries( requried_queries: list[str], - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ): # queries that are currently installed - installed_queries = [q.split("/")[-1] for q in conn.getEndpoints(dynamic=True)] + installed_queries = [q.split("/")[-1] for q in await conn.getEndpoints(dynamic=True)] # doesn't need to be parallel since tg only does it one at a time for q in requried_queries: @@ -58,7 +58,7 @@ async def init_embedding_index(s: MilvusEmbeddingStore, vertex_field: str): async def init( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, ) -> tuple[BaseExtractor, dict[str, MilvusEmbeddingStore]]: # install requried queries requried_queries = [ @@ -126,7 +126,7 @@ async def init( return extractor, index_stores -def make_headers(conn: TigerGraphConnection): +def make_headers(conn: AsyncTigerGraphConnection): if conn.apiToken is None or conn.apiToken == "": tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() headers = {"Authorization": f"Basic {tkn}"} @@ -137,7 +137,7 @@ def make_headers(conn: TigerGraphConnection): async def stream_ids( - conn: TigerGraphConnection, v_type: str, current_batch: int, ttl_batches: int + conn: AsyncTigerGraphConnection, v_type: str, current_batch: int, ttl_batches: int ) -> dict[str, str | list[str]]: headers = make_headers(conn) @@ -192,7 +192,7 @@ def process_id(v_id: str): async def upsert_vertex( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, vertex_type: str, vertex_id: str, attributes: dict, @@ -203,7 +203,7 @@ async def upsert_vertex( await load_q.put(("vertices", (vertex_type, vertex_id, attrs))) -async def upsert_batch(conn: TigerGraphConnection, data: str): +async def upsert_batch(conn: AsyncTigerGraphConnection, data: str): headers = make_headers(conn) async with httpx.AsyncClient(timeout=http_timeout) as client: async with tg_sem: @@ -239,7 +239,7 @@ async def check_vertex_exists(conn, v_id: str): async def upsert_edge( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, src_v_type: str, src_v_id: str, edge_type: str, diff --git a/eventual-consistency-service/app/graphrag/workers.py b/eventual-consistency-service/app/graphrag/workers.py index 48ebf0f9..4fafa6d9 100644 --- a/eventual-consistency-service/app/graphrag/workers.py +++ b/eventual-consistency-service/app/graphrag/workers.py @@ -10,7 +10,7 @@ from aiochannel import Channel from graphrag import community_summarizer, util from langchain_community.graphs.graph_document import GraphDocument, Node -from pyTigerGraph import TigerGraphConnection +from pyTigerGraph import AsyncTigerGraphConnection from common.config import milvus_config from common.embeddings.embedding_services import EmbeddingModel @@ -24,7 +24,7 @@ async def install_query( - conn: TigerGraphConnection, query_path: str + conn: AsyncTigerGraphConnection, query_path: str ) -> dict[str, httpx.Response | str | None]: LogWriter.info(f"Installing query {query_path}") with open(f"{query_path}.gsql", "r") as f: @@ -35,19 +35,12 @@ async def install_query( USE GRAPH {conn.graphname} {query} INSTALL QUERY {query_name}""" - tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() - headers = {"Authorization": f"Basic {tkn}"} - async with httpx.AsyncClient(timeout=None) as client: async with util.tg_sem: - res = await client.post( - conn.gsUrl + "/gsqlserver/gsql/file", - data=quote_plus(query.encode("utf-8")), - headers=headers, - ) + res = await conn.gsql(query) - if "error" in res.text.lower(): - LogWriter.error(res.text) + if "error" in res: + LogWriter.error(res) return { "result": None, "error": True, @@ -61,7 +54,7 @@ async def install_query( async def chunk_doc( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, doc: dict[str, str], upsert_chan: Channel, embed_chan: Channel, @@ -100,7 +93,7 @@ async def chunk_doc( return doc["v_id"] -async def upsert_chunk(conn: TigerGraphConnection, doc_id, chunk_id, chunk): +async def upsert_chunk(conn: AsyncTigerGraphConnection, doc_id, chunk_id, chunk): logger.info(f"Upserting chunk {chunk_id}") date_added = int(time.time()) await util.upsert_vertex( @@ -192,7 +185,7 @@ async def extract( upsert_chan: Channel, embed_chan: Channel, extractor: BaseExtractor, - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, chunk: str, chunk_id: str, ): @@ -375,7 +368,7 @@ async def extract( async def resolve_entity( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, upsert_chan: Channel, emb_store: MilvusEmbeddingStore, entity_id: str, @@ -454,7 +447,7 @@ async def resolve_entity( async def process_community( - conn: TigerGraphConnection, + conn: AsyncTigerGraphConnection, upsert_chan: Channel, embed_chan: Channel, i: int, diff --git a/eventual-consistency-service/app/main.py b/eventual-consistency-service/app/main.py index d0c9afd8..aef08219 100644 --- a/eventual-consistency-service/app/main.py +++ b/eventual-consistency-service/app/main.py @@ -47,6 +47,7 @@ async def lifespan(_: FastAPI): db_config["username"], db_config["password"], graphname, + async_conn=True ) start_ecc_in_thread(graphname, conn) yield @@ -178,6 +179,7 @@ def consistency_status( credentials.username, credentials.password, graphname, + async_conn=True ) match ecc_method: case SupportAIMethod.SUPPORTAI: From 5d7076a66d6c69f6f56ab894c4ea4b721f66e2f3 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Tue, 17 Dec 2024 08:54:52 -0600 Subject: [PATCH 3/8] fix --- docs/notebooks/SupportAIDemo.ipynb | 58 +++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/docs/notebooks/SupportAIDemo.ipynb b/docs/notebooks/SupportAIDemo.ipynb index 4b21a0c5..1b1e477b 100644 --- a/docs/notebooks/SupportAIDemo.ipynb +++ b/docs/notebooks/SupportAIDemo.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 50, "metadata": {}, "outputs": [], "source": [ @@ -75,7 +75,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 51, "metadata": {}, "outputs": [], "source": [ @@ -107,12 +107,12 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "access = \"AKIARJ6KUJUIS7KJ27YO\"\n", - "sec = \"swYmXU+4yZbXiYCMfwSFxrcS0hNiOd6nzYog6VCZ\"\n", + "access = \"\"\n", + "sec = \"\"\n", "res = conn.ai.createDocumentIngest(\n", " data_source=\"s3\",\n", " data_source_config={\"aws_access_key\": access, \"aws_secret_key\": sec},\n", @@ -145,7 +145,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 52, "metadata": {}, "outputs": [ { @@ -154,13 +154,55 @@ "{'status': 'submitted'}" ] }, - "execution_count": 15, + "execution_count": 52, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "conn.ai.forceConsistencyUpdate(\"graphrag\")" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [], + "source": [ + "from pyTigerGraph import AsyncTigerGraphConnection\n", + "\n", + "conn = AsyncTigerGraphConnection(\n", + " host=host,\n", + " username=username,\n", + " password=password,\n", + " gsPort=\"31409\"\n", + ")\n", + "\n", + "conn.graphname = \"pyTigerGraphRAG\"\n", + "\n", + "res = await conn.gsql(\"\"\"USE GRAPH pyTigerGraphRAG\n", + " CREATE QUERY hello() FOR GRAPH pyTigerGraphRAG{PRINT \"hello\";}\n", + " INSTALL QUERY hello\"\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\"Using graph 'pyTigerGraphRAG'\\nline 2:53 extraneous input ''' expecting {ABORT, ALL, AND, ANY, AVG, BY, CASE, COALESCE, COLLECT, COLUMN, COMMIT, COUNT, DATETIME_ADD, DATETIME_SUB, DEFAULT, DIRECTED, DISTINCT, FALSE, FILE, GROUP, INSERT, ISEMPTY, JOIN, LASTHOP, LEFT, LIST, LOG, MAP, MATCH, MAX, MIN, NOT, NOW, NULL, ON, OR, OUTER, PATH, PER, PROJECT, RANGE, REPLACE, STDEV, STDEVP, SELECT_VERTEX, SEMIJOIN, SET, SRC, SUM, TGT, TO_DATETIME, TRIM, TRUE, UNDIRECTED, UPDATE, VIRTUAL, FILTERTYPE, GSQL_INT_MAX, GSQL_INT_MIN, GSQL_UINT_MAX, '__ENGINE__E_ATTR', '__ENGINE__SRC_ATTR', '__ENGINE__TGT_ATTR', '__ENGINE__V_ATTR', '__ENGINE__SRC_VAL', '__ENGINE__TGT_VAL', '__ENGINE__V_VAL', '__ENGINE__MESSAGE', '__ENGINE__CONTEXT', '__ENGINE__REQUEST', '__ENGINE__SERVICEAPI', 'type', '(', '[', '-', '.', '_', CONST_INT, CONST_STR, NAME, GACCNAME}\\nline 2:59 extraneous input ''' expecting {TO_CSV, WHERE, WITH_EMBEDDING, ',', ';'}\\nParsing encountered 2 syntax error(s)\\n\\nSaved as draft query with type/semantic error: [hello].\\nThe status of query hello is DRAFT, skip.\\nSemantic Check Fails: Graph pyTigerGraphRAG: all queries in this catalog have been installed already.\\nQuery installation finished.\"" + ] + }, + "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "conn.ai.forceConsistencyUpdate()" + "res" ] }, { From c7df94f2013e1603a4dbdd0036023513653f4d90 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Sun, 22 Dec 2024 15:06:19 -0600 Subject: [PATCH 4/8] fix graphrag to use pytg --- common/db/connections.py | 7 +- .../app/graphrag/graph_rag.py | 135 +++++++--------- .../app/graphrag/util.py | 151 +++++++----------- .../app/graphrag/workers.py | 2 +- 4 files changed, 112 insertions(+), 183 deletions(-) diff --git a/common/db/connections.py b/common/db/connections.py index 1ab6a8ac..b1c73581 100644 --- a/common/db/connections.py +++ b/common/db/connections.py @@ -109,12 +109,7 @@ def elevate_db_connection_to_token(host, username, password, graphname, async_co if db_config["getToken"]: try: - apiToken = conn._post( - conn.restppUrl + "/requesttoken", - authMode="pwd", - data=str({"graph": conn.graphname}), - resKey="results", - )["token"] + apiToken = conn.getToken()[0] except HTTPError: LogWriter.error("Failed to get token") raise HTTPException( diff --git a/eventual-consistency-service/app/graphrag/graph_rag.py b/eventual-consistency-service/app/graphrag/graph_rag.py index 23f66f4b..7a9b101a 100644 --- a/eventual-consistency-service/app/graphrag/graph_rag.py +++ b/eventual-consistency-service/app/graphrag/graph_rag.py @@ -41,33 +41,26 @@ async def stream_docs( Streams the document contents into the docs_chan """ logger.info("streaming docs") - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - for i in range(ttl_batches): - doc_ids = await stream_ids(conn, "Document", i, ttl_batches) - if doc_ids["error"]: - # continue to the next batch. - # These docs will not be marked as processed, so the ecc will process it eventually. - continue - - for d in doc_ids["ids"]: - try: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/StreamDocContent/", - params={"doc": d}, - headers=headers, - ) - if res.status_code != 200: - # continue to the next doc. - # This doc will not be marked as processed, so the ecc will process it eventually. - continue - logger.info("stream_docs writes to docs") - await docs_chan.put(res.json()["results"][0]["DocContent"][0]) - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") - continue # try retrieving the next doc + for i in range(ttl_batches): + doc_ids = await stream_ids(conn, "Document", i, ttl_batches) + if doc_ids["error"]: + # continue to the next batch. + # These docs will not be marked as processed, so the ecc will process it eventually. + continue + + for d in doc_ids["ids"]: + try: + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamDocContent", + params={"doc": d}, + ) + logger.info("stream_docs writes to docs") + await docs_chan.put(res[0]["DocContent"][0]) + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") + continue # try retrieving the next doc logger.info("stream_docs done") # close the docs chan -- this function is the only sender @@ -285,14 +278,10 @@ async def resolve_entities( upsert_chan.close() # Copy RELATIONSHIP edges to RESOLVED_RELATIONSHIP - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/ResolveRelationships/", - headers=headers, - ) - res.raise_for_status() + async with tg_sem: + res = await conn.runInstalledQuery( + "ResolveRelationships", + ) async def communities(conn: AsyncTigerGraphConnection, comm_process_chan: Channel): @@ -301,25 +290,19 @@ async def communities(conn: AsyncTigerGraphConnection, comm_process_chan: Channe """ # first pass: Group ResolvedEntities into Communities logger.info("Initializing Communities (first louvain pass)") - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/graphrag_louvain_init", - params={"n_batches": 1}, - headers=headers, - ) - res.raise_for_status() + + async with tg_sem: + res = await conn.runInstalledQuery( + "graphrag_louvain_init", + params={"n_batches": 1} + ) # get the modularity - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/modularity", - params={"iteration": 1, "batch_num": 1}, - headers=headers, - ) - res.raise_for_status() - mod = res.json()["results"][0]["mod"] + async with tg_sem: + res = await conn.runInstalledQuery( + "modularity", + params={"iteration": 1, "batch_num": 1} + ) + mod = res[0]["mod"] logger.info(f"****mod pass 1: {mod}") await stream_communities(conn, 1, comm_process_chan) @@ -331,26 +314,19 @@ async def communities(conn: AsyncTigerGraphConnection, comm_process_chan: Channe i += 1 logger.info(f"Running louvain on Communities (iteration: {i})") # louvain pass - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/graphrag_louvain_communities", - params={"n_batches": 1, "iteration": i}, - headers=headers, - ) - - res.raise_for_status() + async with tg_sem: + res = await conn.runInstalledQuery( + "graphrag_louvain_communities", + params={"n_batches": 1, "iteration": i}, + ) # get the modularity - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/modularity", - params={"iteration": i + 1, "batch_num": 1}, - headers=headers, - ) - res.raise_for_status() - mod = res.json()["results"][0]["mod"] + async with tg_sem: + res = await conn.runInstalledQuery( + "modularity", + params={"iteration": i + 1, "batch_num": 1}, + ) + mod = res[0]["mod"] logger.info(f"mod pass {i+1}: {mod} (diff= {abs(prev_mod - mod)})") if mod == 0 or mod - prev_mod <= -0.05: break @@ -377,15 +353,12 @@ async def stream_communities( # async for i in community_chan: # get the community from that layer - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/stream_community", - params={"iter": i}, - headers=headers, - ) - resp.raise_for_status() - comms = resp.json()["results"][0]["Comms"] + async with tg_sem: + resp = await conn.runInstalledQuery( + "stream_community", + params={"iter": i} + ) + comms = resp[0]["Comms"] for c in comms: await comm_process_chan.put((i, c["v_id"])) @@ -467,7 +440,7 @@ async def run(graphname: str, conn: AsyncTigerGraphConnection): type_start = time.perf_counter() logger.info("Type Processing Start") res = await add_rels_between_types(conn) - if res["error"]: + if res.get("error", False): logger.error(f"Error adding relationships between types: {res}") else: logger.info(f"Added relationships between types: {res}") diff --git a/eventual-consistency-service/app/graphrag/util.py b/eventual-consistency-service/app/graphrag/util.py index c4497ab9..5b949253 100644 --- a/eventual-consistency-service/app/graphrag/util.py +++ b/eventual-consistency-service/app/graphrag/util.py @@ -139,21 +139,17 @@ def make_headers(conn: AsyncTigerGraphConnection): async def stream_ids( conn: AsyncTigerGraphConnection, v_type: str, current_batch: int, ttl_batches: int ) -> dict[str, str | list[str]]: - headers = make_headers(conn) - try: - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.post( - f"{conn.restppUrl}/query/{conn.graphname}/StreamIds", - params={ - "current_batch": current_batch, - "ttl_batches": ttl_batches, - "v_type": v_type, - }, - headers=headers, - ) - ids = res.json()["results"][0]["@@ids"] + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamIds", + params={ + "current_batch": current_batch, + "ttl_batches": ttl_batches, + "v_type": v_type, + } + ) + ids = res[0]["@@ids"] return {"error": False, "ids": ids} except Exception as e: @@ -204,38 +200,27 @@ async def upsert_vertex( async def upsert_batch(conn: AsyncTigerGraphConnection, data: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - except Exception as e: - err = traceback.format_exc() - logger.error(f"Upsert err:\n{err}") - res.raise_for_status() + async with tg_sem: + try: + res = await conn.upsertData(data) + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} async def check_vertex_exists(conn, v_id: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.get( - f"{conn.restppUrl}/graph/{conn.graphname}/vertices/Entity/{v_id}", - headers=headers, - ) - - except Exception as e: - err = traceback.format_exc() - logger.error(f"Check err:\n{err}") + async with tg_sem: try: - res.raise_for_status() - return res.json() + res = await conn.getVerticesById("Entity", v_id) + except Exception as e: - logger.error(f"Check err:\n{e}\n{e}") - return {"error": True, "message": e} + err = traceback.format_exc() + logger.error(f"Check err:\n{err}") + return {"error": True, "message": str(e)} + + return {"error": False, "resp": res} async def upsert_edge( @@ -269,24 +254,18 @@ async def upsert_edge( async def get_commuinty_children(conn, i: int, c: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: - async with tg_sem: - try: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/get_community_children", - params={"comm": c, "iter": i}, - headers=headers, - ) - except: - logger.error(f"Get Children err:\n{traceback.format_exc()}") + async with tg_sem: try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Get Children err:\n{e}") + resp = await conn.runInstalledQuery( + "get_community_children", + params={"comm": c, "iter": i} + ) + except: + logger.error(f"Get Children err:\n{traceback.format_exc()}") + descrs = [] try: - res = resp.json()["results"][0]["children"] + res = resp[0]["children"] except Exception as e: logger.error(f"Get Children err:\n{e}") res = [] @@ -307,59 +286,41 @@ async def get_commuinty_children(conn, i: int, c: str): async def check_all_ents_resolved(conn): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/entities_have_resolution", - headers=headers, + resp = await conn.runInstalledQuery( + "entities_have_resolution" ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert Desc err:\n{e}") + except Exception as e: + logger.error(f"Check Vert Desc err:\n{e}") - res = resp.json()["results"][0]["all_resolved"] - logger.info(resp.json()["results"]) + res = resp[0]["all_resolved"] + logger.info(resp) return res async def add_rels_between_types(conn): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/create_entity_type_relationships", - headers=headers, + resp = await conn.runInstalledQuery( + "create_entity_type_relationships" ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert EntityType err:\n{e}") - - if resp.status_code != 200: - logger.error(f"Check Vert EntityType err:\n{resp.text}") - return {"error": True, "message": resp.text} - else: - res = resp.json() - logger.info(resp.json()) - return res + except Exception as e: + logger.error(f"Check Vert EntityType err:\n{e}") + return {"error": True, "message": e} + return resp[0] async def check_vertex_has_desc(conn, i: int): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=None) as client: + try: async with tg_sem: - resp = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/communities_have_desc", + resp = await conn.runInstalledQuery( + "communities_have_desc", params={"iter": i}, - headers=headers, ) - try: - resp.raise_for_status() - except Exception as e: - logger.error(f"Check Vert Desc err:\n{e}") + except Exception as e: + logger.error(f"Check Vert Desc err:\n{e}") - res = resp.json()["results"][0]["all_have_desc"] - logger.info(resp.json()["results"]) + res = resp[0]["all_have_desc"] + logger.info(res) return res diff --git a/eventual-consistency-service/app/graphrag/workers.py b/eventual-consistency-service/app/graphrag/workers.py index 4fafa6d9..1052cde8 100644 --- a/eventual-consistency-service/app/graphrag/workers.py +++ b/eventual-consistency-service/app/graphrag/workers.py @@ -173,7 +173,7 @@ async def get_vert_desc(conn, v_id, node: Node): # if vertex exists, get description content and append this description to it if not exists.get("error", False): # deduplicate descriptions - desc.extend(exists["results"][0]["attributes"]["description"]) + desc.extend(exists["resp"][0]["attributes"]["description"]) desc = list(set(desc)) return desc From 7babc269d5e2a904daf34b882595365da56aebc0 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Wed, 25 Dec 2024 11:39:56 -0600 Subject: [PATCH 5/8] update supportai --- .../app/supportai/util.py | 112 ++++++------------ 1 file changed, 39 insertions(+), 73 deletions(-) diff --git a/eventual-consistency-service/app/supportai/util.py b/eventual-consistency-service/app/supportai/util.py index b6cdc948..202c1ff4 100644 --- a/eventual-consistency-service/app/supportai/util.py +++ b/eventual-consistency-service/app/supportai/util.py @@ -142,20 +142,18 @@ async def stream_ids( headers = make_headers(conn) try: - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - res = await client.post( - f"{conn.restppUrl}/query/{conn.graphname}/StreamIds", - params={ - "current_batch": current_batch, - "ttl_batches": ttl_batches, - "v_type": v_type, - }, - headers=headers, - ) - ids = res.json()["results"][0]["@@ids"] + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamIds", + params={ + "current_batch": current_batch, + "ttl_batches": ttl_batches, + "v_type": v_type, + } + ) + ids = res[0]["@@ids"] return {"error": False, "ids": ids} - + except Exception as e: exc = traceback.format_exc() LogWriter.error(f"/{conn.graphname}/query/StreamIds\nException Trace:\n{exc}") @@ -201,50 +199,28 @@ async def upsert_vertex( attrs = map_attrs(attributes) data = json.dumps({"vertices": {vertex_type: {vertex_id: attrs}}}) headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - - res.raise_for_status() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Request body: {data}") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - logger.error(f"Request body: {data}") + async with tg_sem: + try: + res = await conn.upsertData(data) + + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} async def check_vertex_exists(conn, v_id: str): - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.get( - f"{conn.restppUrl}/graph/{conn.graphname}/vertices/Entity/{v_id}", - headers=headers, - ) - - res.raise_for_status() - return res.json() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - return {"error": "Request failed"} - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - return {"error": f"HTTP status error {exc.response.status_code}"} + async with tg_sem: + try: + res = await conn.getVerticesById("Entity", v_id) + + except Exception as e: + err = traceback.format_exc() + logger.error(f"Check err:\n{err}") + return {"error": True, "message": str(e)} + + return {"error": False, "resp": res} @@ -278,22 +254,12 @@ async def upsert_edge( } } ) - headers = make_headers(conn) - async with httpx.AsyncClient(timeout=http_timeout) as client: - async with tg_sem: - try: - res = await client.post( - f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers - ) - res.raise_for_status() - except httpx.RequestError as exc: - logger.error(f"An error occurred while requesting {exc.request.url!r}.") - logger.error(f"Request body: {data}") - logger.error(f"Details: {exc}") - # Check if the exception has a response attribute - if hasattr(exc, 'response') and exc.response is not None: - logger.error(f"Response content: {exc.response.content}") - except httpx.HTTPStatusError as exc: - logger.error(f"Error response {exc.response.status_code} while requesting {exc.request.url!r}.") - logger.error(f"Response content: {exc.response.content}") - logger.error(f"Request body: {data}") + async with tg_sem: + try: + res = await conn.upsertData(data) + + logger.info(f"Upsert res: {res}") + except Exception as e: + err = traceback.format_exc() + logger.error(f"Upsert err:\n{err}") + return {"error": True, "message": str(e)} From 162de49c49f77ab0209df6226b5af642dffb6291 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Wed, 25 Dec 2024 11:41:04 -0600 Subject: [PATCH 6/8] update supportai --- .../app/supportai/workers.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/eventual-consistency-service/app/supportai/workers.py b/eventual-consistency-service/app/supportai/workers.py index 828655e8..e02ca5ad 100644 --- a/eventual-consistency-service/app/supportai/workers.py +++ b/eventual-consistency-service/app/supportai/workers.py @@ -34,19 +34,12 @@ async def install_query( USE GRAPH {conn.graphname} {query} INSTALL QUERY {query_name}""" - tkn = base64.b64encode(f"{conn.username}:{conn.password}".encode()).decode() - headers = {"Authorization": f"Basic {tkn}"} - - async with httpx.AsyncClient(timeout=None) as client: - async with util.tg_sem: - res = await client.post( - conn.gsUrl + "/gsqlserver/gsql/file", - data=quote_plus(query.encode("utf-8")), - headers=headers, - ) - if "error" in res.text.lower(): - LogWriter.error(res.text) + async with util.tg_sem: + res = await conn.gsql(query) + + if "error" in res: + LogWriter.error(res) return { "result": None, "error": True, @@ -56,6 +49,7 @@ async def install_query( return {"result": res, "error": False} + async def chunk_doc( conn: TigerGraphConnection, doc: dict[str, str], From f54094dd4c93f88e9f03e53de0223adb6329f978 Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Wed, 25 Dec 2024 11:58:33 -0600 Subject: [PATCH 7/8] supportai pytg async --- .../app/supportai/supportai_init.py | 9 +++------ eventual-consistency-service/app/supportai/util.py | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/eventual-consistency-service/app/supportai/supportai_init.py b/eventual-consistency-service/app/supportai/supportai_init.py index 287d367b..c542cc90 100644 --- a/eventual-consistency-service/app/supportai/supportai_init.py +++ b/eventual-consistency-service/app/supportai/supportai_init.py @@ -43,15 +43,12 @@ async def stream_docs( for d in doc_ids["ids"]: try: async with tg_sem: - res = await client.get( - f"{conn.restppUrl}/query/{conn.graphname}/StreamDocContent/", + res = await conn.runInstalledQuery( + "StreamDocContent", params={"doc": d}, - headers=headers, ) - if res.status_code != 200: - continue logger.info("stream_docs writes to docs") - await docs_chan.put(res.json()["results"][0]["DocContent"][0]) + await docs_chan.put(res[0]["DocContent"][0]) except Exception as e: exc = traceback.format_exc() logger.error(f"Error retrieveing doc: {d} --> {e}\n{exc}") diff --git a/eventual-consistency-service/app/supportai/util.py b/eventual-consistency-service/app/supportai/util.py index 202c1ff4..ba415014 100644 --- a/eventual-consistency-service/app/supportai/util.py +++ b/eventual-consistency-service/app/supportai/util.py @@ -33,7 +33,7 @@ async def install_queries( conn: TigerGraphConnection, ): # queries that are currently installed - installed_queries = [q.split("/")[-1] for q in conn.getEndpoints(dynamic=True)] + installed_queries = [q.split("/")[-1] for q in await conn.getEndpoints(dynamic=True)] # doesn't need to be parallel since tg only does it one at a time for q in requried_queries: From d6f9810cbb8e99d78c91f972a75441f4ea3ac0ff Mon Sep 17 00:00:00 2001 From: Parker Erickson Date: Wed, 25 Dec 2024 12:06:20 -0600 Subject: [PATCH 8/8] fix tgproxy --- common/metrics/tg_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/metrics/tg_proxy.py b/common/metrics/tg_proxy.py index 5bfcb494..d6291c6f 100644 --- a/common/metrics/tg_proxy.py +++ b/common/metrics/tg_proxy.py @@ -33,7 +33,7 @@ def hooked(*args, **kwargs): else: return original_attr - def _req(self, method: str, url: str, authMode: str, *args, **kwargs): + def _req(self, method: str, url: str, authMode: str = "token", *args, **kwargs): # we always use token auth # always use proxy endpoint in GUI for restpp and gsql if self.auth_mode == "pwd":