Skip to content

Commit

Permalink
fix: jeager tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
linyyyang committed Nov 8, 2022
1 parent 1bcee94 commit 8e1ed25
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 34 deletions.
2 changes: 1 addition & 1 deletion client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (c *Client) Request(ctx context.Context, method,
if c.options.tracer != nil {
clientSpan := parseTrace(ctx, method, "httpClient-"+method, c.options.tracer)
carrier := opentracing.HTTPHeadersCarrier(header)
_ = c.options.tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, carrier)
_ = clientSpan.Tracer().Inject(clientSpan.Context(), opentracing.HTTPHeaders, carrier)
header = http.Header(carrier)
defer clientSpan.Finish()
}
Expand Down
9 changes: 4 additions & 5 deletions middleware/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func TracerMiddleWare(t opentracing.Tracer) MuxMiddleware {
t = opentracing.GlobalTracer()
}

ctx := ChainHeader(w, r)

ctx := r.Context()
parentSpanContext, err := t.Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header))
Expand All @@ -50,20 +49,20 @@ func TracerMiddleWare(t opentracing.Tracer) MuxMiddleware {
defer serverSpan.Finish()
}

r = r.WithContext(ctx)
r = r.WithContext(ChainHeader(ctx, w, r))
h.ServeHTTP(w, r)

}
}
}

