Skip to content

Commit

Permalink
add code origin support to kafka client
Browse files Browse the repository at this point in the history
# Conflicts:
#	dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/probe/CodeOriginProbe.java
  • Loading branch information
evanchooly committed Feb 10, 2025
1 parent added74 commit 0eddcc4
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public byte[] transform(

private boolean skipInstrumentation(ClassLoader loader, String classFilePath) {
if (definitionMatcher.isEmpty()) {
log.warn("No debugger definitions present.");
log.debug("No debugger definitions present.");
return true;
}
if (classFilePath == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ public String getSourceFileName() {
public String getTypeName() {
return Strings.getClassName(classNode.name);
}

@Override
public String toString() {
return String.format(
"MethodInfo{classNode=%s, methodNode=%s}", classNode.name, methodNode.desc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,8 @@ public int hashCode() {

@Override
public String toString() {
return "CodeOriginProbe{"
+ "id='"
+ id
+ '\''
+ ", version="
+ version
+ ", tags="
+ Arrays.toString(tags)
+ ", where="
+ where
+ ", evaluateAt="
+ evaluateAt
+ ", entrySpanProbe="
+ entrySpanProbe
+ "} ";
return String.format(
"CodeOriginProbe{probeId=%s, entrySpanProbe=%s, signature=%s, location=%s}",
probeId, entrySpanProbe, signature, location);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import datadog.trace.bootstrap.debugger.DebuggerContext.CodeOriginRecorder
import com.google.common.util.concurrent.MoreExecutors
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.debugger.DebuggerContext
import datadog.trace.bootstrap.instrumentation.api.Tags
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
Expand All @@ -13,17 +10,15 @@ import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.stub.StreamObserver

import java.lang.reflect.Method
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import static datadog.trace.api.config.TraceInstrumentationConfig.*
import static datadog.trace.agent.test.asserts.TagsAssert.assertTags

abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
def codeOriginRecorder

abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
@Override
final String service() {
return null
Expand Down Expand Up @@ -56,7 +51,7 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
codeOriginSetup()
}

def "test conversation #name"() {
def "code origin test #name"() {
setup:

def msgCount = serverMessageCount
Expand Down Expand Up @@ -155,77 +150,13 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
}
}.flatten().sort()


assert codeOriginRecorder.invoked
assertTraces(2) {
trace((hasClientMessageSpans() ? clientMessageCount * serverMessageCount : 0) + 1) {
span {
operationName clientOperation()
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
parent()
errored false
tags {
"$Tags.COMPONENT" "grpc-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.RPC_SERVICE" "example.Greeter"
"status.code" "OK"
"request.type" "example.Helloworld\$Response"
"response.type" "example.Helloworld\$Response"
peerServiceFrom(Tags.RPC_SERVICE)
defaultTags()
}
}
if (hasClientMessageSpans()) {
(1..(clientMessageCount * serverMessageCount)).each {
span {
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-client"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"message.type" "example.Helloworld\$Response"
defaultTagsNoPeerService()
}
}
}
}
}
trace(clientMessageCount + 1) {
span {
operationName serverOperation()
resourceName "example.Greeter/Conversation"
spanType DDSpanTypes.RPC
childOf trace(0).get(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-server"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"status.code" "OK"

defaultTags(true)
}
}
clientRange.each {
span {
operationName "grpc.message"
resourceName "grpc.message"
spanType DDSpanTypes.RPC
childOf span(0)
errored false
tags {
"$Tags.COMPONENT" "grpc-server"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
"message.type" "example.Helloworld\$Response"
defaultTags()
}
}
}
}
assert DebuggerContext.codeOriginRecorder != null
def span = TEST_WRITER.flatten().find {
it.operationName.toString() == "grpc.server.request"
}
assertTags(span, {
it.codeOriginTags()
}, false)

cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
Expand All @@ -247,26 +178,6 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
clientRange = 1..clientMessageCount
serverRange = 1..serverMessageCount
}


void codeOriginSetup() {
injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED, "true", true)
codeOriginRecorder = new CodeOriginRecorder() {
def invoked = false
@Override
String captureCodeOrigin(boolean entry) {
invoked = true
return "done"
}

@Override
String captureCodeOrigin(Method method, boolean entry) {
invoked = true
return "done"
}
}
DebuggerContext.initCodeOrigin(codeOriginRecorder)
}
}

class GrpcCodeOriginForkedTest extends GrpcCodeOriginTest {
Expand Down
3 changes: 3 additions & 0 deletions dd-java-agent/instrumentation/kafka-clients-3.8/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ dependencies {
implementation project(':dd-java-agent:instrumentation:kafka-common')
main_java17Implementation project(':dd-java-agent:instrumentation:kafka-common')

implementation project(':dd-java-agent:instrumentation:span-origin')
main_java17Implementation project(':dd-java-agent:instrumentation:span-origin')

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.8.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.1.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '3.1.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumenterModule.class)
public class MessageListenerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {

public MessageListenerInstrumentation() {
super("kafka", "kafka-3.8");
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return hasClassNamed("org.apache.kafka.clients.MetadataRecoveryStrategy"); // since 3.8
}

@Override
public String hierarchyMarkerType() {
return "org.springframework.kafka.listener.MessageListener";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()));
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("onMessage")),
datadog.trace.instrumentation.codeorigin.EntrySpanOriginAdvice.class.getName());
}
}
Loading

0 comments on commit 0eddcc4

Please sign in to comment.