Skip to content

Commit

Permalink
dep:upgrade polaris-go version (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jun 22, 2024
1 parent 14c7543 commit a4b578d
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 53 deletions.
31 changes: 11 additions & 20 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -72,11 +71,11 @@ type (

// Build creates polaris balancer.Balancer implement
func (bb *balancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
grpclog.Infof("[Polaris][Balancer] start to build polaris balancer")
GetLogger().Info("[Polaris][Balancer] start to build polaris balancer")
target := opts.Target
host, _, err := parseHost(target.URL.Host)
if err != nil {
grpclog.Errorln("[Polaris][Balancer] failed to create balancer: " + err.Error())
GetLogger().Error("[Polaris][Balancer] failed to create balancer: " + err.Error())
return nil
}
return &polarisNamingBalancer{
Expand Down Expand Up @@ -174,7 +173,7 @@ func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) {
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
grpclog.Warningf("[Polaris][Balancer] failed to create new SubConn: %v", err)
GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
}
p.subConns[key] = sc
Expand All @@ -198,9 +197,7 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS
if state.BalancerConfig != nil {
p.lbCfg = state.BalancerConfig.(*LBConfig)
}
if grpclog.V(2) {
grpclog.Infoln("[Polaris][Balancer] got new ClientConn state: ", state)
}
GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state)
if len(state.ResolverState.Addresses) == 0 {
p.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
Expand Down Expand Up @@ -257,17 +254,13 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
// UpdateSubConnState is called by gRPC when the state of a SubConn changes.
func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
}
GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
oldS, quit := func() (connectivity.State, bool) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
}
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return connectivity.TransientFailure, true
}
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
Expand Down Expand Up @@ -420,13 +413,11 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
request.SourceService = *sourceService
} else {
if err := pnp.addTrafficLabels(info, request); err != nil {
grpclog.Errorf("[Polaris][Balancer] fetch traffic labels fail : %+v", err)
GetLogger().Error("[Polaris][Balancer] fetch traffic labels fail : %+v", err)
}
}

if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] get one instance request : %+v", request)
}
GetLogger().Debug("[Polaris][Balancer] get one instance request : %+v", request)
var err error
resp, err = pnp.balancer.routerAPI.ProcessRouters(request)
if err != nil {
Expand Down Expand Up @@ -500,13 +491,13 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq
engine := pnp.balancer.consumerAPI.SDKContext().GetEngine()
resp, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v",
GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v",
req.GetNamespace(), req.GetService(), err)
return err
}

if resp == nil || resp.GetValue() == nil {
grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
return ErrorPolarisServiceRouteRuleEmpty
}

Expand Down Expand Up @@ -573,6 +564,6 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
callResult.SetDelay(time.Since(r.startTime))
callResult.SetRetCode(int32(code))
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
grpclog.Errorf("[Polaris][Balancer] report grpc call info fail : %+v", err)
GetLogger().Error("[Polaris][Balancer] report grpc call info fail : %+v", err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-beta.5
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down
68 changes: 51 additions & 17 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package grpcpolaris

import (
"io"
"log"
"sync/atomic"

"github.com/natefinch/lumberjack"
)

type LogLevel int

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type LogLevel should have comment or be unexported

Check warning on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.18.x)

exported type LogLevel should have comment or be unexported
Expand All @@ -32,40 +34,72 @@ const (
LogError
)

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function SetLogger should have comment or be unexported
_log = logger
}

func GetLogger() Logger {

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function GetLogger should have comment or be unexported
return _log
}

type Logger interface {

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type Logger should have comment or be unexported
SetWriter(io.WriteCloser)
SetLevel()
Debug(format string, args interface{})
Info(format string, args interface{})
Warn(format string, args interface{})
Error(format string, args interface{})
SetLevel(LogLevel)
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
}

type defaultLogger struct {
writerRef atomic.Value
levelRef atomic.Value
writer *log.Logger
levelRef atomic.Value
}