// ChainHeader
func ChainHeader(w http.ResponseWriter, r *http.Request) context.Context {
func ChainHeader(ctx context.Context, w http.ResponseWriter, r *http.Request) context.Context {
var (
reqId string
otHeaders = HeaderMap
)
ctx := r.Context()

preTags := tags.Extract(ctx)

for k, v := range otHeaders {
Expand Down
6 changes: 6 additions & 0 deletions server/mux/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func fullOptions(logger *zap.Logger,
o.bodyWriter = rewriter.DefaultBodyWriter(o.bodyMarshaler, o.bodyMarshaler, o.withoutHTTPStatus)
}

// tracer
if o.tracer != nil {
o.middleWares = append(o.middleWares,
middleware.TracerMiddleWare(o.tracer))
}

// limiter
if o.limiter != nil {
o.middleWares = append(o.middleWares,
Expand Down
1 change: 0 additions & 1 deletion server/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func NewMuxServe(logger *zap.Logger, opts ...Optional) *MuxServe {
// default middleWares
var middlewares = []middleware.MuxMiddleware{
middleware.RecoverMiddleWare(o.logger, o.bodyMarshaler, o.errorMarshaler, o.withoutHTTPStatus),
middleware.TracerMiddleWare(o.tracer),
health.HealthMiddleware,
}
if len(o.middleWares) > 0 {
Expand Down
55 changes: 50 additions & 5 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package server

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/goriller/ginny-util/ip"
"github.com/goriller/ginny/interceptor"
"github.com/goriller/ginny/interceptor/limit"
"github.com/goriller/ginny/interceptor/logging"
Expand All @@ -25,9 +27,11 @@ import (
)

type options struct {
grpcAddr string
httpAddr string
tags []string // for register service
grpcAddr string
grpcSevAddr string
httpAddr string
httpSevAddr string
tags []string // for register service

discover Discover
tracer opentracing.Tracer
Expand Down Expand Up @@ -67,13 +71,47 @@ func evaluateOptions(opts []Option) *options {
var tag []string
t := os.Getenv("SERVICE_TAG")
tag = strings.Split(t, ",")

optCopy := &options{
tags: tag,
}
*optCopy = *defaultOptions
for _, o := range opts {
o(optCopy)
}

var (
host string
port string
ghost string
gport string
)
grpcAddrs := strings.Split(optCopy.grpcAddr, ":")
if len(grpcAddrs) == 2 {
ghost = grpcAddrs[0]
gport = grpcAddrs[1]
}
httpAddrs := strings.Split(optCopy.httpAddr, ":")
if len(httpAddrs) == 2 {
host = httpAddrs[0]
port = httpAddrs[1]
}
if ghost == "" {
ghost = ip.GetLocalIP4()
}
if host == "" {
host = ghost
}
if !strings.Contains(ghost, "://") {
ghost = fmt.Sprintf("grpc://%s:%s", ghost, gport)
}
if !strings.Contains(host, "://") {
host = fmt.Sprintf("http://%s:%s", host, port)
}

optCopy.grpcSevAddr = ghost
optCopy.httpSevAddr = host

return optCopy
}

Expand Down Expand Up @@ -240,7 +278,6 @@ func fullOptions(logger *zap.Logger,
muxLoggingOpts...,
),
validator.UnaryServerInterceptor(false),
interceptor.TracerServerUnaryInterceptor(opt.tracer),
}

streamServerInterceptors := []grpc.StreamServerInterceptor{
Expand All @@ -251,7 +288,15 @@ func fullOptions(logger *zap.Logger,
muxLoggingOpts...,
),
validator.StreamServerInterceptor(false),
interceptor.TracerServerStreamInterceptor(opt.tracer),
}

// tracer
if opt.tracer != nil {
opt.muxOptions = append(opt.muxOptions, mux.WithTracer(opt.tracer))
unaryServerInterceptors = append(unaryServerInterceptors,
interceptor.TracerServerUnaryInterceptor(opt.tracer))
streamServerInterceptors = append(streamServerInterceptors,
interceptor.TracerServerStreamInterceptor(opt.tracer))
}
// limiter
if opt.limiter != nil {
Expand Down
29 changes: 7 additions & 22 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/goriller/ginny-util/graceful"
"github.com/goriller/ginny-util/ip"
"github.com/goriller/ginny/health"
"github.com/goriller/ginny/server/mux"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
Expand Down Expand Up @@ -171,22 +168,10 @@ func (s *Server) register(ctx context.Context) error {
return nil
}

if !strings.Contains(s.options.grpcAddr, "://") {
s.options.grpcAddr = fmt.Sprintf("grpc://%s", s.options.grpcAddr)
}
u, err := url.Parse(s.options.grpcAddr)
if err != nil {
return err
}
var host = u.Hostname()
if host == "" {
host = ip.GetLocalIP4()
}

for key := range s.grpcServer.GetServiceInfo() {
// gRPC
name := fmt.Sprintf("%s[%s]", key, "grpc")
err := s.options.discover.ServiceRegister(ctx, name, fmt.Sprintf("%s:%s", host, u.Port()), s.options.tags, nil)
err := s.options.discover.ServiceRegister(ctx, name, s.options.grpcSevAddr, s.options.tags, nil)
if err != nil {
return errors.Wrap(err, "register grpc service error")
}
Expand All @@ -195,7 +180,7 @@ func (s *Server) register(ctx context.Context) error {
// HTTP
if s.options.httpAddr != "" {
hName := fmt.Sprintf("%s[%s]", key, "http")
err = s.options.discover.ServiceRegister(ctx, hName, fmt.Sprintf("%s:%s", host, u.Port()), s.options.tags, nil)
err = s.options.discover.ServiceRegister(ctx, hName, s.options.httpSevAddr, s.options.tags, nil)
if err != nil {
return errors.Wrap(err, "register http service error")
}
Expand All @@ -214,21 +199,21 @@ func (s *Server) deRegister(ctx context.Context) error {

for key := range s.grpcServer.GetServiceInfo() {
// gRPC
name := fmt.Sprintf("%s[%s/%s]", "http", s.options.grpcAddr, key)
name := fmt.Sprintf("%s[%s]", key, "grpc")
err := s.options.discover.ServiceDeregister(ctx, name)
if err != nil {
return errors.Wrapf(err, "deregister service error[id=%s]", name)
return errors.Wrapf(err, "deregister grpc service error[id=%s]", name)
}
s.logger.Log(logging.INFO, "deregister service success: "+name)
s.logger.Log(logging.INFO, "deregister grpc service success: "+name)

// HTTP
if s.options.httpAddr != "" {
hName := fmt.Sprintf("%s[%s]", key, "http")
err = s.options.discover.ServiceDeregister(ctx, hName)
if err != nil {
return errors.Wrapf(err, "deregister http server error[id=%s]", hName)
return errors.Wrapf(err, "deregister http service error[id=%s]", hName)
}
s.logger.Log(logging.INFO, "deregister http server success: "+hName)
s.logger.Log(logging.INFO, "deregister http service success: "+hName)
}
}

Expand Down

0 comments on commit 8e1ed25

Please sign in to comment.