Skip to content

Commit

Permalink
HPCC-586 WsClient OTEL trace support
Browse files Browse the repository at this point in the history
- Injects trace context in traceparent http header
- Integrates autoconfigure otel SDK
- Adds manual WsWUClient.ping span
- Adds manual WsWUClientTest.ping span
- Adds manual span around getHPCCver and getHPCCContainerizedMode
- Adds manual http span on Utils.Connection.sendHTTPRequest
- Injects traceparent http header on outgoing Utils.Connection.sendHTTPRequest
- Only attempts to autoconfig remotetest if otel.java.global-autoconfig enabled
- Adds root span around platform tester

Signed-off-by: Rodrigo Pastrana <[email protected]>
  • Loading branch information
rpastrana committed Jun 6, 2024
1 parent 422ee47 commit cd8a465
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 247 deletions.
40 changes: 39 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
<groups>org.hpccsystems.commons.annotations.BaseTests</groups>
<codehaus.template.version>1.0.0</codehaus.template.version>
<project.benchmarking>false</project.benchmarking>
<opentelemetry.bom.version>1.38.0</opentelemetry.bom.version>
<opentelemetry.semconv.version>1.25.0-alpha</opentelemetry.semconv.version>
</properties>

<scm>
Expand Down Expand Up @@ -98,8 +100,44 @@
</snapshots>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>${opentelemetry.bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.25.0-alpha</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
119 changes: 102 additions & 17 deletions wsclient/src/main/java/org/hpccsystems/ws/client/BaseHPCCWsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,28 @@
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ServerAttributes;

/**
* Defines functionality common to all HPCC Systmes web service clients.
* Defines functionality common to all HPCC Systems web service clients.
*
* Typically implemented by specialized HPCC Web service clients.
*/
public abstract class BaseHPCCWsClient extends DataSingleton
{
public static final String PROJECT_NAME = "WsClient";
private static OpenTelemetry globalOTel = null;
/** Constant <code>log</code> */
protected static final Logger log = LogManager.getLogger(BaseHPCCWsClient.class);
/** Constant <code>DEAFULTECLWATCHPORT="8010"</code> */
Expand Down Expand Up @@ -164,6 +179,53 @@ private String getTargetHPCCBuildVersionString() throws Exception

}

public SpanBuilder getWsClientSpanBuilder(String spanName)
{
SpanBuilder spanBuilder = getWsClientTracer().spanBuilder(spanName)
.setAttribute(ServerAttributes.SERVER_ADDRESS, wsconn.getHost())
.setAttribute(ServerAttributes.SERVER_PORT, Long.getLong(wsconn.getPort()))
.setAttribute(HttpAttributes.HTTP_REQUEST_METHOD, HttpAttributes.HttpRequestMethodValues.GET)
.setSpanKind(SpanKind.CLIENT);

return spanBuilder;
}

static public void injectCurrentSpanTraceParentHeader(Stub clientStub)
{
if (clientStub != null)
{
injectCurrentSpanTraceParentHeader(clientStub._getServiceClient().getOptions());
}
}

static public void injectCurrentSpanTraceParentHeader(Options options)
{
if (options != null)
{
W3CTraceContextPropagator.getInstance().inject(Context.current(), options, Options::setProperty);
}
}

/**
* Performs all Otel initialization
*/
private void initOTel()
{
/*
* If using the OpenTelemetry SDK, you may want to instantiate the OpenTelemetry toprovide configuration, for example of Resource or Sampler. See OpenTelemetrySdk and OpenTelemetrySdk.builder for information on how to construct theSDK's OpenTelemetry implementation.
* WARNING: Due to the inherent complications around initialization order involving this classand its single global instance, we strongly recommend *not* using GlobalOpenTelemetry unless youhave a use-case that absolutely requires it. Please favor using instances of OpenTelemetrywherever possible.
* If you are using the OpenTelemetry javaagent, it is generally best to only callGlobalOpenTelemetry.get() once, and then pass the resulting reference where you need to use it.
*/
globalOTel = GlobalOpenTelemetry.get();
}

public Tracer getWsClientTracer()
{
if (globalOTel == null)
initOTel();

return globalOTel.getTracer(PROJECT_NAME);
}
/**
* All instances of HPCCWsXYZClient should utilize this init function
* Attempts to establish the target HPCC build version and its container mode
Expand All @@ -175,36 +237,55 @@ private String getTargetHPCCBuildVersionString() throws Exception
*/
protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAndContainerMode)
{
initOTel();

boolean success = true;
initErrMessage = "";
setActiveConnectionInfo(connection);

if (fetchVersionAndContainerMode)
{
try
Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope scope = fetchHPCCVerSpan.makeCurrent())
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
try
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
}
catch (Exception e)
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCVerSpan.end();
}

