Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: llm bridge server communicates with zipper in memory way #996

Merged
merged 8 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

"github.com/spf13/cobra"
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/ylog"
pkgconfig "github.com/yomorun/yomo/pkg/config"
"github.com/yomorun/yomo/pkg/listener/mem"
"github.com/yomorun/yomo/pkg/log"
"github.com/yomorun/yomo/pkg/trace"

Expand Down Expand Up @@ -68,15 +70,18 @@
// listening address.
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)

// memory listener
var listener *mem.Listener

Check warning on line 75 in cli/serve.go

View check run for this annotation

Codecov / codecov/patch

cli/serve.go#L73-L75

Added lines #L73 - L75 were not covered by tests
options := []yomo.ZipperOption{}
tokenString := ""
if _, ok := conf.Auth["type"]; ok {
if tokenString, ok = conf.Auth["token"]; ok {
options = append(options, yomo.WithAuth("token", tokenString))
}
}
// check llm bridge server config
// parse the llm bridge config

// check and parse the llm bridge server config
bridgeConf := conf.Bridge
aiConfig, err := ai.ParseConfig(bridgeConf)
if err != nil {
Expand All @@ -88,8 +93,10 @@
}
}
if aiConfig != nil {
listener = mem.Listen()

Check warning on line 96 in cli/serve.go

View check run for this annotation

Codecov / codecov/patch

cli/serve.go#L96

Added line #L96 was not covered by tests
// add AI connection middleware
options = append(options, yomo.WithZipperConnMiddleware(ai.RegisterFunctionMW()))
options = append(options, yomo.WithFrameListener(listener))

Check warning on line 99 in cli/serve.go

View check run for this annotation

Codecov / codecov/patch

cli/serve.go#L99

Added line #L99 was not covered by tests
}
// new zipper
zipper, err := yomo.NewZipper(
Expand All @@ -108,7 +115,13 @@
registerAIProvider(aiConfig)
// start the llm api server
go func() {
err := ai.Serve(aiConfig, listenAddr, fmt.Sprintf("token:%s", tokenString), ylog.Default())
conn, _ := listener.Dial()
source := ai.NewSource(conn, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))

conn2, _ := listener.Dial()
reducer := ai.NewReducer(conn2, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))

err := ai.Serve(aiConfig, ylog.Default(), source, reducer)

Check warning on line 124 in cli/serve.go

View check run for this annotation

Codecov / codecov/patch

cli/serve.go#L118-L124

