Skip to content

Commit

Permalink
NIFI-14220 allow use of Expression Language for PutElasticsearchJson …
Browse files Browse the repository at this point in the history
…and PutElasticsearchRecord boolean fields

Signed-off-by: Pierre Villard <[email protected]>

This closes #9688.
  • Loading branch information
ChrisSamo632 authored and pvillard31 committed Feb 4, 2025
1 parent 46cbada commit 9408292
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
.name("put-es-json-scripted-upsert")
.displayName("Scripted Upsert")
.description("Whether to add the scripted_upsert flag to the Upsert Operation. " +
"Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. " +
"If true, forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. " +
"If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the " +
CLIENT_SERVICE.getDisplayName() + " controller service's " + ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() +
" to " + ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + " or no \"upsert\" doc will be, " +
"included in the request to Elasticsearch and the operation will not create a new document for the script " +
"to execute against, resulting in a \"not_found\" error")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
.displayName("Retain ID (Record Path)")
.description("Whether to retain the existing field used as the ID Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(ID_RECORD_PATH)
Expand Down Expand Up @@ -242,7 +241,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
.displayName("Retain @timestamp (Record Path)")
.description("Whether to retain the existing field used as the @timestamp Record Path.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(false)
.dependsOn(AT_TIMESTAMP_RECORD_PATH)
Expand All @@ -268,7 +266,6 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
"and the error related to the first record within the FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". " +
"If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() + "\" is \"false\" then records associated with \"not_found\" " +
"Elasticsearch document responses will also be send to the \"" + REL_ERRORS.getName() + "\" relationship.")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.required(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ void basicTest(final int failure, final int retry, final int successful) {
}

void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer) {
basicTest(failure, retry, successful, consumer, Collections.emptyMap());
}

void basicTest(final int failure, final int retry, final int successful, final Consumer<List<IndexOperationRequest>> consumer, final Map<String, String> attr) {
clientService.setEvalConsumer(consumer);
basicTest(failure, retry, successful, Collections.emptyMap());
basicTest(failure, retry, successful, attr);
}

void basicTest(final int failure, final int retry, final int successful, final Map<String, String> attr) {
Expand Down Expand Up @@ -289,6 +293,24 @@ void simpleTestWithScriptedUpsert() {
basicTest(0, 0, 1, consumer);
}

@Test
void simpleTestWithScriptedUpsertEL() {
runner.setProperty(PutElasticsearchJson.SCRIPT, script);
runner.setProperty(PutElasticsearchJson.DYNAMIC_TEMPLATES, dynamicTemplates);
runner.setProperty(PutElasticsearchJson.INDEX_OP, IndexOperationRequest.Operation.Upsert.getValue().toLowerCase());
runner.setProperty(PutElasticsearchJson.SCRIPTED_UPSERT, "${scripted}");
final Consumer<List<IndexOperationRequest>> consumer = (final List<IndexOperationRequest> items) -> {
final long scriptCount = items.stream().filter(item -> item.getScript().equals(expectedScript)).count();
final long trueScriptedUpsertCount = items.stream().filter(IndexOperationRequest::isScriptedUpsert).count();
final long dynamicTemplatesCount = items.stream().filter(item -> item.getDynamicTemplates().equals(expectedDynamicTemplate)).count();

assertEquals(1L, scriptCount);
assertEquals(1L, trueScriptedUpsertCount);
assertEquals(1L, dynamicTemplatesCount);
};
basicTest(0, 0, 1, consumer, Map.of("scripted", "true"));
}

@Test
void testNonJsonScript() {
runner.setProperty(PutElasticsearchJson.SCRIPT, "not-json");
Expand Down

0 comments on commit 9408292

Please sign in to comment.