Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding async support for tools #735

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/crewai/tools/tool_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from difflib import SequenceMatcher
from textwrap import dedent
from typing import Any, List, Union
import asyncio

from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
Expand Down Expand Up @@ -123,6 +124,17 @@ def _use(
tool=calling.tool_name, input=calling.arguments
)

tool_method = tool.func or tool.coroutine
is_async_tool = asyncio.iscoroutinefunction(tool_method)

if is_async_tool:
if tool.func:
# async tool defined using BaseTool class from crewai_tools
async_tool_run = tool._run
elif tool.coroutine:
# async tool defined using @tool decorator from langchain
async_tool_run = tool._arun
Comment on lines +130 to +136
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently when we define an async function using @tool decorator or BaseTools, they get created differently.

For @tool decorator - if the function is a coroutine, func=None and the function gets added to the coroutine attribute.

However, for BaseTools, thats not the case, it remains part of the func attribute. To absorb this complexity from the user's perspective, we do it this way so that users can still define their async tools using _run when using BaseTools. There is no change from the user's perspective other than just adding a async while defining the tool.


if not result:
try:
if calling.tool_name in [
Expand All @@ -139,16 +151,28 @@ def _use(
for k, v in calling.arguments.items()
if k in acceptable_args
}
result = tool._run(**arguments)
if is_async_tool:
result = asyncio.run(async_tool_run(**arguments))
else:
result = tool._run(**arguments)
except Exception:
if tool.args_schema:
arguments = calling.arguments
result = tool._run(**arguments)
if is_async_tool:
result = asyncio.run(async_tool_run(**arguments))
else:
result = tool._run(**arguments)
else:
arguments = calling.arguments.values() # type: ignore # Incompatible types in assignment (expression has type "dict_values[str, Any]", variable has type "dict[str, Any]")
result = tool._run(*arguments)
if is_async_tool:
result = asyncio.run(async_tool_run(*arguments))
else:
result = tool._run(*arguments)
else:
result = tool._run()
if is_async_tool:
result = asyncio.run(async_tool_run())
else:
result = tool._run()
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
Expand Down
75 changes: 75 additions & 0 deletions tests/crew_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,81 @@ def test_manager_agent():
execute.assert_called()


def test_crew_async_tool_execution_langchain():
from langchain.tools import tool
from langchain_openai import ChatOpenAI
import asyncio

@tool
async def async_search(query: str) -> str:
"""Useful to get the answer to a user query."""
await asyncio.sleep(1)
return "The capital of France is Paris."

agent = Agent(
role="Research",
goal="Research the user query and provide a brief and concise response. If you need more information, ask the user for it.",
backstory=(
"You are a virtual concierge specialized in research. Respond to our first-class users with the latest information."
),
tools=[async_search],
llm=ChatOpenAI(temperature=0, model="gpt-4"),
)
task_description = "Find the capital of France"
expected_output = "A response to the user query."

async_task = Task(
description=task_description, expected_output=expected_output, agent=agent
)

crew = Crew(agents=[agent], tasks=[async_task])

result = crew.kickoff()
assert result == "The capital of France is Paris."


def test_crew_async_tool_execution():
from langchain.tools import tool
from langchain_openai import ChatOpenAI
import asyncio
import time
from crewai_tools import BaseTool

class AsyncSearch(BaseTool):
def __init__(self):
super().__init__(
name="AsyncSearch",
description="Performs searches based on query",
)

async def _run(self, query: str):
await asyncio.sleep(1)
return "The capital of France is Paris."

async_search = AsyncSearch()

agent = Agent(
role="Research",
goal="Research the user query and provide a brief and concise response. If you need more information, ask the user for it.",
backstory=(
"You are a virtual concierge specialized in research. Respond to our first-class users with the latest information."
),
tools=[async_search],
llm=ChatOpenAI(temperature=0, model="gpt-4"),
)
task_description = "Find the capital of France"
expected_output = "A response to the user query."

async_task = Task(
description=task_description, expected_output=expected_output, agent=agent
)

crew = Crew(agents=[agent], tasks=[async_task])

result = crew.kickoff()
assert result == "The capital of France is Paris."


def test_manager_agent_in_agents_raises_exception():
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
Expand Down