Added lines #L118 - L124 were not covered by tests
if err != nil {
log.FailureStatusEvent(os.Stdout, err.Error())
return
Expand Down
2 changes: 0 additions & 2 deletions cli/serverless/golang/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (s *GolangServerless) Init(opts *serverless.Options) error {

// Build compiles the serverless to executable
func (s *GolangServerless) Build(clean bool) error {
log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...")
// check if the file exists
appPath := s.source
if _, err := os.Stat(appPath); os.IsNotExist(err) {
Expand Down Expand Up @@ -203,7 +202,6 @@ func (s *GolangServerless) Build(clean bool) error {
if clean {
file.Remove(s.tempDir)
}
log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!")
return nil
}

Expand Down
42 changes: 29 additions & 13 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,32 @@

defer closeServer(s.downstreams, s.connector, s.listener, s.router)

errCount := 0
for {
fconn, err := s.listener.Accept(s.ctx)
if err != nil {
if err == s.ctx.Err() {
return ErrServerClosed
listeners := append(s.opts.listeners, s.listener)

var wg sync.WaitGroup
for _, l := range listeners {
wg.Add(1)
go func(l frame.Listener) {
errCount := 0
for {
fconn, err := l.Accept(s.ctx)
if err != nil {
if err == s.ctx.Err() {
wg.Done()
return
}
errCount++
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
continue

Check warning on line 162 in core/server.go

View check run for this annotation

Codecov / codecov/patch

core/server.go#L160-L162

Added lines #L160 - L162 were not covered by tests
}

go s.handleFrameConn(fconn, s.logger)
}
errCount++
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
continue
}

go s.handleFrameConn(fconn, s.logger)
}(l)
}

wg.Wait()
return ErrServerClosed
}

func (s *Server) handleFrameConn(fconn frame.Conn, logger *slog.Logger) {
Expand Down Expand Up @@ -380,7 +392,11 @@

// dispatch every DataFrames to all downstreams
func (s *Server) dispatchToDownstreams(c *Context) error {
dataFrame := c.Frame
dataFrame := &frame.DataFrame{
Tag: c.Frame.Tag,
Payload: c.Frame.Payload,
Metadata: c.Frame.Metadata,
}
if c.Connection.ClientType() == ClientTypeUpstreamZipper {
c.Logger.Debug("ignored client", "client_type", c.Connection.ClientType().String())
// loop protection
Expand Down
9 changes: 9 additions & 0 deletions core/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/core/ylog"
)
Expand Down Expand Up @@ -38,6 +39,7 @@
router router.Router
connMiddlewares []ConnMiddleware
frameMiddlewares []FrameMiddleware
listeners []frame.Listener
}

func defaultServerOptions() *serverOptions {
Expand Down Expand Up @@ -120,3 +122,10 @@
o.connMiddlewares = append(o.connMiddlewares, mws...)
}
}

// WithFrameListener adds a Listener other than a quic.Listener.
func WithFrameListener(l ...frame.Listener) ServerOption {
return func(o *serverOptions) {
o.listeners = append(o.listeners, l...)
}

Check warning on line 130 in core/server_options.go

View check run for this annotation

Codecov / codecov/patch

core/server_options.go#L127-L130

Added lines #L127 - L130 were not covered by tests
}
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"github.com/quic-go/quic-go"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/router"
)

Expand Down Expand Up @@ -147,4 +148,11 @@
o.serverOption = append(o.serverOption, core.WithFrameMiddleware(mw...))
}
}

// WithFrameListener adds a Listener other than a quic.Listener.
WithFrameListener = func(l ...frame.Listener) ZipperOption {
return func(o *zipperOptions) {
o.serverOption = append(o.serverOption, core.WithFrameListener(l...))
}

Check warning on line 156 in options.go

View check run for this annotation

Codecov / codecov/patch

options.go#L153-L156

Added lines #L153 - L156 were not covered by tests
}
)
23 changes: 10 additions & 13 deletions pkg/bridge/ai/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"time"

openai "github.com/sashabaranov/go-openai"
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/ai"
"github.com/yomorun/yomo/pkg/bridge/ai/provider"
"github.com/yomorun/yomo/pkg/bridge/ai/register"
Expand All @@ -34,23 +35,20 @@

// BasicAPIServer provides restful service for end user
type BasicAPIServer struct {
zipperAddr string
credential string
httpHandler http.Handler
}

// Serve starts the Basic API Server
func Serve(config *Config, zipperListenAddr string, credential string, logger *slog.Logger) error {
func Serve(config *Config, logger *slog.Logger, source yomo.Source, reducer yomo.StreamFunction) error {

Check warning on line 42 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L42

Added line #L42 was not covered by tests
provider, err := provider.GetProvider(config.Server.Provider)
if err != nil {
return err
}
srv, err := NewBasicAPIServer(config, zipperListenAddr, credential, provider, logger)
srv, err := NewBasicAPIServer(config, provider, source, reducer, logger)

Check warning on line 47 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L47

Added line #L47 was not covered by tests
if err != nil {
return err
}

logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())
return http.ListenAndServe(config.Server.Addr, srv.httpHandler)
}

Expand Down Expand Up @@ -80,24 +78,23 @@
}