func (l *defaultLogger) SetWriter(writer io.WriteCloser) {
l.writerRef.Store(writer)
func newDefaultLogger() *defaultLogger {
lumberJackLogger := &lumberjack.Logger{
Filename: "./logs/grpc-go-polaris.log", // 文件位置
MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: 7, // 保留旧文件的最大天数
MaxBackups: 100, // 保留旧文件的最大个数
Compress: true, // 是否压缩/归档旧文件
}

levelRef := atomic.Value{}

levelRef.Store(LogInfo)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Llongfile|log.Ldate|log.Ltime),
levelRef: levelRef,
}
}

func (l *defaultLogger) SetLevel(level LogLevel) {
l.levelRef.Store(level)
}

func (l *defaultLogger) Debug(format string, args interface{}) {

func (l *defaultLogger) Debug(format string, args ...interface{}) {
l.printf(LogDebug, format, args...)
}

func (l *defaultLogger) Info(format string, args interface{}) {
func (l *defaultLogger) Info(format string, args ...interface{}) {
l.printf(LogInfo, format, args...)

}

func (l *defaultLogger) Warn(format string, args interface{}) {

func (l *defaultLogger) Warn(format string, args ...interface{}) {
l.printf(LogWarn, format, args...)
}

func (l *defaultLogger) Error(format string, args interface{}) {
func (l *defaultLogger) Error(format string, args ...interface{}) {
l.printf(LogError, format, args...)
}

func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) {
curLevel := l.levelRef.Load().(LogLevel)
if curLevel > expectLevel {
return
}
l.writer.Printf(format, args...)
}
9 changes: 4 additions & 5 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -80,7 +79,7 @@ func (p *RateLimitInterceptor) UnaryInterceptor(ctx context.Context, req interfa

future, err := p.limitAPI.GetQuota(quotaReq)
if nil != err {
grpclog.Errorf("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err)
GetLogger().Error("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err)
return handler(ctx, req)
}

Expand Down Expand Up @@ -167,21 +166,21 @@ func (p *RateLimitInterceptor) fetchArguments(req *model.QuotaRequestImpl) ([]*t
}

if err := engine.SyncGetResources(getRuleReq); err != nil {
grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v",
GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v",
req.GetNamespace(), req.GetService(), err)
return nil, false
}

svcRule := getRuleReq.RateLimitRule
if svcRule == nil || svcRule.GetValue() == nil {
grpclog.Warningf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil",
GetLogger().Warn("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil",
req.GetNamespace(), req.GetService())
return nil, false
}

rules, ok := svcRule.GetValue().(*traffic_manage.RateLimit)
if !ok {
grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid",
GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid",
req.GetNamespace(), req.GetService())
return nil, false
}
Expand Down
5 changes: 2 additions & 3 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/model"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -280,12 +279,12 @@ func (pr *polarisNamingResolver) watcher() {
continue
}
if err = pr.cc.UpdateState(*state); nil != err {
grpclog.Errorf("fail to do update service %s: %v", pr.target.URL.Host, err)
GetLogger().Error("fail to do update service %s: %v", pr.target.URL.Host, err)
}
var svcKey model.ServiceKey
svcKey, eventChan, err = pr.doWatch(consumerAPI)
if nil != err {
grpclog.Errorf("fail to do watch for service %s: %v", svcKey, err)
GetLogger().Error("fail to do watch for service %s: %v", svcKey, err)
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package grpcpolaris
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -99,11 +98,13 @@ func (srv *Server) doRegister(lis net.Listener) error {
}
srv.serverOptions.host = host
}
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %w", err)
if srv.serverOptions.port == 0 {
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %w", err)
}
srv.serverOptions.port = port
}
srv.serverOptions.port = port
svcInfos := buildServiceNames(srv.Server, srv)

for _, name := range svcInfos {
Expand All @@ -130,7 +131,7 @@ func (srv *Server) Serve(lis net.Listener) error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGSEGV, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
srv.Stop()
}()
Expand Down Expand Up @@ -170,14 +171,14 @@ func (srv *Server) Deregister() {
func Serve(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) error {
pSrv, err := Register(gSrv, lis, opts...)
if err != nil {
log.Fatalf("polaris register err: %v", err)
GetLogger().Error("polaris register err: %v", err)
}

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
pSrv.Stop()
}()
Expand Down

0 comments on commit a4b578d

Please sign in to comment.