Skip to content

Commit

Permalink
[Paddle-pipelines] remove redundant requirements, upgrade to 0.6.2 (P…
Browse files Browse the repository at this point in the history
…addlePaddle#7518)

* remove redundant requirements

* Upgrade to 0.6.2

* Update txt text processing

* fix ci error

* recover markdown
  • Loading branch information
w5688414 authored Nov 29, 2023
1 parent be318c5 commit 5eb2eae
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pipelines/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.1
0.6.2
1 change: 1 addition & 0 deletions pipelines/examples/semantic-search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ curl -X POST -k http://localhost:8891/query -H 'Content-Type: application/json'

```bash
pip install streamlit==1.11.1
pip install markdown
# 配置模型服务地址
export API_ENDPOINT=http://127.0.0.1:8891
# 在指定端口 8502 启动 WebUI
Expand Down
99 changes: 65 additions & 34 deletions pipelines/pipelines/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
chunk_size: int = 500,
thread_count: int = 32,
queue_size: int = 32,
**kwargs,
):
"""
A DocumentStore using Elasticsearch to store and query the documents for our search.
Expand Down Expand Up @@ -239,6 +240,8 @@ def __init__(
self.scroll = scroll
self.skip_missing_embeddings: bool = skip_missing_embeddings
self.vector_type = vector_type
self.number_of_shards = kwargs.get("number_of_shards", 1)
self.number_of_replicas = kwargs.get("number_of_replicas", 2)

self.similarity_check(similarity)
if index_type in ["flat", "hnsw"]:
Expand Down Expand Up @@ -309,6 +312,8 @@ def _init_elastic_client(
verify_certs=verify_certs,
timeout=timeout,
connection_class=connection_class,
max_retries=5,
retry_on_timeout=True,
)
elif aws4auth:
# aws elasticsearch with IAM
Expand All @@ -320,6 +325,8 @@ def _init_elastic_client(
use_ssl=True,
verify_certs=True,
timeout=timeout,
max_retries=5,
retry_on_timeout=True,
)
elif username:
# standard http_auth
Expand All @@ -331,6 +338,8 @@ def _init_elastic_client(
verify_certs=verify_certs,
timeout=timeout,
connection_class=connection_class,
max_retries=5,
retry_on_timeout=True,
)
else:
# there is no authentication for this elasticsearch instance
Expand All @@ -341,6 +350,8 @@ def _init_elastic_client(
verify_certs=verify_certs,
timeout=timeout,
connection_class=connection_class,
max_retries=5,
retry_on_timeout=True,
)

# Test connection
Expand Down Expand Up @@ -451,8 +462,8 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
mapping["mappings"]["properties"].update({field: {"type": "text"}})

if self.embedding_field:
mapping["settings"]["number_of_shards"] = 1
mapping["settings"]["number_of_replicas"] = 2
mapping["settings"]["number_of_shards"] = self.number_of_shards
mapping["settings"]["number_of_replicas"] = self.number_of_replicas
mapping["mappings"]["properties"][self.embedding_field] = {
"type": self.vector_type,
"dims": self.embedding_dim,
Expand Down Expand Up @@ -488,7 +499,7 @@ def _create_label_index(self, index_name: str, headers: Optional[Dict[str, str]]
# TODO add pipeline_hash and pipeline_name once we migrated the REST API to pipelines
}
},
"settings": {"number_of_shards": 1, "number_of_replicas": 2},
"settings": {"number_of_shards": self.number_of_shards, "number_of_replicas": self.number_of_replicas},
}
try:
self.client.indices.create(index=index_name, body=mapping, headers=headers)
Expand Down Expand Up @@ -527,9 +538,15 @@ def get_documents_by_id(
to performance issues. Note that Elasticsearch limits the number of results to 10,000 documents by default.
"""
index = index or self.index
query = {"size": len(ids), "query": {"ids": {"values": ids}}}
result = self.client.search(index=index, body=query, headers=headers)["hits"]["hits"]
documents = [self._convert_es_hit_to_document(hit, return_embedding=self.return_embedding) for hit in result]
documents = []
for i in range(0, len(ids), batch_size):
ids_for_batch = ids[i : i + batch_size]
query = {"size": len(ids_for_batch), "query": {"ids": {"values": ids_for_batch}}}
result = self.client.search(index=index, body=query, request_timeout=600, headers=headers)["hits"]["hits"]
# documents = [self._convert_es_hit_to_document(hit, return_embedding=self.return_embedding) for hit in result]
documents.extend(
[self._convert_es_hit_to_document(hit, return_embedding=self.return_embedding) for hit in result]
)
return documents

def get_metadata_values_by_key(
Expand Down Expand Up @@ -659,6 +676,7 @@ def write_documents(
documents=document_objects, index=index, duplicate_documents=duplicate_documents, headers=headers
)
documents_to_index = []

for doc in document_objects:
_doc = {
"_op_type": "index" if duplicate_documents == "overwrite" else "create",
Expand Down Expand Up @@ -1203,7 +1221,7 @@ def query(

logger.debug(f"Retriever query: {body}")
logging.getLogger("elasticsearch").setLevel(logging.CRITICAL)
result = self.client.search(index=index, body=body, headers=headers)["hits"]["hits"]
result = self.client.search(index=index, body=body, request_timeout=600, headers=headers)["hits"]["hits"]

documents = [self._convert_es_hit_to_document(hit, return_embedding=self.return_embedding) for hit in result]
return documents
Expand Down Expand Up @@ -1303,14 +1321,17 @@ def query_by_embedding(
# +1 in similarity to avoid negative numbers (for cosine sim)
body = {"size": top_k, "query": self._get_vector_similarity_query(query_emb, top_k)}
if filters:
filter_ = {"bool": {"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}}
if body["query"]["script_score"]["query"] == {"match_all": {}}:
body["query"]["script_score"]["query"] = filter_
if self.index_type == "hnsw":
filter_ = {"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}
body["query"]["knn"][self.embedding_field].update(filter_)
else:
body["query"]["script_score"]["query"]["bool"]["filter"]["bool"]["must"].append(filter_)
filter_ = {"bool": {"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}}
if body["query"]["script_score"]["query"] == {"match_all": {}}:
body["query"]["script_score"]["query"] = filter_
else:
body["query"]["script_score"]["query"]["bool"]["filter"]["bool"]["must"].append(filter_)

excluded_meta_data: Optional[list] = None

if self.excluded_meta_data:
excluded_meta_data = deepcopy(self.excluded_meta_data)

Expand All @@ -1324,9 +1345,9 @@ def query_by_embedding(
if excluded_meta_data:
body["_source"] = {"excludes": excluded_meta_data}

logger.debug(f"Retriever query: {body}")
# logger.debug(f"Retriever query: {body}")
try:
result = self.client.search(index=index, body=body, request_timeout=300, headers=headers)["hits"]["hits"]
result = self.client.search(index=index, body=body, request_timeout=600, headers=headers)["hits"]["hits"]
if len(result) == 0:
count_embeddings = self.get_embedding_count(index=index, headers=headers)
if count_embeddings == 0:
Expand Down Expand Up @@ -2073,7 +2094,7 @@ def _get_embedding_field_mapping(self, similarity: Optional[str]):
# use default parameters
pass
elif self.index_type == "hnsw":
method["parameters"] = {"ef_construction": 80, "m": 64}
method["parameters"] = {"ef_construction": self.ef_construction, "m": self.m}
else:
logger.error("Please set index_type to either 'flat' or 'hnsw'")

Expand Down Expand Up @@ -2242,18 +2263,29 @@ def _get_vector_similarity_query(self, query_emb: np.ndarray, top_k: int):
script_score_query = {
"bool": {"filter": {"bool": {"must": [{"exists": {"field": self.embedding_field}}]}}}
}

query = {
"script_score": {
"query": script_score_query,
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": "bpack_knn_script",
"lang": "knn",
"params": {"space": self.similarity, "field": "embedding", "vector": query_emb.tolist()},
},
if self.index_type == "hnsw":
query = {
"knn": {
self.embedding_field: {
"vector": query_emb.tolist(),
"k": 16,
"ef": self.ef_construction,
}
}
}
}
else:
query = {
"script_score": {
"query": script_score_query,
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": "bpack_knn_script",
"lang": "knn",
"params": {"space": self.similarity, "field": "embedding", "vector": query_emb.tolist()},
},
}
}

return query

def _create_label_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
Expand Down Expand Up @@ -2368,8 +2400,8 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
mapping["mappings"]["properties"].update({field: {"type": "text"}})

if self.embedding_field:
mapping["settings"]["number_of_shards"] = 1
mapping["settings"]["number_of_replicas"] = 2
mapping["settings"]["number_of_shards"] = self.number_of_shards
mapping["settings"]["number_of_replicas"] = self.number_of_replicas
if self.index_type == "hnsw":
mapping["mappings"]["properties"][self.embedding_field] = {
"type": self.vector_type,
Expand All @@ -2378,12 +2410,11 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
"space_type": self.space_type,
"parameters": {"ef_construction": self.ef_construction, "m": self.m},
}

else:
mapping["mappings"]["properties"][self.embedding_field] = {
"type": self.vector_type,
"dims": self.embedding_dim,
}
else:
mapping["mappings"]["properties"][self.embedding_field] = {
"type": self.vector_type,
"dims": self.embedding_dim,
}

if self.index_type == "hnsw":
mapping["settings"]["index"] = {"knn": True}
Expand Down
32 changes: 2 additions & 30 deletions pipelines/pipelines/nodes/file_converter/txt.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,36 +59,8 @@ def convert(

with open(file_path, encoding=encoding, errors="ignore") as f:
text = f.read()
pages = text.split("\n\n")

cleaned_pages = []
for page in pages:
lines = page.splitlines()
cleaned_lines = []
for line in lines:
words = line.split()
digits = [word for word in words if any(i.isdigit() for i in word)]

# remove lines having > 40% of words as digits AND not ending with a period(.)
if remove_numeric_tables:
if words and len(digits) / len(words) > 0.4 and not line.strip().endswith("."):
logger.debug(f"Removing line '{line}' from {file_path}")
continue

cleaned_lines.append(line)

page = "\n".join(cleaned_lines)
cleaned_pages.append(page)

if valid_languages:
document_text = "".join(cleaned_pages)
if not self.validate_language(document_text, valid_languages):
logger.warning(
f"The language for {file_path} is not one of {valid_languages}. The file may not have "
f"been decoded in the correct text format."
)
documents = []
for page in cleaned_pages:
document = {"content": page, "content_type": "text", "meta": meta}
documents.append(document)
document = {"content": text, "content_type": "text", "meta": meta}
documents.append(document)
return documents
12 changes: 6 additions & 6 deletions pipelines/pipelines/nodes/retriever/parallel_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from pipelines.document_stores import BaseDocumentStore
from pipelines.nodes.retriever.base import BaseRetriever
from pipelines.schema import ContentTypes, Document
from pipelines.utils.common_utils import initialize_device_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,10 +210,6 @@ def __init__(
progress_bar=progress_bar,
)

self.devices, _ = initialize_device_settings(use_cuda=use_gpu, multi_gpu=True)
if batch_size < len(self.devices):
logger.warning("Batch size is less than the number of devices. All gpus will not be utilized.")

self.document_store = document_store
self.batch_size = batch_size
self.progress_bar = progress_bar
Expand All @@ -223,6 +218,7 @@ def __init__(
self.mode = mode
self.url = url
self.num_process = num_process
self.model_name = kwargs.get("model_name", "m3e")

if document_store is None:
logger.warning("DensePassageRetriever initialized without a document store. ")
Expand Down Expand Up @@ -325,7 +321,11 @@ def retrieve_batch(
def run_indexing(self, documents: List[dict], **kwargs):
time1 = time.time()
documents_list = embeddings_multi_doc(
documents, batch_size=self.batch_size, num_process=self.num_process, url=self.url
documents,
batch_size=self.batch_size,
num_process=self.num_process,
url=self.url,
model_name=self.model_name,
)
documents = []
for i in documents_list:
Expand Down
8 changes: 3 additions & 5 deletions pipelines/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ pdfplumber
faiss-cpu>=1.7.2
opencv-python>=4.4
opencv-contrib-python-headless
python-multipart
click==8.0
fastapi
uvicorn
markdown
numba
pymilvus>=2.1
wordcloud==1.8.2.2
Expand All @@ -28,4 +24,6 @@ events
sseclient-py==1.7.2
spacy
tritonclient[all]
typing_extensions==4.5.0
typing_extensions==4.5.0
aistudio_sdk
markdown

0 comments on commit 5eb2eae

Please sign in to comment.