try
Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan();
try (Scope scope = fetchHPCCContainerMode.makeCurrent())
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
try
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
}
catch (Exception e)
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
}
}
catch (Exception e)
finally
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCContainerMode.end();
}
}
if (!initErrMessage.isEmpty())
Expand Down Expand Up @@ -401,7 +482,10 @@ public String getInitError()
protected Stub verifyStub() throws Exception
{
if (stub != null)
{
injectCurrentSpanTraceParentHeader(stub);
return stub;
}
else
throw new Exception("WS Client Stub not available." + (hasInitError() ? "\n" + initErrMessage : ""));
}
Expand Down Expand Up @@ -687,6 +771,7 @@ protected void handleEspExceptions(ArrayOfEspExceptionWrapper exp, String messag
if (message != null && !message.isEmpty()) exp.setWsClientMessage(message);

log.error(exp.toString());

throw exp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hpccsystems.ws.client.utils.Utils;
import org.hpccsystems.ws.client.wrappers.ArrayOfECLExceptionWrapper;
import org.hpccsystems.ws.client.wrappers.gen.filespray.ProgressResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupQueryResponseWrapper;
import org.hpccsystems.ws.client.wrappers.gen.wstopology.TpGroupWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper;
import org.hpccsystems.ws.client.wrappers.wsworkunits.WsWorkunitsClientStubWrapper;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

/**
* Facilitates ECL WorkUnit related actions.
*
Expand Down Expand Up @@ -303,7 +307,6 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
request.setCount(1);

WUQueryResponse response = null;

try
{
response = ((WsWorkunits) stub).wUQuery(request);
Expand All @@ -326,7 +329,8 @@ public void fastWURefresh(WorkunitWrapper wu) throws Exception, ArrayOfEspExcept
{
ECLWorkunit[] eclWorkunit = response.getWorkunits().getECLWorkunit();

if (eclWorkunit != null && eclWorkunit.length == 1) wu.update(eclWorkunit[0]);
if (eclWorkunit != null && eclWorkunit.length == 1)
wu.update(eclWorkunit[0]);
}

if (previousState != getStateID(wu))
Expand Down Expand Up @@ -2551,18 +2555,27 @@ public List<QueryResultWrapper> deleteQueries(Set<String> querynames, String clu
*/
public boolean ping() throws Exception
{
verifyStub();

Ping request = new Ping();

try
Span span = getWsClientSpanBuilder("WsWUClient_Ping").startSpan();
try (Scope scope = span.makeCurrent())
{
((WsWorkunitsStub) stub).ping(request);
verifyStub(); // must be called within span scope for proper context propagation

Ping request = new Ping();
try
{
((WsWorkunitsStub) stub).ping(request);
span.setStatus(StatusCode.OK);
}
catch (Exception e)
{
span.recordException(e);
log.error(e.getLocalizedMessage());
return false;
}
}
catch (Exception e)
finally
{
log.error(e.getLocalizedMessage());
return false;
span.end();
}

return true;
Expand Down
Loading

0 comments on commit cd8a465

Please sign in to comment.