Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[do not review]tracing stream #2312

Open
wants to merge 29 commits into
base: dev-tracing-fix
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions api/transport/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,41 @@ type ResponseWriter interface {
SetApplicationError()
}

// ExtendedResponseWriter extends the ResponseWriter interface to allow setting
// and retrieving additional metadata related to application errors, and to track
// the size of the response.
//
// Functions on ExtendedResponseWriter are not thread-safe.
type ExtendedResponseWriter interface {
ResponseWriter

// SetApplicationError specifies that this response contains an application error.
// If called, this MUST be called before any invocation of Write().
// This signals that the response is an application-level error, rather than
// a system or transport error.
SetApplicationError()

// SetApplicationErrorMeta allows setting additional metadata related to the
// application error. The metadata can contain fields such as error code, name,
// or details that provide more context about the error.
SetApplicationErrorMeta(applicationErrorMeta *ApplicationErrorMeta)

// IsApplicationError returns a boolean indicating whether the response
// contains an application error. This helps consumers of the response to know
// if the response was flagged as an application error.
IsApplicationError() bool

// ApplicationErrorMeta returns the application error metadata that was set
// by the SetApplicationErrorMeta method. If no metadata was set, this returns nil.
// This can provide further details about the nature of the application error.
ApplicationErrorMeta() *ApplicationErrorMeta

// ResponseSize returns the number of bytes written to the response. This method
// allows tracking of the size of the response, which can be useful for monitoring,
// logging, or debugging purposes.
ResponseSize() int
}

// ApplicationErrorMeta contains additional information to describe the
// application error, using an error name, code and details string.
//
Expand Down
227 changes: 204 additions & 23 deletions internal/tracinginterceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ package tracinginterceptor

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/interceptor"
"go.uber.org/yarpc/transport/tchannel/tracing"
"go.uber.org/yarpc/yarpcerrors"
"go.uber.org/zap"
)

