Skip to content

Commit

Permalink
Merge pull request #416 from devchat-ai/optimize_devchat_error_handler
Browse files Browse the repository at this point in the history
Improve OpenAI API Integration: Refactor and Error Handling Enhancements
  • Loading branch information
kagami-l authored Sep 23, 2024
2 parents ef59342 + 3fbbfd7 commit ea3ba91
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 38 deletions.
115 changes: 91 additions & 24 deletions devchat/llm/openai.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""
openai api utils
"""

# flake8: noqa: E402
# Import necessary libraries
import json
import os
import re
Expand All @@ -10,68 +15,82 @@
from devchat.ide import IDEService

from .pipeline import (
RetryException,
exception_handle,
exception_output_handle,
parallel,
pipeline,
retry,
RetryException, # Import RetryException class
exception_handle, # Function to handle exceptions
parallel, # Function to run tasks in parallel
pipeline, # Function to create a pipeline of tasks
retry, # Function to retry a task
)


def _try_remove_markdown_block_flag(content):
"""
如果content是一个markdown块,则删除它的头部```xxx和尾部```
If the content is a markdown block, this function removes the header ```xxx and footer ```
"""
# 定义正则表达式模式,用于匹配markdown块的头部和尾部
# Define a regex pattern to match the header and footer of a markdown block
pattern = r"^\s*```\s*(\w+)\s*\n(.*?)\n\s*```\s*$"

# 使用re模块进行匹配
# Use the re module to match the pattern
match = re.search(pattern, content, re.DOTALL | re.MULTILINE)

if match:
# 如果匹配成功,则提取出markdown块的内容并返回
# If a match is found, extract the content of the markdown block and return it
_ = match.group(1) # language
markdown_content = match.group(2)
return markdown_content.strip()
# 如果匹配失败,则返回原始内容
# If no match is found, return the original content
return content


def chat_completion_stream_commit(
messages: List[Dict], # [{"role": "user", "content": "hello"}]
llm_config: Dict, # {"model": "...", ...}
):
"""
This function is used to commit chat completion stream
"""
proxy_url = os.environ.get("DEVCHAT_PROXY", "")
proxy_setting = {"proxy": {"https://": proxy_url, "http://": proxy_url}} if proxy_url else {}

# Initialize OpenAI client with API key, base URL and http client
client = openai.OpenAI(
api_key=os.environ.get("OPENAI_API_KEY", None),
base_url=os.environ.get("OPENAI_API_BASE", None),
http_client=httpx.Client(**proxy_setting, trust_env=False),
)

# Update llm_config dictionary
llm_config["stream"] = True
llm_config["timeout"] = 60
# Return chat completions
return client.chat.completions.create(messages=messages, **llm_config)


def chat_completion_stream_raw(**kwargs):
"""
This function is used to get raw chat completion stream
"""
proxy_url = os.environ.get("DEVCHAT_PROXY", "")
proxy_setting = {"proxy": {"https://": proxy_url, "http://": proxy_url}} if proxy_url else {}

# Initialize OpenAI client with API key, base URL and http client
client = openai.OpenAI(
api_key=os.environ.get("OPENAI_API_KEY", None),
base_url=os.environ.get("OPENAI_API_BASE", None),
http_client=httpx.Client(**proxy_setting, trust_env=False),
)

# Update kwargs dictionary
kwargs["stream"] = True
kwargs["timeout"] = 60
# Return chat completions
return client.chat.completions.create(**kwargs)


def stream_out_chunk(chunks):
"""
This function is used to print out chunks of data
"""
for chunk in chunks:
chunk_dict = chunk.dict()
if len(chunk_dict["choices"]) > 0:
Expand All @@ -82,6 +101,9 @@ def stream_out_chunk(chunks):


def retry_timeout(chunks):
"""
This function is used to handle timeout errors
"""
try:
for chunk in chunks:
yield chunk
Expand All @@ -91,10 +113,16 @@ def retry_timeout(chunks):


def chunk_list(chunks):
"""
This function is used to convert chunks into a list
"""
return [chunk for chunk in chunks]


def chunks_content(chunks):
"""
This function is used to extract content from chunks
"""
content = None
for chunk in chunks:
chunk_dict = chunk.dict()
Expand All @@ -108,6 +136,10 @@ def chunks_content(chunks):


def chunks_call(chunks):
"""
This function is used to extract tool
calls from chunks
"""
tool_calls = []

for chunk in chunks:
Expand All @@ -128,19 +160,26 @@ def chunks_call(chunks):


def content_to_json(content):
"""
This function is used to convert content to JSON
"""
try:
content_no_block = _try_remove_markdown_block_flag(content)
response_obj = json.loads(content_no_block)
response_obj = json.loads(content_no_block, strict=False)
return response_obj
except json.JSONDecodeError as err:
IDEService().ide_logging("info", f"in content_to_json: json decode error: {err}")
IDEService().ide_logging("debug", f"Receive content: {content}")
IDEService().ide_logging("debug", f"in content_to_json: json decode error: {err}")
raise RetryException(err) from err
except Exception as err:
IDEService().ide_logging("info", f"in content_to_json: other error: {err}")
IDEService().ide_logging("debug", f"in content_to_json: other error: {err}")
raise err


def to_dict_content_and_call(content, tool_calls=None):
"""
This function is used to convert content and tool calls to a dictionary
"""
if tool_calls is None:
tool_calls = []
return {
Expand All @@ -151,36 +190,63 @@ def to_dict_content_and_call(content, tool_calls=None):
}


# Define a pipeline function for chat completion content.
# This pipeline first commits a chat completion stream, handles any timeout errors,
# and then extracts the content from the chunks.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
chat_completion_content = retry(
pipeline(chat_completion_stream_commit, retry_timeout, chunks_content), times=3
)

# Define a pipeline function for chat completion stream content.
# This pipeline first commits a chat completion stream, handles any timeout errors,
# streams out the chunk, and then extracts the content from the chunks.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
chat_completion_stream_content = retry(
pipeline(chat_completion_stream_commit, retry_timeout, stream_out_chunk, chunks_content),
times=3,
)

# Define a pipeline function for chat completion call.
# This pipeline first commits a chat completion stream, handles any timeout errors,
# and then extracts the tool calls from the chunks.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
chat_completion_call = retry(
pipeline(chat_completion_stream_commit, retry_timeout, chunks_call), times=3
)

# Define a pipeline function for chat completion without streaming and return a JSON object.
# This pipeline first commits a chat completion stream, handles any timeout errors, extracts
# the content from the chunks and then converts the content to JSON.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
# If a JSONDecodeError is encountered during the content to JSON conversion, it will log the
# error and retry the pipeline.
# If any other exception is encountered, it will log the error and raise it.
chat_completion_no_stream_return_json_with_retry = exception_handle(
retry(
pipeline(chat_completion_stream_commit, retry_timeout, chunks_content, content_to_json),
times=3,
),
exception_output_handle(lambda err: None),
None,
)


def chat_completion_no_stream_return_json(messages: List[Dict], llm_config: Dict):
"""call llm without stream, return json object"""
"""
This function is used to get chat completion without streaming and return JSON object
"""
llm_config["response_format"] = {"type": "json_object"}
return chat_completion_no_stream_return_json_with_retry(
messages=messages, llm_config=llm_config
)


# Define a pipeline function for chat completion stream.
# This pipeline first commits a chat completion stream, handles any timeout errors,
# extracts the content from the chunks, and then converts the content and tool calls
# to a dictionary.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
# If an exception is encountered, it will return a dictionary with None values and the error.
chat_completion_stream = exception_handle(
retry(
pipeline(
Expand All @@ -191,9 +257,16 @@ def chat_completion_no_stream_return_json(messages: List[Dict], llm_config: Dict
),
times=3,
),
lambda err: {"content": None, "function_name": None, "parameters": "", "error": err},
None,
)

# Define a pipeline function for chat call completion stream.
# This pipeline first commits a chat completion stream, handles any timeout errors,
# converts the chunks to a list, extracts the content and tool calls from the chunks
# in parallel, and then converts the content and tool calls to a dictionary.
# If any step in the pipeline fails, it will retry the entire pipeline up to 3 times.
# If an exception is encountered, it will return a dictionary with None values, an empty
# tool calls list, and the error.
chat_call_completion_stream = exception_handle(
retry(
pipeline(
Expand All @@ -205,11 +278,5 @@ def chat_completion_no_stream_return_json(messages: List[Dict], llm_config: Dict
),
times=3,
),
lambda err: {
"content": None,
"function_name": None,
"parameters": "",
"tool_calls": [],
"error": err,
},
None,
)
Loading

0 comments on commit ea3ba91

Please sign in to comment.