Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmcmu committed Jul 3, 2024
1 parent ea88b69 commit 581e0ab
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.hpccsystems.dfs.client.RowServiceOutputStream;
import org.hpccsystems.dfs.client.Utils;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -115,6 +117,20 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso
this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
this.writeSpan = Utils.createSpan(writeSpanName);

String IPs = "";
for (int i = 0; i < dp.getCopyCount() ; i++)
{
IPs += dp.getCopyIP(i);
if (i < dp.getCopyCount() - 1)
{
IPs += ",";
}
}

Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, IPs,
ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()));
writeSpan.setAllAttributes(attributes);

this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co

this.dataPart = dp;

if (rdSpan != null && rdSpan.getSpanContext().isValid())
if (rdSpan != null)
{
this.readSpan = rdSpan;
this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(readSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class RowServiceOutputStream extends OutputStream
private ByteBuffer scratchBuffer = ByteBuffer.allocate(SCRATCH_BUFFER_LEN);

private Span writeSpan = null;
private String writeSpanTraceID = "";
private String traceContextHeader = "";

private static class RowServiceResponse
{
Expand Down Expand Up @@ -236,7 +236,12 @@ private static class RowServiceResponse
this.accessToken = accessToken;
this.compressionAlgo = fileCompression;
this.sockOpTimeoutMs = sockOpTimeoutMS;
this.writeSpan = writeSpan;

if (writeSpan != null && writeSpan.getSpanContext().isValid())
{
this.writeSpan = writeSpan;
this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(writeSpan);
}

if (this.writeSpan != null)
{
Expand Down Expand Up @@ -288,7 +293,7 @@ private static class RowServiceResponse
log.error(errorMessage);

Exception wrappedException = new Exception(errorMessage, e);
if (writeSpan != null && writeSpan.getSpanContext().isValid())
if (writeSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
Expand Down Expand Up @@ -369,7 +374,7 @@ private static class RowServiceResponse

private String makeGetVersionRequest()
{
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : "";
final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }";
return versionMsg;
}
Expand All @@ -378,7 +383,7 @@ private void makeInitialWriteRequest() throws Exception
{
String jsonRecordDef = RecordDefinitionTranslator.toJsonRecord(this.recordDef).toString();

final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : "";
String initialRequest = "\n{\n"
+ " \"format\" : \"binary\",\n"
+ trace
Expand Down Expand Up @@ -435,7 +440,7 @@ private void makeInitialWriteRequest() throws Exception

private String makeCloseHandleRequest()
{
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : "";

StringBuilder sb = new StringBuilder(256);
sb.delete(0, sb.length());
Expand Down Expand Up @@ -657,7 +662,7 @@ public void write(byte[] b) throws IOException
*/
public void write(byte[] b, int off, int len) throws IOException
{
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : "";

String request = "{ \"format\" : \"binary\", \"handle\" : \"" + this.handle + "\","
+ trace
Expand Down

0 comments on commit 581e0ab

Please sign in to comment.