Skip to content

Commit

Permalink
trying to track down this data race and determine if its my code or n…
Browse files Browse the repository at this point in the history
…ats.go
  • Loading branch information
actatum committed Dec 6, 2023
1 parent 9f32382 commit b892ea1
Show file tree
Hide file tree
Showing 4 changed files with 469 additions and 335 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
go ns.Start()
ns.Start()
defer func() {
ns.Shutdown()
ns.WaitForShutdown()
Expand Down
2 changes: 1 addition & 1 deletion examples/simple/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
go ns.Start()
ns.Start()
defer func() {
ns.Shutdown()
ns.WaitForShutdown()
Expand Down
51 changes: 38 additions & 13 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ func (s *ServerConfig) setDefaults() {

// Server represents a stormRPC server. It contains all functionality for handling RPC requests.
type Server struct {
mu sync.Mutex
nc *nats.Conn
mu sync.Mutex
// nc *nats.Conn
shutdownSignal chan struct{}
handlerFuncs map[string]HandlerFunc
errorHandler ErrorHandler
timeout time.Duration
mw []Middleware

running bool

svc micro.Service
}

Expand Down Expand Up @@ -81,12 +83,23 @@ func NewServer(cfg *ServerConfig, opts ...ServerOption) (*Server, error) {
return nil, err
}

err = svc.AddEndpoint("123123123123123", micro.HandlerFunc(func(r micro.Request) {}))
if err != nil {
return nil, err
}

err = svc.AddEndpoint("0909090", micro.HandlerFunc(func(r micro.Request) {}))
if err != nil {
return nil, err
}

return &Server{
nc: nc,
// nc: nc,
shutdownSignal: make(chan struct{}),
handlerFuncs: make(map[string]HandlerFunc),
timeout: defaultServerTimeout,
errorHandler: cfg.errorHandler,
running: false,
svc: svc,
}, nil
}
Expand Down Expand Up @@ -128,12 +141,21 @@ func (s *Server) Handle(subject string, fn HandlerFunc) {
func (s *Server) Run() error {
s.mu.Lock()
s.applyMiddlewares()
for sub, fn := range s.handlerFuncs {
if err := s.createMicroEndpoint(sub, fn); err != nil {
s.mu.Unlock()
for sub := range s.handlerFuncs {
if err := s.createMicroEndpoint(s.svc, sub, s.handlerFuncs[sub]); err != nil {
return err
}
}
// for sub, fn := range s.handlerFuncs {
// if err := s.createMicroEndpoint(sub, fn); err != nil {
// return err
// }
// }
s.mu.Lock()
s.running = true
s.mu.Unlock()
// time.Sleep(1 * time.Second)

<-s.shutdownSignal
return nil
Expand All @@ -143,15 +165,16 @@ func (s *Server) Run() error {
func (s *Server) Shutdown(ctx context.Context) error {

Check warning on line 165 in server.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.nc.FlushWithContext(ctx); err != nil {
return err
}
// if err := s.nc.FlushWithContext(ctx); err != nil {
// return err
// }

if err := s.svc.Stop(); err != nil {
return err
}

s.nc.Close()
// s.nc.Close()
s.running = false
s.shutdownSignal <- struct{}{}
return nil
}
Expand All @@ -174,7 +197,9 @@ func (s *Server) Use(mw ...Middleware) {
s.mu.Lock()
defer s.mu.Unlock()

s.mw = mw
if !s.running {
s.mw = append(s.mw, mw...)
}
}

func (s *Server) applyMiddlewares() {
Expand All @@ -189,7 +214,9 @@ func (s *Server) applyMiddlewares() {

// createMicroEndpoint registers a HandlerFunc as a micro Endpoint
// allowing for automatic service discovery and observability.
func (s *Server) createMicroEndpoint(subject string, handlerFunc HandlerFunc) error {
func (s *Server) createMicroEndpoint(svc micro.Service, subject string, handlerFunc HandlerFunc) error {

Check warning on line 217 in server.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

unused-parameter: parameter 'svc' seems to be unused, consider removing or renaming it as _ (revive)
s.mu.Lock()
defer s.mu.Unlock()
err := s.svc.AddEndpoint(
nameFromSubject(subject),
micro.ContextHandler(context.Background(), func(ctx context.Context, r micro.Request) {
Expand All @@ -210,10 +237,8 @@ func (s *Server) createMicroEndpoint(subject string, handlerFunc HandlerFunc) er
resp := handlerFunc(ctx, Request{
Msg: &nats.Msg{
Subject: r.Subject(),
Reply: "",
Header: nats.Header(r.Headers()),
Data: r.Data(),
Sub: &nats.Subscription{},
},
})

Expand Down
Loading

0 comments on commit b892ea1

Please sign in to comment.