diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProvenanceClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProvenanceClient.java index 74163e0fb18f..9844ac5bf5e1 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProvenanceClient.java @@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; import java.io.IOException; +import java.io.InputStream; public interface ProvenanceClient { ProvenanceEntity submitProvenanceQuery(ProvenanceEntity provenanceQuery) throws NiFiClientException, IOException; @@ -40,6 +41,10 @@ public interface ProvenanceClient { LatestProvenanceEventsEntity getLatestEvents(String processorId) throws NiFiClientException, IOException; + InputStream getInputFlowFileContent(String provenanceEventId, String nodeId) throws NiFiClientException, IOException; + + InputStream getOutputFlowFileContent(String provenanceEventId, String nodeId) throws NiFiClientException, IOException; + enum ReplayEventNodes { PRIMARY, ALL; diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProvenanceClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProvenanceClient.java index bc928c213b30..ec3ca96a3b35 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProvenanceClient.java @@ -30,6 +30,7 @@ import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; import java.io.IOException; +import java.io.InputStream; import java.util.Objects; public class JerseyProvenanceClient extends AbstractJerseyClient implements ProvenanceClient { @@ -149,4 +150,42 @@ public LatestProvenanceEventsEntity getLatestEvents(final String processorId) th return getRequestBuilder(target).get(LatestProvenanceEventsEntity.class); }); } + + @Override + public InputStream getInputFlowFileContent(final String provenanceEventId, final String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(provenanceEventId)) { + throw new IllegalArgumentException("Provenance Event ID must be specified"); + } + + return executeAction("Error retrieving Input FlowFile Content from provenance event", () -> { + WebTarget target = provenanceEventsTarget + .path("/{id}/content/input") + .resolveTemplate("id", provenanceEventId); + + if (nodeId != null) { + target = target.queryParam("clusterNodeId", nodeId); + } + + return getRequestBuilder(target).get(InputStream.class); + }); + } + + @Override + public InputStream getOutputFlowFileContent(String provenanceEventId, final String nodeId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(provenanceEventId)) { + throw new IllegalArgumentException("Provenance Event ID must be specified"); + } + + return executeAction("Error retrieving Output FlowFile Content from provenance event", () -> { + WebTarget target = provenanceEventsTarget + .path("/{id}/content/output") + .resolveTemplate("id", provenanceEventId); + + if (nodeId != null) { + target = target.queryParam("clusterNodeId", nodeId); + } + + return getRequestBuilder(target).get(InputStream.class); + }); + } }