From aa53f8bc48c7cc749134647ad4a82ede07f2a6a9 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:52:53 -0800 Subject: [PATCH 1/6] Initial Agent API for EA --- examples/agent/websocket/simple/README.md | 20 + examples/agent/websocket/simple/main.go | 424 ++++++++++++++++ pkg/api/agent/v1/websocket/chan_default.go | 466 ++++++++++++++++++ pkg/api/agent/v1/websocket/chan_router.go | 404 +++++++++++++++ pkg/api/agent/v1/websocket/constants.go | 19 + .../v1/websocket/interfaces/constants.go | 39 ++ .../v1/websocket/interfaces/interfaces.go | 26 + .../agent/v1/websocket/interfaces/types.go | 134 +++++ pkg/api/agent/v1/websocket/types.go | 53 ++ pkg/api/live/v1/deprecated.go | 41 -- pkg/api/prerecorded/v1/deprecated.go | 29 -- pkg/api/version/agent-version.go | 32 ++ pkg/api/version/constants.go | 4 + pkg/api/version/version.go | 10 +- pkg/client/agent/client.go | 93 ++++ pkg/client/agent/init.go | 44 ++ .../agent/v1/websocket/client_channel.go | 312 ++++++++++++ pkg/client/agent/v1/websocket/constants.go | 32 ++ .../agent/v1/websocket/new_using_chan.go | 109 ++++ pkg/client/agent/v1/websocket/types.go | 35 ++ pkg/client/common/v1/websocket.go | 1 + pkg/client/interfaces/interfaces.go | 7 + pkg/client/interfaces/v1/constants.go | 4 + pkg/client/interfaces/v1/options.go | 37 ++ pkg/client/interfaces/v1/types-agent.go | 84 ++++ pkg/client/listen/deprecated.go | 123 ----- pkg/common/constants.go | 3 + 27 files changed, 2391 insertions(+), 194 deletions(-) create mode 100644 examples/agent/websocket/simple/README.md create mode 100644 examples/agent/websocket/simple/main.go create mode 100644 pkg/api/agent/v1/websocket/chan_default.go create mode 100644 pkg/api/agent/v1/websocket/chan_router.go create mode 100644 pkg/api/agent/v1/websocket/constants.go create mode 100644 pkg/api/agent/v1/websocket/interfaces/constants.go create mode 100644 pkg/api/agent/v1/websocket/interfaces/interfaces.go create mode 100644 pkg/api/agent/v1/websocket/interfaces/types.go create mode 100644 pkg/api/agent/v1/websocket/types.go delete mode 100644 pkg/api/live/v1/deprecated.go delete mode 100644 pkg/api/prerecorded/v1/deprecated.go create mode 100644 pkg/api/version/agent-version.go create mode 100644 pkg/client/agent/client.go create mode 100644 pkg/client/agent/init.go create mode 100644 pkg/client/agent/v1/websocket/client_channel.go create mode 100644 pkg/client/agent/v1/websocket/constants.go create mode 100644 pkg/client/agent/v1/websocket/new_using_chan.go create mode 100644 pkg/client/agent/v1/websocket/types.go create mode 100644 pkg/client/interfaces/v1/types-agent.go delete mode 100644 pkg/client/listen/deprecated.go diff --git a/examples/agent/websocket/simple/README.md b/examples/agent/websocket/simple/README.md new file mode 100644 index 00000000..9b18df3f --- /dev/null +++ b/examples/agent/websocket/simple/README.md @@ -0,0 +1,20 @@ +# Agent API (Real-Time) Example + +This example uses the Microphone as input in order to detect conversation insights in what is being said. This example required additional components (for the microphone) to be installed in order for this example to function correctly. + +## Configuration + +The SDK (and this example) needs to be initialized with your account's credentials `DEEPGRAM_API_KEY`, which are available in your [Deepgram Console][dg-console]. If you don't have a Deepgram account, you can [sign up here][dg-signup] for free. + +You must add your `DEEPGRAM_API_KEY` to your list of environment variables. We use environment variables because they are easy to configure, support PaaS-style deployments, and work well in containerized environments like Docker and Kubernetes. + +```sh +export DEEPGRAM_API_KEY=YOUR-APP-KEY-HERE +``` + +## Installation + +The Live API (Real-Time) example makes use of a [microphone package](https://github.com/deepgram/deepgram-go-sdk/tree/main/pkg/audio/microphone) contained within the repository. That package makes use of the [PortAudio library](http://www.portaudio.com/) which is a cross-platform open source audio library. If you are on Linux, you can install this library using whatever package manager is available (yum, apt, etc.) on your operating system. If you are on macOS, you can install this library using [brew](https://brew.sh/). + +[dg-console]: https://console.deepgram.com/ +[dg-signup]: https://console.deepgram.com/signup diff --git a/examples/agent/websocket/simple/main.go b/examples/agent/websocket/simple/main.go new file mode 100644 index 00000000..c79b9f50 --- /dev/null +++ b/examples/agent/websocket/simple/main.go @@ -0,0 +1,424 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package main + +// streaming +import ( + "bufio" + "context" + "fmt" + "os" + "sync" + "time" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" + microphone "github.com/deepgram/deepgram-go-sdk/pkg/audio/microphone" + client "github.com/deepgram/deepgram-go-sdk/pkg/client/agent" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +type MyHandler struct { + binaryChan chan *[]byte + openChan chan *msginterfaces.OpenResponse + welcomeResponse chan *msginterfaces.WelcomeResponse + conversationTextResponse chan *msginterfaces.ConversationTextResponse + userStartedSpeakingResponse chan *msginterfaces.UserStartedSpeakingResponse + agentThinkingResponse chan *msginterfaces.AgentThinkingResponse + functionCallRequestResponse chan *msginterfaces.FunctionCallRequestResponse + functionCallingResponse chan *msginterfaces.FunctionCallingResponse + agentStartedSpeakingResponse chan *msginterfaces.AgentStartedSpeakingResponse + agentAudioDoneResponse chan *msginterfaces.AgentAudioDoneResponse + closeChan chan *msginterfaces.CloseResponse + errorChan chan *msginterfaces.ErrorResponse + unhandledChan chan *[]byte +} + +func NewMyHandler() *MyHandler { + handler := &MyHandler{ + binaryChan: make(chan *[]byte), + openChan: make(chan *msginterfaces.OpenResponse), + welcomeResponse: make(chan *msginterfaces.WelcomeResponse), + conversationTextResponse: make(chan *msginterfaces.ConversationTextResponse), + userStartedSpeakingResponse: make(chan *msginterfaces.UserStartedSpeakingResponse), + agentThinkingResponse: make(chan *msginterfaces.AgentThinkingResponse), + functionCallRequestResponse: make(chan *msginterfaces.FunctionCallRequestResponse), + functionCallingResponse: make(chan *msginterfaces.FunctionCallingResponse), + agentStartedSpeakingResponse: make(chan *msginterfaces.AgentStartedSpeakingResponse), + agentAudioDoneResponse: make(chan *msginterfaces.AgentAudioDoneResponse), + closeChan: make(chan *msginterfaces.CloseResponse), + errorChan: make(chan *msginterfaces.ErrorResponse), + unhandledChan: make(chan *[]byte), + } + + go func() { + handler.Run() + }() + + return handler +} + +// GetBinary returns the binary channels +func (dch MyHandler) GetBinary() []*chan *[]byte { + return []*chan *[]byte{&dch.binaryChan} +} + +// GetOpen returns the open channels +func (dch MyHandler) GetOpen() []*chan *msginterfaces.OpenResponse { + return []*chan *msginterfaces.OpenResponse{&dch.openChan} +} + +// GetWelcomeResponse returns the welcome response channels +func (dch MyHandler) GetWelcome() []*chan *msginterfaces.WelcomeResponse { + return []*chan *msginterfaces.WelcomeResponse{&dch.welcomeResponse} +} + +// GetConversationTextResponse returns the conversation text response channels +func (dch MyHandler) GetConversationText() []*chan *msginterfaces.ConversationTextResponse { + return []*chan *msginterfaces.ConversationTextResponse{&dch.conversationTextResponse} +} + +// GetUserStartedSpeakingResponse returns the user started speaking response channels +func (dch MyHandler) GetUserStartedSpeaking() []*chan *msginterfaces.UserStartedSpeakingResponse { + return []*chan *msginterfaces.UserStartedSpeakingResponse{&dch.userStartedSpeakingResponse} +} + +// GetAgentThinkingResponse returns the agent thinking response channels +func (dch MyHandler) GetAgentThinking() []*chan *msginterfaces.AgentThinkingResponse { + return []*chan *msginterfaces.AgentThinkingResponse{&dch.agentThinkingResponse} +} + +// GetFunctionCallRequestResponse returns the function call request response channels +func (dch MyHandler) GetFunctionCallRequest() []*chan *msginterfaces.FunctionCallRequestResponse { + return []*chan *msginterfaces.FunctionCallRequestResponse{&dch.functionCallRequestResponse} +} + +// GetFunctionCallingResponse returns the function calling response channels +func (dch MyHandler) GetFunctionCalling() []*chan *msginterfaces.FunctionCallingResponse { + return []*chan *msginterfaces.FunctionCallingResponse{&dch.functionCallingResponse} +} + +// GetAgentStartedSpeakingResponse returns the agent started speaking response channels +func (dch MyHandler) GetAgentStartedSpeaking() []*chan *msginterfaces.AgentStartedSpeakingResponse { + return []*chan *msginterfaces.AgentStartedSpeakingResponse{&dch.agentStartedSpeakingResponse} +} + +// GetAgentAudioDoneResponse returns the agent audio done response channels +func (dch MyHandler) GetAgentAudioDone() []*chan *msginterfaces.AgentAudioDoneResponse { + return []*chan *msginterfaces.AgentAudioDoneResponse{&dch.agentAudioDoneResponse} +} + +func (dch MyHandler) GetEndOfThought() []*chan *msginterfaces.EndOfThoughtResponse { + return []*chan *msginterfaces.EndOfThoughtResponse{&dch.endOfThoughtResponse} +} +// GetClose returns the close channels +func (dch MyHandler) GetClose() []*chan *msginterfaces.CloseResponse { + return []*chan *msginterfaces.CloseResponse{&dch.closeChan} +} + +// GetError returns the error channels +func (dch MyHandler) GetError() []*chan *msginterfaces.ErrorResponse { + return []*chan *msginterfaces.ErrorResponse{&dch.errorChan} +} + +// GetUnhandled returns the unhandled event channels +func (dch MyHandler) GetUnhandled() []*chan *[]byte { + return []*chan *[]byte{&dch.unhandledChan} +} + +// Open is the callback for when the connection opens +// golintci: funlen +func (dch MyHandler) Run() error { + wgReceivers := sync.WaitGroup{} + + // binary channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + counter := 0 + lastBytesReceived := time.Now().Add(-7 * time.Second) + + for br := range dch.binaryChan { + fmt.Printf("\n\n[Binary Data]\n\n") + fmt.Printf("Size: %d\n\n", len(*br)) + + if lastBytesReceived.Add(5 * time.Second).Before(time.Now()) { + counter = counter + 1 + file, err := os.OpenFile(fmt.Sprintf("output_%d.wav", counter), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o666) + if err != nil { + fmt.Printf("Failed to open file. Err: %v\n", err) + continue + } + // Add a wav audio container header to the file if you want to play the audio + // using a media player like VLC, Media Player, or Apple Music + header := []byte{ + 0x52, 0x49, 0x46, 0x46, // "RIFF" + 0x00, 0x00, 0x00, 0x00, // Placeholder for file size + 0x57, 0x41, 0x56, 0x45, // "WAVE" + 0x66, 0x6d, 0x74, 0x20, // "fmt " + 0x10, 0x00, 0x00, 0x00, // Chunk size (16) + 0x01, 0x00, // Audio format (1 for PCM) + 0x01, 0x00, // Number of channels (1) + 0x80, 0x3e, 0x00, 0x00, // Sample rate (16000) + 0x00, 0x7d, 0x00, 0x00, // Byte rate (16000 * 2) + 0x02, 0x00, // Block align (2) + 0x10, 0x00, // Bits per sample (16) + 0x64, 0x61, 0x74, 0x61, // "data" + 0x00, 0x00, 0x00, 0x00, // Placeholder for data size + } + + _, err = file.Write(header) + if err != nil { + fmt.Printf("Failed to write header to file. Err: %v\n", err) + continue + } + file.Close() + } + + fmt.Printf("Dumping to WAV file\n") + file, err := os.OpenFile(fmt.Sprintf("output_%d.wav", counter), os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + fmt.Printf("Failed to open file. Err: %v\n", err) + continue + } + + _, err = file.Write(*br) + file.Close() + + if err != nil { + fmt.Printf("Failed to write to file. Err: %v\n", err) + continue + } + + lastBytesReceived = time.Now() + } + }() + + // open channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.openChan { + fmt.Printf("\n\n[OpenResponse]\n\n") + } + }() + + // welcome response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.welcomeResponse { + fmt.Printf("\n\n[WelcomeResponse]\n\n") + } + }() + + // conversation text response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for ctr := range dch.conversationTextResponse { + fmt.Printf("\n\n[ConversationTextResponse]\n") + fmt.Printf("%s: %s\n\n", ctr.Role, ctr.Content) + } + }() + + // user started speaking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.userStartedSpeakingResponse { + fmt.Printf("\n\n[UserStartedSpeakingResponse]\n\n") + } + }() + + // agent thinking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.agentThinkingResponse { + fmt.Printf("\n\n[AgentThinkingResponse]\n\n") + } + }() + + // function call request response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.functionCallRequestResponse { + fmt.Printf("\n\n[FunctionCallRequestResponse]\n\n") + } + }() + + // function calling response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.functionCallingResponse { + fmt.Printf("\n\n[FunctionCallingResponse]\n\n") + } + }() + + // agent started speaking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.agentStartedSpeakingResponse { + fmt.Printf("\n\n[AgentStartedSpeakingResponse]\n\n") + } + }() + + // agent audio done response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.agentAudioDoneResponse { + fmt.Printf("\n\n[AgentAudioDoneResponse]\n\n") + } + }() + + // close channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.closeChan { + fmt.Printf("\n\n[CloseResponse]\n\n") + } + }() + + // error channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for er := range dch.errorChan { + fmt.Printf("\n[ErrorResponse]\n") + fmt.Printf("\nError.Type: %s\n", er.ErrCode) + fmt.Printf("Error.Message: %s\n", er.ErrMsg) + fmt.Printf("Error.Description: %s\n\n", er.Description) + fmt.Printf("Error.Variant: %s\n\n", er.Variant) + } + }() + + // unhandled event channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for byData := range dch.unhandledChan { + fmt.Printf("\n[UnhandledEvent]") + fmt.Printf("Dump:\n%s\n\n", string(*byData)) + } + }() + + // wait for all receivers to finish + wgReceivers.Wait() + + return nil +} + +func main() { + // init library + microphone.Initialize() + + // print instructions + fmt.Print("\n\nPress ENTER to exit!\n\n") + + /* + DG Streaming API + */ + // init library + client.Init(client.InitLib{ + LogLevel: client.LogLevelDefault, // LogLevelDefault, LogLevelFull, LogLevelDebug, LogLevelTrace + }) + + // Go context + ctx := context.Background() + // client options + cOptions := &interfaces.ClientOptions{ + EnableKeepAlive: true, + } + + // set the Transcription options + tOptions := client.NewSettingsConfigurationOptions() + tOptions.Agent.Think.Provider.Type = "open_ai" + tOptions.Agent.Think.Model = "gpt-4o-mini" + tOptions.Agent.Think.Instructions = "You are a helpful AI assistant." + + // implement your own callback + var callback msginterfaces.AgentMessageChan + callback = *NewMyHandler() + + // create a Deepgram client + dgClient, err := client.NewWSUsingChan(ctx, "", cOptions, tOptions, callback) + if err != nil { + fmt.Println("ERROR creating LiveTranscription connection:", err) + return + } + + // connect the websocket to Deepgram + fmt.Printf("Starting Agent...\n") + bConnected := dgClient.Connect() + if !bConnected { + fmt.Println("Client.Connect failed") + os.Exit(1) + } + + /* + Microphone package + */ + // mic stuf + mic, err := microphone.New(microphone.AudioConfig{ + InputChannels: 1, + SamplingRate: 16000, + }) + if err != nil { + fmt.Printf("Initialize failed. Err: %v\n", err) + os.Exit(1) + } + + // start the mic + fmt.Printf("Starting Microphone...\n") + err = mic.Start() + if err != nil { + fmt.Printf("mic.Start failed. Err: %v\n", err) + os.Exit(1) + } + + go func() { + // feed the microphone stream to the Deepgram client (this is a blocking call) + mic.Stream(dgClient) + }() + + // wait for user input to exit + input := bufio.NewScanner(os.Stdin) + input.Scan() + + // close mic stream + fmt.Printf("Stopping Microphone...\n") + err = mic.Stop() + if err != nil { + fmt.Printf("mic.Stop failed. Err: %v\n", err) + os.Exit(1) + } + + // teardown library + microphone.Teardown() + + // close DG client + fmt.Printf("Stopping Agent...\n") + dgClient.Stop() + + fmt.Printf("\n\nProgram exiting...\n") +} diff --git a/pkg/api/agent/v1/websocket/chan_default.go b/pkg/api/agent/v1/websocket/chan_default.go new file mode 100644 index 00000000..4678e80d --- /dev/null +++ b/pkg/api/agent/v1/websocket/chan_default.go @@ -0,0 +1,466 @@ +// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "sync" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" +) + +// NewDefaultChanHandler creates a new DefaultChanHandler +func NewDefaultChanHandler() *DefaultChanHandler { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + var debugExtStr string + if v := os.Getenv("DEEPGRAM_DEBUG_VERBOSE"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG_VERBOSE found") + debugExtStr = v + } + handler := &DefaultChanHandler{ + debugWebsocket: strings.EqualFold(debugStr, "true"), + debugWebsocketVerbose: strings.EqualFold(debugExtStr, "true"), + binaryChan: make(chan *[]byte), + openChan: make(chan *interfaces.OpenResponse), + welcomeResponse: make(chan *interfaces.WelcomeResponse), + conversationTextResponse: make(chan *interfaces.ConversationTextResponse), + userStartedSpeakingResponse: make(chan *interfaces.UserStartedSpeakingResponse), + agentThinkingResponse: make(chan *interfaces.AgentThinkingResponse), + functionCallRequestResponse: make(chan *interfaces.FunctionCallRequestResponse), + functionCallingResponse: make(chan *interfaces.FunctionCallingResponse), + agentStartedSpeakingResponse: make(chan *interfaces.AgentStartedSpeakingResponse), + agentAudioDoneResponse: make(chan *interfaces.AgentAudioDoneResponse), + endOfThoughtResponse: make(chan *interfaces.EndOfThoughtResponse), + closeChan: make(chan *interfaces.CloseResponse), + errorChan: make(chan *interfaces.ErrorResponse), + unhandledChan: make(chan *[]byte), + } + + go func() { + err := handler.Run() + if err != nil { + klog.V(1).Infof("handler.Run failed. Err: %v\n", err) + } + }() + + return handler +} + +// GetBinary returns the binary channels +func (dch DefaultChanHandler) GetBinary() []*chan *[]byte { + return []*chan *[]byte{&dch.binaryChan} +} + +// GetOpen returns the open channels +func (dch DefaultChanHandler) GetOpen() []*chan *interfaces.OpenResponse { + return []*chan *interfaces.OpenResponse{&dch.openChan} +} + +// GetWelcomeResponse returns the welcome response channels +func (dch DefaultChanHandler) GetWelcome() []*chan *interfaces.WelcomeResponse { + return []*chan *interfaces.WelcomeResponse{&dch.welcomeResponse} +} + +// GetConversationTextResponse returns the conversation text response channels +func (dch DefaultChanHandler) GetConversationText() []*chan *interfaces.ConversationTextResponse { + return []*chan *interfaces.ConversationTextResponse{&dch.conversationTextResponse} +} + +// GetUserStartedSpeakingResponse returns the user started speaking response channels +func (dch DefaultChanHandler) GetUserStartedSpeaking() []*chan *interfaces.UserStartedSpeakingResponse { + return []*chan *interfaces.UserStartedSpeakingResponse{&dch.userStartedSpeakingResponse} +} + +// GetAgentThinkingResponse returns the agent thinking response channels +func (dch DefaultChanHandler) GetAgentThinking() []*chan *interfaces.AgentThinkingResponse { + return []*chan *interfaces.AgentThinkingResponse{&dch.agentThinkingResponse} +} + +// GetFunctionCallRequestResponse returns the function call request response channels +func (dch DefaultChanHandler) GetFunctionCallRequest() []*chan *interfaces.FunctionCallRequestResponse { + return []*chan *interfaces.FunctionCallRequestResponse{&dch.functionCallRequestResponse} +} + +// GetFunctionCallingResponse returns the function calling response channels +func (dch DefaultChanHandler) GetFunctionCalling() []*chan *interfaces.FunctionCallingResponse { + return []*chan *interfaces.FunctionCallingResponse{&dch.functionCallingResponse} +} + +// GetAgentStartedSpeakingResponse returns the agent started speaking response channels +func (dch DefaultChanHandler) GetAgentStartedSpeaking() []*chan *interfaces.AgentStartedSpeakingResponse { + return []*chan *interfaces.AgentStartedSpeakingResponse{&dch.agentStartedSpeakingResponse} +} + +// GetAgentAudioDoneResponse returns the agent audio done response channels +func (dch DefaultChanHandler) GetAgentAudioDone() []*chan *interfaces.AgentAudioDoneResponse { + return []*chan *interfaces.AgentAudioDoneResponse{&dch.agentAudioDoneResponse} +} + +// GetClose returns the close channels +func (dch DefaultChanHandler) GetClose() []*chan *interfaces.CloseResponse { + return []*chan *interfaces.CloseResponse{&dch.closeChan} +} + +// GetError returns the error channels +func (dch DefaultChanHandler) GetError() []*chan *interfaces.ErrorResponse { + return []*chan *interfaces.ErrorResponse{&dch.errorChan} +} + +// GetUnhandled returns the unhandled event channels +func (dch DefaultChanHandler) GetUnhandled() []*chan *[]byte { + return []*chan *[]byte{&dch.unhandledChan} +} + +// Open is the callback for when the connection opens +// +//nolint:funlen,gocyclo // this is a complex function. keep as is +func (dch DefaultChanHandler) Run() error { + wgReceivers := sync.WaitGroup{} + + // binary channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for br := range dch.binaryChan { + fmt.Printf("\n\n[Binary Data]\n\n") + fmt.Printf("Size: %d\n\n", len(*br)) + + if dch.debugWebsocket { + fmt.Printf("Hex Dump: %x...\n\n", (*br)[:20]) + } + if dch.debugWebsocketVerbose { + fmt.Printf("Dumping to verbose.wav\n") + file, err := os.OpenFile("verbose.wav", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + fmt.Printf("Failed to open file. Err: %v\n", err) + continue + } + + _, err = file.Write(*br) + file.Close() + + if err != nil { + fmt.Printf("Failed to write to file. Err: %v\n", err) + continue + } + } + } + }() + + // open channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for or := range dch.openChan { + if dch.debugWebsocket { + data, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("Open json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nOpen Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[OpenResponse]\n\n") + } + }() + + // welcome response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for wr := range dch.welcomeResponse { + if dch.debugWebsocket { + data, err := json.Marshal(wr) + if err != nil { + klog.V(1).Infof("Welcome json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nWelcome Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[WelcomeResponse]\n\n") + } + }() + + // conversation text response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for ctr := range dch.conversationTextResponse { + if dch.debugWebsocket { + data, err := json.Marshal(ctr) + if err != nil { + klog.V(1).Infof("ConversationText json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nConversationText Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[ConversationTextResponse]\n\n") + } + }() + + // user started speaking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for ussr := range dch.userStartedSpeakingResponse { + if dch.debugWebsocket { + data, err := json.Marshal(ussr) + if err != nil { + klog.V(1).Infof("UserStartedSpeaking json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nUserStartedSpeaking Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[UserStartedSpeakingResponse]\n\n") + } + }() + + // agent thinking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for atr := range dch.agentThinkingResponse { + if dch.debugWebsocket { + data, err := json.Marshal(atr) + if err != nil { + klog.V(1).Infof("AgentThinking json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nAgentThinking Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[AgentThinkingResponse]\n\n") + } + }() + + // function call request response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for fcrr := range dch.functionCallRequestResponse { + if dch.debugWebsocket { + data, err := json.Marshal(fcrr) + if err != nil { + klog.V(1).Infof("FunctionCallRequest json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nFunctionCallRequest Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[FunctionCallRequestResponse]\n\n") + } + }() + + // function calling response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for fcr := range dch.functionCallingResponse { + if dch.debugWebsocket { + data, err := json.Marshal(fcr) + if err != nil { + klog.V(1).Infof("FunctionCalling json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nFunctionCalling Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[FunctionCallingResponse]\n\n") + } + }() + + // agent started speaking response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for assr := range dch.agentStartedSpeakingResponse { + if dch.debugWebsocket { + data, err := json.Marshal(assr) + if err != nil { + klog.V(1).Infof("AgentStartedSpeaking json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nAgentStartedSpeaking Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[AgentStartedSpeakingResponse]\n\n") + } + }() + + // agent audio done response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for aadr := range dch.agentAudioDoneResponse { + if dch.debugWebsocket { + data, err := json.Marshal(aadr) + if err != nil { + klog.V(1).Infof("AgentAudioDone json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nAgentAudioDone Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[AgentAudioDoneResponse]\n\n") + } + }() + + // close channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for cr := range dch.closeChan { + if dch.debugWebsocket { + data, err := json.Marshal(cr) + if err != nil { + klog.V(1).Infof("Close json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nClose Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[CloseResponse]\n\n") + } + }() + + // error channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for er := range dch.errorChan { + if dch.debugWebsocket { + data, err := json.Marshal(er) + if err != nil { + klog.V(1).Infof("Close json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n[ErrorResponse]\n") + fmt.Printf("\nError.Type: %s\n", er.ErrCode) + fmt.Printf("Error.Message: %s\n", er.ErrMsg) + fmt.Printf("Error.Description: %s\n\n", er.Description) + fmt.Printf("Error.Variant: %s\n\n", er.Variant) + } + }() + + // unhandled event channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for byData := range dch.unhandledChan { + if dch.debugWebsocket { + prettyJSON, err := prettyjson.Format(*byData) + if err != nil { + klog.V(2).Infof("\n\nRaw Data:\n%s\n\n", string(*byData)) + } else { + klog.V(2).Infof("\n\nError Object:\n%s\n\n", prettyJSON) + } + } + + fmt.Printf("\n[UnhandledEvent]") + fmt.Printf("Dump:\n%s\n\n", string(*byData)) + } + }() + + // wait for all receivers to finish + wgReceivers.Wait() + + return nil +} diff --git a/pkg/api/agent/v1/websocket/chan_router.go b/pkg/api/agent/v1/websocket/chan_router.go new file mode 100644 index 00000000..0d171bf3 --- /dev/null +++ b/pkg/api/agent/v1/websocket/chan_router.go @@ -0,0 +1,404 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "encoding/hex" + "encoding/json" + "os" + "strings" + + prettyjson "github.com/hokaccha/go-prettyjson" + klog "k8s.io/klog/v2" + + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" +) + +// NewWithDefault creates a ChanRouter with the default callback handler +func NewChanWithDefault() *ChanRouter { + chans := NewDefaultChanHandler() + go func() { + err := chans.Run() + if err != nil { + klog.V(1).Infof("chans.Run failed. Err: %v\n", err) + } + }() + + return NewChanRouter(chans) +} + +// New creates a ChanRouter with a user-defined channels +// gocritic:ignore +func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter { + var debugStr string + if v := os.Getenv("DEEPGRAM_DEBUG"); v != "" { + klog.V(4).Infof("DEEPGRAM_DEBUG found") + debugStr = v + } + + router := &ChanRouter{ + debugWebsocket: strings.EqualFold(strings.ToLower(debugStr), "true"), + binaryChan: make([]*chan *[]byte, 0), + openChan: make([]*chan *interfaces.OpenResponse, 0), + welcomeResponse: make([]*chan *interfaces.WelcomeResponse, 0), + conversationTextResponse: make([]*chan *interfaces.ConversationTextResponse, 0), + userStartedSpeakingResponse: make([]*chan *interfaces.UserStartedSpeakingResponse, 0), + agentThinkingResponse: make([]*chan *interfaces.AgentThinkingResponse, 0), + functionCallRequestResponse: make([]*chan *interfaces.FunctionCallRequestResponse, 0), + functionCallingResponse: make([]*chan *interfaces.FunctionCallingResponse, 0), + agentStartedSpeakingResponse: make([]*chan *interfaces.AgentStartedSpeakingResponse, 0), + agentAudioDoneResponse: make([]*chan *interfaces.AgentAudioDoneResponse, 0), + closeChan: make([]*chan *interfaces.CloseResponse, 0), + errorChan: make([]*chan *interfaces.ErrorResponse, 0), + unhandledChan: make([]*chan *[]byte, 0), + } + + if chans != nil { + router.binaryChan = append(router.binaryChan, chans.GetBinary()...) + router.openChan = append(router.openChan, chans.GetOpen()...) + router.welcomeResponse = append(router.welcomeResponse, chans.GetWelcome()...) + router.conversationTextResponse = append(router.conversationTextResponse, chans.GetConversationText()...) + router.userStartedSpeakingResponse = append(router.userStartedSpeakingResponse, chans.GetUserStartedSpeaking()...) + router.agentThinkingResponse = append(router.agentThinkingResponse, chans.GetAgentThinking()...) + router.functionCallRequestResponse = append(router.functionCallRequestResponse, chans.GetFunctionCallRequest()...) + router.functionCallingResponse = append(router.functionCallingResponse, chans.GetFunctionCalling()...) + router.agentStartedSpeakingResponse = append(router.agentStartedSpeakingResponse, chans.GetAgentStartedSpeaking()...) + router.agentAudioDoneResponse = append(router.agentAudioDoneResponse, chans.GetAgentAudioDone()...) + router.closeChan = append(router.closeChan, chans.GetClose()...) + router.errorChan = append(router.errorChan, chans.GetError()...) + router.unhandledChan = append(router.unhandledChan, chans.GetUnhandled()...) + } + + return router +} + +// Open sends an OpenResponse message to the callback +func (r *ChanRouter) Open(or *interfaces.OpenResponse) error { + byMsg, err := json.Marshal(or) + if err != nil { + klog.V(1).Infof("json.Marshal(or) failed. Err: %v\n", err) + return err + } + + action := func(data []byte) error { + var msg interfaces.OpenResponse + if err := json.Unmarshal(data, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(OpenResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.openChan { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeOpenResponse), byMsg, action) +} + +// Close sends an CloseResponse message to the callback +func (r *ChanRouter) Close(cr *interfaces.CloseResponse) error { + byMsg, err := json.Marshal(cr) + if err != nil { + klog.V(1).Infof("json.Marshal(or) failed. Err: %v\n", err) + return err + } + + action := func(data []byte) error { + var msg interfaces.CloseResponse + if err := json.Unmarshal(data, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(CloseResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.closeChan { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeCloseResponse), byMsg, action) +} + +// Error sends an ErrorResponse message to the callback +func (r *ChanRouter) Error(er *interfaces.ErrorResponse) error { + byMsg, err := json.Marshal(er) + if err != nil { + klog.V(1).Infof("json.Marshal(er) failed. Err: %v\n", err) + return err + } + + action := func(data []byte) error { + var msg interfaces.ErrorResponse + if err := json.Unmarshal(data, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(ErrorResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.errorChan { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeErrorResponse), byMsg, action) +} + +// processGeneric generalizes the handling of all message types +func (r *ChanRouter) processGeneric(msgType string, byMsg []byte, action func(data []byte) error) error { + klog.V(6).Infof("router.%s ENTER\n", msgType) + + r.printDebugMessages(5, msgType, byMsg) + + var err error + if err = action(byMsg); err != nil { + klog.V(1).Infof("callback.%s failed. Err: %v\n", msgType, err) + } else { + klog.V(5).Infof("callback.%s succeeded\n", msgType) + } + klog.V(6).Infof("router.%s LEAVE\n", msgType) + + return err +} + +func (r *ChanRouter) processWelcome(byMsg []byte) error { + action := func(byMsg []byte) error { + var msg interfaces.WelcomeResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(WelcomeResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.welcomeResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeWelcomeResponse), byMsg, action) +} + +func (r *ChanRouter) processConversationText(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.ConversationTextResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(ConversationTextResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.conversationTextResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeConversationTextResponse), byMsg, action) +} + +func (r *ChanRouter) processUserStartedSpeaking(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.UserStartedSpeakingResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(UserStartedSpeakingResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.userStartedSpeakingResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeUserStartedSpeakingResponse), byMsg, action) +} + +func (r *ChanRouter) processAgentThinking(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.AgentThinkingResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(AgentThinkingResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.agentThinkingResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeAgentThinkingResponse), byMsg, action) +} + +func (r *ChanRouter) processFunctionCallRequest(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.FunctionCallRequestResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(FunctionCallRequestResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.functionCallRequestResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeFunctionCallRequestResponse), byMsg, action) +} + +func (r *ChanRouter) processFunctionCalling(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.FunctionCallingResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(FunctionCallingResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.functionCallingResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeFunctionCallingResponse), byMsg, action) +} + +func (r *ChanRouter) processAgentStartedSpeaking(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.AgentStartedSpeakingResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(AgentStartedSpeakingResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.agentStartedSpeakingResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeAgentStartedSpeakingResponse), byMsg, action) +} + +func (r *ChanRouter) processAgentAudioDone(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.AgentAudioDoneResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(AgentAudioDoneResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.agentAudioDoneResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeAgentAudioDoneResponse), byMsg, action) +} + +func (r *ChanRouter) processErrorResponse(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.ErrorResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.errorChan { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeErrorResponse), byMsg, action) +} + +// Message handles platform messages and routes them appropriately based on the MessageType +func (r *ChanRouter) Message(byMsg []byte) error { + klog.V(6).Infof("router.Message ENTER\n") + + if r.debugWebsocket { + klog.V(5).Infof("Raw Message:\n%s\n", string(byMsg)) + } + + var mt interfaces.MessageType + if err := json.Unmarshal(byMsg, &mt); err != nil { + klog.V(1).Infof("json.Unmarshal(MessageType) failed. Err: %v\n", err) + klog.V(6).Infof("router.Message LEAVE\n") + return err + } + + var err error + switch interfaces.TypeResponse(mt.Type) { + case interfaces.TypeWelcomeResponse: + err = r.processWelcome(byMsg) + case interfaces.TypeConversationTextResponse: + err = r.processConversationText(byMsg) + case interfaces.TypeUserStartedSpeakingResponse: + err = r.processUserStartedSpeaking(byMsg) + case interfaces.TypeAgentThinkingResponse: + err = r.processAgentThinking(byMsg) + case interfaces.TypeFunctionCallRequestResponse: + err = r.processFunctionCallRequest(byMsg) + case interfaces.TypeFunctionCallingResponse: + err = r.processFunctionCalling(byMsg) + case interfaces.TypeAgentStartedSpeakingResponse: + err = r.processAgentStartedSpeaking(byMsg) + case interfaces.TypeAgentAudioDoneResponse: + err = r.processAgentAudioDone(byMsg) + case interfaces.TypeResponse(interfaces.TypeErrorResponse): + err = r.processErrorResponse(byMsg) + default: + err = r.UnhandledMessage(byMsg) + } + + if err == nil { + klog.V(6).Infof("MessageType(%s) after - Result: succeeded\n", mt.Type) + } else { + klog.V(5).Infof("MessageType(%s) after - Result: %v\n", mt.Type, err) + } + klog.V(6).Infof("router.Message LEAVE\n") + return err +} + +// Binary handles platform messages and routes them appropriately based on the MessageType +func (r *ChanRouter) Binary(byMsg []byte) error { + klog.V(6).Infof("router.Binary ENTER\n") + + klog.V(5).Infof("Binary Message:\n%s...\n", hex.EncodeToString(byMsg[:20])) + for _, ch := range r.binaryChan { + *ch <- &byMsg + } + + klog.V(6).Infof("router.Binary LEAVE\n") + return nil +} + +// UnhandledMessage logs and handles any unexpected message types +func (r *ChanRouter) UnhandledMessage(byMsg []byte) error { + klog.V(6).Infof("router.UnhandledMessage ENTER\n") + r.printDebugMessages(3, "UnhandledMessage", byMsg) + + for _, ch := range r.unhandledChan { + *ch <- &byMsg + } + + klog.V(1).Infof("Unknown Event was received\n") + klog.V(6).Infof("router.UnhandledMessage LEAVE\n") + return ErrInvalidMessageType +} + +// printDebugMessages formats and logs debugging messages +func (r *ChanRouter) printDebugMessages(level klog.Level, function string, byMsg []byte) { + prettyJSON, err := prettyjson.Format(byMsg) + if err != nil { + klog.V(1).Infof("prettyjson.Format failed. Err: %v\n", err) + return + } + klog.V(level).Infof("\n\n-----------------------------------------------\n") + klog.V(level).Infof("%s RAW:\n%s\n", function, prettyJSON) + klog.V(level).Infof("-----------------------------------------------\n\n\n") +} diff --git a/pkg/api/agent/v1/websocket/constants.go b/pkg/api/agent/v1/websocket/constants.go new file mode 100644 index 00000000..15a467f3 --- /dev/null +++ b/pkg/api/agent/v1/websocket/constants.go @@ -0,0 +1,19 @@ +// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import "errors" + +const ( + PackageVersion string = "v1.0" +) + +var ( + // ErrInvalidMessageType invalid message type + ErrInvalidMessageType = errors.New("invalid message type") + + // ErrUserChanNotDefined user chan not defined or invalid + ErrUserChanNotDefined = errors.New("user chan not defined or invalid") +) diff --git a/pkg/api/agent/v1/websocket/interfaces/constants.go b/pkg/api/agent/v1/websocket/interfaces/constants.go new file mode 100644 index 00000000..201fe905 --- /dev/null +++ b/pkg/api/agent/v1/websocket/interfaces/constants.go @@ -0,0 +1,39 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +import ( + commoninterfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1/interfaces" +) + +// These are the message types that can be received from the live API +type TypeResponse commoninterfaces.TypeResponse + +// client message types +const ( + TypeSettingsConfiguration = "SettingsConfiguration" + TypeUpdateInstructions = "UpdateInstructions" + TypeUpdateSpeak = "UpdateSpeak" + TypeInjectAgentMessage = "InjectAgentMessage" + TypeFunctionCallResponse = "FunctionCallResponse" + TypeKeepAlive = "KeepAlive" + TypeClose = "Close" +) + +// server message types +const ( + // message types + TypeOpenResponse = commoninterfaces.TypeOpenResponse + TypeWelcomeResponse = "Welcome" + TypeConversationTextResponse = "ConversationText" + TypeUserStartedSpeakingResponse = "UserStartedSpeaking" + TypeAgentThinkingResponse = "AgentThinking" + TypeFunctionCallRequestResponse = "FunctionCallRequest" + TypeFunctionCallingResponse = "FunctionCalling" + TypeAgentStartedSpeakingResponse = "AgentStartedSpeaking" + TypeAgentAudioDoneResponse = "AgentAudioDone" + TypeCloseResponse = commoninterfaces.TypeCloseResponse + TypeErrorResponse = commoninterfaces.TypeErrorResponse +) diff --git a/pkg/api/agent/v1/websocket/interfaces/interfaces.go b/pkg/api/agent/v1/websocket/interfaces/interfaces.go new file mode 100644 index 00000000..79ce0e86 --- /dev/null +++ b/pkg/api/agent/v1/websocket/interfaces/interfaces.go @@ -0,0 +1,26 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package defines interfaces for the live API +package interfacesv1 + +/* +Chan Interfaces +*/ +// AgentMessageChan is a callback used to receive notifcations for platforms messages +type AgentMessageChan interface { + GetBinary() []*chan *[]byte + GetOpen() []*chan *OpenResponse + GetWelcome() []*chan *WelcomeResponse + GetConversationText() []*chan *ConversationTextResponse + GetUserStartedSpeaking() []*chan *UserStartedSpeakingResponse + GetAgentThinking() []*chan *AgentThinkingResponse + GetFunctionCallRequest() []*chan *FunctionCallRequestResponse + GetFunctionCalling() []*chan *FunctionCallingResponse + GetAgentStartedSpeaking() []*chan *AgentStartedSpeakingResponse + GetAgentAudioDone() []*chan *AgentAudioDoneResponse + GetClose() []*chan *CloseResponse + GetError() []*chan *ErrorResponse + GetUnhandled() []*chan *[]byte +} diff --git a/pkg/api/agent/v1/websocket/interfaces/types.go b/pkg/api/agent/v1/websocket/interfaces/types.go new file mode 100644 index 00000000..2074dba7 --- /dev/null +++ b/pkg/api/agent/v1/websocket/interfaces/types.go @@ -0,0 +1,134 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +import ( + commoninterfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +/***********************************/ +// Request/Input structs +/***********************************/ +type SettingsConfigurationOptions interfaces.SettingsConfigurationOptions + +// UpdateInstructions is the request to update the Agent instructions +type UpdateInstructions struct { + Type string `json:"type,omitempty"` + Instructions string `json:"instructions,omitempty"` +} + +// UpdateSpeak is the request to update model for speaking +type UpdateSpeak struct { + Type string `json:"type,omitempty"` + Model string `json:"model,omitempty"` +} + +// InjectAgentMessage is the request to inject a message into the Agent +type InjectAgentMessage struct { + Type string `json:"type,omitempty"` + Message string `json:"message,omitempty"` +} + +// FunctionCallResponse is the response from a function call +type FunctionCallResponse struct { + Type string `json:"type,omitempty"` + FunctionCallID string `json:"function_call_id,omitempty"` + Output string `json:"output,omitempty"` +} + +// KeepAlive is the request to keep the connection alive +type KeepAlive struct { + Type string `json:"type,omitempty"` +} + +// Close terminates the connection +type Close struct { + Type string `json:"type,omitempty"` +} + +/***********************************/ +// MessageType is the header to bootstrap you way unmarshalling other messages +/***********************************/ +/* + Example: + { + "type": "message", + "message": { + ... + } + } +*/ +type MessageType struct { + Type string `json:"type"` +} + +/***********************************/ +// shared/common structs +/***********************************/ +// None is a placeholder + +/***********************************/ +// Results from Agent/Server +/***********************************/ +// OpenResponse is the response from opening the connection +type OpenResponse = commoninterfaces.OpenResponse + +// WelcomeResponse is the response from the welcome message +type WelcomeResponse struct { + Type string `json:"type,omitempty"` + SessionID string `json:"session_id,omitempty"` +} + +// ConversationTextResponse is the response from the conversation text +type ConversationTextResponse struct { + Type string `json:"type,omitempty"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` +} + +// UserStartedSpeakingResponse is the response from the user starting to speak +type UserStartedSpeakingResponse struct { + Type string `json:"type,omitempty"` +} + +// AgentThinkingResponse is the response from the Agent thinking +type AgentThinkingResponse struct { + Type string `json:"type,omitempty"` + Content string `json:"content,omitempty"` +} + +// FunctionCallRequestResponse is the response from a function call request +type FunctionCallRequestResponse struct { + Type string `json:"type,omitempty"` + FunctionName string `json:"function_name,omitempty"` + FunctionCallID string `json:"function_call_id,omitempty"` + Input map[string]string `json:"input,omitempty"` // TODO: this is still undefined +} + +// FunctionCallingResponse is the response from a function calling +type FunctionCallingResponse struct { + Type string `json:"type,omitempty"` + Output map[string]string `json:"output,omitempty"` // TODO: this is still undefined +} + +// AgentStartedSpeakingResponse is the response from the Agent starting to speak +type AgentStartedSpeakingResponse struct { + Type string `json:"type,omitempty"` + TotalLatency float64 `json:"total_latency,omitempty"` + TtsLatency float64 `json:"tts_latency,omitempty"` + TttLatency float64 `json:"ttt_latency,omitempty"` +} + +// AgentAudioDoneResponse is the response from the Agent audio done +type AgentAudioDoneResponse struct { + Type string `json:"type,omitempty"` +} + +// CloseResponse is the response from closing the connection +type CloseResponse = commoninterfaces.CloseResponse + +// ErrorResponse is the Deepgram specific response error +type ErrorResponse = interfaces.DeepgramError diff --git a/pkg/api/agent/v1/websocket/types.go b/pkg/api/agent/v1/websocket/types.go new file mode 100644 index 00000000..125408d5 --- /dev/null +++ b/pkg/api/agent/v1/websocket/types.go @@ -0,0 +1,53 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" +) + +/* +Using Channels +*/ +// DefaultCallbackHandler is a default callback handler for live transcription +// Simply prints the transcript to stdout +type DefaultChanHandler struct { + debugWebsocket bool + debugWebsocketVerbose bool + + binaryChan chan *[]byte + openChan chan *interfaces.OpenResponse + welcomeResponse chan *interfaces.WelcomeResponse + conversationTextResponse chan *interfaces.ConversationTextResponse + userStartedSpeakingResponse chan *interfaces.UserStartedSpeakingResponse + agentThinkingResponse chan *interfaces.AgentThinkingResponse + functionCallRequestResponse chan *interfaces.FunctionCallRequestResponse + functionCallingResponse chan *interfaces.FunctionCallingResponse + agentStartedSpeakingResponse chan *interfaces.AgentStartedSpeakingResponse + agentAudioDoneResponse chan *interfaces.AgentAudioDoneResponse + closeChan chan *interfaces.CloseResponse + errorChan chan *interfaces.ErrorResponse + unhandledChan chan *[]byte +} + +// ChanRouter routes events +type ChanRouter struct { + debugWebsocket bool + + // call out to channels + binaryChan []*chan *[]byte + openChan []*chan *interfaces.OpenResponse + welcomeResponse []*chan *interfaces.WelcomeResponse + conversationTextResponse []*chan *interfaces.ConversationTextResponse + userStartedSpeakingResponse []*chan *interfaces.UserStartedSpeakingResponse + agentThinkingResponse []*chan *interfaces.AgentThinkingResponse + functionCallRequestResponse []*chan *interfaces.FunctionCallRequestResponse + functionCallingResponse []*chan *interfaces.FunctionCallingResponse + agentStartedSpeakingResponse []*chan *interfaces.AgentStartedSpeakingResponse + agentAudioDoneResponse []*chan *interfaces.AgentAudioDoneResponse + closeChan []*chan *interfaces.CloseResponse + errorChan []*chan *interfaces.ErrorResponse + unhandledChan []*chan *[]byte +} diff --git a/pkg/api/live/v1/deprecated.go b/pkg/api/live/v1/deprecated.go deleted file mode 100644 index 24c65c23..00000000 --- a/pkg/api/live/v1/deprecated.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. -// Use of this source code is governed by a MIT license that can be found in the LICENSE file. -// SPDX-License-Identifier: MIT - -// *********** WARNING *********** -// This package provides the Live API -// -// Deprecated: This package is deprecated. Use the listen package instead. This will be removed in a future release. -// -// This package is frozen and no new functionality will be added. -// *********** WARNING *********** -package legacy - -import ( - websocketv1 "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket" - interfacesv1 "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket/interfaces" -) - -const ( - PackageVersion = websocketv1.PackageVersion -) - -// Alias -type LiveMessageCallback = interfacesv1.LiveMessageCallback -type DefaultCallbackHandler = websocketv1.DefaultCallbackHandler -type MessageRouter = websocketv1.MessageRouter - -// NewDefaultCallbackHandler -func NewDefaultCallbackHandler() websocketv1.DefaultCallbackHandler { - return DefaultCallbackHandler{} -} - -// MessageRouter -func NewWithDefault() *websocketv1.MessageRouter { - return websocketv1.NewWithDefault() -} - -// New creates a MessageRouter with a user-defined callback -func New(callback LiveMessageCallback) *websocketv1.MessageRouter { - return websocketv1.New(callback) -} diff --git a/pkg/api/prerecorded/v1/deprecated.go b/pkg/api/prerecorded/v1/deprecated.go deleted file mode 100644 index 5fbab1fd..00000000 --- a/pkg/api/prerecorded/v1/deprecated.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -// Use of this source code is governed by a MIT license that can be found in the LICENSE file. -// SPDX-License-Identifier: MIT - -// *********** WARNING *********** -// This package provides the PreRecorded API -// -// Deprecated: This package is deprecated. Use the listen package instead. This will be removed in a future release. -// -// This package is frozen and no new functionality will be added. -// *********** WARNING *********** -package legacy - -import ( - restv1 "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/rest" - client "github.com/deepgram/deepgram-go-sdk/pkg/client/prerecorded" //lint:ignore -) - -const ( - PackageVersion = restv1.PackageVersion -) - -// Alias -type Client = restv1.Client - -// New creates a new Client -func New(c *client.Client) *restv1.Client { - return restv1.New(c) -} diff --git a/pkg/api/version/agent-version.go b/pkg/api/version/agent-version.go new file mode 100644 index 00000000..ea3fd11c --- /dev/null +++ b/pkg/api/version/agent-version.go @@ -0,0 +1,32 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package version + +import ( + "context" +) + +const ( + // TODO: The API currently isn't versioned... this seems like a bad idea + // APIVersion current supported version + // AgentAPIVersion string = "v1" + + // AgentPath is the current path for agent API + AgentPath string = "agent" +) + +/* +GetAgentAPI is a function which controls the versioning of the agent API and provides mechanism for: + +- overriding the host endpoint +- overriding the version used +- overriding the endpoint path +- additional arguments to the query string/parameters + +The return value is the complete URL endpoint to be used for the agent API +*/ +func GetAgentAPI(ctx context.Context, host, version, path string /*options *interfaces.SettingsConfigurationOptions,*/, args ...interface{}) (string, error) { + return getAPIURL(ctx, "agent", host, version, path, nil, args...) +} diff --git a/pkg/api/version/constants.go b/pkg/api/version/constants.go index 33e23e3d..5fcedaeb 100644 --- a/pkg/api/version/constants.go +++ b/pkg/api/version/constants.go @@ -17,6 +17,9 @@ const ( // APIPathListen APIPathListen string = "listen" + // APITypeAgent + APITypeAgent string = "agent" + // APITypeLive APITypeLive string = "live" @@ -35,6 +38,7 @@ var ( // APIPathMap maps the API types to their default paths APIPathMap = map[string]string{ + "agent": "agent", "analyze": "read", "prerecorded": APIPathListen, "speak": "speak", diff --git a/pkg/api/version/version.go b/pkg/api/version/version.go index d5dcb190..33961041 100644 --- a/pkg/api/version/version.go +++ b/pkg/api/version/version.go @@ -28,12 +28,16 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options // set the protocol protocol := HTTPProtocol - if apiType == APITypeLive || apiType == APITypeSpeakStream { + if apiType == APITypeLive || apiType == APITypeSpeakStream || apiType == APITypeAgent { protocol = WSProtocol } if host == "" { host = common.DefaultHost } + if apiType == APITypeAgent && host == common.DefaultHost { + klog.V(4).Infof("overriding with agent host\n") + host = common.DefaultAgentHost + } // check if the host has a protocol r := regexp.MustCompile(`^(https?)://(.+)$`) @@ -62,6 +66,10 @@ func getAPIURL(ctx context.Context, apiType, host, version, path string, options if version == "" { version = DefaultAPIVersion } + if apiType == APITypeAgent { + klog.V(4).Infof("overriding agent version with empty string since API isn't versioned\n") + version = "" + } // remove the version from the path if it exists r = regexp.MustCompile(`^(v\d+|%%s)/`) diff --git a/pkg/client/agent/client.go b/pkg/client/agent/client.go new file mode 100644 index 00000000..f2fdd39d --- /dev/null +++ b/pkg/client/agent/client.go @@ -0,0 +1,93 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +/* +This package provides the agent client implementation for the Deepgram API +*/ +package agent + +import ( + "context" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" + listenv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/agent/v1/websocket" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +/***********************************/ +// WebSocket Agent +/***********************************/ +const ( + WebSocketPackageVersion = listenv1ws.PackageVersion +) + +// WSChannel is an alias for listenv1ws.WSChannel +type WSChannel = listenv1ws.WSChannel + +// options +func NewSettingsConfigurationOptions() *interfaces.SettingsConfigurationOptions { + return interfaces.NewSettingsConfigurationOptions() +} + +/* + Using Channels +*/ +/* +NewWSUsingChanForDemo creates a new websocket connection for demo purposes only + +Input parameters: +- ctx: context.Context object +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewWSUsingChanForDemo(ctx context.Context, options *interfaces.SettingsConfigurationOptions) (*listenv1ws.WSChannel, error) { + return listenv1ws.NewUsingChanForDemo(ctx, options) +} + +/* +NewWebSocketUsingChanWithDefaults creates a new websocket connection with all default options + +Input parameters: +- ctx: context.Context object +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The chans handler is set to the default handler which just prints all messages to the console +*/ +func NewWSUsingChanWithDefaults(ctx context.Context, options *interfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*listenv1ws.WSChannel, error) { + return listenv1ws.NewUsingChanWithDefaults(ctx, options, chans) +} + +/* +NewWSUsingChan creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. +- chans: AgentMessageChan which is a chans that allows you to perform actions based on the transcription +*/ +func NewWSUsingChan(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*listenv1ws.WSChannel, error) { + ctx, ctxCancel := context.WithCancel(ctx) + return listenv1ws.NewUsingChanWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, chans) +} + +/* +NewWSUsingChanWithCancel creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. +- chans: AgentMessageChan which is a chans that allows you to perform actions based on the transcription +*/ +func NewWSUsingChanWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*listenv1ws.WSChannel, error) { + return listenv1ws.NewUsingChanWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, chans) +} diff --git a/pkg/client/agent/init.go b/pkg/client/agent/init.go new file mode 100644 index 00000000..414997f7 --- /dev/null +++ b/pkg/client/agent/init.go @@ -0,0 +1,44 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package agent + +import ( + common "github.com/deepgram/deepgram-go-sdk/pkg/common" +) + +// please see pkg/common/init.go for more information +const ( + LogLevelDefault = common.LogLevelDefault + LogLevelErrorOnly = common.LogLevelErrorOnly + LogLevelStandard = common.LogLevelStandard + LogLevelElevated = common.LogLevelElevated + LogLevelFull = common.LogLevelFull + LogLevelDebug = common.LogLevelDebug + LogLevelTrace = common.LogLevelTrace + LogLevelVerbose = common.LogLevelVerbose +) + +// Initialization options for this SDK. +type InitLib struct { + LogLevel common.LogLevel + DebugFilePath string +} + +// InitWithDefault is the SDK Init function for this library using default values. +func InitWithDefault() { + Init(InitLib{ + LogLevel: LogLevelDefault, + }) +} + +// The SDK Init function for this library. +// Allows you to set the logging level and use of a log file. +// Default is output to the stdout. +func Init(init InitLib) { + common.Init(common.InitLib{ + LogLevel: init.LogLevel, + DebugFilePath: init.DebugFilePath, + }) +} diff --git a/pkg/client/agent/v1/websocket/client_channel.go b/pkg/client/agent/v1/websocket/client_channel.go new file mode 100644 index 00000000..55ecb196 --- /dev/null +++ b/pkg/client/agent/v1/websocket/client_channel.go @@ -0,0 +1,312 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +// This package provides the live/streaming client implementation for the Deepgram API +package websocketv1 + +import ( + "context" + "encoding/json" + "fmt" + "io" + "regexp" + "strings" + "time" + + "github.com/dvonthenen/websocket" + klog "k8s.io/klog/v2" + + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" + version "github.com/deepgram/deepgram-go-sdk/pkg/api/version" + common "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1" +) + +// Connect performs a websocket connection with "DefaultConnectRetry" number of retries. +func (c *WSChannel) Connect() bool { + c.ctx, c.ctxCancel = context.WithCancel(c.ctx) + return c.ConnectWithCancel(c.ctx, c.ctxCancel, int(DefaultConnectRetry)) +} + +// ConnectWithCancel performs a websocket connection with specified number of retries and providing a +// cancel function to stop the connection +func (c *WSChannel) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { + c.ctx = ctx + c.ctxCancel = ctxCancel + return c.WSClient.ConnectWithCancel(ctx, ctxCancel, retryCnt) +} + +// AttemptReconnect performs a reconnect after failing retries +func (c *WSChannel) AttemptReconnect(ctx context.Context, retries int64) bool { + c.ctx, c.ctxCancel = context.WithCancel(ctx) + return c.AttemptReconnectWithCancel(c.ctx, c.ctxCancel, retries) +} + +// AttemptReconnect performs a reconnect after failing retries and providing a cancel function +func (c *WSChannel) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { + c.ctx = ctx + c.ctxCancel = ctxCancel + return c.WSClient.AttemptReconnectWithCancel(ctx, ctxCancel, retries) +} + +// GetURL returns the websocket URL +func (c *WSChannel) GetURL(host string) (string, error) { + // we dont send the SettingsConfigurationOptions because that is sent as a WS message to the server + url, err := version.GetAgentAPI(c.ctx, c.cOptions.Host, c.cOptions.APIVersion, c.cOptions.Path /*, c.tOptions*/) + if err != nil { + klog.V(1).Infof("version.GetAgentAPI failed. Err: %v\n", err) + return "", err + } + klog.V(5).Infof("Connecting to %s\n", url) + return url, nil +} + +// Start the keepalive and flush threads +func (c *WSChannel) Start() { + // send ConfigurationOptions to server + if c.tOptions != nil { + // send the configuration settings to the server + klog.V(4).Infof("Sending ConfigurationSettings to server\n") + err := c.WriteJSON(c.tOptions) + if err != nil { + klog.V(1).Infof("w.WriteJSON ConfigurationSettings failed. Err: %v\n", err) + + // terminate the connection + c.WSClient.Stop() + + return + } + } + + if c.cOptions.EnableKeepAlive { + go c.ping() + } +} + +// ProcessMessage processes the message and sends it to the callback +func (c *WSChannel) ProcessMessage(wsType int, byMsg []byte) error { + klog.V(6).Infof("ProcessMessage() ENTER\n") + + switch wsType { + case websocket.TextMessage: + // route the message + err := (*c.router).Message(byMsg) + if err != nil { + klog.V(1).Infof("agent.listen(): router.Message failed. Err: %v\n", err) + } + case websocket.BinaryMessage: + // audio data! + err := (*c.router).Binary(byMsg) + if err != nil { + klog.V(1).Infof("agent.listen(): router.Binary failed. Err: %v\n", err) + } + default: + klog.V(7).Infof("agent.listen(): msg recv: type %d, len: %d\n", wsType, len(byMsg)) + } + + klog.V(6).Infof("ProcessMessage Succeeded\n") + klog.V(6).Infof("ProcessMessage() LEAVE\n") + + return nil +} + +// Stream is a helper function to stream audio data from a io.Reader object to deepgram +func (c *WSChannel) Stream(r io.Reader) error { + klog.V(6).Infof("agent.Stream() ENTER\n") + + chunk := make([]byte, ChunkSize) + + for { + select { + case <-c.ctx.Done(): + klog.V(2).Infof("stream object Done()\n") + klog.V(6).Infof("agent.Stream() LEAVE\n") + return nil + default: + bytesRead, err := r.Read(chunk) + if err != nil { + errStr := err.Error() + switch { + case strings.Contains(errStr, common.SuccessfulSocketErr): + klog.V(3).Infof("Graceful websocket close\n") + klog.V(6).Infof("agent.Stream() LEAVE\n") + return nil + case strings.Contains(errStr, common.UseOfClosedSocket): + klog.V(3).Infof("Graceful websocket close\n") + klog.V(6).Infof("agent.Stream() LEAVE\n") + return nil + case strings.Contains(errStr, common.FatalReadSocketErr): + klog.V(1).Infof("Fatal socket error: %v\n", err) + klog.V(6).Infof("agent.Stream() LEAVE\n") + return err + case (err == io.EOF || err == io.ErrUnexpectedEOF): + klog.V(3).Infof("stream object EOF\n") + klog.V(6).Infof("agent.Stream() LEAVE\n") + return err + default: + klog.V(1).Infof("r.Read error. Err: %v\n", err) + klog.V(6).Infof("agent.Stream() LEAVE\n") + return err + } + } + + if bytesRead == 0 { + klog.V(7).Infof("Skipping. bytesRead == 0\n") + continue + } + + err = c.WriteBinary(chunk[:bytesRead]) + if err != nil { + klog.V(1).Infof("w.Write failed. Err: %v\n", err) + klog.V(6).Infof("agent.Stream() LEAVE\n") + return err + } + klog.V(7).Infof("io.Writer succeeded\n") + } + } +} + +/* +Write performs the lower level websocket write operation. +This is needed to implement the io.Writer interface. (aka the streaming interface) +*/ +func (c *WSChannel) Write(p []byte) (int, error) { + klog.V(7).Infof("agent.Write() ENTER\n") + + byteLen := len(p) + err := c.WriteBinary(p) + if err != nil { + klog.V(1).Infof("Write failed. Err: %v\n", err) + klog.V(7).Infof("agent.Write() LEAVE\n") + return 0, err + } + + klog.V(7).Infof("agent.Write Succeeded\n") + klog.V(7).Infof("agent.Write() LEAVE\n") + return byteLen, nil +} + +/* +Kick off the keepalive message to the server +*/ +func (c *WSChannel) KeepAlive() error { + klog.V(7).Infof("agent.KeepAlive() ENTER\n") + + keepAlive := msginterfaces.KeepAlive{ + Type: msginterfaces.TypeKeepAlive, + } + err := c.WriteJSON(keepAlive) + if err != nil { + klog.V(1).Infof("KeepAlive failed. Err: %v\n", err) + klog.V(7).Infof("agent.KeepAlive() LEAVE\n") + + return err + } + + klog.V(4).Infof("KeepAlive Succeeded\n") + klog.V(7).Infof("agent.KeepAlive() LEAVE\n") + + return err +} + +// GetCloseMsg sends an application level message to Deepgram +func (c *WSChannel) GetCloseMsg() []byte { + close := msginterfaces.Close{ + Type: msginterfaces.TypeClose, + } + + byMsg, err := json.Marshal(close) + if err != nil { + klog.V(1).Infof("GetCloseMsg failed. Err: %v\n", err) + return nil + } + + return byMsg +} + +// Finish the websocket connection +func (c *WSChannel) Finish() { + // NA +} + +// ProcessError processes the error and sends it to the callback +func (c *WSChannel) ProcessError(err error) error { + response := c.errorToResponse(err) + sendErr := (*c.router).Error(response) + if err != nil { + klog.V(1).Infof("ProcessError failed. Err: %v\n", sendErr) + } + + return err +} + +// ping thread +func (c *WSChannel) ping() { + klog.V(6).Infof("agent.ping() ENTER\n") + + defer func() { + if r := recover(); r != nil { + klog.V(1).Infof("Panic triggered\n") + + // send error on callback + err := common.ErrFatalPanicRecovered + sendErr := c.ProcessError(err) + if sendErr != nil { + klog.V(1).Infof("listen: Fatal socket error. Err: %v\n", sendErr) + } + + klog.V(6).Infof("agent.ping() LEAVE\n") + return + } + }() + + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + klog.V(3).Infof("agent.ping() Exiting\n") + klog.V(6).Infof("agent.ping() LEAVE\n") + return + case <-ticker.C: + klog.V(5).Infof("Starting ping...") + + // deepgram keepalive message + klog.V(5).Infof("Sending Deepgram KeepAlive message...\n") + err := c.KeepAlive() + if err == nil { + klog.V(5).Infof("Ping sent!") + } else { + klog.V(1).Infof("Failed to send Deepgram KeepAlive. Err: %v\n", err) + } + } + } +} + +// errorToResponse converts an error into a Deepgram error response +func (c *WSChannel) errorToResponse(err error) *msginterfaces.ErrorResponse { + r := regexp.MustCompile(`websocket: ([a-z]+) (\d+) .+: (.+)`) + + var errorCode string + var errorNum string + var errorDesc string + + matches := r.FindStringSubmatch(err.Error()) + if len(matches) > 3 { + errorCode = matches[1] + errorNum = matches[2] + errorDesc = matches[3] + } else { + errorCode = common.UnknownDeepgramErr + errorNum = common.UnknownDeepgramErr + errorDesc = err.Error() + } + + response := &msginterfaces.ErrorResponse{ + Type: string(msginterfaces.TypeErrorResponse), + ErrMsg: strings.TrimSpace(fmt.Sprintf("%s %s", errorCode, errorNum)), + Description: strings.TrimSpace(errorDesc), + Variant: errorNum, + } + return response +} diff --git a/pkg/client/agent/v1/websocket/constants.go b/pkg/client/agent/v1/websocket/constants.go new file mode 100644 index 00000000..94e2ba98 --- /dev/null +++ b/pkg/client/agent/v1/websocket/constants.go @@ -0,0 +1,32 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "time" +) + +const ( + PackageVersion string = "v1.0" +) + +// external constants +const ( + DefaultConnectRetry int64 = 3 + + ChunkSize = 1024 * 2 + TerminationSleep = 100 * time.Millisecond +) + +const ( + // MessageTypeKeepAlive keep the connection alive + MessageTypeKeepAlive string = "KeepAlive" +) + +// internal constants for retry, waits, back-off, etc. +const ( + flushPeriod = 500 * time.Millisecond + pingPeriod = 5 * time.Second +) diff --git a/pkg/client/agent/v1/websocket/new_using_chan.go b/pkg/client/agent/v1/websocket/new_using_chan.go new file mode 100644 index 00000000..23662356 --- /dev/null +++ b/pkg/client/agent/v1/websocket/new_using_chan.go @@ -0,0 +1,109 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "context" + + klog "k8s.io/klog/v2" + + websocketv1api "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket" + msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" + common "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1" + commoninterfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1/interfaces" + clientinterfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +/* +NewForDemo creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY +*/ +func NewUsingChanForDemo(ctx context.Context, options *clientinterfaces.SettingsConfigurationOptions) (*WSChannel, error) { + return NewUsingChan(ctx, "", &clientinterfaces.ClientOptions{}, options, nil) +} + +/* +NewWithDefaults creates a new websocket connection with all default options + +Notes: + - The Deepgram API KEY is read from the environment variable DEEPGRAM_API_KEY + - The chans handler is set to the default handler which just prints all messages to the console +*/ +func NewUsingChanWithDefaults(ctx context.Context, options *clientinterfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*WSChannel, error) { // gocritic:ignore + return NewUsingChan(ctx, "", &clientinterfaces.ClientOptions{}, options, chans) +} + +/* +New creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. +- chans: AgentMessageChan which is a chans that allows you to perform actions based on the transcription +*/ +func NewUsingChan(ctx context.Context, apiKey string, cOptions *clientinterfaces.ClientOptions, tOptions *clientinterfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*WSChannel, error) { + ctx, ctxCancel := context.WithCancel(ctx) + return NewUsingChanWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, chans) +} + +/* +New creates a new websocket connection with the specified options + +Input parameters: +- ctx: context.Context object +- ctxCancel: allow passing in own cancel +- apiKey: string containing the Deepgram API key +- cOptions: ClientOptions which allows overriding things like hostname, version of the API, etc. +- tOptions: SettingsConfigurationOptions which allows overriding things like language, model, etc. +- chans: AgentMessageChan which is a chans that allows you to perform actions based on the transcription +*/ +func NewUsingChanWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *clientinterfaces.ClientOptions, tOptions *clientinterfaces.SettingsConfigurationOptions, chans msginterfaces.AgentMessageChan) (*WSChannel, error) { + klog.V(6).Infof("agent.New() ENTER\n") + + if apiKey != "" { + cOptions.APIKey = apiKey + } + err := cOptions.Parse() + if err != nil { + klog.V(1).Infof("ClientOptions.Parse() failed. Err: %v\n", err) + return nil, err + } + err = tOptions.Check() + if err != nil { + klog.V(1).Infof("TranscribeOptions.Check() failed. Err: %v\n", err) + return nil, err + } + + if chans == nil { + klog.V(2).Infof("Using DefaultCallbackHandler.\n") + chans = websocketv1api.NewDefaultChanHandler() + } + + // init + var router commoninterfaces.Router + router = websocketv1api.NewChanRouter(chans) + + conn := WSChannel{ + cOptions: cOptions, + tOptions: tOptions, + chans: make([]*msginterfaces.AgentMessageChan, 0), + router: &router, + ctx: ctx, + ctxCancel: ctxCancel, + } + + var handler commoninterfaces.WebSocketHandler + handler = &conn + conn.WSClient = common.NewWS(ctx, ctxCancel, apiKey, cOptions, &handler, &router) + + klog.V(3).Infof("NewDeepGramWSClient Succeeded\n") + klog.V(6).Infof("agent.New() LEAVE\n") + + return &conn, nil +} diff --git a/pkg/client/agent/v1/websocket/types.go b/pkg/client/agent/v1/websocket/types.go new file mode 100644 index 00000000..03fdbf12 --- /dev/null +++ b/pkg/client/agent/v1/websocket/types.go @@ -0,0 +1,35 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package websocketv1 + +import ( + "context" + + msginterface "github.com/deepgram/deepgram-go-sdk/pkg/api/agent/v1/websocket/interfaces" + common "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1" + commoninterfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/common/v1/interfaces" + interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" +) + +// client messages +type SettingsConfigurationOptions interfaces.SettingsConfigurationOptions +type UpdateInstructions msginterface.UpdateInstructions +type UpdateSpeak msginterface.UpdateSpeak +type InjectAgentMessage msginterface.InjectAgentMessage +type FunctionCallResponse msginterface.FunctionCallResponse +type KeepAlive msginterface.KeepAlive + +// WSChannel is a struct representing the websocket client connection using channels +type WSChannel struct { + *common.WSClient + ctx context.Context + ctxCancel context.CancelFunc + + cOptions *interfaces.ClientOptions + tOptions *interfaces.SettingsConfigurationOptions + + chans []*msginterface.AgentMessageChan + router *commoninterfaces.Router +} diff --git a/pkg/client/common/v1/websocket.go b/pkg/client/common/v1/websocket.go index ab9ec66b..576dc1e5 100644 --- a/pkg/client/common/v1/websocket.go +++ b/pkg/client/common/v1/websocket.go @@ -206,6 +206,7 @@ func (c *WSClient) internalConnectWithCancel(ctx context.Context, ctxCancel cont SkipServerAuth: c.cOptions.SkipServerAuth, } } + // perform the websocket connection ws, res, err := dialer.DialContext(c.ctx, url, myHeader) if res != nil { diff --git a/pkg/client/interfaces/interfaces.go b/pkg/client/interfaces/interfaces.go index 8fb3050b..35ae3f85 100644 --- a/pkg/client/interfaces/interfaces.go +++ b/pkg/client/interfaces/interfaces.go @@ -12,7 +12,14 @@ const ( PackageVersion = interfacesv1.PackageVersion ) +// NewSettingsConfigurationOptions creates a new SettingsConfigurationOptions object +func NewSettingsConfigurationOptions() *interfacesv1.SettingsConfigurationOptions { + return interfacesv1.NewSettingsConfigurationOptions() +} + +// options type ClientOptions = interfacesv1.ClientOptions +type SettingsConfigurationOptions = interfacesv1.SettingsConfigurationOptions type PreRecordedTranscriptionOptions = interfacesv1.PreRecordedTranscriptionOptions type LiveTranscriptionOptions = interfacesv1.LiveTranscriptionOptions type AnalyzeOptions = interfacesv1.AnalyzeOptions diff --git a/pkg/client/interfaces/v1/constants.go b/pkg/client/interfaces/v1/constants.go index 5a15518d..3cc9d2de 100644 --- a/pkg/client/interfaces/v1/constants.go +++ b/pkg/client/interfaces/v1/constants.go @@ -12,6 +12,10 @@ const ( PackageVersion string = "v1.0" ) +const ( + TypeSettingsConfiguration = "SettingsConfiguration" +) + // errors var ( // ErrNoAPIKey no api key found diff --git a/pkg/client/interfaces/v1/options.go b/pkg/client/interfaces/v1/options.go index a89433b0..e838c49c 100644 --- a/pkg/client/interfaces/v1/options.go +++ b/pkg/client/interfaces/v1/options.go @@ -130,3 +130,40 @@ func (o *WSSpeakOptions) Check() error { return nil } + +func NewSettingsConfigurationOptions() *SettingsConfigurationOptions { + return &SettingsConfigurationOptions{ + Type: TypeSettingsConfiguration, + Audio: Audio{ + Input: &Input{ + Encoding: "linear16", + SampleRate: 16000, + }, + Output: &Output{ + Encoding: "linear16", + SampleRate: 16000, + Container: "none", + }, + }, + Agent: Agent{ + Listen: Listen{ + Model: "nova-2", + }, + Think: Think{ + Provider: Provider{ + Type: "", // Required to be set + }, + Model: "", // Required to be set + }, + Speak: Speak{ + Model: "aura-asteria-en", + }, + }, + } +} +func (o *SettingsConfigurationOptions) Check() error { + // checks + // currently no op + + return nil +} diff --git a/pkg/client/interfaces/v1/types-agent.go b/pkg/client/interfaces/v1/types-agent.go new file mode 100644 index 00000000..61280d71 --- /dev/null +++ b/pkg/client/interfaces/v1/types-agent.go @@ -0,0 +1,84 @@ +// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +// Use of this source code is governed by a MIT license that can be found in the LICENSE file. +// SPDX-License-Identifier: MIT + +package interfacesv1 + +/* +SettingsConfigurationOptions contain all of the knobs and dials to control the Agent API + +Please see the live/streaming documentation for more details: +XXXX +*/ +type SettingsConfigurationOptions struct { + Type string `json:"type"` + Audio Audio `json:"audio"` + Agent Agent `json:"agent"` + Context *Context `json:"context,omitempty"` +} + +/* +Sub-structs in SettingsConfigurationOptions +*/ +type Input struct { + Encoding string `json:"encoding,omitempty"` + SampleRate int `json:"sample_rate,omitempty"` +} +type Output struct { + Encoding string `json:"encoding,omitempty"` + SampleRate int `json:"sample_rate,omitempty"` + Bitrate int `json:"bitrate,omitempty"` + Container string `json:"container,omitempty"` +} +type Audio struct { + Input *Input `json:"input,omitempty"` + Output *Output `json:"output,omitempty"` +} +type Listen struct { + Model string `json:"model,omitempty"` +} +type Provider struct { + Type string `json:"type,omitempty"` +} +type Item struct { + Type string `json:"type,omitempty"` + Description string `json:"description,omitempty"` +} +type Properties struct { + Item Item `json:"item,omitempty"` +} +type Parameters struct { + Type string `json:"type,omitempty"` + Properties Properties `json:"properties,omitempty"` + Required []string `json:"required,omitempty"` +} +type Headers struct { + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} +type Functions struct { + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Parameters Parameters `json:"parameters,omitempty"` + URL *string `json:"url,omitempty"` + Headers *[]Headers `json:"headers,omitempty"` + Method *string `json:"method,omitempty"` +} +type Think struct { + Provider Provider `json:"provider"` + Model string `json:"model,omitempty"` + Instructions string `json:"instructions,omitempty"` + Functions []Functions `json:"functions,omitempty"` +} +type Speak struct { + Model string `json:"model,omitempty"` +} +type Agent struct { + Listen Listen `json:"listen"` + Think Think `json:"think"` + Speak Speak `json:"speak"` +} +type Context struct { + Messages map[string]string `json:"messages,omitempty"` + Replay bool `json:"replay,omitempty"` +} diff --git a/pkg/client/listen/deprecated.go b/pkg/client/listen/deprecated.go deleted file mode 100644 index fb523877..00000000 --- a/pkg/client/listen/deprecated.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 Deepgram SDK contributors. All Rights Reserved. -// Use of this source code is governed by a MIT license that can be found in the LICENSE file. -// SPDX-License-Identifier: MIT - -package listen - -import ( - "context" - - msginterfaces "github.com/deepgram/deepgram-go-sdk/pkg/api/listen/v1/websocket/interfaces" - interfaces "github.com/deepgram/deepgram-go-sdk/pkg/client/interfaces" - listenv1rest "github.com/deepgram/deepgram-go-sdk/pkg/client/listen/v1/rest" - listenv1ws "github.com/deepgram/deepgram-go-sdk/pkg/client/listen/v1/websocket" -) - -/***********************************/ -// Deprecated (THESE WILL STILL WORK, -// BUT WILL BE REMOVED IN A FUTURE RELEASE) -/***********************************/ -/* -NewWebSocketForDemo creates a new websocket connection with all default options - -Please see NewWebSocketUsingCallbackForDemo for more information. - -TODO: Deprecate this function later -*/ -func NewWebSocketForDemo(ctx context.Context, options *interfaces.LiveTranscriptionOptions) (*listenv1ws.Client, error) { - return NewWSUsingCallbackForDemo(ctx, options) -} - -/* -NewWebSocketWithDefaults creates a new websocket connection with all default options - -Please see NewWebSocketUsingCallbackWithDefaults for more information. - -TODO: Deprecate this function later -*/ -func NewWebSocketWithDefaults(ctx context.Context, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { - return NewWSUsingCallbackWithDefaults(ctx, tOptions, callback) -} - -/* -NewWebSocket creates a new websocket connection with the specified options - -Please see NewWebSocketUsingCallback for more information. - -TODO: Deprecate this function later -*/ -func NewWebSocket(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { - return NewWSUsingCallback(ctx, apiKey, cOptions, tOptions, callback) -} - -/* -NewWebSocketWithCancel creates a new websocket connection but has facilities to BYOC (Bring Your Own Cancel) - -Please see NewWebSocketUsingCallbackWithCancel for more information. - -TODO: Deprecate this function later -*/ -func NewWebSocketWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.Client, error) { - return NewWSUsingCallbackWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, callback) -} - -/***********************************/ -// REST Client -/***********************************/ -// PreRecordedClient is an alias for listenv1rest.Client -// -// Deprecated: This package is deprecated. Use RestClient instead. This will be removed in a future release. -type PreRecordedClient = listenv1rest.RESTClient - -// NewPreRecordedWithDefaults is an alias for NewRESTWithDefaults -// -// Deprecated: This package is deprecated. Use NewRESTWithDefaults instead. This will be removed in a future release. -func NewPreRecordedWithDefaults() *listenv1rest.RESTClient { - return NewRESTWithDefaults() -} - -// NewPreRecorded is an alias for NewREST -// -// Deprecated: This package is deprecated. Use NewREST instead. This will be removed in a future release. -func NewPreRecorded(apiKey string, options *interfaces.ClientOptions) *listenv1rest.RESTClient { - return NewREST(apiKey, options) -} - -/***********************************/ -// WebSocket / Streaming / Live -/***********************************/ -// LiveClient is an alias for listenv1rest.Client -// -// Deprecated: This alias is deprecated. Use WSCallback instead. This will be removed in a future release. -type LiveClient = listenv1ws.Client - -/* - Older "Live" functions -*/ -// NewLiveForDemo is an alias for NewWebSocketForDemo -// -// Deprecated: This package is deprecated. Use NewWebSocketForDemo instead. This will be removed in a future release. -func NewLiveForDemo(ctx context.Context, options *interfaces.LiveTranscriptionOptions) (*listenv1ws.WSCallback, error) { - return NewWebSocketForDemo(ctx, options) -} - -// NewLiveWithDefaults is an alias for NewWebSocketWithDefaults -// -// Deprecated: This package is deprecated. Use NewWebSocketWithDefaults instead. This will be removed in a future release. -func NewLiveWithDefaults(ctx context.Context, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.WSCallback, error) { - return NewWebSocketWithDefaults(ctx, tOptions, callback) -} - -// NewLive is an alias for NewWebSocket -// -// Deprecated: This package is deprecated. Use NewWebSocket instead. This will be removed in a future release. -func NewLive(ctx context.Context, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.WSCallback, error) { - return NewWebSocket(ctx, apiKey, cOptions, tOptions, callback) -} - -// NewLiveWithCancel is an alias for NewWebSocketWithCancel -// -// Deprecated: This package is deprecated. Use NewWebSocketWithCancel instead. This will be removed in a future release. -func NewLiveWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey string, cOptions *interfaces.ClientOptions, tOptions *interfaces.LiveTranscriptionOptions, callback msginterfaces.LiveMessageCallback) (*listenv1ws.WSCallback, error) { - return NewWebSocketWithCancel(ctx, ctxCancel, apiKey, cOptions, tOptions, callback) -} diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 4e5a9dba..010c096f 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -7,6 +7,9 @@ package common const ( // default host DefaultHost string = "api.deepgram.com" + + // default agent host + DefaultAgentHost string = "agent.deepgram.com" ) // LogLevel expressed as an int64 From ede21b18765d5b793db75f9fbcbd2d83ee23173e Mon Sep 17 00:00:00 2001 From: John Vajda Date: Thu, 30 Jan 2025 17:44:29 -0700 Subject: [PATCH 2/6] adds agent api --- examples/agent/websocket/simple/main.go | 27 ++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/examples/agent/websocket/simple/main.go b/examples/agent/websocket/simple/main.go index c79b9f50..41cd2736 100644 --- a/examples/agent/websocket/simple/main.go +++ b/examples/agent/websocket/simple/main.go @@ -33,6 +33,8 @@ type MyHandler struct { closeChan chan *msginterfaces.CloseResponse errorChan chan *msginterfaces.ErrorResponse unhandledChan chan *[]byte + injectionRefusedResponse chan *msginterfaces.InjectionRefusedResponse + keepAliveResponse chan *msginterfaces.KeepAlive } func NewMyHandler() *MyHandler { @@ -50,6 +52,8 @@ func NewMyHandler() *MyHandler { closeChan: make(chan *msginterfaces.CloseResponse), errorChan: make(chan *msginterfaces.ErrorResponse), unhandledChan: make(chan *[]byte), + injectionRefusedResponse: make(chan *msginterfaces.InjectionRefusedResponse), + keepAliveResponse: make(chan *msginterfaces.KeepAlive), } go func() { @@ -109,9 +113,6 @@ func (dch MyHandler) GetAgentAudioDone() []*chan *msginterfaces.AgentAudioDoneRe return []*chan *msginterfaces.AgentAudioDoneResponse{&dch.agentAudioDoneResponse} } -func (dch MyHandler) GetEndOfThought() []*chan *msginterfaces.EndOfThoughtResponse { - return []*chan *msginterfaces.EndOfThoughtResponse{&dch.endOfThoughtResponse} -} // GetClose returns the close channels func (dch MyHandler) GetClose() []*chan *msginterfaces.CloseResponse { return []*chan *msginterfaces.CloseResponse{&dch.closeChan} @@ -127,6 +128,16 @@ func (dch MyHandler) GetUnhandled() []*chan *[]byte { return []*chan *[]byte{&dch.unhandledChan} } +// GetInjectionRefused returns the injection refused response channels +func (dch MyHandler) GetInjectionRefused() []*chan *msginterfaces.InjectionRefusedResponse { + return []*chan *msginterfaces.InjectionRefusedResponse{&dch.injectionRefusedResponse} +} + +// GetKeepAlive returns the keep alive channels +func (dch MyHandler) GetKeepAlive() []*chan *msginterfaces.KeepAlive { + return []*chan *msginterfaces.KeepAlive{&dch.keepAliveResponse} +} + // Open is the callback for when the connection opens // golintci: funlen func (dch MyHandler) Run() error { @@ -287,6 +298,16 @@ func (dch MyHandler) Run() error { } }() + // keep alive response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.keepAliveResponse { + fmt.Printf("\n\n[KeepAliveResponse]\n\n") + } + }() + // close channel wgReceivers.Add(1) go func() { From 9d0bacfec5e69ba793f1f37c7cb2a532271c4eb7 Mon Sep 17 00:00:00 2001 From: John Vajda Date: Fri, 31 Jan 2025 11:24:45 -0700 Subject: [PATCH 3/6] fixes failed tests and code review feedback --- README.md | 32 ++++++++++++---- pkg/api/agent/v1/websocket/chan_default.go | 38 ++++++++++++++++++- pkg/api/agent/v1/websocket/chan_router.go | 35 +++++++++++++++++ .../v1/websocket/interfaces/constants.go | 1 + .../v1/websocket/interfaces/interfaces.go | 2 + .../agent/v1/websocket/interfaces/types.go | 6 +++ pkg/api/agent/v1/websocket/types.go | 4 ++ pkg/client/interfaces/v1/types-agent.go | 4 +- tests/daily_test/prerecorded_test.go | 2 +- tests/edge_cases/cancel/main.go | 2 +- tests/edge_cases/failed_retry/main.go | 2 +- tests/edge_cases/keepalive/main.go | 2 +- tests/edge_cases/reconnect_client/main.go | 3 +- tests/edge_cases/timeout/main.go | 2 +- ...99cf97131d81a63a2711f0563d37-response.json | 2 +- ...fe51eff47984886930b71fae0929-response.json | 2 +- 16 files changed, 121 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 79a4ed09..ebaeab97 100644 --- a/README.md +++ b/README.md @@ -196,28 +196,32 @@ There are examples for **every*- API call in this SDK. You can find all of these These examples provide: -Speech-to-Text - Live Audio / WebSocket: +### Agent + +- Agent Simple - [examples/agent/simple](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/agent/simple/main.go) + +### Speech-to-Text - Live Audio / WebSocket: - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) -Speech-to-Text - PreRecorded / REST: +### Speech-to-Text - PreRecorded / REST: - From an Audio File - [examples/speech-to-text/rest/file](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/file/main.go) - From an URL - [examples/speech-to-text/rest/url](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/url/main.go) - From an Audio Stream - [examples/speech-to-text/rest/stream](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/stream/main.go) -Speech-to-Text - Live Audio: +### Speech-to-Text - Live Audio: - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) -Text-to-Speech - WebSocket +### Text-to-Speech - WebSocket - Websocket Simple Example - [examples/text-to-speech/websocket/simple](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/simple/main.go) - Interactive Websocket - [examples/text-to-speech/websocket/interactive](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/websocket/interactive/main.go) -Text-to-Speech - REST +### Text-to-Speech - REST - Save audio to a Path - [examples/text-to-speech/rest/file](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/rest/file/main.go) - Save audio to a Stream/Buffer - [examples/text-to-speech/rest/stream](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/text-to-speech/rest/stream/main.go) @@ -256,7 +260,21 @@ client.Init(client.InitLib{ ## Testing -TBD +There are several test folders in [/tests](https://github.com/deepgram/deepgram-go-sdk/tree/main/tests) you can run: + +- unit_test/ - Unit tests +- daily_test/ - Integration/daily tests +- edge_cases/ - Edge case testing +- response_data/ - Test data +- utils/ - Test utilities + +To run the tests, you can use the following commands: + +Run specific tests in a directory: + +```bash +go run filename +``` ## Backwards Compatibility @@ -273,6 +291,6 @@ To make sure our community is safe for all, be sure to review and agree to our [ We love to hear from you so if you have questions, comments or find a bug in the project, let us know! You can either: -- [Open an issue in this repository](https://github.com/deepgram/deepgram-dotnet-sdk/issues/new) +- [Open an issue in this repository](https://github.com/deepgram/deepgram-go-sdk/issues/new) - [Join the Deepgram Github Discussions Community](https://github.com/orgs/deepgram/discussions) - [Join the Deepgram Discord Community](https://discord.gg/xWRaCDBtW4) diff --git a/pkg/api/agent/v1/websocket/chan_default.go b/pkg/api/agent/v1/websocket/chan_default.go index 4678e80d..e2652d82 100644 --- a/pkg/api/agent/v1/websocket/chan_default.go +++ b/pkg/api/agent/v1/websocket/chan_default.go @@ -42,7 +42,8 @@ func NewDefaultChanHandler() *DefaultChanHandler { functionCallingResponse: make(chan *interfaces.FunctionCallingResponse), agentStartedSpeakingResponse: make(chan *interfaces.AgentStartedSpeakingResponse), agentAudioDoneResponse: make(chan *interfaces.AgentAudioDoneResponse), - endOfThoughtResponse: make(chan *interfaces.EndOfThoughtResponse), + injectionRefusedResponse: make(chan *interfaces.InjectionRefusedResponse), + keepAliveResponse: make(chan *interfaces.KeepAlive), closeChan: make(chan *interfaces.CloseResponse), errorChan: make(chan *interfaces.ErrorResponse), unhandledChan: make(chan *[]byte), @@ -118,6 +119,16 @@ func (dch DefaultChanHandler) GetError() []*chan *interfaces.ErrorResponse { return []*chan *interfaces.ErrorResponse{&dch.errorChan} } +// GetInjectionRefused returns the injection refused channels +func (dch DefaultChanHandler) GetInjectionRefused() []*chan *interfaces.InjectionRefusedResponse { + return []*chan *interfaces.InjectionRefusedResponse{&dch.injectionRefusedResponse} +} + +// GetKeepAlive returns the keep alive channels +func (dch DefaultChanHandler) GetKeepAlive() []*chan *interfaces.KeepAlive { + return []*chan *interfaces.KeepAlive{&dch.keepAliveResponse} +} + // GetUnhandled returns the unhandled event channels func (dch DefaultChanHandler) GetUnhandled() []*chan *[]byte { return []*chan *[]byte{&dch.unhandledChan} @@ -385,6 +396,31 @@ func (dch DefaultChanHandler) Run() error { } }() + // keep alive response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for ka := range dch.keepAliveResponse { + if dch.debugWebsocket { + data, err := json.Marshal(ka) + if err != nil { + klog.V(1).Infof("KeepAlive json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nKeepAlive Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[KeepAliveResponse]\n\n") + } + }() + // close channel wgReceivers.Add(1) go func() { diff --git a/pkg/api/agent/v1/websocket/chan_router.go b/pkg/api/agent/v1/websocket/chan_router.go index 0d171bf3..4403e26a 100644 --- a/pkg/api/agent/v1/websocket/chan_router.go +++ b/pkg/api/agent/v1/websocket/chan_router.go @@ -50,6 +50,7 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter { functionCallingResponse: make([]*chan *interfaces.FunctionCallingResponse, 0), agentStartedSpeakingResponse: make([]*chan *interfaces.AgentStartedSpeakingResponse, 0), agentAudioDoneResponse: make([]*chan *interfaces.AgentAudioDoneResponse, 0), + injectionRefusedResponse: make([]*chan *interfaces.InjectionRefusedResponse, 0), closeChan: make([]*chan *interfaces.CloseResponse, 0), errorChan: make([]*chan *interfaces.ErrorResponse, 0), unhandledChan: make([]*chan *[]byte, 0), @@ -69,6 +70,7 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter { router.closeChan = append(router.closeChan, chans.GetClose()...) router.errorChan = append(router.errorChan, chans.GetError()...) router.unhandledChan = append(router.unhandledChan, chans.GetUnhandled()...) + router.injectionRefusedResponse = append(router.injectionRefusedResponse, chans.GetInjectionRefused()...) } return router @@ -316,6 +318,35 @@ func (r *ChanRouter) processErrorResponse(byMsg []byte) error { return r.processGeneric(string(interfaces.TypeErrorResponse), byMsg, action) } +func (r *ChanRouter) processInjectionRefused(byMsg []byte) error { + var response interfaces.InjectionRefusedResponse + if err := json.Unmarshal(byMsg, &response); err != nil { + return err + } + + for _, ch := range r.injectionRefusedResponse { + *ch <- &response + } + return nil +} + +func (r *ChanRouter) processKeepAlive(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.KeepAlive + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(KeepAlive) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.keepAliveResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeKeepAlive), byMsg, action) +} + // Message handles platform messages and routes them appropriately based on the MessageType func (r *ChanRouter) Message(byMsg []byte) error { klog.V(6).Infof("router.Message ENTER\n") @@ -351,6 +382,10 @@ func (r *ChanRouter) Message(byMsg []byte) error { err = r.processAgentAudioDone(byMsg) case interfaces.TypeResponse(interfaces.TypeErrorResponse): err = r.processErrorResponse(byMsg) + case interfaces.TypeInjectionRefusedResponse: + err = r.processInjectionRefused(byMsg) + case interfaces.TypeKeepAlive: + err = r.processKeepAlive(byMsg) default: err = r.UnhandledMessage(byMsg) } diff --git a/pkg/api/agent/v1/websocket/interfaces/constants.go b/pkg/api/agent/v1/websocket/interfaces/constants.go index 201fe905..eeaa8522 100644 --- a/pkg/api/agent/v1/websocket/interfaces/constants.go +++ b/pkg/api/agent/v1/websocket/interfaces/constants.go @@ -36,4 +36,5 @@ const ( TypeAgentAudioDoneResponse = "AgentAudioDone" TypeCloseResponse = commoninterfaces.TypeCloseResponse TypeErrorResponse = commoninterfaces.TypeErrorResponse + TypeInjectionRefusedResponse = "InjectionRefused" ) diff --git a/pkg/api/agent/v1/websocket/interfaces/interfaces.go b/pkg/api/agent/v1/websocket/interfaces/interfaces.go index 79ce0e86..85144c7a 100644 --- a/pkg/api/agent/v1/websocket/interfaces/interfaces.go +++ b/pkg/api/agent/v1/websocket/interfaces/interfaces.go @@ -23,4 +23,6 @@ type AgentMessageChan interface { GetClose() []*chan *CloseResponse GetError() []*chan *ErrorResponse GetUnhandled() []*chan *[]byte + GetInjectionRefused() []*chan *InjectionRefusedResponse + GetKeepAlive() []*chan *KeepAlive } diff --git a/pkg/api/agent/v1/websocket/interfaces/types.go b/pkg/api/agent/v1/websocket/interfaces/types.go index 2074dba7..d82bfd71 100644 --- a/pkg/api/agent/v1/websocket/interfaces/types.go +++ b/pkg/api/agent/v1/websocket/interfaces/types.go @@ -132,3 +132,9 @@ type CloseResponse = commoninterfaces.CloseResponse // ErrorResponse is the Deepgram specific response error type ErrorResponse = interfaces.DeepgramError + +// InjectionRefusedResponse is the response when an agent message injection is refused +type InjectionRefusedResponse struct { + Type string `json:"type,omitempty"` + Message string `json:"message,omitempty"` +} diff --git a/pkg/api/agent/v1/websocket/types.go b/pkg/api/agent/v1/websocket/types.go index 125408d5..ef1da7dc 100644 --- a/pkg/api/agent/v1/websocket/types.go +++ b/pkg/api/agent/v1/websocket/types.go @@ -27,6 +27,8 @@ type DefaultChanHandler struct { functionCallingResponse chan *interfaces.FunctionCallingResponse agentStartedSpeakingResponse chan *interfaces.AgentStartedSpeakingResponse agentAudioDoneResponse chan *interfaces.AgentAudioDoneResponse + injectionRefusedResponse chan *interfaces.InjectionRefusedResponse + keepAliveResponse chan *interfaces.KeepAlive closeChan chan *interfaces.CloseResponse errorChan chan *interfaces.ErrorResponse unhandledChan chan *[]byte @@ -47,6 +49,8 @@ type ChanRouter struct { functionCallingResponse []*chan *interfaces.FunctionCallingResponse agentStartedSpeakingResponse []*chan *interfaces.AgentStartedSpeakingResponse agentAudioDoneResponse []*chan *interfaces.AgentAudioDoneResponse + injectionRefusedResponse []*chan *interfaces.InjectionRefusedResponse + keepAliveResponse []*chan *interfaces.KeepAlive closeChan []*chan *interfaces.CloseResponse errorChan []*chan *interfaces.ErrorResponse unhandledChan []*chan *[]byte diff --git a/pkg/client/interfaces/v1/types-agent.go b/pkg/client/interfaces/v1/types-agent.go index 61280d71..376fcfcc 100644 --- a/pkg/client/interfaces/v1/types-agent.go +++ b/pkg/client/interfaces/v1/types-agent.go @@ -71,7 +71,9 @@ type Think struct { Functions []Functions `json:"functions,omitempty"` } type Speak struct { - Model string `json:"model,omitempty"` + Model string `json:"model,omitempty"` + Provider string `json:"provider,omitempty"` + VoiceID string `json:"voice_id,omitempty"` } type Agent struct { Listen Listen `json:"listen"` diff --git a/tests/daily_test/prerecorded_test.go b/tests/daily_test/prerecorded_test.go index f848714f..8f03f829 100644 --- a/tests/daily_test/prerecorded_test.go +++ b/tests/daily_test/prerecorded_test.go @@ -29,7 +29,7 @@ const ( ) const ( - FromURLSmartFormat = "Yep. I said it before and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it." + FromURLSmartFormat = "Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it." FromURLSummarize = "Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it." ) diff --git a/tests/edge_cases/cancel/main.go b/tests/edge_cases/cancel/main.go index 7204b471..eaf76180 100644 --- a/tests/edge_cases/cancel/main.go +++ b/tests/edge_cases/cancel/main.go @@ -36,7 +36,7 @@ func main() { } // use the default callback handler which just dumps all messages to the screen - dgClient, err := client.NewWebSocketWithCancel(ctx, ctxCancel, "", cOptions, tOptions, nil) + dgClient, err := client.NewWebSocketUsingChanWithCancel(ctx, ctxCancel, "", cOptions, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveClient connection:", err) return diff --git a/tests/edge_cases/failed_retry/main.go b/tests/edge_cases/failed_retry/main.go index 5a8a9606..92589338 100644 --- a/tests/edge_cases/failed_retry/main.go +++ b/tests/edge_cases/failed_retry/main.go @@ -34,7 +34,7 @@ func main() { } // use the default callback handler which just dumps all messages to the screen - dgClient, err := client.NewWebSocket(ctx, "", cOptions, tOptions, nil) + dgClient, err := client.NewWebSocketUsingChan(ctx, "", cOptions, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveClient connection:", err) return diff --git a/tests/edge_cases/keepalive/main.go b/tests/edge_cases/keepalive/main.go index 052b85a9..64fd0ba9 100644 --- a/tests/edge_cases/keepalive/main.go +++ b/tests/edge_cases/keepalive/main.go @@ -33,7 +33,7 @@ func main() { } // use the default callback handler which just dumps all messages to the screen - dgClient, err := client.NewWebSocket(ctx, "", cOptions, tOptions, nil) + dgClient, err := client.NewWebSocketUsingChan(ctx, "", cOptions, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveClient connection:", err) return diff --git a/tests/edge_cases/reconnect_client/main.go b/tests/edge_cases/reconnect_client/main.go index 0a5163cd..960606d1 100644 --- a/tests/edge_cases/reconnect_client/main.go +++ b/tests/edge_cases/reconnect_client/main.go @@ -74,7 +74,6 @@ func (c MyCallback) UtteranceEnd(ur *api.UtteranceEndResponse) error { } else { fmt.Printf("\n[UtteranceEnd] Received\n") } - return nil } @@ -142,7 +141,7 @@ func main() { } // create a Deepgram client - dgClient, err := client.NewWebSocket(ctx, "", cOptions, tOptions, callback) + dgClient, err := client.NewWebSocketUsingCallback(ctx, "", cOptions, tOptions, callback) if err != nil { fmt.Println("ERROR creating LiveTranscription connection:", err) return diff --git a/tests/edge_cases/timeout/main.go b/tests/edge_cases/timeout/main.go index 9f16ff22..2b5d6891 100644 --- a/tests/edge_cases/timeout/main.go +++ b/tests/edge_cases/timeout/main.go @@ -28,7 +28,7 @@ func main() { } // use the default callback handler which just dumps all messages to the screen - dgClient, err := client.NewWebSocketWithDefaults(ctx, tOptions, nil) + dgClient, err := client.NewWSUsingChanWithDefaults(ctx, tOptions, nil) if err != nil { fmt.Println("ERROR creating LiveClient connection:", err) return diff --git a/tests/response_data/642c86c60eedbc4af873632b86d68164149599cf97131d81a63a2711f0563d37-response.json b/tests/response_data/642c86c60eedbc4af873632b86d68164149599cf97131d81a63a2711f0563d37-response.json index 249d61cc..ecddfe98 100755 --- a/tests/response_data/642c86c60eedbc4af873632b86d68164149599cf97131d81a63a2711f0563d37-response.json +++ b/tests/response_data/642c86c60eedbc4af873632b86d68164149599cf97131d81a63a2711f0563d37-response.json @@ -1 +1 @@ -{"metadata":{"transaction_key":"deprecated","request_id":"29040720-d419-48c5-88b1-5933fa01c132","sha256":"5324da68ede209a16ac69a38e8cd29cee4d754434a041166cda3a1f5e0b24566","created":"2024-08-26T16:47:46.221Z","duration":17.566313,"channels":1,"models":["1abfe86b-e047-4eed-858a-35e5625b41ee"],"model_info":{"1abfe86b-e047-4eed-858a-35e5625b41ee":{"name":"2-general-nova","version":"2024-01-06.5664","arch":"nova-2"}},"summary_info":{"model_uuid":"67875a7f-c9c4-48a0-aa55-5bdb8a91c34a"}},"results":{"channels":[{"alternatives":[{"transcript":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","confidence":0.99953806,"words":[{"word":"yep","start":5.6,"end":6.1,"confidence":0.9976238,"punctuated_word":"Yep."},{"word":"i","start":7.04,"end":7.2799997,"confidence":0.71035343,"punctuated_word":"I"},{"word":"said","start":7.2799997,"end":7.52,"confidence":0.96610147,"punctuated_word":"said"},{"word":"it","start":7.52,"end":7.6,"confidence":0.99953806,"punctuated_word":"it"},{"word":"before","start":7.6,"end":7.9199996,"confidence":0.8144645,"punctuated_word":"before,"},{"word":"and","start":7.9199996,"end":8.08,"confidence":0.9998975,"punctuated_word":"and"},{"word":"i'll","start":8.08,"end":8.24,"confidence":0.99988437,"punctuated_word":"I'll"},{"word":"say","start":8.24,"end":8.48,"confidence":0.9997116,"punctuated_word":"say"},{"word":"it","start":8.48,"end":8.639999,"confidence":0.9998079,"punctuated_word":"it"},{"word":"again","start":8.639999,"end":9.139999,"confidence":0.95415795,"punctuated_word":"again."},{"word":"life","start":9.991312,"end":10.391313,"confidence":0.99934644,"punctuated_word":"Life"},{"word":"moves","start":10.391313,"end":10.711312,"confidence":0.99980146,"punctuated_word":"moves"},{"word":"pretty","start":10.711312,"end":11.031313,"confidence":0.9998349,"punctuated_word":"pretty"},{"word":"fast","start":11.031313,"end":11.531313,"confidence":0.9997705,"punctuated_word":"fast."},{"word":"you","start":11.991312,"end":12.231313,"confidence":0.9602717,"punctuated_word":"You"},{"word":"don't","start":12.231313,"end":12.4713125,"confidence":0.99991965,"punctuated_word":"don't"},{"word":"stop","start":12.4713125,"end":12.711312,"confidence":0.99985266,"punctuated_word":"stop"},{"word":"and","start":12.711312,"end":12.871312,"confidence":0.99942976,"punctuated_word":"and"},{"word":"look","start":12.871312,"end":13.031313,"confidence":0.999892,"punctuated_word":"look"},{"word":"around","start":13.031313,"end":13.351313,"confidence":0.9998568,"punctuated_word":"around"},{"word":"once","start":13.351313,"end":13.591312,"confidence":0.99925345,"punctuated_word":"once"},{"word":"in","start":13.591312,"end":13.671312,"confidence":0.9984509,"punctuated_word":"in"},{"word":"a","start":13.671312,"end":13.831312,"confidence":0.9846156,"punctuated_word":"a"},{"word":"while","start":13.831312,"end":14.331312,"confidence":0.94432104,"punctuated_word":"while,"},{"word":"you","start":14.631312,"end":14.791312,"confidence":0.9986889,"punctuated_word":"you"},{"word":"could","start":14.791312,"end":14.951313,"confidence":0.9996587,"punctuated_word":"could"},{"word":"miss","start":14.951313,"end":15.191313,"confidence":0.99969184,"punctuated_word":"miss"},{"word":"it","start":15.191313,"end":15.691313,"confidence":0.99777055,"punctuated_word":"it."}]}]}],"summary":{"short":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","result":"success"}}} \ No newline at end of file +{"metadata":{"transaction_key":"deprecated","request_id":"3407ef63-d66a-4aad-893b-b0df0a89840d","sha256":"5324da68ede209a16ac69a38e8cd29cee4d754434a041166cda3a1f5e0b24566","created":"2025-01-31T00:32:07.106Z","duration":17.566313,"channels":1,"models":["1abfe86b-e047-4eed-858a-35e5625b41ee"],"model_info":{"1abfe86b-e047-4eed-858a-35e5625b41ee":{"name":"2-general-nova","version":"2024-01-06.5664","arch":"nova-2"}},"summary_info":{"model_uuid":"67875a7f-c9c4-48a0-aa55-5bdb8a91c34a"}},"results":{"channels":[{"alternatives":[{"transcript":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","confidence":0.9995364,"words":[{"word":"yep","start":5.6,"end":6.1,"confidence":0.99766016,"punctuated_word":"Yep."},{"word":"i","start":7.04,"end":7.2799997,"confidence":0.708042,"punctuated_word":"I"},{"word":"said","start":7.2799997,"end":7.44,"confidence":0.96501267,"punctuated_word":"said"},{"word":"it","start":7.44,"end":7.6,"confidence":0.9995364,"punctuated_word":"it"},{"word":"before","start":7.6,"end":7.9199996,"confidence":0.8164387,"punctuated_word":"before,"},{"word":"and","start":7.9199996,"end":8.08,"confidence":0.99989605,"punctuated_word":"and"},{"word":"i'll","start":8.08,"end":8.24,"confidence":0.99988526,"punctuated_word":"I'll"},{"word":"say","start":8.24,"end":8.48,"confidence":0.999711,"punctuated_word":"say"},{"word":"it","start":8.48,"end":8.639999,"confidence":0.9998086,"punctuated_word":"it"},{"word":"again","start":8.639999,"end":9.139999,"confidence":0.9539417,"punctuated_word":"again."},{"word":"life","start":9.991312,"end":10.391313,"confidence":0.999337,"punctuated_word":"Life"},{"word":"moves","start":10.391313,"end":10.711312,"confidence":0.99979717,"punctuated_word":"moves"},{"word":"pretty","start":10.711312,"end":11.031313,"confidence":0.99983025,"punctuated_word":"pretty"},{"word":"fast","start":11.031313,"end":11.531313,"confidence":0.999768,"punctuated_word":"fast."},{"word":"you","start":11.991312,"end":12.231313,"confidence":0.959791,"punctuated_word":"You"},{"word":"don't","start":12.231313,"end":12.4713125,"confidence":0.9999174,"punctuated_word":"don't"},{"word":"stop","start":12.4713125,"end":12.711312,"confidence":0.9998504,"punctuated_word":"stop"},{"word":"and","start":12.711312,"end":12.871312,"confidence":0.9994228,"punctuated_word":"and"},{"word":"look","start":12.871312,"end":13.031313,"confidence":0.99988985,"punctuated_word":"look"},{"word":"around","start":13.031313,"end":13.351313,"confidence":0.9998529,"punctuated_word":"around"},{"word":"once","start":13.351313,"end":13.591312,"confidence":0.9992393,"punctuated_word":"once"},{"word":"in","start":13.591312,"end":13.751312,"confidence":0.99844617,"punctuated_word":"in"},{"word":"a","start":13.751312,"end":13.831312,"confidence":0.9845093,"punctuated_word":"a"},{"word":"while","start":13.831312,"end":14.331312,"confidence":0.9434917,"punctuated_word":"while,"},{"word":"you","start":14.631312,"end":14.791312,"confidence":0.9986513,"punctuated_word":"you"},{"word":"could","start":14.791312,"end":14.951313,"confidence":0.9996413,"punctuated_word":"could"},{"word":"miss","start":14.951313,"end":15.191313,"confidence":0.99967337,"punctuated_word":"miss"},{"word":"it","start":15.191313,"end":15.691313,"confidence":0.99765414,"punctuated_word":"it."}]}]}],"summary":{"short":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","result":"success"}}} \ No newline at end of file diff --git a/tests/response_data/bfae00d50d521f470ff9d1943f32225fcfeffe51eff47984886930b71fae0929-response.json b/tests/response_data/bfae00d50d521f470ff9d1943f32225fcfeffe51eff47984886930b71fae0929-response.json index 2c78f154..4ad4e561 100755 --- a/tests/response_data/bfae00d50d521f470ff9d1943f32225fcfeffe51eff47984886930b71fae0929-response.json +++ b/tests/response_data/bfae00d50d521f470ff9d1943f32225fcfeffe51eff47984886930b71fae0929-response.json @@ -1 +1 @@ -{"metadata":{"transaction_key":"deprecated","request_id":"b4692c7a-6db1-45b5-9c41-65054f098083","sha256":"5324da68ede209a16ac69a38e8cd29cee4d754434a041166cda3a1f5e0b24566","created":"2024-08-26T16:47:43.269Z","duration":17.566313,"channels":1,"models":["30089e05-99d1-4376-b32e-c263170674af"],"model_info":{"30089e05-99d1-4376-b32e-c263170674af":{"name":"2-general-nova","version":"2024-01-09.29447","arch":"nova-2"}}},"results":{"channels":[{"alternatives":[{"transcript":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","confidence":0.99853826,"words":[{"word":"yep","start":5.52,"end":6.02,"confidence":0.99584043,"punctuated_word":"Yep."},{"word":"i","start":7.04,"end":7.2799997,"confidence":0.5126306,"punctuated_word":"I"},{"word":"said","start":7.2799997,"end":7.44,"confidence":0.9672295,"punctuated_word":"said"},{"word":"it","start":7.44,"end":7.6,"confidence":0.9997284,"punctuated_word":"it"},{"word":"before","start":7.6,"end":7.9199996,"confidence":0.7846241,"punctuated_word":"before,"},{"word":"and","start":7.9199996,"end":8.16,"confidence":0.9998627,"punctuated_word":"and"},{"word":"i'll","start":8.16,"end":8.32,"confidence":0.9998944,"punctuated_word":"I'll"},{"word":"say","start":8.32,"end":8.48,"confidence":0.9996517,"punctuated_word":"say"},{"word":"it","start":8.48,"end":8.639999,"confidence":0.99982834,"punctuated_word":"it"},{"word":"again","start":8.639999,"end":9.139999,"confidence":0.97370577,"punctuated_word":"again."},{"word":"life","start":9.991312,"end":10.391313,"confidence":0.9957366,"punctuated_word":"Life"},{"word":"moves","start":10.391313,"end":10.711312,"confidence":0.9988586,"punctuated_word":"moves"},{"word":"pretty","start":10.711312,"end":11.031313,"confidence":0.9996014,"punctuated_word":"pretty"},{"word":"fast","start":11.031313,"end":11.531313,"confidence":0.9995537,"punctuated_word":"fast."},{"word":"you","start":12.071312,"end":12.231313,"confidence":0.9514749,"punctuated_word":"You"},{"word":"don't","start":12.231313,"end":12.4713125,"confidence":0.99988735,"punctuated_word":"don't"},{"word":"stop","start":12.4713125,"end":12.711312,"confidence":0.99979633,"punctuated_word":"stop"},{"word":"and","start":12.711312,"end":12.871312,"confidence":0.9987136,"punctuated_word":"and"},{"word":"look","start":12.871312,"end":13.031313,"confidence":0.9996673,"punctuated_word":"look"},{"word":"around","start":13.031313,"end":13.351313,"confidence":0.9995766,"punctuated_word":"around"},{"word":"once","start":13.351313,"end":13.591312,"confidence":0.998198,"punctuated_word":"once"},{"word":"in","start":13.591312,"end":13.751312,"confidence":0.99853826,"punctuated_word":"in"},{"word":"a","start":13.751312,"end":13.831312,"confidence":0.9861093,"punctuated_word":"a"},{"word":"while","start":13.831312,"end":14.331312,"confidence":0.92627394,"punctuated_word":"while,"},{"word":"you","start":14.631312,"end":14.791312,"confidence":0.997024,"punctuated_word":"you"},{"word":"could","start":14.791312,"end":14.951313,"confidence":0.9983543,"punctuated_word":"could"},{"word":"miss","start":14.951313,"end":15.191313,"confidence":0.9984425,"punctuated_word":"miss"},{"word":"it","start":15.191313,"end":15.691313,"confidence":0.9950415,"punctuated_word":"it."}],"paragraphs":{"transcript":"\nYep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","paragraphs":[{"sentences":[{"text":"Yep.","start":5.52,"end":6.02},{"text":"I said it before, and I'll say it again.","start":7.04,"end":9.139999},{"text":"Life moves pretty fast.","start":9.991312,"end":11.531313},{"text":"You don't stop and look around once in a while, you could miss it.","start":12.071312,"end":15.691313}],"num_words":28,"start":5.52,"end":15.691313}]}}]}]}} \ No newline at end of file +{"metadata":{"transaction_key":"deprecated","request_id":"4e9a855f-ca0f-4274-b411-ea32dbc16d29","sha256":"5324da68ede209a16ac69a38e8cd29cee4d754434a041166cda3a1f5e0b24566","created":"2025-01-31T00:32:06.663Z","duration":17.566313,"channels":1,"models":["30089e05-99d1-4376-b32e-c263170674af"],"model_info":{"30089e05-99d1-4376-b32e-c263170674af":{"name":"2-general-nova","version":"2024-01-09.29447","arch":"nova-2"}}},"results":{"channels":[{"alternatives":[{"transcript":"Yep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","confidence":0.9984518,"words":[{"word":"yep","start":5.52,"end":6.02,"confidence":0.99584556,"punctuated_word":"Yep."},{"word":"i","start":7.04,"end":7.2799997,"confidence":0.5232183,"punctuated_word":"I"},{"word":"said","start":7.2799997,"end":7.44,"confidence":0.964605,"punctuated_word":"said"},{"word":"it","start":7.44,"end":7.6,"confidence":0.9997186,"punctuated_word":"it"},{"word":"before","start":7.6,"end":7.9199996,"confidence":0.7852936,"punctuated_word":"before,"},{"word":"and","start":7.9199996,"end":8.16,"confidence":0.999858,"punctuated_word":"and"},{"word":"i'll","start":8.16,"end":8.24,"confidence":0.99988794,"punctuated_word":"I'll"},{"word":"say","start":8.24,"end":8.48,"confidence":0.99964166,"punctuated_word":"say"},{"word":"it","start":8.48,"end":8.639999,"confidence":0.9998179,"punctuated_word":"it"},{"word":"again","start":8.639999,"end":9.139999,"confidence":0.97208285,"punctuated_word":"again."},{"word":"life","start":9.991312,"end":10.391313,"confidence":0.9956626,"punctuated_word":"Life"},{"word":"moves","start":10.391313,"end":10.711312,"confidence":0.9987895,"punctuated_word":"moves"},{"word":"pretty","start":10.711312,"end":11.031313,"confidence":0.99958473,"punctuated_word":"pretty"},{"word":"fast","start":11.031313,"end":11.531313,"confidence":0.9995389,"punctuated_word":"fast."},{"word":"you","start":12.071312,"end":12.231313,"confidence":0.9496514,"punctuated_word":"You"},{"word":"don't","start":12.231313,"end":12.4713125,"confidence":0.9998871,"punctuated_word":"don't"},{"word":"stop","start":12.4713125,"end":12.711312,"confidence":0.99978834,"punctuated_word":"stop"},{"word":"and","start":12.711312,"end":12.871312,"confidence":0.99870694,"punctuated_word":"and"},{"word":"look","start":12.871312,"end":13.031313,"confidence":0.9996247,"punctuated_word":"look"},{"word":"around","start":13.031313,"end":13.351313,"confidence":0.9995322,"punctuated_word":"around"},{"word":"once","start":13.351313,"end":13.591312,"confidence":0.9980415,"punctuated_word":"once"},{"word":"in","start":13.591312,"end":13.751312,"confidence":0.99843293,"punctuated_word":"in"},{"word":"a","start":13.751312,"end":13.831312,"confidence":0.98599744,"punctuated_word":"a"},{"word":"while","start":13.831312,"end":14.331312,"confidence":0.92610466,"punctuated_word":"while,"},{"word":"you","start":14.631312,"end":14.791312,"confidence":0.9967163,"punctuated_word":"you"},{"word":"could","start":14.791312,"end":14.951313,"confidence":0.9983016,"punctuated_word":"could"},{"word":"miss","start":14.951313,"end":15.191313,"confidence":0.9984518,"punctuated_word":"miss"},{"word":"it","start":15.191313,"end":15.691313,"confidence":0.9958323,"punctuated_word":"it."}],"paragraphs":{"transcript":"\nYep. I said it before, and I'll say it again. Life moves pretty fast. You don't stop and look around once in a while, you could miss it.","paragraphs":[{"sentences":[{"text":"Yep.","start":5.52,"end":6.02},{"text":"I said it before, and I'll say it again.","start":7.04,"end":9.139999},{"text":"Life moves pretty fast.","start":9.991312,"end":11.531313},{"text":"You don't stop and look around once in a while, you could miss it.","start":12.071312,"end":15.691313}],"num_words":28,"start":5.52,"end":15.691313}]}}]}]}} \ No newline at end of file From d6f1755df1484399556d732d318ac2db31eb0124 Mon Sep 17 00:00:00 2001 From: John Vajda Date: Fri, 31 Jan 2025 11:33:42 -0700 Subject: [PATCH 4/6] fixes MD linting errors --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ebaeab97..94e2b4dd 100644 --- a/README.md +++ b/README.md @@ -200,18 +200,18 @@ These examples provide: - Agent Simple - [examples/agent/simple](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/agent/simple/main.go) -### Speech-to-Text - Live Audio / WebSocket: +### Speech-to-Text - Live Audio / WebSocket - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) -### Speech-to-Text - PreRecorded / REST: +### Speech-to-Text - PreRecorded / REST - From an Audio File - [examples/speech-to-text/rest/file](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/file/main.go) - From an URL - [examples/speech-to-text/rest/url](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/url/main.go) - From an Audio Stream - [examples/speech-to-text/rest/stream](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/rest/stream/main.go) -### Speech-to-Text - Live Audio: +### Speech-to-Text - Live Audio - From a Microphone - [examples/speech-to-text/websocket/microphone](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/microphone/main.go) - From an HTTP Endpoint - [examples/speech-to-text/websocket/http](https://github.com/deepgram/deepgram-go-sdk/blob/main/examples/speech-to-text/websocket/http/main.go) From 914f66f26be5138835f9ff40a2075f9fa515b004 Mon Sep 17 00:00:00 2001 From: John Vajda Date: Fri, 31 Jan 2025 11:42:39 -0700 Subject: [PATCH 5/6] Fixes incorrect error message in error channel handler --- pkg/api/agent/v1/websocket/chan_router.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/api/agent/v1/websocket/chan_router.go b/pkg/api/agent/v1/websocket/chan_router.go index 4403e26a..8c30031a 100644 --- a/pkg/api/agent/v1/websocket/chan_router.go +++ b/pkg/api/agent/v1/websocket/chan_router.go @@ -319,15 +319,20 @@ func (r *ChanRouter) processErrorResponse(byMsg []byte) error { } func (r *ChanRouter) processInjectionRefused(byMsg []byte) error { - var response interfaces.InjectionRefusedResponse - if err := json.Unmarshal(byMsg, &response); err != nil { - return err - } + action := func(data []byte) error { + var msg interfaces.InjectionRefusedResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(InjectionRefusedResponse) failed. Err: %v\n", err) + return err + } - for _, ch := range r.injectionRefusedResponse { - *ch <- &response + for _, ch := range r.injectionRefusedResponse { + *ch <- &msg + } + return nil } - return nil + + return r.processGeneric(string(interfaces.TypeInjectionRefusedResponse), byMsg, action) } func (r *ChanRouter) processKeepAlive(byMsg []byte) error { From cc14d32d90a17cbc7275786130887f12c5186bb4 Mon Sep 17 00:00:00 2001 From: John Vajda Date: Fri, 31 Jan 2025 15:14:20 -0700 Subject: [PATCH 6/6] adds SettingsAppliedResponse --- examples/agent/websocket/simple/main.go | 43 ++++++++++++++----- pkg/api/agent/v1/websocket/chan_default.go | 30 +++++++++++++ pkg/api/agent/v1/websocket/chan_router.go | 23 ++++++++++ .../v1/websocket/interfaces/constants.go | 1 + .../v1/websocket/interfaces/interfaces.go | 1 + .../agent/v1/websocket/interfaces/types.go | 5 +++ pkg/api/agent/v1/websocket/types.go | 2 + 7 files changed, 95 insertions(+), 10 deletions(-) diff --git a/examples/agent/websocket/simple/main.go b/examples/agent/websocket/simple/main.go index 41cd2736..5898092a 100644 --- a/examples/agent/websocket/simple/main.go +++ b/examples/agent/websocket/simple/main.go @@ -35,6 +35,7 @@ type MyHandler struct { unhandledChan chan *[]byte injectionRefusedResponse chan *msginterfaces.InjectionRefusedResponse keepAliveResponse chan *msginterfaces.KeepAlive + settingsAppliedResponse chan *msginterfaces.SettingsAppliedResponse } func NewMyHandler() *MyHandler { @@ -54,6 +55,7 @@ func NewMyHandler() *MyHandler { unhandledChan: make(chan *[]byte), injectionRefusedResponse: make(chan *msginterfaces.InjectionRefusedResponse), keepAliveResponse: make(chan *msginterfaces.KeepAlive), + settingsAppliedResponse: make(chan *msginterfaces.SettingsAppliedResponse), } go func() { @@ -138,6 +140,11 @@ func (dch MyHandler) GetKeepAlive() []*chan *msginterfaces.KeepAlive { return []*chan *msginterfaces.KeepAlive{&dch.keepAliveResponse} } +// GetSettingsApplied returns the settings applied response channels +func (dch MyHandler) GetSettingsApplied() []*chan *msginterfaces.SettingsAppliedResponse { + return []*chan *msginterfaces.SettingsAppliedResponse{&dch.settingsAppliedResponse} +} + // Open is the callback for when the connection opens // golintci: funlen func (dch MyHandler) Run() error { @@ -152,8 +159,8 @@ func (dch MyHandler) Run() error { lastBytesReceived := time.Now().Add(-7 * time.Second) for br := range dch.binaryChan { - fmt.Printf("\n\n[Binary Data]\n\n") - fmt.Printf("Size: %d\n\n", len(*br)) + fmt.Printf("\n\n[Binary Data Received]\n") + fmt.Printf("Size: %d bytes\n", len(*br)) if lastBytesReceived.Add(5 * time.Second).Before(time.Now()) { counter = counter + 1 @@ -308,6 +315,16 @@ func (dch MyHandler) Run() error { } }() + // settings applied response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for _ = range dch.settingsAppliedResponse { + fmt.Printf("\n\n[SettingsAppliedResponse]\n\n") + } + }() + // close channel wgReceivers.Add(1) go func() { @@ -338,8 +355,8 @@ func (dch MyHandler) Run() error { defer wgReceivers.Done() for byData := range dch.unhandledChan { - fmt.Printf("\n[UnhandledEvent]") - fmt.Printf("Dump:\n%s\n\n", string(*byData)) + fmt.Printf("\n[UnhandledEvent]\n") + fmt.Printf("Raw message: %s\n", string(*byData)) } }() @@ -378,28 +395,30 @@ func main() { tOptions.Agent.Think.Instructions = "You are a helpful AI assistant." // implement your own callback - var callback msginterfaces.AgentMessageChan - callback = *NewMyHandler() + callback := msginterfaces.AgentMessageChan(*NewMyHandler()) // create a Deepgram client + fmt.Printf("Creating new Deepgram WebSocket client...\n") dgClient, err := client.NewWSUsingChan(ctx, "", cOptions, tOptions, callback) if err != nil { - fmt.Println("ERROR creating LiveTranscription connection:", err) + fmt.Printf("ERROR creating LiveTranscription connection:\n- Error: %v\n- Type: %T\n", err, err) return } // connect the websocket to Deepgram - fmt.Printf("Starting Agent...\n") + fmt.Printf("Attempting to connect to Deepgram WebSocket...\n") bConnected := dgClient.Connect() if !bConnected { - fmt.Println("Client.Connect failed") + fmt.Printf("WebSocket connection failed - check your API key and network connection\n") os.Exit(1) } + fmt.Printf("Successfully connected to Deepgram WebSocket\n") /* Microphone package */ - // mic stuf + // mic stuff + fmt.Printf("Initializing microphone...\n") mic, err := microphone.New(microphone.AudioConfig{ InputChannels: 1, SamplingRate: 16000, @@ -408,6 +427,7 @@ func main() { fmt.Printf("Initialize failed. Err: %v\n", err) os.Exit(1) } + fmt.Printf("Microphone initialized successfully\n") // start the mic fmt.Printf("Starting Microphone...\n") @@ -416,10 +436,13 @@ func main() { fmt.Printf("mic.Start failed. Err: %v\n", err) os.Exit(1) } + fmt.Printf("Microphone started successfully\n") go func() { + fmt.Printf("Starting audio stream...\n") // feed the microphone stream to the Deepgram client (this is a blocking call) mic.Stream(dgClient) + fmt.Printf("Audio stream ended\n") }() // wait for user input to exit diff --git a/pkg/api/agent/v1/websocket/chan_default.go b/pkg/api/agent/v1/websocket/chan_default.go index e2652d82..586cb908 100644 --- a/pkg/api/agent/v1/websocket/chan_default.go +++ b/pkg/api/agent/v1/websocket/chan_default.go @@ -134,6 +134,11 @@ func (dch DefaultChanHandler) GetUnhandled() []*chan *[]byte { return []*chan *[]byte{&dch.unhandledChan} } +// GetSettingsApplied returns the settings applied response channels +func (dch DefaultChanHandler) GetSettingsApplied() []*chan *interfaces.SettingsAppliedResponse { + return []*chan *interfaces.SettingsAppliedResponse{&dch.settingsAppliedResponse} +} + // Open is the callback for when the connection opens // //nolint:funlen,gocyclo // this is a complex function. keep as is @@ -421,6 +426,31 @@ func (dch DefaultChanHandler) Run() error { } }() + // settings applied response channel + wgReceivers.Add(1) + go func() { + defer wgReceivers.Done() + + for sa := range dch.settingsAppliedResponse { + if dch.debugWebsocket { + data, err := json.Marshal(sa) + if err != nil { + klog.V(1).Infof("SettingsApplied json.Marshal failed. Err: %v\n", err) + continue + } + + prettyJSON, err := prettyjson.Format(data) + if err != nil { + klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err) + continue + } + klog.V(2).Infof("\n\nSettingsApplied Object:\n%s\n\n", prettyJSON) + } + + fmt.Printf("\n\n[SettingsAppliedResponse]\n\n") + } + }() + // close channel wgReceivers.Add(1) go func() { diff --git a/pkg/api/agent/v1/websocket/chan_router.go b/pkg/api/agent/v1/websocket/chan_router.go index 8c30031a..d7cef31d 100644 --- a/pkg/api/agent/v1/websocket/chan_router.go +++ b/pkg/api/agent/v1/websocket/chan_router.go @@ -51,6 +51,8 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter { agentStartedSpeakingResponse: make([]*chan *interfaces.AgentStartedSpeakingResponse, 0), agentAudioDoneResponse: make([]*chan *interfaces.AgentAudioDoneResponse, 0), injectionRefusedResponse: make([]*chan *interfaces.InjectionRefusedResponse, 0), + keepAliveResponse: make([]*chan *interfaces.KeepAlive, 0), + settingsAppliedResponse: make([]*chan *interfaces.SettingsAppliedResponse, 0), closeChan: make([]*chan *interfaces.CloseResponse, 0), errorChan: make([]*chan *interfaces.ErrorResponse, 0), unhandledChan: make([]*chan *[]byte, 0), @@ -71,6 +73,8 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter { router.errorChan = append(router.errorChan, chans.GetError()...) router.unhandledChan = append(router.unhandledChan, chans.GetUnhandled()...) router.injectionRefusedResponse = append(router.injectionRefusedResponse, chans.GetInjectionRefused()...) + router.keepAliveResponse = append(router.keepAliveResponse, chans.GetKeepAlive()...) + router.settingsAppliedResponse = append(router.settingsAppliedResponse, chans.GetSettingsApplied()...) } return router @@ -352,6 +356,23 @@ func (r *ChanRouter) processKeepAlive(byMsg []byte) error { return r.processGeneric(string(interfaces.TypeKeepAlive), byMsg, action) } +func (r *ChanRouter) processSettingsApplied(byMsg []byte) error { + action := func(data []byte) error { + var msg interfaces.SettingsAppliedResponse + if err := json.Unmarshal(byMsg, &msg); err != nil { + klog.V(1).Infof("json.Unmarshal(SettingsAppliedResponse) failed. Err: %v\n", err) + return err + } + + for _, ch := range r.settingsAppliedResponse { + *ch <- &msg + } + return nil + } + + return r.processGeneric(string(interfaces.TypeSettingsAppliedResponse), byMsg, action) +} + // Message handles platform messages and routes them appropriately based on the MessageType func (r *ChanRouter) Message(byMsg []byte) error { klog.V(6).Infof("router.Message ENTER\n") @@ -391,6 +412,8 @@ func (r *ChanRouter) Message(byMsg []byte) error { err = r.processInjectionRefused(byMsg) case interfaces.TypeKeepAlive: err = r.processKeepAlive(byMsg) + case interfaces.TypeSettingsAppliedResponse: + err = r.processSettingsApplied(byMsg) default: err = r.UnhandledMessage(byMsg) } diff --git a/pkg/api/agent/v1/websocket/interfaces/constants.go b/pkg/api/agent/v1/websocket/interfaces/constants.go index eeaa8522..ab3e6954 100644 --- a/pkg/api/agent/v1/websocket/interfaces/constants.go +++ b/pkg/api/agent/v1/websocket/interfaces/constants.go @@ -37,4 +37,5 @@ const ( TypeCloseResponse = commoninterfaces.TypeCloseResponse TypeErrorResponse = commoninterfaces.TypeErrorResponse TypeInjectionRefusedResponse = "InjectionRefused" + TypeSettingsAppliedResponse = "SettingsApplied" ) diff --git a/pkg/api/agent/v1/websocket/interfaces/interfaces.go b/pkg/api/agent/v1/websocket/interfaces/interfaces.go index 85144c7a..884d5809 100644 --- a/pkg/api/agent/v1/websocket/interfaces/interfaces.go +++ b/pkg/api/agent/v1/websocket/interfaces/interfaces.go @@ -25,4 +25,5 @@ type AgentMessageChan interface { GetUnhandled() []*chan *[]byte GetInjectionRefused() []*chan *InjectionRefusedResponse GetKeepAlive() []*chan *KeepAlive + GetSettingsApplied() []*chan *SettingsAppliedResponse } diff --git a/pkg/api/agent/v1/websocket/interfaces/types.go b/pkg/api/agent/v1/websocket/interfaces/types.go index d82bfd71..3082e078 100644 --- a/pkg/api/agent/v1/websocket/interfaces/types.go +++ b/pkg/api/agent/v1/websocket/interfaces/types.go @@ -138,3 +138,8 @@ type InjectionRefusedResponse struct { Type string `json:"type,omitempty"` Message string `json:"message,omitempty"` } + +// SettingsAppliedResponse is the response confirming settings were applied +type SettingsAppliedResponse struct { + Type string `json:"type,omitempty"` +} diff --git a/pkg/api/agent/v1/websocket/types.go b/pkg/api/agent/v1/websocket/types.go index ef1da7dc..ce31c5d6 100644 --- a/pkg/api/agent/v1/websocket/types.go +++ b/pkg/api/agent/v1/websocket/types.go @@ -29,6 +29,7 @@ type DefaultChanHandler struct { agentAudioDoneResponse chan *interfaces.AgentAudioDoneResponse injectionRefusedResponse chan *interfaces.InjectionRefusedResponse keepAliveResponse chan *interfaces.KeepAlive + settingsAppliedResponse chan *interfaces.SettingsAppliedResponse closeChan chan *interfaces.CloseResponse errorChan chan *interfaces.ErrorResponse unhandledChan chan *[]byte @@ -51,6 +52,7 @@ type ChanRouter struct { agentAudioDoneResponse []*chan *interfaces.AgentAudioDoneResponse injectionRefusedResponse []*chan *interfaces.InjectionRefusedResponse keepAliveResponse []*chan *interfaces.KeepAlive + settingsAppliedResponse []*chan *interfaces.SettingsAppliedResponse closeChan []*chan *interfaces.CloseResponse errorChan []*chan *interfaces.ErrorResponse unhandledChan []*chan *[]byte