Skip to content

Commit

Permalink
feat(project): improve huge knowledge base (#386)
Browse files Browse the repository at this point in the history
* feat(primitive): support batching build feature store

* feat(primitive): support TPM limitter

* feat(end2end/main.py): add precision evaluation
  • Loading branch information
tpoisonooo authored Sep 10, 2024
1 parent 97d3ceb commit ff48875
Show file tree
Hide file tree
Showing 22 changed files with 347 additions and 120 deletions.
23 changes: 3 additions & 20 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
models/
repodir/
workdir/
write_toml.py
modeling_internlm2.py
Expand All @@ -9,7 +7,6 @@ logs/
logs/work.txt
server.log
**/__pycache__
pk/
badcase.txt
config.bak
config.ini
Expand All @@ -28,33 +25,19 @@ nohup.out
start-web.sh
web/proxy/config-template.ini
web/env.sh
sft-data
config-alignment.ini
logs/work.txt
web/tools/query.jsonl
query.jsonl
web/tools/groups/
tests/history_recv_send.txt
web/tools/chat.txt
web/tools/filter.jsonl
unittest/token.json
config.test
wkteam/
config-wechat.ini
sft/groups/
evaluation/queries/
candidates/
evaluation/candidates.zip
evaluation/feature_stores.zip
evaluation/query.log
bceodir/
odir/
repodir-full/
web.log
workdir-full/
evaluation/rejection/gt_bad.txt
evaluation/rejection/gt_good.txt
workdir832/
workdir.bak/
workdir-20240729-kg-included/
bm25.pkl
repodir/
repodir.full/
workdir.full/
1 change: 1 addition & 0 deletions config-cpu.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reranker_model_path = "https://api.siliconflow.cn/v1/rerank"
# if using `siliconcloud` API as `embedding_model_path` or `reranker_model_path`, give the token
api_token = ""
api_rpm = 800
api_tpm = 40000
work_dir = "workdir"

[web_search]
Expand Down
1 change: 1 addition & 0 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reranker_model_path = "maidalun1020/bce-reranker-base_v1"
# if using `siliconcloud` API as `embedding_model_path` or `reranker_model_path`, give the token
api_token = ""
api_rpm = 1000
api_tpm = 40000
work_dir = "workdir"

[web_search]
Expand Down
79 changes: 79 additions & 0 deletions evaluation/end2end/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from huixiangdou.service import ParallelPipeline, start_llm_server
from huixiangdou.primitive import Query
import json
import asyncio
import pdb
from typing import List
from rouge import Rouge
from loguru import logger

assistant = ParallelPipeline(work_dir='/home/khj/hxd-ci/workdir', config_path='/home/khj/hxd-ci/config.ini')

def format_refs(refs: List[str]):
refs_filter = list(set(refs))
if len(refs) < 1:
return ''

text = '**References:**\r\n'
for file_or_url in refs_filter:
text += '* {}\r\n'.format(file_or_url)
text += '\r\n'
return text

async def run(query_text: str):
query = Query(query_text)
sentence = ''
refs = None
async for sess in assistant.generate(query=query, enable_web_search=False):
if len(sess.delta) > 0:
sentence += sess.delta
if refs is None:
refs = sess.references
return sentence, refs

gts = []
dts = []

output_filepath = 'out.jsonl'

finished_query = []
with open(output_filepath) as fin:
json_str = ""
for line in fin:
json_str += line

if '}\n' == line:
print(json_str)
json_obj = json.loads(json_str)
finished_query.append(json_obj['query'].strip())
json_str = ""

with open('evaluation/end2end/qa.jsonl') as fin:
for json_str in fin:
json_obj = json.loads(json_str)
query = json_obj['query'].strip()
if query in finished_query:
continue

gt = json_obj['resp']
gts.append(gt)

loop = asyncio.get_event_loop()
dt, refs = loop.run_until_complete(run(query_text=query))
dts.append(dt)

distance = assistant.retriever.embedder.distance(text1=gt, text2=dt).tolist()

rouge = Rouge()
scores = rouge.get_scores(gt, dt)
json_obj['distance'] = distance
json_obj['rouge_scores'] = scores
json_obj['dt'] = dt
json_obj['dt_refs'] = refs

out_json_str = json.dumps(json_obj, ensure_ascii=False, indent=2)
logger.info(out_json_str)

with open(output_filepath, 'a') as fout:
fout.write(out_json_str)
fout.write('\n')
3 changes: 1 addition & 2 deletions evaluation/rerank/step1_create_candidates.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ def process(param: tuple):
json_str = json.dumps({
'query': query,
'candidates': candidates
},
ensure_ascii=False)
}, ensure_ascii=False)

with open(os.path.join('candidates', fsid + '.jsonl'), 'a') as f:
f.write(json_str)
Expand Down
2 changes: 1 addition & 1 deletion huixiangdou/primitive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
MarkdownTextRefSplitter,
RecursiveCharacterTextSplitter,
nested_split_markdown, split_python_code)
from .rpm import RPM
from .limitter import RPM, TPM
from .bm250kapi import BM25Okapi
45 changes: 42 additions & 3 deletions huixiangdou/primitive/embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import numpy as np
from loguru import logger
from .query import DistanceStrategy
from .rpm import RPM
from .limitter import RPM, TPM
from .chunk import Chunk

class Embedder:
"""Wrap text2vec (multimodal) model."""
Expand Down Expand Up @@ -43,10 +44,13 @@ def __init__(self, model_config: dict):

