Skip to content

Commit

Permalink
Custom data new fields (#51)
Browse files Browse the repository at this point in the history
* Custom data field (#48)

* Added custom data parser

* custom data code level and config mapping

* removed comments and reconciled field (#49)

* changed reporting to false

* removed indentation

* removed indentation
  • Loading branch information
truthfool authored Apr 6, 2023
1 parent a31db97 commit 4a5cde4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void bulk(IndexRequest indexRequest) {
logger.info("Calling bulk request for insert");
bulkRequest.add(indexRequest);
}

public void bulk(UpdateRequest updateRequest) {
logger.info("Calling bulk request for upsert");
bulkRequest.add(updateRequest);
Expand All @@ -122,20 +123,20 @@ public void index(JSONObject record) {
if (reportingEnabled) {
upsertToReportingIndex(record);
}
logger.info("Pushing index for " + indexFor(record));
IndexRequest request =
new IndexRequest(indexFor(record), typeFor(record), idFor(record))
.source(record.toString(), XContentType.JSON)
.routing(Integer.toString(record.getInt("partitionId")));
bulk(request);
}
logger.info("Pushing index for " + indexFor(record));
IndexRequest request =
new IndexRequest(indexFor(record), typeFor(record), idFor(record))
.source(record.toString(), XContentType.JSON)
.routing(Integer.toString(record.getInt("partitionId")));
bulk(request);
}

public void upsertToReportingIndex(JSONObject record){
public void upsertToReportingIndex(JSONObject record) {
JSONObject newRecord = new JSONObject();
if (record.getString("valueType").equalsIgnoreCase("variable")) {
JSONObject valueObj = record.getJSONObject("value");
if (valueObj.has("name")) {
if(paymentsIndexConfiguration.getVariables().contains(valueObj.getString("name"))) {
if (valueObj.has("name")) { // Checking for variable events
if (paymentsIndexConfiguration.getVariables().contains(valueObj.getString("name"))) {
if (valueObj.getString("name").equalsIgnoreCase("amount")) {
newRecord.put((String) valueObj.get("name"),
Double.parseDouble(valueObj.getString("value").replaceAll("\"",
Expand All @@ -159,22 +160,13 @@ public void upsertToReportingIndex(JSONObject record){
Instant timestamp = Instant.ofEpochMilli(record.getLong("timestamp"));
String name = "zeebe-payments" + INDEX_DELIMITER + version + INDEX_DELIMITER +
formatter.format(timestamp);
if (!newRecord.has("initiator") || !newRecord.has("isNotificationsFailureEnabled") ||
!newRecord.has("isNotificationsSuccessEnabled") || !newRecord.has("mpesaTransactionId")
|| !newRecord.has("partyLookupFailed") || !newRecord.has("tenantId") ||
!newRecord.has("timer") || !newRecord.has("transactionId") ||
!newRecord.has("transferCreateFailed") || !newRecord.has("getTransactionStatusHttpCode")
|| !newRecord.has("getTransactionStatusHttpCode") || !newRecord.has("errorCode") ||
!newRecord.has("getTransactionStatusResponse") || !newRecord.has("isCallbackReceived")
|| !newRecord.has("transferResponse-CREATE")
) {
UpdateRequest request1 = new UpdateRequest(name, valueObj.get("processInstanceKey").toString())
.doc(newRecord.toMap())
.upsert(newRecord.toString(), XContentType.JSON);
bulk(request1);
}
UpdateRequest request1 = new UpdateRequest(name, valueObj.get("processInstanceKey").toString())
.doc(newRecord.toMap())
.upsert(newRecord.toString(), XContentType.JSON);
bulk(request1);
}
}

public synchronized int flush() {
boolean success;
int bulkSize = bulkRequest.numberOfActions();
Expand Down Expand Up @@ -251,7 +243,7 @@ public boolean putIndexTemplate(
// update alias in template in case it was changed in configuration
template.put("aliases", Collections.singletonMap(aliasName, Collections.EMPTY_MAP));

if(reportingEnabled) {
if (reportingEnabled) {
if (templateName.equals("zeebe-record")) {
Map<String, Object> template1;
try (InputStream inputStream1 =
Expand Down Expand Up @@ -289,8 +281,7 @@ private boolean putIndexTemplate(PutIndexTemplateRequest putIndexTemplateRequest
.isAcknowledged();
}catch (ElasticsearchException exception){
throw new ElasticsearchExporterException("Failed to Connect ES", exception);
}
catch (IOException e) {
} catch (IOException e) {
throw new ElasticsearchExporterException("Failed to put index template", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,26 @@ public class PaymentsIndexConfiguration {
@Value("${reporting.fields.transferResponseCREATE}")
private Boolean transferResponseCREATEVal;

public PaymentsIndexConfiguration(){}
@Value("${reporting.fields.currency}")
private Boolean currencyVal;

@Value("${reporting.fields.errorInformation}")
private Boolean errorInformationVal;

@Value("${reporting.fields.customData}")
private Boolean customDataVal;

@Value("${reporting.fields.confirmationReceived}")
private Boolean confirmationReceivedVal;

@Value("${reporting.fields.clientCorrelationId}")
private Boolean clientCorrelationIdVal;

@Value("${reporting.fields.ams}")
private Boolean amsVal;

public PaymentsIndexConfiguration() {
}

public List<String> getVariables() {
Boolean amount = amountVal;
Expand Down Expand Up @@ -115,6 +134,13 @@ public List<String> getVariables() {
Boolean transferCreateFailed = transferCreateFailedVal;
Boolean transferSettlementFailed = transferSettlementFailedVal;
Boolean transferResponseCREATE = transferResponseCREATEVal;
Boolean currency = currencyVal;
Boolean errorInformation = errorInformationVal;
Boolean customData = customDataVal;
Boolean confirmationReceived = confirmationReceivedVal;
Boolean clientCorrelationId = clientCorrelationIdVal;
Boolean ams = amsVal;

if (amount) {
variables.add("amount");
}
Expand Down Expand Up @@ -190,10 +216,25 @@ public List<String> getVariables() {
if (transferResponseCREATE) {
variables.add("transferResponse-CREATE");
}


if (currency) {
variables.add("currency");
}
if (errorInformation) {
variables.add("errorInformation");
}
if (customData) {
variables.add("customData");
}
if (confirmationReceived) {
variables.add("confirmationReceived");
}
if (clientCorrelationId) {
variables.add("clientCorrelationId");
}
if (ams) {
variables.add("ams");
}
return variables;
}

}

}
5 changes: 3 additions & 2 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ reporting:
amount: true
accountId: true
ams: true
clientCorrelationId : true
clientCorrelationId: true
currency: true
customData: true
customData: false
confirmationReceived: false
errorCode: false
errorDescription: true
errorInformation: true
Expand Down

0 comments on commit 4a5cde4

Please sign in to comment.