Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Migrate to slog #201

Merged
merged 2 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"

"github.com/anycable/anycable-go/server"
"github.com/apex/log"
)

const (
Expand Down Expand Up @@ -41,7 +41,7 @@ type HTTPBroadcaster struct {
authHeader string
server *server.HTTPServer
node Handler
log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*HTTPBroadcaster)(nil)
Expand All @@ -56,7 +56,7 @@ func NewHTTPBroadcaster(node Handler, config *HTTPConfig) *HTTPBroadcaster {

return &HTTPBroadcaster{
node: node,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "http"}),
log: slog.With("context", "broadcast").With("provider", "http"),
port: config.Port,
path: config.Path,
authHeader: authHeader,
Expand All @@ -78,7 +78,7 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error {
s.server = server
s.server.SetupHandler(s.path, http.HandlerFunc(s.Handler))

s.log.Infof("Accept broadcast requests at %s%s", s.server.Address(), s.path)
s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s", s.server.Address(), s.path))

go func() {
if err := s.server.StartAndAnnounce("broadcasting HTTP server"); err != nil {
Expand All @@ -103,7 +103,7 @@ func (s *HTTPBroadcaster) Shutdown(ctx context.Context) error {
// Handler processes HTTP requests
func (s *HTTPBroadcaster) Handler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
s.log.Debugf("Invalid request method: %s", r.Method)
s.log.Debug("invalid request method", "method", r.Method)
w.WriteHeader(422)
return
}
Expand All @@ -118,7 +118,7 @@ func (s *HTTPBroadcaster) Handler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)

if err != nil {
s.log.Error("Failed to read request body")
s.log.Error("failed to read request body")
w.WriteHeader(422)
return
}
Expand Down
14 changes: 7 additions & 7 deletions broadcast/legacy_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package broadcast

import (
"context"
"log/slog"

"github.com/apex/log"
"github.com/nats-io/nats.go"

nconfig "github.com/anycable/anycable-go/nats"
Expand All @@ -14,7 +14,7 @@ type LegacyNATSBroadcaster struct {
handler Handler
config *nconfig.NATSConfig

log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*LegacyNATSBroadcaster)(nil)
Expand All @@ -23,7 +23,7 @@ func NewLegacyNATSBroadcaster(node Handler, c *nconfig.NATSConfig) *LegacyNATSBr
return &LegacyNATSBroadcaster{
config: c,
handler: node,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "nats"}),
log: slog.With("context", "broadcast").With("provider", "nats"),
}
}

Expand All @@ -37,11 +37,11 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
nats.MaxReconnects(s.config.MaxReconnectAttempts),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
log.Warnf("Connection failed: %v", err)
slog.Warn("connection failed", "error", err.Error())
}
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Infof("Connection restored: %s", nc.ConnectedUrl())
slog.Info("connection restored", "url", nc.ConnectedUrl())
}),
}

Expand All @@ -56,7 +56,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
}

_, err = nc.Subscribe(s.config.Channel, func(m *nats.Msg) {
s.log.Debugf("Incoming pubsub message: %s", m.Data)
s.log.Debug("incoming pubsub message", "data", m.Data)
s.handler.HandlePubSub(m.Data)
})

Expand All @@ -65,7 +65,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
return err
}

s.log.Infof("Subscribing for broadcasts to channel: %s", s.config.Channel)
s.log.Info("subscribing for broadcasts", "channel", s.config.Channel)

s.conn = nc

Expand Down
33 changes: 16 additions & 17 deletions broadcast/legacy_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"net/url"
"strings"
"time"
Expand All @@ -12,7 +13,6 @@ import (
rconfig "github.com/anycable/anycable-go/redis"
"github.com/anycable/anycable-go/utils"

"github.com/apex/log"
"github.com/gomodule/redigo/redis"
)

Expand All @@ -28,7 +28,7 @@ type LegacyRedisBroadcaster struct {
reconnectAttempt int
maxReconnectAttempts int
uri *url.URL
log *log.Entry
log *slog.Logger
tlsVerify bool
}