if 'Bearer' not in api_token:
api_token = 'Bearer ' + api_token
api_rpm = max(1, int(model_config['api_rpm']))
api_rpm = max(1000, int(model_config['api_rpm']))
api_tpm = max(40000, int(model_config['api_tpm']))

self.client = {
'api_token': api_token,
'api_rpm': RPM(api_rpm)
'api_rpm': RPM(api_rpm),
'api_tpm': TPM(api_tpm)
}

else:
Expand Down Expand Up @@ -74,6 +78,15 @@ def token_length(self, text: str) -> int:
else:
return len(text) // 2

def distance(self, text1:str, text2:str) -> float:
emb1 = self.embed_query(text=text1)
emb2 = self.embed_query(text=text2)

if self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
distance = np.linalg.norm(emb1 - emb2)
return distance
raise ValueError('Unsupported distance strategy')

def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
"""Embed input text or image as feature, output np.ndarray with np.float32"""
if 'bge' in self._type:
Expand All @@ -91,6 +104,7 @@ def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
return emb
else:
self.client['api_rpm'].wait(silent=True)
self.client['api_tpm'].wait(silent=True, token_count=len(text))

# siliconcloud bce API
if text is None:
Expand All @@ -115,3 +129,28 @@ def embed_query(self, text: str = None, path: str = None) -> np.ndarray:
emb_list = json_obj['data'][0]['embedding']
emb = np.array(emb_list).astype(np.float32).reshape(1, -1)
return emb

def embed_query_batch_text(self, chunks: List[Chunk] = []) -> np.ndarray:
"""Embed input text or image as feature, output np.ndarray with np.float32"""
if 'bge' in self._type:
import torch
with torch.no_grad():
features = []
for c in chunks:
feature = self.client.encode(text=c.content_or_path)
features.append(feature.cpu().numpy())
return np.concatenate(features).reshape(len(chunks), -1).astype(np.float32)

elif 'bce' in self._type:
texts = []
for c in chunks:
texts.append(c.content_or_path)
emb = self.client.encode(texts, show_progress_bar=False, normalize_embeddings=True)
return emb.astype(np.float32)

else:
features = []
for c in chunks:
feature = self.embed_query(text=c.content_or_path)
features.append(feature)
return np.concatenate(features).reshape(len(chunks), -1).astype(np.float32)
103 changes: 79 additions & 24 deletions huixiangdou/primitive/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .embedder import Embedder
from .query import Query, DistanceStrategy
from .chunk import Chunk


# heavily modified from langchain
Expand Down Expand Up @@ -118,6 +119,20 @@ def similarity_search_with_query(self,
logger.info('highest score {}, threshold {}'.format(highest_score, threshold))
return ret

@classmethod
def split_by_batchsize(self, chunks: List[Chunk] = [], batchsize:int = 4):
texts = [c for c in chunks if c.modal == 'text']
images = [c for c in chunks if c.modal == 'image']

block_text = []
for i in range(0, len(texts), batchsize):
block_text.append(texts[i:i+batchsize])

block_image = []
for i in range(0, len(images), batchsize):
block_image.append(images[i:i+batchsize])
return block_text, block_image

@classmethod
def save_local(self, folder_path: str, chunks: List[Chunk],
embedder: Embedder) -> None:
Expand All @@ -131,32 +146,72 @@ def save_local(self, folder_path: str, chunks: List[Chunk],

faiss = dependable_faiss_import()
index = None
batchsize = 1

for chunk in tqdm(chunks):
np_feature = None
try:
if chunk.modal == 'text':
np_feature = embedder.embed_query(text=chunk.content_or_path)
elif chunk.modal == 'image':
try:
batchsize_str = os.getenv('HUIXIANGDOU_BATCHSIZE')
if batchsize_str is None:
logger.info('`export HUIXIANGDOU_BATCHSIZE=64` for faster feature building.')
else:
batchsize = int(batchsize_str)
except Exception as e:
logger.error(str(e))
batchsize = 1

if batchsize == 1:
for chunk in tqdm(chunks, 'chunks'):
np_feature = None
try:
if chunk.modal == 'text':
np_feature = embedder.embed_query(text=chunk.content_or_path)
elif chunk.modal == 'image':
np_feature = embedder.embed_query(path=chunk.content_or_path)
else:
raise ValueError(f'Unimplement chunk type: {chunk.modal}')
except Exception as e:
logger.error('{}'.format(e))

if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_feature)
else:
# batching
block_text, block_image = self.split_by_batchsize(chunks=chunks, batchsize=batchsize)
for subchunks in tqdm(block_text, 'build_text'):
np_features = embedder.embed_query_batch_text(chunks=subchunks)
if index is None:
dimension = np_features[0].shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_features)

for subchunks in tqdm(block_image, 'build_image'):
for chunk in subchunks:
np_feature = embedder.embed_query(path=chunk.content_or_path)
else:
raise ValueError(f'Unimplement chunk type: {chunk.modal}')
except Exception as e:
logger.error('{}'.format(e))

if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)

index.add(np_feature)
if np_feature is None:
logger.error('np_feature is None')
continue

if index is None:
dimension = np_feature.shape[-1]

if embedder.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
index = faiss.IndexFlatL2(dimension)
elif embedder.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
index = faiss.IndexFlatIP(dimension)
index.add(np_feature)

path = Path(folder_path)
path.mkdir(exist_ok=True, parents=True)
Expand Down
Loading

0 comments on commit ff48875

Please sign in to comment.