var (
Expand All @@ -39,54 +45,229 @@ var (

// Params defines the parameters for creating the Interceptor
type Params struct {
// Tracer is used to propagate context and to generate spans
Tracer opentracing.Tracer
// Transport is the name of the transport, it decides the propagation format and propagation carrier
Tracer opentracing.Tracer
Transport string
Logger *zap.Logger
}

// Interceptor is the tracing interceptor for all RPC types.
// It handles both tracing observability and context propagation using OpenTracing APIs.
type Interceptor struct {
tracer opentracing.Tracer
transport string
propagationFormat opentracing.BuiltinFormat
log *zap.Logger
}

// PropagationCarrier is an interface to combine both reader and writer interface
type PropagationCarrier interface {
opentracing.TextMapReader
opentracing.TextMapWriter
}

// New constructs a tracing interceptor with the provided parameter.
func New(p Params) *Interceptor {
return &Interceptor{}
i := &Interceptor{
tracer: p.Tracer,
transport: p.Transport,
propagationFormat: getPropagationFormat(p.Transport),
log: p.Logger,
}
if i.tracer == nil {
i.tracer = opentracing.GlobalTracer()
}
if i.log == nil {
i.log = zap.NewNop()
}
return i
}

// Handle implements interceptor.UnaryInbound
func (m *Interceptor) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
// TODO: implement
panic("implement me")
func (i *Interceptor) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
parentSpanCtx, _ := i.tracer.Extract(i.propagationFormat, getPropagationCarrier(req.Headers.Items(), req.Transport))
extractOpenTracingSpan := &transport.ExtractOpenTracingSpan{
ParentSpanContext: parentSpanCtx,
Tracer: i.tracer,
TransportName: req.Transport,
StartTime: time.Now(),
ExtraTags: commonTracingTags,
}
ctx, span := extractOpenTracingSpan.Do(ctx, req)
defer span.Finish()
err := h.Handle(ctx, req, resw)

extendedWriter, ok := resw.(transport.ExtendedResponseWriter)
if !ok {
i.log.Debug("ResponseWriter does not implement ExtendedResponseWriter, passing false and nil for app error meta")
return updateSpanWithErrorDetails(span, false, nil, err)
}

return updateSpanWithErrorDetails(span, extendedWriter.IsApplicationError(), extendedWriter.ApplicationErrorMeta(), err)
}

// Call implements interceptor.UnaryOutbound
func (m *Interceptor) Call(ctx context.Context, req *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
// TODO: implement
panic("implement me")
func (i *Interceptor) Call(ctx context.Context, req *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
createOpenTracingSpan := &transport.CreateOpenTracingSpan{
Tracer: i.tracer,
TransportName: i.transport,
StartTime: time.Now(),
ExtraTags: commonTracingTags,
}
ctx, span := createOpenTracingSpan.Do(ctx, req)
defer span.Finish()

tracingHeaders := make(map[string]string)

// We use i.transport here because this is an outbound call made by the interceptor.
// In inbound handlers (e.g., Handle function), req.Transport is used because it's the transport from the incoming request.
if err := i.tracer.Inject(span.Context(), i.propagationFormat, getPropagationCarrier(tracingHeaders, i.transport)); err != nil {
span.LogFields(logFieldEventError, log.String("message", err.Error()))
} else {
for k, v := range tracingHeaders {
req.Headers = req.Headers.With(k, v)
}
}

res, err := out.Call(ctx, req)
if res != nil {
return res, updateSpanWithErrorDetails(span, res.ApplicationError, res.ApplicationErrorMeta, err)
}
return nil, updateSpanWithErrorDetails(span, false, nil, err)
}

// HandleOneway implements interceptor.OnewayInbound
func (m *Interceptor) HandleOneway(ctx context.Context, req *transport.Request, h transport.OnewayHandler) error {
// TODO: implement
panic("implement me")
func (i *Interceptor) HandleOneway(ctx context.Context, req *transport.Request, h transport.OnewayHandler) error {
parentSpanCtx, _ := i.tracer.Extract(i.propagationFormat, getPropagationCarrier(req.Headers.Items(), req.Transport))
extractOpenTracingSpan := &transport.ExtractOpenTracingSpan{
ParentSpanContext: parentSpanCtx,
Tracer: i.tracer,
TransportName: req.Transport,
StartTime: time.Now(),
ExtraTags: commonTracingTags,
}
ctx, span := extractOpenTracingSpan.Do(ctx, req)
defer span.Finish()

err := h.HandleOneway(ctx, req)
return updateSpanWithErrorDetails(span, false, nil, err)
}

// CallOneway implements interceptor.OnewayOutbound
func (m *Interceptor) CallOneway(ctx context.Context, request *transport.Request, out transport.OnewayOutbound) (transport.Ack, error) {
// TODO: implement
panic("implement me")
func (i *Interceptor) CallOneway(ctx context.Context, req *transport.Request, out transport.OnewayOutbound) (transport.Ack, error) {
createOpenTracingSpan := &transport.CreateOpenTracingSpan{
Tracer: i.tracer,
TransportName: i.transport,
StartTime: time.Now(),
ExtraTags: commonTracingTags,
}
ctx, span := createOpenTracingSpan.Do(ctx, req)
defer span.Finish()

tracingHeaders := make(map[string]string)
if err := i.tracer.Inject(span.Context(), i.propagationFormat, getPropagationCarrier(tracingHeaders, i.transport)); err != nil {
span.LogFields(logFieldEventError, log.String("message", err.Error()))
} else {
for k, v := range tracingHeaders {
req.Headers = req.Headers.With(k, v)
}
}

ack, err := out.CallOneway(ctx, req)
return ack, updateSpanWithErrorDetails(span, false, nil, err)
}

// HandleStream implements interceptor.StreamInbound
func (m *Interceptor) HandleStream(s *transport.ServerStream, h transport.StreamHandler) error {
// TODO: implement
panic("implement me")
func (i *Interceptor) HandleStream(s *transport.ServerStream, h transport.StreamHandler) error {
parentSpanCtx, _ := i.tracer.Extract(i.propagationFormat, getPropagationCarrier(s.Request().Meta.Headers.Items(), i.transport))

// Start the server-side span
span := i.tracer.StartSpan(
s.Request().Meta.Procedure,
opentracing.ChildOf(parentSpanCtx),
ext.SpanKindRPCServer,
opentracing.StartTime(time.Now()),
)
defer span.Finish()

// Wrap the ServerStream in a tracedServerStream
err := h.HandleStream(s)
tracedStream := newTracedServerStream(*s, span, i.log)

return updateSpanWithErrorDetails(span, tracedStream.IsApplicationError(), tracedStream.ApplicationErrorMeta(), err)
}

// CallStream implements interceptor.StreamOutbound
func (m *Interceptor) CallStream(ctx context.Context, req *transport.StreamRequest, out transport.StreamOutbound) (*transport.ClientStream, error) {
// TODO: implement
panic("implement me")
func (i *Interceptor) CallStream(ctx context.Context, req *transport.StreamRequest, out transport.StreamOutbound) (*transport.ClientStream, error) {
span := i.tracer.StartSpan(
req.Meta.Procedure,
ext.SpanKindRPCClient,
opentracing.StartTime(time.Now()),
)
defer span.Finish()

// Inject span context into headers for tracing propagation
tracingHeaders := make(map[string]string)
if err := i.tracer.Inject(span.Context(), i.propagationFormat, getPropagationCarrier(tracingHeaders, i.transport)); err != nil {
span.LogFields(logFieldEventError, log.String("message", err.Error()))
} else {
for k, v := range tracingHeaders {
req.Meta.Headers = req.Meta.Headers.With(k, v)
}
}

clientStream, err := out.CallStream(ctx, req)
if err != nil {
if updateErr := updateSpanWithErrorDetails(span, false, nil, err); updateErr != nil {
i.log.Error("Failed to update span with error details", zap.Error(updateErr))
}
return nil, err
}

tracedStream := newTracedClientStream(clientStream, span, i.log)
return &tracedStream.ClientStream, nil
}

func updateSpanWithErrorDetails(
span opentracing.Span,
isApplicationError bool,
appErrorMeta *transport.ApplicationErrorMeta,
err error,
) error {
if err == nil && !isApplicationError {
return nil
}
ext.Error.Set(span, true)
if status := yarpcerrors.FromError(err); status != nil {
errCode := status.Code()
span.SetTag(rpcStatusCodeTag, int(errCode))
return err
}
if isApplicationError {
span.SetTag(rpcStatusCodeTag, applicationError)

if appErrorMeta != nil {
if appErrorMeta.Code != nil {
span.SetTag(rpcStatusCodeTag, int(*appErrorMeta.Code))
}
if appErrorMeta.Name != "" {
span.SetTag(errorNameTag, appErrorMeta.Name)
}
}
return err
}
span.SetTag(rpcStatusCodeTag, int(yarpcerrors.CodeUnknown))
return err
}

func getPropagationFormat(transport string) opentracing.BuiltinFormat {
if transport == "tchannel" {
return opentracing.TextMap
}
return opentracing.HTTPHeaders
}

func getPropagationCarrier(headers map[string]string, transport string) PropagationCarrier {
if transport == "tchannel" {
return tracing.HeadersCarrier(headers)
}
return opentracing.TextMapCarrier(headers)
}
Loading