Expand All @@ -43,7 +43,7 @@ func NewLegacyRedisBroadcaster(node Handler, config *rconfig.RedisConfig) *Legac
pingInterval: time.Duration(config.KeepalivePingInterval),
reconnectAttempt: 0,
maxReconnectAttempts: config.MaxReconnectAttempts,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "redis"}),
log: slog.With("context", "broadcast").With("provider", "redis"),
tlsVerify: config.TLSVerify,
}
}
Expand All @@ -67,8 +67,7 @@ func (s *LegacyRedisBroadcaster) Start(done chan (error)) error {
if s.sentinels != "" {
masterName := redisURL.Hostname()

s.log.Debug("Redis sentinel enabled")
s.log.Debugf("Redis sentinel parameters: sentinels: %s, masterName: %s", s.sentinels, masterName)
s.log.Debug("Redis sentinel enabled", "sentinels", s.sentinels, "master", masterName)
sentinels := strings.Split(s.sentinels, ",")
s.sentinelClient = &sentinel.Sentinel{
Addrs: sentinels,
Expand Down Expand Up @@ -100,10 +99,10 @@ func (s *LegacyRedisBroadcaster) Start(done chan (error)) error {
dialOptions...,
)
if err != nil {
s.log.Debugf("Failed to connect to sentinel %s", addr)
s.log.Debug("failed to connect to sentinel", "addr", addr)
return nil, err
}
s.log.Debugf("Successfully connected to sentinel %s", addr)
s.log.Debug("successfully connected to sentinel", "addr", addr)
return c, nil
},
}
Expand Down Expand Up @@ -149,18 +148,18 @@ func (s *LegacyRedisBroadcaster) keepalive(done chan (error)) {
masterAddress, err := s.sentinelClient.MasterAddr()

if err != nil {
s.log.Warn("Failed to get master address from sentinel.")
s.log.Warn("failed to get master address from sentinel")
done <- err
return
}
s.log.Debugf("Got master address from sentinel: %s", masterAddress)
s.log.Debug("obtained master address from sentinel", "addr", masterAddress)

s.uri.Host = masterAddress
s.url = s.uri.String()
}

if err := s.listen(); err != nil {
s.log.Warnf("Redis connection failed: %v", err)
s.log.Warn("Redis connection failed", "error", err.Error())
}

s.reconnectAttempt++
Expand All @@ -172,10 +171,10 @@ func (s *LegacyRedisBroadcaster) keepalive(done chan (error)) {

delay := utils.NextRetry(s.reconnectAttempt)

s.log.Infof("Next Redis reconnect attempt in %s", delay)
s.log.Info(fmt.Sprintf("next Redis reconnect attempt in %s", delay))
time.Sleep(delay)

s.log.Infof("Reconnecting to Redis...")
s.log.Info("reconnecting to Redis...")
}
}

Expand All @@ -198,13 +197,13 @@ func (s *LegacyRedisBroadcaster) listen() error {

if s.sentinels != "" {
if !sentinel.TestRole(c, "master") {
return errors.New("Failed master role check")
return errors.New("failed master role check")
}
}

psc := redis.PubSubConn{Conn: c}
if err = psc.Subscribe(s.channel); err != nil {
s.log.Errorf("Failed to subscribe to Redis channel: %v", err)
s.log.Error("failed to subscribe to Redis channel", "error", err.Error())
return err
}

Expand All @@ -216,12 +215,12 @@ func (s *LegacyRedisBroadcaster) listen() error {
for {
switch v := psc.Receive().(type) {
case redis.Message:
s.log.Debugf("Incoming pubsub message from Redis: %s", v.Data)
s.log.Debug("incoming pubsub message", "data", v.Data)
s.node.HandlePubSub(v.Data)
case redis.Subscription:
s.log.Infof("Subscribed to Redis channel: %s\n", v.Channel)
s.log.Info("subscribed to Redis channel", "channel", v.Channel)
case error:
s.log.Errorf("Redis subscription error: %v", v)
s.log.Error("Redis subscription error", "error", v.Error())
done <- v
}
}
Expand Down
38 changes: 19 additions & 19 deletions broadcast/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"

rconfig "github.com/anycable/anycable-go/redis"
"github.com/anycable/anycable-go/utils"

"github.com/apex/log"
nanoid "github.com/matoous/go-nanoid"
"github.com/redis/rueidis"
)
Expand All @@ -33,7 +33,7 @@ type RedisBroadcaster struct {
shutdownCh chan struct{}
finishedCh chan struct{}

log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*RedisBroadcaster)(nil)
Expand All @@ -46,7 +46,7 @@ func NewRedisBroadcaster(node Handler, config *rconfig.RedisConfig) *RedisBroadc
node: node,
config: config,
consumerName: name,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "redisx", "id": name}),
log: slog.With("context", "broadcast").With("provider", "redisx").With("id", name),
shutdownCh: make(chan struct{}),
finishedCh: make(chan struct{}),
}
Expand All @@ -64,11 +64,11 @@ func (s *RedisBroadcaster) Start(done chan error) error {
}