// NewBasicAPIServer creates a new restful service
func NewBasicAPIServer(config *Config, zipperAddr, credential string, provider provider.LLMProvider, logger *slog.Logger) (*BasicAPIServer, error) {
zipperAddr = parseZipperAddr(zipperAddr)

func NewBasicAPIServer(config *Config, provider provider.LLMProvider, source yomo.Source, reducer yomo.StreamFunction, logger *slog.Logger) (*BasicAPIServer, error) {

Check warning on line 81 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L81

Added line #L81 was not covered by tests
logger = logger.With("service", "llm-bridge")

service := NewService(zipperAddr, provider, &ServiceOptions{
opts := &ServiceOptions{

Check warning on line 84 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L84

Added line #L84 was not covered by tests
Logger: logger,
CredentialFunc: func(r *http.Request) (string, error) { return credential, nil },
})
SourceBuilder: func() yomo.Source { return source },
ReducerBuilder: func() yomo.StreamFunction { return reducer },

Check warning on line 87 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}
service := NewService(provider, opts)

Check warning on line 89 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L89

Added line #L89 was not covered by tests

mux := NewServeMux(service)

server := &BasicAPIServer{
zipperAddr: zipperAddr,
credential: credential,
httpHandler: DecorateHandler(mux, decorateReqContext(service, logger)),
}

logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())

Check warning on line 97 in pkg/bridge/ai/api_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/api_server.go#L97

Added line #L97 was not covered by tests
return server, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bridge/ai/api_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func TestServer(t *testing.T) {
return mockCaller(nil), err
}

service := newService("fake_zipper_addr", pd, newCaller, &ServiceOptions{
SourceBuilder: func(_, _ string) yomo.Source { return flow },
ReducerBuilder: func(_, _ string) yomo.StreamFunction { return flow },
service := newService(pd, newCaller, &ServiceOptions{
SourceBuilder: func() yomo.Source { return flow },
ReducerBuilder: func() yomo.StreamFunction { return flow },
MetadataExchanger: func(_ string) (metadata.M, error) { return metadata.M{"hello": "llm bridge"}, nil },
})

Expand Down
129 changes: 129 additions & 0 deletions pkg/bridge/ai/reducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package ai

import (
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/core"
"github.com/yomorun/yomo/core/auth"
"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/core/serverless"
"github.com/yomorun/yomo/pkg/id"
"github.com/yomorun/yomo/pkg/listener/mem"
)

var _ yomo.Source = &memSource{}

type memSource struct {
cred *auth.Credential
conn *mem.FrameConn
}

func NewSource(conn *mem.FrameConn, cred *auth.Credential) yomo.Source {
return &memSource{
conn: conn,
cred: cred,
}

Check warning on line 25 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L21-L25

Added lines #L21 - L25 were not covered by tests
}

func (m *memSource) Connect() error {
hf := &frame.HandshakeFrame{
Name: "fc-source",
ID: id.New(),
ClientType: byte(core.ClientTypeSource),
AuthName: m.cred.Name(),
AuthPayload: m.cred.Payload(),
Version: core.Version,
}

return m.conn.Handshake(hf)

Check warning on line 38 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L28-L38

Added lines #L28 - L38 were not covered by tests
}

func (m *memSource) Write(tag uint32, data []byte) error {
df := &frame.DataFrame{
Tag: tag,
Payload: data,
}
return m.conn.WriteFrame(df)

Check warning on line 46 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L41-L46

Added lines #L41 - L46 were not covered by tests
}

func (m *memSource) Close() error { return nil }
func (m *memSource) SetErrorHandler(_ func(_ error)) {}
func (m *memSource) WriteWithTarget(_ uint32, _ []byte, _ string) error { return nil }

Check warning on line 51 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L49-L51

Added lines #L49 - L51 were not covered by tests

type memStreamFunction struct {
observedTags []uint32
handler core.AsyncHandler
cred *auth.Credential
conn *mem.FrameConn
}

// NewReducer creates a new instance of memory StreamFunction.
func NewReducer(conn *mem.FrameConn, cred *auth.Credential) yomo.StreamFunction {
return &memStreamFunction{
conn: conn,
cred: cred,
}

Check warning on line 65 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L61-L65

Added lines #L61 - L65 were not covered by tests
}

func (m *memStreamFunction) Close() error {
return nil

Check warning on line 69 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}

func (m *memStreamFunction) Connect() error {
hf := &frame.HandshakeFrame{
Name: "fc-reducer",
ID: id.New(),
ClientType: byte(core.ClientTypeStreamFunction),
AuthName: m.cred.Name(),
AuthPayload: m.cred.Payload(),
ObserveDataTags: m.observedTags,
Version: core.Version,
}

if err := m.conn.Handshake(hf); err != nil {
return nil
}

Check warning on line 85 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L72-L85

Added lines #L72 - L85 were not covered by tests

go func() {
for {
f, err := m.conn.ReadFrame()
if err != nil {
return
}

Check warning on line 92 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L87-L92

Added lines #L87 - L92 were not covered by tests

switch ff := f.(type) {
case *frame.DataFrame:
go m.onDataFrame(ff)
default:
return

Check warning on line 98 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L94-L98

Added lines #L94 - L98 were not covered by tests
}
}
}()

return nil

Check warning on line 103 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L103

Added line #L103 was not covered by tests
}

func (m *memStreamFunction) onDataFrame(dataFrame *frame.DataFrame) {
md, err := metadata.Decode(dataFrame.Metadata)
if err != nil {
return
}

Check warning on line 110 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L106-L110

Added lines #L106 - L110 were not covered by tests

serverlessCtx := serverless.NewContext(m.conn, dataFrame.Tag, md, dataFrame.Payload)
m.handler(serverlessCtx)

Check warning on line 113 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L112-L113

Added lines #L112 - L113 were not covered by tests
}

func (m *memStreamFunction) SetHandler(fn core.AsyncHandler) error {
m.handler = fn
return nil

Check warning on line 118 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L116-L118

Added lines #L116 - L118 were not covered by tests
}

func (m *memStreamFunction) Init(_ func() error) error { return nil }
func (m *memStreamFunction) SetCronHandler(_ string, _ core.CronHandler) error { return nil }
func (m *memStreamFunction) SetErrorHandler(_ func(err error)) {}
func (m *memStreamFunction) SetObserveDataTags(tags ...uint32) { m.observedTags = tags }
func (m *memStreamFunction) SetPipeHandler(fn core.PipeHandler) error { return nil }
func (m *memStreamFunction) SetWantedTarget(string) {}
func (m *memStreamFunction) Wait() {}

Check warning on line 127 in pkg/bridge/ai/reducer.go

View check run for this annotation

Codecov / codecov/patch

pkg/bridge/ai/reducer.go#L121-L127

Added lines #L121 - L127 were not covered by tests

var _ yomo.StreamFunction = &memStreamFunction{}
Loading
Loading