Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graphrag): add type information upsert #271

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/gsql/supportai/SupportAI_Schema.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CREATE SCHEMA_CHANGE JOB add_supportai_schema {
ADD VERTEX Relationship(PRIMARY_ID id STRING, definition STRING, short_name STRING, epoch_added UINT, epoch_processing UINT, epoch_processed UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true";
ADD VERTEX DocumentCollection(PRIMARY_ID id STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true";
ADD VERTEX Content(PRIMARY_ID id STRING, text STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true";
ADD VERTEX EntityType(PRIMARY_ID id STRING, description STRING, epoch_added UINT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true";
ADD DIRECTED EDGE HAS_CONTENT(FROM Document, TO Content|FROM DocumentChunk, TO Content) WITH REVERSE_EDGE="reverse_HAS_CONTENT";
ADD DIRECTED EDGE IS_CHILD_OF(FROM Concept, TO Concept) WITH REVERSE_EDGE="reverse_IS_CHILD_OF";
ADD DIRECTED EDGE IS_HEAD_OF(FROM Entity, TO Relationship) WITH REVERSE_EDGE="reverse_IS_HEAD_OF";
Expand All @@ -18,6 +19,8 @@ CREATE SCHEMA_CHANGE JOB add_supportai_schema {
ADD DIRECTED EDGE HAS_CHILD(FROM Document, TO DocumentChunk) WITH REVERSE_EDGE="reverse_HAS_CHILD";
ADD DIRECTED EDGE HAS_RELATIONSHIP(FROM Concept, TO Concept, relation_type STRING) WITH REVERSE_EDGE="reverse_HAS_RELATIONSHIP";
ADD DIRECTED EDGE CONTAINS_DOCUMENT(FROM DocumentCollection, TO Document) WITH REVERSE_EDGE="reverse_CONTAINS_DOCUMENT";
ADD DIRECTED EDGE ENTITY_HAS_TYPE(FROM Entity, TO EntityType) WITH REVERSE_EDGE="reverse_ENTITY_HAS_TYPE";
ADD DIRECTED EDGE RELATIONSHIP_TYPE(FROM EntityType, TO EntityType, DISCRIMINATOR(relation_type STRING), frequency INT) WITH REVERSE_EDGE="reverse_RELATIONSHIP_TYPE";

// GraphRAG
ADD VERTEX Community (PRIMARY_ID id STRING, iteration UINT, description STRING) WITH PRIMARY_ID_AS_ATTRIBUTE="true";
Expand Down
20 changes: 20 additions & 0 deletions common/gsql/supportai/create_entity_type_relationships.gsql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE DISTRIBUTED QUERY create_entity_type_relationships(/* Parameters here */) SYNTAX v2{
MapAccum<STRING, MapAccum<STRING, SumAccum<INT>>> @rel_type_count; // entity type, relationship type for entity type, frequency
SumAccum<INT> @@rels_inserted;
ents = {Entity.*};
accum_types = SELECT et FROM ents:e -(RELATIONSHIP>:r)- Entity:e2 -(ENTITY_HAS_TYPE>:eht)- EntityType:et
WHERE r.relation_type != "DOC_CHUNK_COOCCURRENCE"
ACCUM
e.@rel_type_count += (et.id -> (r.relation_type -> 1));

ets = SELECT et FROM ents:e -(ENTITY_HAS_TYPE>:eht)- EntityType:et
ACCUM
FOREACH (entity_type, rel_type_freq) IN e.@rel_type_count DO
FOREACH (rel_type, freq) IN e.@rel_type_count.get(entity_type) DO
INSERT INTO RELATIONSHIP_TYPE VALUES (et.id, entity_type, rel_type, freq),
@@rels_inserted += 1
END
END;

PRINT @@rels_inserted as relationships_inserted;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ async def summarize(self, name: str, text: list[str]) -> CommunitySummary:

# remove iteration tags from name
name = id_pat.sub("", name)
summary = await chain.ainvoke(
{
"entity_name": name,
"description_list": text,
}
)
try:
summary = await chain.ainvoke(
{
"entity_name": name,
"description_list": text,
}
)
except Exception as e:
return {"error": True, "summary": ""}
return summary.summary
parkererickson-tg marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions eventual-consistency-service/app/graphrag/graph_rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
stream_ids,
tg_sem,
upsert_batch,
add_rels_between_types
)
from pyTigerGraph import TigerGraphConnection

Expand Down Expand Up @@ -462,6 +463,12 @@ async def run(graphname: str, conn: TigerGraphConnection):
init_end = time.perf_counter()
logger.info("Doc Processing End")

# Type Resolution
type_start = time.perf_counter()
logger.info("Type Processing Start")
await add_rels_between_types(conn)
logger.info("Type Processing End")
type_end = time.perf_counter()
# Entity Resolution
entity_start = time.perf_counter()

Expand Down Expand Up @@ -516,6 +523,7 @@ async def run(graphname: str, conn: TigerGraphConnection):
end = time.perf_counter()
logger.info(f"DONE. graphrag system initializer dT: {init_end-init_start}")
logger.info(f"DONE. graphrag entity resolution dT: {entity_end-entity_start}")
logger.info(f"DONE. graphrag type creation dT: {type_end-type_start}")
logger.info(
f"DONE. graphrag community initializer dT: {community_end-community_start}"
)
Expand Down
33 changes: 27 additions & 6 deletions eventual-consistency-service/app/graphrag/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def init(
"common/gsql/graphRAG/louvain/graphrag_louvain_communities",
"common/gsql/graphRAG/louvain/modularity",
"common/gsql/graphRAG/louvain/stream_community",
"common/gsql/supportai/create_entity_type_relationships"
]
# add louvain to queries
q = [x.split(".gsql")[0] for x in glob("common/gsql/graphRAG/louvain/*")]
Expand Down Expand Up @@ -206,9 +207,13 @@ async def upsert_batch(conn: TigerGraphConnection, data: str):
headers = make_headers(conn)
async with httpx.AsyncClient(timeout=http_timeout) as client:
async with tg_sem:
res = await client.post(
f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers
)
try:
res = await client.post(
f"{conn.restppUrl}/graph/{conn.graphname}", data=data, headers=headers
)
except Exception as e:
err = traceback.format_exc()
logger.error(f"Upsert err:\n{err}")
res.raise_for_status()


Expand All @@ -225,14 +230,11 @@ async def check_vertex_exists(conn, v_id: str):
except Exception as e:
err = traceback.format_exc()
logger.error(f"Check err:\n{err}")
return {"error": True}

try:
res.raise_for_status()
return res.json()
except Exception as e:
logger.error(f"Check err:\n{e}\n{res.text}")
return {"error": True}

parkererickson-tg marked this conversation as resolved.
Show resolved Hide resolved

async def upsert_edge(
Expand Down Expand Up @@ -321,6 +323,25 @@ async def check_all_ents_resolved(conn):

return res

async def add_rels_between_types(conn):
headers = make_headers(conn)
async with httpx.AsyncClient(timeout=None) as client:
async with tg_sem:
resp = await client.get(
f"{conn.restppUrl}/query/{conn.graphname}/create_entity_type_relationships",
headers=headers,
)
try:
resp.raise_for_status()
except Exception as e:
logger.error(f"Check Vert EntityType err:\n{e}")

if resp.status_code != 200:
logger.error(f"Check Vert EntityType err:\n{resp.text}")
parkererickson-tg marked this conversation as resolved.
Show resolved Hide resolved
else:
res = resp.json()["results"][0]["relationships_inserted"]
logger.info(resp.json()["results"])
return res

async def check_vertex_has_desc(conn, i: int):
headers = make_headers(conn)
Expand Down
35 changes: 34 additions & 1 deletion eventual-consistency-service/app/graphrag/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from common.config import milvus_config
from common.embeddings.embedding_services import EmbeddingModel
from common.embeddings.milvus_embedding_store import MilvusEmbeddingStore
from common.extractors.BaseExtractor import BaseExtractor
from common.extractors import BaseExtractor, LLMEntityRelationshipExtractor
from common.logs.logwriter import LogWriter

vertex_field = milvus_config.get("vertex_field", "vertex_id")
Expand Down Expand Up @@ -242,6 +242,39 @@ async def extract(
),
)
)
if isinstance(extractor, LLMEntityRelationshipExtractor):
logger.info("extract writes type vert to upsert")
type_id = util.process_id(node.type)
if len(type_id) == 0:
continue
await upsert_chan.put(
(
util.upsert_vertex, # func to call
(
conn,
"EntityType", # v_type
type_id, # v_id
{ # attrs
"epoch_added": int(time.time()),
},
)
)
)
logger.info("extract writes entity_has_type edge to upsert")
await upsert_chan.put(
(
util.upsert_edge,
(
conn,
"Entity", # src_type
v_id, # src_id
"ENTITY_HAS_TYPE", # edgeType
"EntityType", # tgt_type
type_id, # tgt_id
None, # attributes
),
)
)

# link the entity to the chunk it came from
logger.info("extract writes contains edge to upsert")
Expand Down
Loading