if s.config.IsSentinel() { //nolint:gocritic
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %v (sentinels)", s.config.Hostnames())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %v (sentinels)", s.config.Hostnames()))
} else if s.config.IsCluster() {
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %v (cluster)", s.config.Hostnames())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %v (cluster)", s.config.Hostnames()))
} else {
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %s", s.config.Hostname())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %s", s.config.Hostname()))
}

s.clientOptions = options
Expand All @@ -86,7 +86,7 @@ func (s *RedisBroadcaster) Shutdown(ctx context.Context) error {
return nil
}

s.log.Debugf("Shutting down Redis broadcaster")
s.log.Debug("shutting down Redis broadcaster")

close(s.shutdownCh)

Expand All @@ -100,7 +100,7 @@ func (s *RedisBroadcaster) Shutdown(ctx context.Context) error {
err := res.Error()

if err != nil {
s.log.Errorf("Failed to remove Redis stream consumer: %v", err)
s.log.Error("failed to remove Redis stream consumer", "error", err.Error())
}

s.client.Close()
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
err := s.initClient()

if err != nil {
s.log.Errorf("Failed to connect to Redis: %v", err)
s.log.Error("failed to connect to Redis", "error", err)
s.maybeReconnect(done)
return
}
Expand All @@ -144,9 +144,9 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
if err != nil {
if redisErr, ok := rueidis.IsRedisErr(err); ok {
if strings.HasPrefix(redisErr.Error(), "BUSYGROUP") {
s.log.Debugf("Redis consumer group already exists")
s.log.Debug("Redis consumer group already exists")
} else {
s.log.Errorf("Failed to create consumer group: %v", err)
s.log.Error("failed to create consumer group", "error", err.Error())
s.maybeReconnect(done)
return
}
Expand All @@ -161,23 +161,23 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
for {
select {
case <-s.shutdownCh:
s.log.Debugf("Stop consuming stream")
s.log.Debug("stop consuming stream")
close(s.finishedCh)
return
default:
if lastClaimedAt+readBlockMilliseconds < time.Now().UnixMilli() {
reclaimed, err := s.autoclaimMessages(readBlockMilliseconds)

if err != nil {
s.log.Errorf("Failed to claim from Redis stream: %v", err)
s.log.Error("failed to claim from Redis stream", "error", err)
s.maybeReconnect(done)
return
}

lastClaimedAt = time.Now().UnixMilli()

if len(reclaimed) > 0 {
s.log.Debugf("Reclaimed messages: %d", len(reclaimed))
s.log.Debug("reclaimed messages", "size", len(reclaimed))

s.broadcastXrange(reclaimed)
}
Expand All @@ -186,7 +186,7 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
messages, err := s.readFromStream(readBlockMilliseconds)

if err != nil {
s.log.Errorf("Failed to read from Redis stream: %v", err)
s.log.Error("failed to read from Redis stream", "error", err)
s.maybeReconnect(done)
return
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *RedisBroadcaster) autoclaimMessages(blockTime int64) ([]rueidis.XRangeE
func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) {
for _, message := range messages {
if payload, pok := message.FieldValues["payload"]; pok {
s.log.Debugf("Incoming broadcast: %v", payload)
s.log.Debug("incoming broadcast", "data", payload)

s.node.HandleBroadcast([]byte(payload))

Expand All @@ -264,7 +264,7 @@ func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) {
err := ackRes[0].Error()

if err != nil {
s.log.Errorf("Failed to ack message: %v", err)
s.log.Error("failed to ack message", "error", err)
}
}
}
Expand All @@ -281,10 +281,10 @@ func (s *RedisBroadcaster) maybeReconnect(done chan (error)) {

delay := utils.NextRetry(s.reconnectAttempt - 1)

s.log.Infof("Next Redis reconnect attempt in %s", delay)
s.log.Info(fmt.Sprintf("next Redis reconnect attempt in %s", delay))
time.Sleep(delay)

s.log.Infof("Reconnecting to Redis...")
s.log.Info("reconnecting to Redis...")

s.clientMu.Lock()

Expand Down
Loading
Loading