Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added OpenAI function calls #427

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: ruff
run: ruff sgpt tests scripts
- name: mypy
run: mypy sgpt
run: mypy sgpt --exclude function.py --exclude handler.py --exclude default_functions
- name: unittests
run: |
export OPENAI_API_KEY=test_api_key
Expand Down
129 changes: 100 additions & 29 deletions README.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"rich >= 13.1.0, < 14.0.0",
"distro >= 1.8.0, < 2.0.0",
"openai >= 1.6.1, < 2.0.0",
"instructor >= 0.4.5, < 1.0.0",
'pyreadline3 >= 3.4.1, < 4.0.0; sys_platform == "win32"',
]

Expand Down Expand Up @@ -81,6 +82,7 @@ skip = "__init__.py"

[tool.mypy]
strict = true
exclude = ["function.py", "handler.py", "default_functions"]

[tool.ruff]
select = [
Expand Down
2 changes: 1 addition & 1 deletion sgpt/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.1"
__version__ = "1.1.0"
25 changes: 25 additions & 0 deletions sgpt/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from click.types import Choice

from sgpt.config import cfg
from sgpt.default_functions.init_functions import install_functions as inst_funcs
from sgpt.function import get_openai_schemas
from sgpt.handlers.chat_handler import ChatHandler
from sgpt.handlers.default_handler import DefaultHandler
from sgpt.handlers.repl_handler import ReplHandler
Expand Down Expand Up @@ -57,9 +59,16 @@ def main(
),
code: bool = typer.Option(
False,
"--code",
"-c",
help="Generate only code.",
rich_help_panel="Assistance Options",
),
functions: bool = typer.Option(
cfg.get("OPENAI_USE_FUNCTIONS") == "true",
help="Allow function calls.",
rich_help_panel="Assistance Options",
),
editor: bool = typer.Option(
False,
help="Open $EDITOR to provide a prompt.",
Expand Down Expand Up @@ -92,6 +101,8 @@ def main(
),
list_chats: bool = typer.Option(
False,
"--list-chats",
"-lc",
help="List all existing chat ids.",
callback=ChatHandler.list_ids,
rich_help_panel="Chat Options",
Expand All @@ -115,6 +126,8 @@ def main(
),
list_roles: bool = typer.Option(
False,
"--list-roles",
"-lr",
help="List roles.",
callback=SystemRole.list,
rich_help_panel="Role Options",
Expand All @@ -125,6 +138,12 @@ def main(
callback=install_shell_integration,
hidden=True, # Hiding since should be used only once.
),
install_functions: bool = typer.Option(
False,
help="Install default functions.",
callback=inst_funcs,
hidden=True, # Hiding since should be used only once.
),
) -> None:
stdin_passed = not sys.stdin.isatty()

Expand Down Expand Up @@ -154,6 +173,8 @@ def main(
else SystemRole.get(role)
)

function_schemas = (get_openai_schemas() or None) if functions else None

if repl:
# Will be in infinite loop here until user exits with Ctrl+C.
ReplHandler(repl, role_class).handle(
Expand All @@ -163,6 +184,7 @@ def main(
top_p=top_probability,
chat_id=repl,
caching=cache,
functions=function_schemas,
)

if chat:
Expand All @@ -173,6 +195,7 @@ def main(
top_p=top_probability,
chat_id=chat,
caching=cache,
functions=function_schemas,
)
else:
full_completion = DefaultHandler(role_class).handle(
Expand All @@ -181,6 +204,7 @@ def main(
temperature=temperature,
top_p=top_probability,
caching=cache,
functions=function_schemas,
)

while shell and not stdin_passed:
Expand All @@ -201,6 +225,7 @@ def main(
temperature=temperature,
top_p=top_probability,
caching=cache,
functions=function_schemas,
)
continue
break
Expand Down
3 changes: 2 additions & 1 deletion sgpt/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def wrapper(*args: Any, **kwargs: Any) -> Generator[str, None, None]:
for i in func(*args, **kwargs):
result += i
yield i
cache_file.write_text(result)
if "@FunctionCall" not in result:
cache_file.write_text(result)
self._delete_oldest_files(self.length) # type: ignore

return wrapper
Expand Down
4 changes: 4 additions & 0 deletions sgpt/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
SHELL_GPT_CONFIG_FOLDER = Path(CONFIG_FOLDER) / "shell_gpt"
SHELL_GPT_CONFIG_PATH = SHELL_GPT_CONFIG_FOLDER / ".sgptrc"
ROLE_STORAGE_PATH = SHELL_GPT_CONFIG_FOLDER / "roles"
FUNCTIONS_PATH = SHELL_GPT_CONFIG_FOLDER / "functions"
CHAT_CACHE_PATH = Path(gettempdir()) / "chat_cache"
CACHE_PATH = Path(gettempdir()) / "cache"

Expand All @@ -28,6 +29,9 @@
"DEFAULT_EXECUTE_SHELL_CMD": os.getenv("DEFAULT_EXECUTE_SHELL_CMD", "false"),
"DISABLE_STREAMING": os.getenv("DISABLE_STREAMING", "false"),
"CODE_THEME": os.getenv("CODE_THEME", "dracula"),
"OPENAI_FUNCTIONS_PATH": os.getenv("OPENAI_FUNCTIONS_PATH", str(FUNCTIONS_PATH)),
"OPENAI_USE_FUNCTIONS": os.getenv("OPENAI_USE_FUNCTIONS", "true"),
"SHOW_FUNCTIONS_OUTPUT": os.getenv("SHOW_FUNCTIONS_OUTPUT", "false"),
# New features might add their own config variables here.
}

Expand Down
Empty file.
28 changes: 28 additions & 0 deletions sgpt/default_functions/common/execute_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import subprocess

from instructor import OpenAISchema
from pydantic import Field


class Function(OpenAISchema):
"""
Executes a shell command and returns the output (result).
"""

shell_command: str = Field(
...,
example="ls -la",
descriptions="Shell command to execute.",
)

class Config:
title = "execute_shell_command"

@classmethod
def execute(cls, shell_command: str) -> str:
process = subprocess.Popen(
shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output, _ = process.communicate()
exit_code = process.returncode
return f"Exit code: {exit_code}, Output:\n{output.decode()}"
35 changes: 35 additions & 0 deletions sgpt/default_functions/init_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import platform
import shutil
from pathlib import Path
from typing import Any

from ..config import cfg
from ..utils import option_callback

FUNCTIONS_FOLDER = Path(cfg.get("OPENAI_FUNCTIONS_PATH"))


@option_callback
def install_functions(*_args: Any) -> None:
current_folder = os.path.dirname(os.path.abspath(__file__))
common_folder = Path(current_folder + "/common")
common_files = [Path(path) for path in common_folder.glob("*.py")]
print("Installing default functions...")

for file in common_files:
print(f"Installed {FUNCTIONS_FOLDER}/{file.name}")
shutil.copy(file, FUNCTIONS_FOLDER, follow_symlinks=True)

current_platform = platform.system()
if current_platform == "Linux":
print("Installing Linux functions...")
if current_platform == "Windows":
print("Installing Windows functions...")
if current_platform == "Darwin":
print("Installing Mac functions...")
mac_folder = Path(current_folder + "/mac")
mac_files = [Path(path) for path in mac_folder.glob("*.py")]
for file in mac_files:
print(f"Installed {FUNCTIONS_FOLDER}/{file.name}")
shutil.copy(file, FUNCTIONS_FOLDER, follow_symlinks=True)
33 changes: 33 additions & 0 deletions sgpt/default_functions/mac/apple_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import subprocess

from instructor import OpenAISchema
from pydantic import Field


class Function(OpenAISchema):
"""
Executes Apple Script on macOS and returns the output (result).
Can be used for actions like: draft (prepare) an email, show calendar events, create a note.
"""

apple_script: str = Field(
...,
example='tell application "Finder" to get the name of every disk',
descriptions="Apple Script to execute.",
)

class Config:
title = "execute_apple_script"

@classmethod
def execute(cls, apple_script):
script_command = ["osascript", "-e", apple_script]
try:
process = subprocess.Popen(
script_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
output, _ = process.communicate()
output = output.decode("utf-8").strip()
return f"Output: {output}"
except Exception as e:
return f"Error: {e}"
62 changes: 62 additions & 0 deletions sgpt/function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import importlib.util
import sys
from abc import ABCMeta
from pathlib import Path
from typing import Any, Callable

from .config import cfg


class Function:
def __init__(self, path: str):
module = self._read(path)
self._function = module.Function.execute
self._openai_schema = module.Function.openai_schema
self._name = self._openai_schema["name"]

@property
def name(self) -> str:
return self._name

@property
def openai_schema(self) -> dict[str, Any]:
return self._openai_schema

@property
def execute(self) -> Callable[..., str]:
return self._function

@classmethod
def _read(cls, path: str) -> Any:
module_name = path.replace("/", ".").rstrip(".py")
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)

if not isinstance(module.Function, ABCMeta):
raise TypeError(
f"Function {module_name} must be a subclass of pydantic.BaseModel"
)
if not hasattr(module.Function, "execute"):
raise TypeError(
f"Function {module_name} must have a 'execute' static method"
)

return module


functions_folder = Path(cfg.get("OPENAI_FUNCTIONS_PATH"))
functions_folder.mkdir(parents=True, exist_ok=True)
functions = [Function(str(path)) for path in functions_folder.glob("*.py")]


def get_function(name: str) -> Callable[..., Any]:
for function in functions:
if function.name == name:
return function.execute
raise ValueError(f"Function {name} not found")


def get_openai_schemas() -> [dict[str, Any]]:
return [function.openai_schema for function in functions]
Loading
Loading