diff --git a/README.md b/README.md index db9d165..f054a89 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ func main() { if err != nil { log.Fatal(err) } - go ns.Start() + ns.Start() defer func() { ns.Shutdown() ns.WaitForShutdown() diff --git a/examples/simple/server/main.go b/examples/simple/server/main.go index 835715f..2a89494 100644 --- a/examples/simple/server/main.go +++ b/examples/simple/server/main.go @@ -33,7 +33,7 @@ func main() { if err != nil { log.Fatal(err) } - go ns.Start() + ns.Start() defer func() { ns.Shutdown() ns.WaitForShutdown() diff --git a/server.go b/server.go index 30045a1..b9f4de3 100644 --- a/server.go +++ b/server.go @@ -40,14 +40,16 @@ func (s *ServerConfig) setDefaults() { // Server represents a stormRPC server. It contains all functionality for handling RPC requests. type Server struct { - mu sync.Mutex - nc *nats.Conn + mu sync.Mutex + // nc *nats.Conn shutdownSignal chan struct{} handlerFuncs map[string]HandlerFunc errorHandler ErrorHandler timeout time.Duration mw []Middleware + running bool + svc micro.Service } @@ -81,12 +83,23 @@ func NewServer(cfg *ServerConfig, opts ...ServerOption) (*Server, error) { return nil, err } + err = svc.AddEndpoint("123123123123123", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + return nil, err + } + + err = svc.AddEndpoint("0909090", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + return nil, err + } + return &Server{ - nc: nc, + // nc: nc, shutdownSignal: make(chan struct{}), handlerFuncs: make(map[string]HandlerFunc), timeout: defaultServerTimeout, errorHandler: cfg.errorHandler, + running: false, svc: svc, }, nil } @@ -128,12 +141,21 @@ func (s *Server) Handle(subject string, fn HandlerFunc) { func (s *Server) Run() error { s.mu.Lock() s.applyMiddlewares() - for sub, fn := range s.handlerFuncs { - if err := s.createMicroEndpoint(sub, fn); err != nil { + s.mu.Unlock() + for sub := range s.handlerFuncs { + if err := s.createMicroEndpoint(s.svc, sub, s.handlerFuncs[sub]); err != nil { return err } } + // for sub, fn := range s.handlerFuncs { + // if err := s.createMicroEndpoint(sub, fn); err != nil { + // return err + // } + // } + s.mu.Lock() + s.running = true s.mu.Unlock() + // time.Sleep(1 * time.Second) <-s.shutdownSignal return nil @@ -143,15 +165,16 @@ func (s *Server) Run() error { func (s *Server) Shutdown(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() - if err := s.nc.FlushWithContext(ctx); err != nil { - return err - } + // if err := s.nc.FlushWithContext(ctx); err != nil { + // return err + // } if err := s.svc.Stop(); err != nil { return err } - s.nc.Close() + // s.nc.Close() + s.running = false s.shutdownSignal <- struct{}{} return nil } @@ -174,7 +197,9 @@ func (s *Server) Use(mw ...Middleware) { s.mu.Lock() defer s.mu.Unlock() - s.mw = mw + if !s.running { + s.mw = append(s.mw, mw...) + } } func (s *Server) applyMiddlewares() { @@ -189,7 +214,9 @@ func (s *Server) applyMiddlewares() { // createMicroEndpoint registers a HandlerFunc as a micro Endpoint // allowing for automatic service discovery and observability. -func (s *Server) createMicroEndpoint(subject string, handlerFunc HandlerFunc) error { +func (s *Server) createMicroEndpoint(svc micro.Service, subject string, handlerFunc HandlerFunc) error { + s.mu.Lock() + defer s.mu.Unlock() err := s.svc.AddEndpoint( nameFromSubject(subject), micro.ContextHandler(context.Background(), func(ctx context.Context, r micro.Request) { @@ -210,10 +237,8 @@ func (s *Server) createMicroEndpoint(subject string, handlerFunc HandlerFunc) er resp := handlerFunc(ctx, Request{ Msg: &nats.Msg{ Subject: r.Subject(), - Reply: "", Header: nats.Header(r.Headers()), Data: r.Data(), - Sub: &nats.Subscription{}, }, }) diff --git a/server_test.go b/server_test.go index a751e83..f0b42f9 100644 --- a/server_test.go +++ b/server_test.go @@ -3,11 +3,8 @@ package stormrpc import ( "context" - "errors" "fmt" - "math/rand" "reflect" - "strconv" "testing" "time" @@ -94,22 +91,7 @@ func TestNewServer(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Cleanup(teh.clear) if tt.runNats { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) - if err != nil { - t.Fatal(err) - } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } + startNatsServer(t) } got, err := NewServer(tt.args.cfg, tt.args.opts...) @@ -145,25 +127,10 @@ func TestNewServer(t *testing.T) { } func TestServer_RunAndShutdown(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) - if err != nil { - t.Fatal(err) - } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } + clientURL := startNatsServer(t) srv, err := NewServer(&ServerConfig{ - NatsURL: ns.ClientURL(), + NatsURL: clientURL, Name: "test", }) if err != nil { @@ -191,321 +158,428 @@ func TestServer_RunAndShutdown(t *testing.T) { } func TestServer_handler(t *testing.T) { - t.Run("successful handle", func(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) - if err != nil { - t.Fatal(err) - } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } - - srv, err := NewServer(&ServerConfig{ - NatsURL: ns.ClientURL(), - Name: "test", - }) - if err != nil { - t.Fatal(err) - } - - subject := strconv.Itoa(rand.Int()) - srv.Handle(subject, func(ctx context.Context, r Request) Response { - _, ok := ctx.Deadline() - if !ok { - t.Error("context should have deadline") - } - return Response{ - Msg: &nats.Msg{ - Subject: r.Reply, - Data: []byte(`{"response":"1"}`), + type args struct { + ctx context.Context + req Request + } + type endpoint struct { + name string + handler HandlerFunc + } + tests := []struct { + name string + endpoints []endpoint + args args + wantErr bool + }{ + { + name: "ok", + endpoints: []endpoint{ + { + name: "test", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { + _, ok := ctx.Deadline() + if !ok { + t.Error("context should have deadline") + } + return Response{ + Msg: &nats.Msg{ + Subject: r.Reply, + Data: []byte(`{"response":"1"}`), + }, + Err: nil, + } + }), }, - Err: nil, - } - }) - - go func() { - _ = srv.Run() - }() - - client, err := NewClient(ns.ClientURL()) - if err != nil { - t.Fatal(err) - } - - req, err := NewRequest(subject, map[string]string{"x": "D"}) - if err != nil { - t.Fatal(err) - } - resp := client.Do(context.Background(), req) - if resp.Err != nil { - t.Fatal(resp.Err) - } - - var result map[string]string - if err = resp.Decode(&result); err != nil { - t.Fatal(err) - } - - if result["response"] != "1" { - t.Fatalf("got = %v, want %v", result["response"], "1") - } - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - if err = srv.Shutdown(ctx); err != nil { - t.Fatal(err) - } - }) - - t.Run("context deadline exceeded", func(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) - if err != nil { - t.Fatal(err) - } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } - - srv, err := NewServer(&ServerConfig{ - NatsURL: ns.ClientURL(), - Name: "test", - }) - if err != nil { - t.Fatal(err) - } + }, + args: args{ + ctx: ctxWithTimeout(5 * time.Second), + req: mustNewRequest(t, "test", map[string]string{"hello": "world"}), + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientURL := startNatsServer(t) - subject := strconv.Itoa(rand.Int()) - srv.Handle(subject, func(ctx context.Context, r Request) Response { - _, ok := ctx.Deadline() - if !ok { - t.Error("context should have deadline") - } - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return NewErrorResponse(r.Reply, Error{ - Code: ErrorCodeDeadlineExceeded, - Message: ctx.Err().Error(), - }) - case <-ticker.C: - return NewErrorResponse(r.Reply, fmt.Errorf("somethings wrong")) - } + srv, err := NewServer(&ServerConfig{ + NatsURL: clientURL, + Name: "test", + }) + if err != nil { + t.Fatal(err) } - }) - - go func() { - _ = srv.Run() - }() - - client, err := NewClient(ns.ClientURL()) - if err != nil { - t.Fatal(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - req, err := NewRequest(subject, map[string]string{"x": "D"}) - if err != nil { - t.Fatal(err) - } - resp := client.Do(ctx, req) - var e *Error - ok := errors.As(resp.Err, &e) - if !ok { - t.Fatalf("expected error to be of type Error, got %T", resp.Err) - } - if e.Code != ErrorCodeDeadlineExceeded { - t.Fatalf("e.Code got = %v, want %v", e.Code, ErrorCodeDeadlineExceeded) - } else if e.Message != context.DeadlineExceeded.Error() { - t.Fatalf("e.Message got = %v, want %v", e.Message, context.DeadlineExceeded.Error()) - } - - if err = srv.Shutdown(ctx); err != nil { - t.Fatal(err) - } - }) - - t.Run("context deadline longer than default timeout", func(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) - if err != nil { - t.Fatal(err) - } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } - - srv, err := NewServer(&ServerConfig{ - NatsURL: ns.ClientURL(), - Name: "test", - }) - if err != nil { - t.Fatal(err) - } + for _, ep := range tt.endpoints { + srv.Handle(ep.name, ep.handler) + } - timeout := 7 * time.Second + errs := make(chan error) + go func(srv *Server, errs chan error) { + errs <- srv.Run() + }(srv, errs) - subject := strconv.Itoa(rand.Int()) - srv.Handle(subject, func(ctx context.Context, r Request) Response { - dl, ok := ctx.Deadline() - if !ok { - t.Error("context should have deadline") + client, err := NewClient(clientURL) + if err != nil { + t.Fatal(err) } - var req map[string]time.Time - _ = r.Decode(&req) - - if req["default"].After(dl) { - t.Errorf("req[default] got = %v, want before %v", req["default"], dl) + resp := client.Do(tt.args.ctx, tt.args.req) + if (resp.Err != nil) != tt.wantErr { + t.Errorf("Client.Do() error = %v, wantErr %v", resp.Err, tt.wantErr) } - var resp Response - resp, err = NewResponse(r.Reply, map[string]string{"success": "ok"}) - if err != nil { - return NewErrorResponse(r.Reply, err) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + if err = srv.Shutdown(ctx); err != nil { + t.Fatal(err) } - return resp + err = <-errs + fmt.Println(err) }) - - go func() { - _ = srv.Run() - }() - - client, err := NewClient(ns.ClientURL()) - if err != nil { - t.Fatal(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - ctxWithDefaultServerTimeout, cancel2 := context.WithTimeout(ctx, srv.timeout) - defer cancel2() - - defaultDeadline, _ := ctxWithDefaultServerTimeout.Deadline() - - req, err := NewRequest(subject, map[string]time.Time{"default": defaultDeadline}) - if err != nil { - t.Fatal(err) - } - _ = client.Do(ctx, req) - - if err = srv.Shutdown(ctx); err != nil { - t.Fatal(err) - } - }) + } + // t.Run("successful handle", func(t *testing.T) { + // ns, err := server.NewServer(&server.Options{ + // Port: 40897, + // }) + // if err != nil { + // t.Fatal(err) + // } + // go ns.Start() + // t.Cleanup(func() { + // ns.Shutdown() + // ns.WaitForShutdown() + // }) + + // if !ns.ReadyForConnections(1 * time.Second) { + // t.Error("timeout waiting for nats server") + // return + // } + + // srv, err := NewServer(&ServerConfig{ + // NatsURL: ns.ClientURL(), + // Name: "test", + // }) + // if err != nil { + // t.Fatal(err) + // } + + // subject := strconv.Itoa(rand.Int()) + // srv.Handle(subject, func(ctx context.Context, r Request) Response { + // _, ok := ctx.Deadline() + // if !ok { + // t.Error("context should have deadline") + // } + // return Response{ + // Msg: &nats.Msg{ + // Subject: r.Reply, + // Data: []byte(`{"response":"1"}`), + // }, + // Err: nil, + // } + // }) + + // go func() { + // _ = srv.Run() + // }() + + // client, err := NewClient(ns.ClientURL()) + // if err != nil { + // t.Fatal(err) + // } + + // req, err := NewRequest(subject, map[string]string{"x": "D"}) + // if err != nil { + // t.Fatal(err) + // } + // resp := client.Do(context.Background(), req) + // if resp.Err != nil { + // t.Fatal(resp.Err) + // } + + // var result map[string]string + // if err = resp.Decode(&result); err != nil { + // t.Fatal(err) + // } + + // if result["response"] != "1" { + // t.Fatalf("got = %v, want %v", result["response"], "1") + // } + + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() + // if err = srv.Shutdown(ctx); err != nil { + // t.Fatal(err) + // } + // }) + + // t.Run("context deadline exceeded", func(t *testing.T) { + // ns, err := server.NewServer(&server.Options{ + // Port: 40897, + // }) + // if err != nil { + // t.Fatal(err) + // } + // go ns.Start() + // t.Cleanup(func() { + // ns.Shutdown() + // ns.WaitForShutdown() + // }) + + // if !ns.ReadyForConnections(1 * time.Second) { + // t.Error("timeout waiting for nats server") + // return + // } + + // srv, err := NewServer(&ServerConfig{ + // NatsURL: ns.ClientURL(), + // Name: "test", + // }) + // if err != nil { + // t.Fatal(err) + // } + + // subject := strconv.Itoa(rand.Int()) + // srv.Handle(subject, func(ctx context.Context, r Request) Response { + // _, ok := ctx.Deadline() + // if !ok { + // t.Error("context should have deadline") + // } + // ticker := time.NewTicker(2 * time.Second) + // defer ticker.Stop() + // for { + // select { + // case <-ctx.Done(): + // return NewErrorResponse(r.Reply, Error{ + // Code: ErrorCodeDeadlineExceeded, + // Message: ctx.Err().Error(), + // }) + // case <-ticker.C: + // return NewErrorResponse(r.Reply, fmt.Errorf("somethings wrong")) + // } + // } + // }) + + // go func() { + // _ = srv.Run() + // }() + + // client, err := NewClient(ns.ClientURL()) + // if err != nil { + // t.Fatal(err) + // } + + // ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + // defer cancel() + + // req, err := NewRequest(subject, map[string]string{"x": "D"}) + // if err != nil { + // t.Fatal(err) + // } + // resp := client.Do(ctx, req) + // var e *Error + // ok := errors.As(resp.Err, &e) + // if !ok { + // t.Fatalf("expected error to be of type Error, got %T", resp.Err) + // } + // if e.Code != ErrorCodeDeadlineExceeded { + // t.Fatalf("e.Code got = %v, want %v", e.Code, ErrorCodeDeadlineExceeded) + // } else if e.Message != context.DeadlineExceeded.Error() { + // t.Fatalf("e.Message got = %v, want %v", e.Message, context.DeadlineExceeded.Error()) + // } + + // if err = srv.Shutdown(ctx); err != nil { + // t.Fatal(err) + // } + // }) + + // t.Run("context deadline longer than default timeout", func(t *testing.T) { + // ns, err := server.NewServer(&server.Options{ + // Port: 40897, + // }) + // if err != nil { + // t.Fatal(err) + // } + // go ns.Start() + // t.Cleanup(func() { + // ns.Shutdown() + // ns.WaitForShutdown() + // }) + + // if !ns.ReadyForConnections(1 * time.Second) { + // t.Error("timeout waiting for nats server") + // return + // } + + // srv, err := NewServer(&ServerConfig{ + // NatsURL: ns.ClientURL(), + // Name: "test", + // }) + // if err != nil { + // t.Fatal(err) + // } + + // timeout := 7 * time.Second + + // subject := strconv.Itoa(rand.Int()) + // srv.Handle(subject, func(ctx context.Context, r Request) Response { + // dl, ok := ctx.Deadline() + // if !ok { + // t.Error("context should have deadline") + // } + + // var req map[string]time.Time + // _ = r.Decode(&req) + + // if req["default"].After(dl) { + // t.Errorf("req[default] got = %v, want before %v", req["default"], dl) + // } + + // var resp Response + // resp, err = NewResponse(r.Reply, map[string]string{"success": "ok"}) + // if err != nil { + // return NewErrorResponse(r.Reply, err) + // } + + // return resp + // }) + + // go func() { + // _ = srv.Run() + // }() + + // client, err := NewClient(ns.ClientURL()) + // if err != nil { + // t.Fatal(err) + // } + + // ctx, cancel := context.WithTimeout(context.Background(), timeout) + // defer cancel() + + // ctxWithDefaultServerTimeout, cancel2 := context.WithTimeout(ctx, srv.timeout) + // defer cancel2() + + // defaultDeadline, _ := ctxWithDefaultServerTimeout.Deadline() + + // req, err := NewRequest(subject, map[string]time.Time{"default": defaultDeadline}) + // if err != nil { + // t.Fatal(err) + // } + // _ = client.Do(ctx, req) + + // if err = srv.Shutdown(ctx); err != nil { + // t.Fatal(err) + // } + // }) } func TestServer_Handle(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, + clientURL := startNatsServer(t) + + s, err := NewServer(&ServerConfig{ + Name: "test", + NatsURL: clientURL, }) if err != nil { t.Fatal(err) } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return - } - - t.Run("OK", func(t *testing.T) { - s, err := NewServer(&ServerConfig{ - Name: "test", - NatsURL: ns.ClientURL(), - }) - if err != nil { - t.Fatal(err) - } - s.Handle("testing", func(ctx context.Context, r Request) Response { return Response{} }) - if err != nil { - t.Fatal(err) - } - - _, ok := s.handlerFuncs["testing"] - if !ok { - t.Errorf("expected handler to exist for subject %s", "testing") - } - }) -} - -func TestServer_Subjects(t *testing.T) { - ns, err := server.NewServer(&server.Options{ - Port: 40897, - }) + s.Handle("testing", func(ctx context.Context, r Request) Response { return Response{} }) if err != nil { t.Fatal(err) } - go ns.Start() - t.Cleanup(func() { - ns.Shutdown() - ns.WaitForShutdown() - }) - if !ns.ReadyForConnections(1 * time.Second) { - t.Error("timeout waiting for nats server") - return + _, ok := s.handlerFuncs["testing"] + if !ok { + t.Errorf("expected handler to exist for subject %s", "testing") } +} - s, err := NewServer(&ServerConfig{ - Name: "test", - NatsURL: ns.ClientURL(), - }) - if err != nil { - t.Fatal(err) +func TestServer_Subjects(t *testing.T) { + type endpoint struct { + name string + handler HandlerFunc + } + tests := []struct { + name string + endpoints []endpoint + want []string + }{ + { + name: "ok", + endpoints: []endpoint{ + { + name: "test", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + }, + want: []string{"test"}, + }, + { + name: "multiple endpoints", + endpoints: []endpoint{ + { + name: "1", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + { + name: "2", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + { + name: "3", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + }, + want: []string{"1", "2", "3"}, + }, + { + name: "duplicate endpoints", + endpoints: []endpoint{ + { + name: "1", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + { + name: "1", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + { + name: "2", + handler: HandlerFunc(func(ctx context.Context, r Request) Response { return Response{} }), + }, + }, + want: []string{"1", "2"}, + }, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientURL := startNatsServer(t) - s.Handle("testing", func(ctx context.Context, r Request) Response { return Response{} }) - s.Handle("testing", func(ctx context.Context, r Request) Response { return Response{} }) - s.Handle("1", func(ctx context.Context, r Request) Response { return Response{} }) + srv, err := NewServer(&ServerConfig{ + Name: "test", + NatsURL: clientURL, + }) + if err != nil { + t.Fatal(err) + } - want := []string{"testing", "1"} + for _, ep := range tt.endpoints { + srv.Handle(ep.name, ep.handler) + } - got := s.Subjects() + got := srv.Subjects() - if !sameStringSlice(got, want) { - t.Fatalf("got = %v, want %v", got, want) + if !sameStringSlice(got, tt.want) { + t.Fatalf("got = %v, want %v", got, tt.want) + } + }) } } func TestServer_Use(t *testing.T) { type fields struct { - nc *nats.Conn shutdownSignal chan struct{} handlerFuncs map[string]HandlerFunc errorHandler ErrorHandler @@ -539,7 +613,6 @@ func TestServer_Use(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Server{ - nc: tt.fields.nc, shutdownSignal: tt.fields.shutdownSignal, handlerFuncs: tt.fields.handlerFuncs, errorHandler: tt.fields.errorHandler, @@ -548,8 +621,8 @@ func TestServer_Use(t *testing.T) { } s.Use(tt.args.mw...) - if !reflect.DeepEqual(tt.args.mw, s.mw) { - t.Fatalf("got = %v, want %v", s.mw, tt.args.mw) + if len(tt.args.mw) != len(s.mw) { + t.Fatalf("expected slices to be the same length got = %v, want %v", s.mw, tt.args.mw) } }) } @@ -557,7 +630,6 @@ func TestServer_Use(t *testing.T) { func TestServer_applyMiddlewares(t *testing.T) { type fields struct { - nc *nats.Conn shutdownSignal chan struct{} handlerFuncs map[string]HandlerFunc errorHandler ErrorHandler @@ -597,7 +669,6 @@ func TestServer_applyMiddlewares(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Server{ - nc: tt.fields.nc, shutdownSignal: tt.fields.shutdownSignal, handlerFuncs: tt.fields.handlerFuncs, errorHandler: tt.fields.errorHandler, @@ -619,6 +690,44 @@ func TestServer_applyMiddlewares(t *testing.T) { } } +func startNatsServer(tb testing.TB) string { + tb.Helper() + + ns, err := server.NewServer(&server.Options{ + Port: 40897, + }) + if err != nil { + tb.Fatal(err) + } + + ns.Start() + + tb.Cleanup(func() { + ns.Shutdown() + ns.WaitForShutdown() + }) + + if !ns.ReadyForConnections(1 * time.Second) { + tb.Fatal("timeout waiting for nats server") + } + + return ns.ClientURL() +} + +func mustNewRequest(tb testing.TB, subject string, body any, opts ...RequestOption) Request { + req, err := NewRequest(subject, body, opts...) + if err != nil { + tb.Fatal(err) + } + + return req +} + +func ctxWithTimeout(timeout time.Duration) context.Context { + ctx, _ := context.WithTimeout(context.Background(), timeout) + return ctx +} + func sameStringSlice(x, y []string) bool { if len(x) != len(y) { return false