Skip to content

Commit

Permalink
add status to agent, remove scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
sjorsdev committed May 1, 2024
1 parent c7b5ff6 commit bc0780b
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 74 deletions.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[versions]
kotlin = "1.9.23"
kotlinx-datetime = "0.6.0-RC.2"
coroutines = "1.7.2"
serialization = "1.6.3"
ktor = "2.3.10"
Expand All @@ -11,6 +12,7 @@ mockk = "1.13.10"
[libraries]
kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlin" }
kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" }
kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version.ref = "kotlinx-datetime" }
# Coroutines
coroutines-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutines" }
coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" }
Expand Down
1 change: 1 addition & 0 deletions src/core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ kotlin {
dependencies {
api(libs.coroutines.core)
implementation(libs.serialization.json)
implementation(libs.kotlinx.datetime)
}
}
val commonTest by getting {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import community.flock.aigentic.core.model.Model
import community.flock.aigentic.core.tool.InternalTool
import community.flock.aigentic.core.tool.Tool
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

data class Task(
val description: String,
Expand All @@ -20,6 +23,21 @@ sealed interface Context {
data class Image(val base64: String) : Context
}

enum class AgentRunningState(val value: String) {
WAITING_TO_START("WAITING_TO_START"),
RUNNING("RUNNING"),
EXECUTING_TOOL("EXECUTING_TOOL"),
WAITING_ON_APPROVAL("WAITING_ON_APPROVAL"),
COMPLETED("COMPLETED"),
STUCK("STUCK"),
}

data class AgentStatus(
var runningState: AgentRunningState = AgentRunningState.WAITING_TO_START,
val startTimestamp: Instant = Clock.System.now(),
var endTimestamp: Instant? = null,
)

data class Agent(
val id: String,
val systemPromptBuilder: SystemPromptBuilder,
Expand All @@ -28,7 +46,7 @@ data class Agent(
val contexts: List<Context>,
val tools: Map<ToolName, Tool>,
) {

internal val messages = MutableSharedFlow<Message>(replay = 100)
val messages = MutableSharedFlow<Message>(replay = 100)
internal val status = MutableStateFlow(AgentStatus())
internal val internalTools = mutableMapOf<ToolName, InternalTool<*>>()
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package community.flock.aigentic.core.agent

import community.flock.aigentic.core.agent.tool.FinishReason
import community.flock.aigentic.core.agent.tool.FinishedOrStuck
import community.flock.aigentic.core.agent.tool.finishOrStuckTool
import community.flock.aigentic.core.message.*
Expand All @@ -9,28 +10,49 @@ import community.flock.aigentic.core.tool.ToolPermissionHandler
import community.flock.aigentic.core.model.ModelResponse
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow

class AgentExecutor(
val schedules: List<Schedule>,
val permissionHandler: ToolPermissionHandler = DefaultToolPermissionHandler()
) {
private val listeners: MutableList<suspend (event: Pair<String, Message>) -> Unit> = mutableListOf()

suspend fun start() =
schedules
.flatMap { it.agents }
.forEach { runAgent(it) }
import kotlinx.datetime.Clock

class AgentExecutor {
var permissionHandler: ToolPermissionHandler = DefaultToolPermissionHandler()
val agents: MutableList<Agent> = mutableListOf()
val startedAgents = MutableSharedFlow<String>(replay = 100)

suspend fun start() {
agents
.filter { it.status.value.runningState == AgentRunningState.WAITING_TO_START }
.forEach {
runAgent(it)
}
}

suspend fun runAgent(agent: Agent): FinishedOrStuck {
applyListeners(agent)
agent.setRunningState(AgentRunningState.RUNNING)
startedAgents.emit(agent.id)
agents.add(agent)

agent.initialize() // Maybe move to Agent builder?
val modelResponse = agent.sendModelRequest()

val result = CompletableDeferred<FinishedOrStuck>()
processResponse(agent, modelResponse) { result.complete(it) }
return result.await()
val resultState = result.await()

agent.updateStatus {
val endRunningState = if (resultState.reason is FinishReason.ImStuck) {
AgentRunningState.STUCK
} else {
AgentRunningState.COMPLETED
}
it.copy(
runningState = endRunningState,
endTimestamp = Clock.System.now()
)
}
return resultState
}

fun getAgent(agentId: String?): Agent = agents.first { it.id == agentId }

private suspend fun Agent.initialize() {
internalTools[finishOrStuckTool.name] = finishOrStuckTool
messages.emit(systemPromptBuilder.buildSystemPrompt(this))
Expand Down Expand Up @@ -79,10 +101,16 @@ class AgentExecutor(
val functionArgs = toolCall.argumentsAsJson()
val tool = tools[ToolName(toolCall.name)] ?: error("Tool not registered: $toolCall")
while (!permissionHandler.hasPermission(tool.toolConfiguration, toolCall)) {
setRunningState(AgentRunningState.WAITING_ON_APPROVAL)
println("Waiting for permission for ${toolCall.name}")
delay(300)
}
if (status.value.runningState == AgentRunningState.WAITING_ON_APPROVAL) {
setRunningState(AgentRunningState.RUNNING)
}
setRunningState(AgentRunningState.EXECUTING_TOOL)
val result = tool.handler(functionArgs)
setRunningState(AgentRunningState.RUNNING)
return Message.ToolResult(toolCall.id, toolCall.name, ToolResultContent(result))
}

Expand All @@ -94,34 +122,17 @@ class AgentExecutor(
private suspend fun Agent.sendModelRequest(): ModelResponse =
model.sendRequest(messages.replayCache, tools.values.toList() + internalTools.values.toList())

fun getMessages(): Map<String, MutableSharedFlow<Message>> =
schedules.flatMap { it.agents }.associate { it.id to it.messages }

fun addListener(function: suspend (event: Pair<String, Message>) -> Unit) {
listeners.add(function)
}

@OptIn(DelicateCoroutinesApi::class)
private fun applyListeners(agent: Agent) {
listeners.forEach { function ->
GlobalScope.launch {
agent.messages.collect { function.invoke(Pair(agent.id, it)) }
}
}
fun loadAgents(agents: MutableList<Agent>) {
this.agents.addAll(agents)
}
}

class Schedule(
val agents: List<Agent>,
val type: ScheduleType
)

sealed interface ScheduleType {
/**
* Just run a single time
*/
data object Single : ScheduleType
suspend fun Agent.updateStatus(update: (currentStatus: AgentStatus) -> AgentStatus) {
update.invoke(status.value).let {
status.emit(it)
}
}


//suspend fun Agent.execute() = runAgent(this)
suspend fun Agent.setRunningState(state: AgentRunningState) {
updateStatus { status.value.copy(runningState = state) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package community.flock.aigentic.core.dsl

import community.flock.aigentic.core.agent.Agent
import community.flock.aigentic.core.agent.AgentExecutor
import community.flock.aigentic.core.agent.Schedule
import community.flock.aigentic.core.agent.ScheduleType
import community.flock.aigentic.core.tool.DefaultToolPermissionHandler
import community.flock.aigentic.core.tool.ToolPermissionHandler

Expand All @@ -14,35 +12,19 @@ fun agentExecutor(agentExecutorBuilder: AgentExecutorConfig.() -> Unit): AgentEx
@AgentDSL
class AgentExecutorConfig : Config<AgentExecutor> {

private val schedules: MutableList<Schedule> = mutableListOf()
private val agents: MutableList<Agent> = mutableListOf()
private var permissionHandler: ToolPermissionHandler = DefaultToolPermissionHandler()

fun AgentExecutorConfig.schedule(scheduleType: ScheduleType, scheduleConfig: ScheduleConfig.() -> Unit) {
ScheduleConfig(scheduleType).apply(scheduleConfig).build().also {
this.schedules.add(it)
}
fun AgentExecutorConfig.addAgent(agent: Agent) {
agents.add(agent)
}

fun AgentExecutorConfig.toolPermissionHandler(toolPermissionHandler: ToolPermissionHandler) {
this.permissionHandler = toolPermissionHandler
}

override fun build(): AgentExecutor =
AgentExecutor(schedules, permissionHandler)
}

@AgentDSL
class ScheduleConfig(
private val scheduleType: ScheduleType
) : Config<Schedule> {

private val agents = mutableListOf<Agent>()

fun addAgent(agent: Agent) {
agents.add(agent)
override fun build(): AgentExecutor = AgentExecutor().also {
it.permissionHandler = this.permissionHandler
it.loadAgents(agents)
}


override fun build(): Schedule =
Schedule(agents, scheduleType)
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package community.flock.aigentic.example

import community.flock.aigentic.core.agent.AgentExecutor
import community.flock.aigentic.core.agent.ScheduleType
import community.flock.aigentic.core.agent.events.toEvents
import community.flock.aigentic.core.dsl.agent
import community.flock.aigentic.core.dsl.agentExecutor
import community.flock.aigentic.core.tool.*
import community.flock.aigentic.core.tool.ParameterType.Primitive
import community.flock.aigentic.dsl.openAIModel
import community.flock.aigentic.model.OpenAIModelIdentifier
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonObject

suspend fun runAdministrativeAgentExample(openAIAPIKey: String) {
Expand All @@ -28,18 +31,21 @@ suspend fun runAdministrativeAgentExample(openAIAPIKey: String) {
}

val executor = agentExecutor {
schedule(ScheduleType.Single) {
addAgent(agent)
}
}.also { logEvents(it) }

executor.start()
}

fun logEvents(it: AgentExecutor) {
it.addListener { (agentId, message) ->
message.toEvents().forEach {
println("[$agentId] - ${it.text}")
@OptIn(DelicateCoroutinesApi::class)
fun logEvents(executor: AgentExecutor) {
GlobalScope.launch {
executor.startedAgents.collect { agentId ->
executor.getAgent(agentId).messages.map { it.toEvents() }.collect {
it.forEach {
println("[$agentId] - ${it.text}")
}
}
}
}
}
Expand Down

0 comments on commit bc0780b

Please sign in to comment.