From a923633730de968d3bc2b7422876ba1ae7870bdb Mon Sep 17 00:00:00 2001 From: marcoporru Date: Mon, 20 Apr 2020 18:12:51 +0200 Subject: [PATCH 1/4] custom services samples --- .../edgeApiExtension-interface/pom.xml | 68 +++++ .../gateway/extension/IEdgeApiExtension.java | 11 + .../edgeApiExtension/pom.xml | 103 +++++++ .../extension/impl/EdgeApiExtension.java | 50 ++++ .../main/resources/defaultConfiguration.json | 12 + .../extendedOperations/pom.xml | 149 ++++++++++ .../extension/BaseHttpResponse.java | 32 +++ .../InteroperabilityModuleActivator.java | 89 ++++++ .../main/resources/defaultConfiguration.json | 12 + .../external-flow-configuration/pom.xml | 153 +++++++++++ .../interceptor/ExternalFlowActivator.java | 256 ++++++++++++++++++ .../gateway/interceptor/InterceptorImpl.java | 78 ++++++ .../managers/ConfigurationHandler.java | 233 ++++++++++++++++ .../interceptor/proxies/BaseIoTMessage.java | 20 ++ .../proxies/ConfigurationFields.java | 13 + .../proxies/CustomConfiguration.java | 77 ++++++ .../proxies/ExtendedCustomConfiguration.java | 33 +++ .../interceptor/proxies/IoTMessage.java | 39 +++ .../interceptor/proxies/MessageFilter.java | 27 ++ .../proxies/MinimalIoTMessage.java | 35 +++ .../interceptor/proxies/MqttConsumer.java | 43 +++ .../interceptor/proxies/MqttInterop.java | 154 +++++++++++ .../interceptor/proxies/MqttIoTMessage.java | 22 ++ .../main/resources/defaultConfiguration.json | 31 +++ 24 files changed, 1740 insertions(+) create mode 100644 custom-services-additional-apis-messagebus/edgeApiExtension-interface/pom.xml create mode 100644 custom-services-additional-apis-messagebus/edgeApiExtension-interface/src/main/java/com/sap/iotservices/gateway/extension/IEdgeApiExtension.java create mode 100644 custom-services-additional-apis-messagebus/edgeApiExtension/pom.xml create mode 100644 custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java create mode 100644 custom-services-additional-apis-messagebus/edgeApiExtension/src/main/resources/defaultConfiguration.json create mode 100644 custom-services-additional-apis-messagebus/extendedOperations/pom.xml create mode 100644 custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/BaseHttpResponse.java create mode 100644 custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java create mode 100644 custom-services-additional-apis-messagebus/extendedOperations/src/main/resources/defaultConfiguration.json create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/pom.xml create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/BaseIoTMessage.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ConfigurationFields.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/IoTMessage.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MessageFilter.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttConsumer.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttIoTMessage.java create mode 100644 custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension-interface/pom.xml b/custom-services-additional-apis-messagebus/edgeApiExtension-interface/pom.xml new file mode 100644 index 0000000..508c4d3 --- /dev/null +++ b/custom-services-additional-apis-messagebus/edgeApiExtension-interface/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + com.sap.iotservices.gateway.extension + edgeApiExtensionInterface + edgeApiExtensionInterface + 1.0.0 + + ${project.artifactId} + ${project.version} + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + + + META-INF/MANIFEST.MF + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + scr-metadata + + manifest + + + true + + + + + true + META-INF + + ${bundle.symbolicName} + ${bundle.version} + + <_dsannotations>* + + <_metatypeannotations>* + + + + + + diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension-interface/src/main/java/com/sap/iotservices/gateway/extension/IEdgeApiExtension.java b/custom-services-additional-apis-messagebus/edgeApiExtension-interface/src/main/java/com/sap/iotservices/gateway/extension/IEdgeApiExtension.java new file mode 100644 index 0000000..ea7d8de --- /dev/null +++ b/custom-services-additional-apis-messagebus/edgeApiExtension-interface/src/main/java/com/sap/iotservices/gateway/extension/IEdgeApiExtension.java @@ -0,0 +1,11 @@ +package com.sap.iotservices.gateway.extension; + +public interface IEdgeApiExtension { + + long getUptime(); + + long getDowntime(); + + float getNoisePower(float signal, float variance); + +} diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension/pom.xml b/custom-services-additional-apis-messagebus/edgeApiExtension/pom.xml new file mode 100644 index 0000000..c7c3685 --- /dev/null +++ b/custom-services-additional-apis-messagebus/edgeApiExtension/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + com.sap.iotservices.gateway.extension + edgeApiExtension + edgeApiExtension + 1.0.0 + + ${project.artifactId} + ${project.version} + + + + com.sap.iotservices.gateway.extension + edgeApiExtensionInterface + ${project.version} + provided + + + org.osgi + osgi.cmpn + 6.0.0 + compile + + + org.slf4j + slf4j-api + 1.7.29 + provided + + + com.github.oshi + oshi-core + 4.3.0 + + + net.java.dev.jna + jna-platform + 5.5.0 + + + net.java.dev.jna + jna + 5.5.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + + + META-INF/MANIFEST.MF + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + scr-metadata + + manifest + + + true + + + + + true + META-INF + + ${bundle.symbolicName} + ${bundle.version} + + <_dsannotations>* + + <_metatypeannotations>* + + + + + + diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java new file mode 100644 index 0000000..8f60e2e --- /dev/null +++ b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java @@ -0,0 +1,50 @@ +package com.sap.iotservices.gateway.extension.impl; + +import java.lang.management.ManagementFactory; +import java.util.Date; +import java.util.Random; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sap.iotservices.gateway.extension.IEdgeApiExtension; + +import oshi.SystemInfo; + +@Component(immediate = true, service = IEdgeApiExtension.class) +public class EdgeApiExtension +implements IEdgeApiExtension { + + private static final Logger LOGGER = LoggerFactory.getLogger(EdgeApiExtension.class); + Random r = new java.util.Random(); + + @Activate + public void startService() { + LOGGER.info("Starting Edge API Extension service"); + } + + @Deactivate + public void stopService() { + LOGGER.info("Stopping Edge API Extension service"); + } + + @Override + public long getUptime() { + Date now = new Date(); + return (now.getTime() - ManagementFactory.getRuntimeMXBean().getStartTime()) / 1000; + } + + @Override + public long getDowntime() { + long uptime = getUptime(); + return new SystemInfo().getOperatingSystem().getSystemUptime() - (uptime / 1000); + } + + @Override public float getNoisePower(float signal, float variance) { + double noise = r.nextGaussian() * Math.sqrt(variance) + signal; + return (float) noise; + } +} diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/resources/defaultConfiguration.json b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/resources/defaultConfiguration.json new file mode 100644 index 0000000..42d4c26 --- /dev/null +++ b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/resources/defaultConfiguration.json @@ -0,0 +1,12 @@ +{ + "businessPartner": "8FA7D41CD4C448BF9A27962E9055C141", + "earliestStartDateTime": "2019-12-05T12:28:57.550000Z", + "dueDateTime": "2019-12-05T14:28:57.550000Z", + "durationInMinutes": "60", + "priority": "HIGH", + "subject": "High temperature on robot_t1", + "status": "-5", + "problemType": "-1", + "origin": "-1", + "remarks": "High temperature on robot_t1" +} \ No newline at end of file diff --git a/custom-services-additional-apis-messagebus/extendedOperations/pom.xml b/custom-services-additional-apis-messagebus/extendedOperations/pom.xml new file mode 100644 index 0000000..19ab895 --- /dev/null +++ b/custom-services-additional-apis-messagebus/extendedOperations/pom.xml @@ -0,0 +1,149 @@ + + + 4.0.0 + com.sap.iot.edge + extendedOperations + extendedOperations + 1.0.0 + + ${project.artifactId} + ${project.version} + 4.51.0 + + + + org.osgi + org.osgi.core + 6.0.0 + provided + + + org.osgi + osgi.cmpn + 6.0.0 + compile + + + com.sap.iotservices.gateway + gateway-topology-edge-api-interface + ${gateway.version} + provided + + + com.sap.iotservices.gateway.extension + edgeApiExtensionInterface + ${bundle.version} + provided + + + com.sap.iotservices.gateway + netty-server-bundle + ${gateway.version} + provided + + + org.apache.logging.log4j + log4j-osgi + 2.9.0 + provided + + + org.slf4j + slf4j-api + 1.7.29 + provided + + + commons-lang + commons-lang + 2.6 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.9.7 + provided + + + com.fasterxml.jackson.core + jackson-databind + 2.9.7 + provided + + + io.swagger.core.v3 + swagger-core + 2.0.9 + provided + + + io.swagger.core.v3 + swagger-annotations + 2.0.9 + provided + + + javax.ws.rs + javax.ws.rs-api + 2.0-m10 + provided + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + + + META-INF/MANIFEST.MF + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + scr-metadata + + manifest + + + true + + + + + true + META-INF + + ${bundle.symbolicName} + ${bundle.version} + + <_dsannotations>* + + <_metatypeannotations>* + + + + + + diff --git a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/BaseHttpResponse.java b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/BaseHttpResponse.java new file mode 100644 index 0000000..e89474f --- /dev/null +++ b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/BaseHttpResponse.java @@ -0,0 +1,32 @@ +package com.sap.iotservice.extension; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; + +import io.swagger.v3.oas.annotations.media.Schema; + +@JsonInclude(JsonInclude.Include.NON_EMPTY) +@JsonIgnoreProperties(ignoreUnknown = true) +public class BaseHttpResponse { + + @Schema(required = true) + private String message; + + public BaseHttpResponse() { + super(); + } + + public BaseHttpResponse(String message) { + super(); + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + +} diff --git a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java new file mode 100644 index 0000000..0dbe88f --- /dev/null +++ b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java @@ -0,0 +1,89 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * The sample is not intended for production use. Provided "as is". + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +package com.sap.iotservice.extension; + +import static com.sap.iotservice.gateway.edge.api.EdgeApiConstants.EDGE_API_BASE_PATH_TENANT; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sap.iotservice.gateway.netty.interfaces.NettyService; +import com.sap.iotservices.gateway.extension.IEdgeApiExtension; +import com.sap.iotservices.topology.edge.api.beans.SensorBean; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; + +@Component +@Tag(name = "ExtendedOperations") +@Path(EDGE_API_BASE_PATH_TENANT + "/extended") +public class InteroperabilityModuleActivator +implements NettyService { + private static final Logger LOGGER = LoggerFactory.getLogger(InteroperabilityModuleActivator.class); + private IEdgeApiExtension edgeApiExtension; + + @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC) + public void setEdgeApiExtension(IEdgeApiExtension edgeApiExtension) { + this.edgeApiExtension = edgeApiExtension; + } + + void unsetEdgeApiExtension(IEdgeApiExtension edgeApiExtension) { + this.edgeApiExtension = null; + } + + @GET + @Path("uptime") + @Produces(MediaType.APPLICATION_JSON) + @Operation(summary = "Returns the UPTIME", description = "The endpoint gives the uptime of the Edge Platform", responses = { + @ApiResponse(responseCode = "200", description = "Successfully returned sensors.", content = @Content(array = @ArraySchema(schema = @Schema(implementation = SensorBean.class)))), + @ApiResponse(responseCode = "400", description = "Malformed HTTP request", content = @Content(schema = @Schema(implementation = BaseHttpResponse.class))) }) + public long getUptime( + @Parameter(description = "TenantId", required = true) @PathParam("TenantId") String tenantId) { + + return edgeApiExtension.getUptime(); + } + + @GET + @Path("downtime") + @Produces(MediaType.APPLICATION_JSON) + @Operation(summary = "Returns the DOWNTIME", description = "The endpoint returns the downtime of the system, computed from the startup of the system", responses = { + @ApiResponse(responseCode = "200", description = "Successfully returned sensors.", content = @Content(array = @ArraySchema(schema = @Schema(implementation = SensorBean.class)))), + @ApiResponse(responseCode = "400", description = "Malformed HTTP request", content = @Content(schema = @Schema(implementation = BaseHttpResponse.class))) }) + public long getDowntime( + @Parameter(description = "TenantId", required = true) @PathParam("tenantId") String tenantId) { + + return edgeApiExtension.getDowntime(); + } + + @GET + @Path("getnoisepower/{signal}/{variance}") + @Produces(MediaType.APPLICATION_JSON) + @Operation(summary = "Compute the quantity of noise power contained in a signal", description = "The endpoint sends a new configuration for the OSGI bundle(s).", responses = { + @ApiResponse(responseCode = "200", description = "Successfully returned power.", content = @Content(array = @ArraySchema(schema = @Schema(implementation = SensorBean.class)))), + @ApiResponse(responseCode = "400", description = "Malformed HTTP request", content = @Content(schema = @Schema(implementation = BaseHttpResponse.class))) }) + public float getNoisePower( + @Parameter(description = "TenantId", required = true) @PathParam("tenantId") String tenantId, + @Parameter(description = "Signal", required = true) @PathParam("signal") float signal, + @Parameter(description = "Variance", required = true) @PathParam("variance") float variance) { + + return edgeApiExtension.getNoisePower(signal, variance); + } + +} diff --git a/custom-services-additional-apis-messagebus/extendedOperations/src/main/resources/defaultConfiguration.json b/custom-services-additional-apis-messagebus/extendedOperations/src/main/resources/defaultConfiguration.json new file mode 100644 index 0000000..42d4c26 --- /dev/null +++ b/custom-services-additional-apis-messagebus/extendedOperations/src/main/resources/defaultConfiguration.json @@ -0,0 +1,12 @@ +{ + "businessPartner": "8FA7D41CD4C448BF9A27962E9055C141", + "earliestStartDateTime": "2019-12-05T12:28:57.550000Z", + "dueDateTime": "2019-12-05T14:28:57.550000Z", + "durationInMinutes": "60", + "priority": "HIGH", + "subject": "High temperature on robot_t1", + "status": "-5", + "problemType": "-1", + "origin": "-1", + "remarks": "High temperature on robot_t1" +} \ No newline at end of file diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/pom.xml b/custom-services-additional-apis-messagebus/external-flow-configuration/pom.xml new file mode 100644 index 0000000..8bb24ac --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/pom.xml @@ -0,0 +1,153 @@ + + 4.0.0 + externalFlowConfiguration + externalFlowConfiguration + com.custom.iotservices.gateway + 1.0.0 + + + ${project.artifactId} + ${project.version} + 1.8 + 1.8 + 4.51.0 + + + + + com.sap.iot.edgeservices + ConfigService + 3.2002.0 + provided + + + + com.sap.iotservices.common + common-basic + ${iot.service.version} + provided + + + com.sap.iotservices.common + common-interface + ${iot.service.version} + provided + + + + org.slf4j + slf4j-api + 1.7.29 + provided + + + org.osgi + osgi.cmpn + 6.0.0 + provided + + + org.apache.servicemix.bundles + org.apache.servicemix.bundles.xmlbeans + 2.6.0_2 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.9.7 + provided + + + org.osgi + org.osgi.core + 6.0.0 + provided + + + commons-lang + commons-lang + 2.6 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.9.7 + provided + + + com.fasterxml.jackson.core + jackson-databind + 2.9.7 + provided + + + commons-io + commons-io + 2.2 + provided + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + + + META-INF/MANIFEST.MF + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + scr-metadata + + manifest + + + true + + + + + true + META-INF + + ${bundle.symbolicName} + ${bundle.version} + + <_dsannotations>* + + <_metatypeannotations>* + + + + + + diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java new file mode 100644 index 0000000..87e58c4 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java @@ -0,0 +1,256 @@ +package com.sap.iotservices.gateway.interceptor; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.lang.StringUtils; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventConstants; +import org.osgi.service.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sap.iot.edgeservices.configservice.service.IConfigStatusService; +import com.sap.iotservices.gateway.interceptor.managers.ConfigurationHandler; +import com.sap.iotservices.gateway.interceptor.proxies.ConfigurationFields; +import com.sap.iotservices.gateway.interceptor.proxies.CustomConfiguration; +import com.sap.iotservices.gateway.interceptor.proxies.ExtendedCustomConfiguration; +import com.sap.iotservices.gateway.interceptor.proxies.MqttInterop; +import com.sap.iotservices.hooks.gateway.IGatewayInterceptor; +import com.sap.iotservices.hooks.gateway.IGatewayInterceptorService; +import com.sap.iotservices.utils.DSUtils; + +/** + * This class starts the actual implementation for the Interceptor + */ +@Component(immediate = true, service = {}) +public class ExternalFlowActivator +implements ServiceListener, EventHandler { + + private static final Logger log = LoggerFactory.getLogger(ExternalFlowActivator.class); + private static final String EVENT_TOPIC = "EXTERNALFLOW"; // The Event Admin topic to subscribe to for config + private static CustomConfiguration configuration; + private static boolean initialized = false; + private static String lastSuccessfulFingerprint; + private static AtomicReference configStatusService = new AtomicReference<>(); // NOSONAR + private static Object lock = new Object(); + private static volatile boolean ingestionEnabled = true; + /** + * Interceptor Manager + */ + private static AtomicReference interceptorMngr = new AtomicReference<>(); + private boolean registered = false; + + public static boolean isInitialized() { + return initialized; + } + + public static void setInitialized(boolean initialized) { + com.sap.iotservices.gateway.interceptor.ExternalFlowActivator.initialized = initialized; + } + + public static IConfigStatusService getConfigStatusService() { + return DSUtils.get(log, configStatusService, DSUtils.WAIT_FOR_VALID_REFERENCE); + } + + @Reference(service = IConfigStatusService.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC) + void setConfigStatusService(IConfigStatusService arg) { + DSUtils.setRef(log, configStatusService, arg, IConfigStatusService.class, this.getClass()); + } + + public static String getLastSuccessfulFingerprint() { + return lastSuccessfulFingerprint; + } + + /** + * @param lastSuccessfulFingerprint + * last fingerprint + */ + private static void setLastSuccessfulFingerprint(String lastSuccessfulFingerprint) { + com.sap.iotservices.gateway.interceptor.ExternalFlowActivator.lastSuccessfulFingerprint = lastSuccessfulFingerprint; + } + + public static CustomConfiguration getConfiguration() { + return configuration; + } + + /** + * @param conf + * currenct active configuration + */ + private static void setConfiguration(CustomConfiguration conf) { + com.sap.iotservices.gateway.interceptor.ExternalFlowActivator.configuration = conf; + Boolean enabled = conf.getIngestionEnabled(); + if (enabled == null) { + setIngestionEnabled(true); + } else { + setIngestionEnabled(enabled); + } + } + + public static boolean isIngestionEnabled() { + return ingestionEnabled; + } + + public static void setIngestionEnabled(boolean ingestionEnabled) { + ExternalFlowActivator.ingestionEnabled = ingestionEnabled; + } + + /** + * initialize the configuration object + */ + private static void initConfiguration() { + log.debug("Configuration is using topic: {}", EVENT_TOPIC); + // load configuration from file or use a default configuration + CustomConfiguration defaultConfig = ConfigurationHandler.loadDefaultConfiguration(); + setConfiguration(ConfigurationHandler.loadConfigurationFromDisk(defaultConfig, EVENT_TOPIC)); + setLastSuccessfulFingerprint(ConfigurationHandler.getLastFingerprint()); + // fallback to default + if (configuration == null) { + log.debug("Starting with default configuration"); + setConfiguration(defaultConfig); + setLastSuccessfulFingerprint(null); + } + } + + public static void reInitMqtt() { + try { + closeOldMqtt(); + } catch (Exception e) { + log.error("Unable to close MQTT: {}", e.getMessage(), e); + } + try { + startNewMqtt(); + } catch (Exception e) { + log.error("Unable to initialize MQTT: {}", e.getMessage(), e); + } + } + + static void closeOldMqtt() { + MqttInterop.unsubscribeTopics(MqttInterop.getInTopic()); + MqttInterop.disconnect(); + } + + static void startNewMqtt() { + MqttInterop.init(); + MqttInterop.subscribeTopics(MqttInterop.getInTopic()); + } + + public static IGatewayInterceptorService getInterceptorManager() { + return DSUtils.get(log, interceptorMngr, DSUtils.WAIT_FOR_VALID_REFERENCE); + } + + @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC) + void setInterceptorManager(IGatewayInterceptorService arg) { + DSUtils.setRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass()); + } + + @Activate + public void start(BundleContext bundleContext) { + log.info("Starting Gateway Interceptor..."); + Dictionary properties = new Hashtable<>(); // NOSONAR + // Register this class to listen over Event Admin for activation requests with the topic EVENT_TOPIC + properties.put(EventConstants.EVENT_TOPIC, EVENT_TOPIC); + bundleContext.registerService(EventHandler.class, this, properties); + com.sap.iotservices.gateway.interceptor.ExternalFlowActivator.initConfiguration(); + startNewMqtt(); + + + new Thread(() -> { + IGatewayInterceptorService interceptorMng = getInterceptorManager(); + + synchronized (lock) { + if ((interceptorMng != null) && (!registered)) { + log.info("Registering implementation of the flow interceptor"); + } + } + }).start(); + + } + + @Deactivate + public void stop(BundleContext bundleContext) { + log.info("Stopping External Flow..."); + } + + void unsetConfigStatusService(IConfigStatusService arg) { + DSUtils.removeRef(log, configStatusService, arg, IConfigStatusService.class, this.getClass()); + } + + /** + * @param event + * handle the event to get new configurations + */ + @Override + public void handleEvent(Event event) { + log.info("RECEIVED CONFIGURATION"); + // Check to see if the event received conforms to a config activation event + // i.e. the event contains the config file to be activated and its associated fingerprint + if (event.getProperty(ConfigurationFields.configFile.name()) instanceof File && + event.getProperty(ConfigurationFields.configFingerprint.name()) instanceof String) { + File configFile = (File) event.getProperty(ConfigurationFields.configFile.name()); + String fingerprint = (String) event.getProperty(ConfigurationFields.configFingerprint.name()); + + // Return if the sent config file has already been activated + if (!StringUtils.isEmpty(lastSuccessfulFingerprint) && lastSuccessfulFingerprint.equals(fingerprint)) { + return; + } + + IConfigStatusService configStatus = getConfigStatusService(); + if (configStatus != null) { + try { + String configFileContents = new String(Files.readAllBytes(configFile.toPath()), + StandardCharsets.UTF_8); + log.info("Config File Contents:\n{}", configFileContents); + ExtendedCustomConfiguration customConfiguration = ConfigurationHandler + .writeConfigurationToDisk(EVENT_TOPIC, configFileContents, fingerprint); + // Set the lastSuccessfulFingerprint to this config file's fingerprint if the config file was + // successfully activated + // Call the activationStatus Declarative Service with the activation result (true or false), + // fingerprint, and a status message + if (customConfiguration != null) { + setConfiguration(customConfiguration); + setLastSuccessfulFingerprint(fingerprint); + configStatus.activationStatus(true, fingerprint, "Activation Succeeded"); + String topic = customConfiguration.getExternalConfigurationTopic(); + if (StringUtils.isEmpty(topic)) { + topic = EVENT_TOPIC; + } + String msg = "{\"file\":\"" + customConfiguration.getConfigurationFile() + "\"}"; + MqttInterop.sendMessage(topic, msg); + } else { + configStatus.activationStatus(false, fingerprint, "Activation Failed"); + } + } catch (IOException e) { + log.error("Cannot read config file: {}", e.getMessage(), e); + configStatus.activationStatus(false, fingerprint, "Cannot read config file: " + e.getMessage()); + } + } + } + } + + @Override + public void serviceChanged(ServiceEvent arg0) { + log.debug("---- serviceChanged - no operation performed."); + } + + void unsetInterceptorManager(IGatewayInterceptorService arg) { + DSUtils.removeRef(log, interceptorMngr, arg, IGatewayInterceptorService.class, this.getClass()); + } +} \ No newline at end of file diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java new file mode 100644 index 0000000..a16eb52 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java @@ -0,0 +1,78 @@ +package com.sap.iotservices.gateway.interceptor; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sap.iotservices.hooks.gateway.IGatewayInterceptor; +import com.sap.iotservices.hooks.gateway.IoTServicesPointcut; +import com.sap.iotservices.network.node.data.Value; +import com.sap.iotservices.network.node.data.WSNParsedMeasure; + +public class InterceptorImpl +implements IGatewayInterceptor { + + private static final Logger log = LoggerFactory.getLogger(InterceptorImpl.class); + + @Override + public void processObject(String pointcutName, Object... args) + throws Exception { + try { + IoTServicesPointcut pointcut = IoTServicesPointcut.valueOf(pointcutName); + switch (pointcut) { + case GATEWAY_PARSED_DATA_DISPATCH: + // triggered upon dispatch of parsed sensor data + if (!ExternalFlowActivator.isIngestionEnabled()) { + manageIncomingMeasures(args); + } + break; + case GATEWAY_GENERIC_COMMAND: + break; + default: + break; + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + @Override + public List getBranchPoints() { + List list = new ArrayList<>(); + list.add(IoTServicesPointcut.GATEWAY_PARSED_DATA_DISPATCH.name()); + list.add(IoTServicesPointcut.GATEWAY_GENERIC_COMMAND.name()); + return list; + } + + @Override + public void onError(Object event, String pointcut, Exception e) { + log.info("OnError triggered; pointcut is {}", pointcut); + } + + // Go through measure list + private void manageIncomingMeasures(Object... args) { + @SuppressWarnings("unchecked") + List measures = (List) args[0]; + // list of measures that are going to be dropped + List toBeRemoved = new ArrayList<>(); + + if (measures != null) { + for (WSNParsedMeasure wsnParsedMeasure : measures) { + List> valueList = wsnParsedMeasure.getValues(); + for (int i = 0; i < valueList.size(); i++) { + toBeRemoved.add(wsnParsedMeasure); + } + } + + // final filtering of list when measures are dropped + for (WSNParsedMeasure wsnParsedMeasure : toBeRemoved) { + measures.remove(wsnParsedMeasure); + log.info("Measure for Capability {}, Device {} and Sensor {} will be dropped", + wsnParsedMeasure.getCapabilityAlternateId(), wsnParsedMeasure.getDeviceAlternateId(), + wsnParsedMeasure.getSensorAlternateId()); + } + } + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java new file mode 100644 index 0000000..ae77f32 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java @@ -0,0 +1,233 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * The sample is not intended for production use. Provided "as is". + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +package com.sap.iotservices.gateway.interceptor.managers; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sap.iotservices.gateway.interceptor.proxies.CustomConfiguration; +import com.sap.iotservices.gateway.interceptor.proxies.ExtendedCustomConfiguration; + +public class ConfigurationHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationHandler.class); // logger + private static final String BASE_PATH = "./../edgeservices/"; // base path for custom configuration, same of other + // services + private static final String UNIFORM_PATH_SEPARATOR = "/"; // linux/windows valid file separator + + private static final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, false); // json object mapper + private static String lastFingerprint = null; // last used fingerprint + + // Constructors + private ConfigurationHandler() { + super(); + } + + /** + * load an existing configuration + * + * @param serviceName + * name of current service + * @return the existing configuration (if any) + */ + public static CustomConfiguration loadConfigurationFromDisk(CustomConfiguration defaultConfiguration, + String serviceName) { + if (defaultConfiguration == null) { + defaultConfiguration = loadDefaultConfiguration(); + } + // existing file paths + String jsonFile = BASE_PATH + serviceName + "/" + serviceName + ".json"; + String fingerprintFile = BASE_PATH + serviceName + "/" + serviceName + "_fingerprint.txt"; + CustomConfiguration fromFile = null; + String content = null; + String fingerprint = null; + File path = new File(jsonFile); + if (!path.exists()) { + LOGGER.info("Configuration file does not exists: {}", jsonFile); + return defaultConfiguration; + } + try { + byte[] contentBytes = Files.readAllBytes(Paths.get(jsonFile)); + content = new String(contentBytes, Charset.defaultCharset()); + } catch (IOException e) { + LOGGER.error("Unable to read configuration from file: {} due to {}", jsonFile, e.getMessage(), e); + } + // if there is no file there is also no needs to load the fingerprint + if (!StringUtils.isEmpty(content)) { + try { + byte[] contentBytes = Files.readAllBytes(Paths.get(fingerprintFile)); + fingerprint = new String(contentBytes, Charset.defaultCharset()); + } catch (IOException e) { + LOGGER.error("Unable to read configuration from file: {} due to {}", fingerprintFile, e.getMessage(), + e); + } + // convert to a POJO + fromFile = extractCustomConfiguration(content); + } + // populate missing values + if (fromFile != null) { + fromFile.mergeMissingValues(defaultConfiguration); + } else { + LOGGER.error("Unable to extract POJO configuration"); + return defaultConfiguration; + } + // set the fingerprint + lastFingerprint = fingerprint; + return fromFile; + } + + /** + * write a configuration into the disk + * + * @param serviceName + * the service name + * @param content + * string with the content of the configuration + * @param fingerprint + * current fingerprint + * @return the written configuration object + */ + public static ExtendedCustomConfiguration writeConfigurationToDisk(String serviceName, String content, + String fingerprint) { + // convert the string to a POJO + ExtendedCustomConfiguration conf = extractCustomConfiguration(content); + if (conf == null) { + // configuration not valid + return null; + } + List additionalConfFile = extractPathAndName(conf.getConfigurationFile()); + // build the path and make the dirs + String basePath = BASE_PATH + serviceName; + File path = new File(basePath); + if (!path.exists()) { + boolean created = path.mkdirs(); + if (!created) { + LOGGER.error("Unable to create the path tree: {}", basePath); + return null; + } + } + if (!additionalConfFile.isEmpty() && additionalConfFile.size() > 1) { + path = new File(additionalConfFile.get(0)); + if (!path.exists()) { + boolean created = path.mkdirs(); + if (!created) { + LOGGER.error("Unable to create the path tree: {}", additionalConfFile.get(0)); + return null; + } + } + } + // write configuration to json file + try { + String filename = serviceName + ".json"; + File jsonFile = new File(basePath + UNIFORM_PATH_SEPARATOR + filename); + FileUtils.writeStringToFile(jsonFile, content, Charset.defaultCharset().name()); + } catch (IOException e) { + LOGGER.error("Unable to write the file: {}.json due to {}", serviceName, e.getMessage(), e); + return null; + } + if (!additionalConfFile.isEmpty() && additionalConfFile.size() > 1) { + try { + String filename = additionalConfFile.get(1); + File jsonFile = new File(additionalConfFile.get(0) + UNIFORM_PATH_SEPARATOR + filename); + FileUtils.writeStringToFile(jsonFile, content, Charset.defaultCharset().name()); + } catch (IOException e) { + LOGGER.error("Unable to write the file: {} due to {}", additionalConfFile.get(1), e.getMessage(), e); + return null; + } + } + // persist fingerprint + try { + String filename = serviceName + "_fingerprint.txt"; + File fingerprintFile = new File(basePath + UNIFORM_PATH_SEPARATOR + filename); + FileUtils.writeStringToFile(fingerprintFile, fingerprint, Charset.defaultCharset().name()); + } catch (IOException e) { + LOGGER.error("Unable to write the file: {}_fingerprint.txt due to {}", serviceName, e.getMessage(), e); + return null; + } + // set reference for the last fingerprint + lastFingerprint = fingerprint; + return conf; + } + + /** + * convert the configuration from string to object + * + * @param content + * json string of the configuration + * @return configuration object + */ + private static ExtendedCustomConfiguration extractCustomConfiguration(String content) { + try { + return mapper.readValue(content, ExtendedCustomConfiguration.class); + } catch (IOException e) { + LOGGER.error("Unable to read configuration {}", e.getMessage(), e); + } + return null; + } + + /** + * @return default configuration object + */ + public static CustomConfiguration loadDefaultConfiguration() { + // load default file from classloader + InputStream stream = ConfigurationHandler.class.getClassLoader() + .getResourceAsStream("defaultConfiguration.json"); + if (stream == null) { + LOGGER.error("No default configuration file"); + return null; + } + String content = null; + CustomConfiguration config = null; + try { + // convert the stream to a string + content = IOUtils.toString(stream, Charset.defaultCharset().name()); + } catch (IOException e) { + LOGGER.error("Unable to read configuration file {}", e.getMessage(), e); + } + // if the file is potentially valid extract the POJO + if (!StringUtils.isEmpty(content)) { + config = extractCustomConfiguration(content); + } + return config; + } + + // getters + public static String getLastFingerprint() { + return lastFingerprint; + } + + private static List extractPathAndName(String src) { + List path = new ArrayList<>(); + if (!StringUtils.isEmpty(src)) { + int idx = src.lastIndexOf('\\'); + if (idx < 0) { + idx = src.lastIndexOf('/'); + } + if (idx < 0) { + path.add("."); + path.add(src); + } else if (idx != src.length() - 2) { + path.add(src.substring(0, idx)); + path.add(src.substring(idx + 1)); + } + } + return path; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/BaseIoTMessage.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/BaseIoTMessage.java new file mode 100644 index 0000000..b639d6b --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/BaseIoTMessage.java @@ -0,0 +1,20 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +public class BaseIoTMessage +extends MinimalIoTMessage { + + String deviceAlternateId; + + public BaseIoTMessage() { + super(); + } + + public String getDeviceAlternateId() { + return deviceAlternateId; + } + + public void setDeviceAlternateId(String deviceAlternateId) { + this.deviceAlternateId = deviceAlternateId; + } + +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ConfigurationFields.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ConfigurationFields.java new file mode 100644 index 0000000..5a6658d --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ConfigurationFields.java @@ -0,0 +1,13 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright (c) 2020 SAP SE or an affiliate company. All rights reserved. + * The sample is not intended for production use. Provided "as is". + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +package com.sap.iotservices.gateway.interceptor.proxies; + +/** + * Configuration fields (policy service) + */ +public enum ConfigurationFields { + configFile, // NOSONAR + configFingerprint // NOSONAR +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java new file mode 100644 index 0000000..c86831e --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java @@ -0,0 +1,77 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * The sample is not intended for production use. Provided "as is". + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ +package com.sap.iotservices.gateway.interceptor.proxies; + +import java.util.List; + +public class CustomConfiguration { + + Float variance; + Float pressureScale; + Boolean ingestionEnabled; + Boolean filterMeasurements; + Boolean filterCalculation; + List filteredObjects; + + // default constructor + public CustomConfiguration() { + super(); + } + + /** + * Populate the missing objects + * + * @param defaultConfiguration + * default configuration object (no null inside) + */ + public void mergeMissingValues(CustomConfiguration defaultConfiguration) { + if (filteredObjects == null || filteredObjects.isEmpty()) { + filteredObjects = defaultConfiguration.getFilteredObjects(); + } + if (filterMeasurements == null) { + filterMeasurements = defaultConfiguration.getFilterMeasurements(); + } + if (filterCalculation == null) { + filterCalculation = defaultConfiguration.getFilterCalculation(); + } + if (variance == null) { + variance = defaultConfiguration.getVariance(); + } + if (pressureScale == null) { + pressureScale = defaultConfiguration.getPressureScale(); + } + if (ingestionEnabled == null) { + ingestionEnabled = defaultConfiguration.getIngestionEnabled(); + } + } + + /** + * getters and setters + */ + + public Boolean getFilterMeasurements() { + return filterMeasurements; + } + + public Boolean getFilterCalculation() { + return filterCalculation; + } + + public List getFilteredObjects() { + return filteredObjects; + } + + public Float getVariance() { + return variance; + } + + public Float getPressureScale() { + return pressureScale; + } + + public Boolean getIngestionEnabled() { + return ingestionEnabled; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java new file mode 100644 index 0000000..a3c9ee6 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java @@ -0,0 +1,33 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +import org.apache.commons.lang.StringUtils; + +public class ExtendedCustomConfiguration +extends CustomConfiguration { + String externalConfigurationTopic; + String configurationFile; + + public ExtendedCustomConfiguration() { + super(); + } + + public String getExternalConfigurationTopic() { + return externalConfigurationTopic; + } + + public String getConfigurationFile() { + return configurationFile; + } + + public void mergeMissingValues(ExtendedCustomConfiguration defaultConfiguration) { + super.mergeMissingValues(defaultConfiguration); + + if (StringUtils.isEmpty(externalConfigurationTopic)) { + externalConfigurationTopic = defaultConfiguration.getExternalConfigurationTopic(); + } + + if (StringUtils.isEmpty(configurationFile)) { + configurationFile = defaultConfiguration.getConfigurationFile(); + } + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/IoTMessage.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/IoTMessage.java new file mode 100644 index 0000000..a634a3d --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/IoTMessage.java @@ -0,0 +1,39 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +import java.util.Map; + +public class IoTMessage +extends BaseIoTMessage { + private Map measures; + private Long time; + + public IoTMessage() { + super(); + } + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + public Map getMeasures() { + return measures; + } + + public void setMeasures(Map measures) { + this.measures = measures; + } + + @Override + public String toString() { + final StringBuilder s = new StringBuilder(); + measures.forEach((k, v) -> s.append("\n\"").append(k).append("\":\"").append(v).append("\",")); + return "{" + "\n\"deviceAlternateId\":\"" + deviceAlternateId + "\"," + "\n\"capabilityAlternateId\":\"" + + capabilityAlternateId + "\"," + "\n\"sensorAlternateId\":\"" + sensorAlternateId + "\"," + + "\n\"sensorTypeAlternateId\":\"" + sensorAlternateId + "\"," + "\n\"time\":" + time + "," + + "\n\"measures\":{" + s.substring(0, s.length() - 1) + "}\n}"; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MessageFilter.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MessageFilter.java new file mode 100644 index 0000000..b6ed190 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MessageFilter.java @@ -0,0 +1,27 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +public class MessageFilter +extends BaseIoTMessage { + private String filterType; + private String condition; + + public MessageFilter() { + super(); + } + + public String getFilterType() { + return filterType; + } + + public void setFilterType(String filterType) { + this.filterType = filterType; + } + + public String getCondition() { + return condition; + } + + public void setCondition(String condition) { + this.condition = condition; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java new file mode 100644 index 0000000..3761dc6 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java @@ -0,0 +1,35 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +public class MinimalIoTMessage { + String capabilityAlternateId; + String sensorAlternateId; + String sensorTypeAlternateId; + + public MinimalIoTMessage() { + super(); + } + + public String getCapabilityAlternateId() { + return capabilityAlternateId; + } + + public void setCapabilityAlternateId(String capabilityAlternateId) { + this.capabilityAlternateId = capabilityAlternateId; + } + + public String getSensorAlternateId() { + return sensorAlternateId; + } + + public void setSensorAlternateId(String sensorAlternateId) { + this.sensorAlternateId = sensorAlternateId; + } + + public String getSensorTypeAlternateId() { + return sensorTypeAlternateId; + } + + public void setSensorTypeAlternateId(String sensorTypeAlternateId) { + this.sensorTypeAlternateId = sensorTypeAlternateId; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttConsumer.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttConsumer.java new file mode 100644 index 0000000..c75fb3d --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttConsumer.java @@ -0,0 +1,43 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sap.iotservices.gateway.interceptor.ExternalFlowActivator; + +public class MqttConsumer +implements IMqttMessageListener { + private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class); + private static final ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, false); // json object mapper + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) + throws Exception { + LOGGER.debug("Received message in the topic {}: {}", s, mqttMessage); + if (!MqttInterop.isStatus()) { + ExternalFlowActivator.reInitMqtt(); + } + MqttIoTMessage message = mapper.readValue(mqttMessage.toString(), MqttIoTMessage.class); + TypeReference> typeRef = new TypeReference>() { + }; + Map originalMassageMap = mapper.readValue(mqttMessage.toString(), typeRef); + Object device = originalMassageMap.get("deviceAlternateId"); + String jsonMessage = mapper.writeValueAsString(message); + if (device instanceof String && !StringUtils.isEmpty((String) device) && !StringUtils.isEmpty(jsonMessage)) { + MqttInterop.sendMessage("measures/" + device, jsonMessage); + } else { + LOGGER.error("Malformed json message: {}", mqttMessage); + } + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java new file mode 100644 index 0000000..e12a74b --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java @@ -0,0 +1,154 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MqttInterop { + private static final Logger LOGGER = LoggerFactory.getLogger(MqttInterop.class); + private static MqttClient msgBusMqttClient; + private static String broker = "tcp://127.0.0.1:61618"; + private static String clientId = UUID.randomUUID().toString(); + private static String inTopic = "inTopic"; + private static String outTopic = "outTopic"; + private static ObjectMapper mapper = new ObjectMapper(); + private static int qos = 2; + private static MqttConsumer messageConsumer = new MqttConsumer(); + private static boolean status = false; + + private MqttInterop() { + super(); + } + + public static boolean isStatus() { + return status; + } + + public static void setStatus(boolean status) { + MqttInterop.status = status; + } + + public static String getBroker() { + return broker; + } + + public static void setBroker(String broker) { + MqttInterop.broker = broker; + } + + public static String getClientId() { + return clientId; + } + + public static void setClientId(String clientId) { + MqttInterop.clientId = clientId; + } + + public static String getInTopic() { + return inTopic; + } + + public static void setInTopic(String inTopic) { + MqttInterop.inTopic = inTopic; + } + + public static String getOutTopic() { + return outTopic; + } + + public static void setOutTopic(String outTopic) { + MqttInterop.outTopic = outTopic; + } + + public static int getQos() { + return qos; + } + + public static void setQos(int qos) { + MqttInterop.qos = qos; + } + + public static void init() { + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + try (MemoryPersistence persistence = new MemoryPersistence()) { + msgBusMqttClient = new MqttClient(broker, clientId, persistence); + LOGGER.debug("Connecting client {} to broker: {}", clientId, broker); + msgBusMqttClient.connect(connOpts); + LOGGER.info("Connected client {}", clientId); + } catch (MqttException me) { + LOGGER.error("Not connected due to: {}", me.getMessage(), me); + status = false; + throw new IllegalStateException(me); + } + status = true; + } + + public static void sendMessage(String content) { + sendMessage(outTopic, content); + } + + public static void sendMessage(String topic, String content) { + MqttMessage message = null; + try { + message = new MqttMessage(mapper.writeValueAsString(content).getBytes(Charset.defaultCharset())); + } catch (JsonProcessingException e) { + LOGGER.error("Unable to send a message due to: {}", e.getMessage(), e); + message = new MqttMessage(content.getBytes(Charset.defaultCharset())); + } + message.setQos(qos); + try { + LOGGER.debug("Publishing message: {}", content); + msgBusMqttClient.publish(topic, message); + } catch (MqttException e) { + LOGGER.error("Unable to send a message due to: {}", e.getMessage(), e); + status = false; + throw new IllegalStateException(e); + } + LOGGER.debug("Message published"); + } + + public static void disconnect() { + try { + msgBusMqttClient.disconnect(); + } catch (MqttException e) { + LOGGER.error("Unable to disconnect due to: {}", e.getMessage(), e); + } + status = false; + LOGGER.debug("Disconnected"); + } + + public static IMqttToken subscribeTopics(String... topic) { + IMqttToken token = null; + try { + token = msgBusMqttClient.subscribeWithResponse(topic, new MqttConsumer[] { messageConsumer }); + } catch (MqttException e) { + LOGGER.error("Unable to subscribe to {} due to: {}", + Arrays.stream(topic).collect(Collectors.joining(", ", "[", "]")), e.getMessage(), e); + } + return token; + } + + public static void unsubscribeTopics(String... topic) { + try { + msgBusMqttClient.unsubscribe(topic); + } catch (MqttException e) { + LOGGER.error("Unable to unsubscribe to {} due to: {}", + Arrays.stream(topic).collect(Collectors.joining(", ", "[", "]")), e.getMessage(), e); + } + } + +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttIoTMessage.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttIoTMessage.java new file mode 100644 index 0000000..91365c9 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttIoTMessage.java @@ -0,0 +1,22 @@ +package com.sap.iotservices.gateway.interceptor.proxies; + +import java.util.List; +import java.util.Map; + +public class MqttIoTMessage +extends MinimalIoTMessage { + + private List> measures; + + public MqttIoTMessage() { + super(); + } + + public List> getMeasures() { + return measures; + } + + public void setMeasures(List> measures) { + this.measures = measures; + } +} diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json new file mode 100644 index 0000000..8356916 --- /dev/null +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json @@ -0,0 +1,31 @@ +{ + "variance": 1, + "pressureScale": 1.25, + "ingestionEnabled": true, + "filterMeasurements": false, + "filterCalculation": false, + "externalConfigurationTopic": "configuration", + "configurationFile": "C:\\Users\\i333269\\Desktop\\defaultConfiguration.json", + "filteredObjects": [ + { + "capabilityAlternateId": "power", + "filterType": "msgbus", + "condition": "AND" + }, + { + "capabilityAlternateId": "snr", + "filterType": "ingestion", + "condition": "AND" + }, + { + "capabilityAlternateId": "pressure", + "filterType": "msgbus", + "condition": "AND" + }, + { + "capabilityAlternateId": "pressure_alert", + "filterType": "ingestion", + "condition": "AND" + } + ] +} \ No newline at end of file From ac47d861444132d6695bc26b0c874e33c077f40b Mon Sep 17 00:00:00 2001 From: marcoporru Date: Wed, 29 Apr 2020 09:28:01 +0200 Subject: [PATCH 2/4] Fixed year --- .../iotservice/extension/InteroperabilityModuleActivator.java | 2 +- .../gateway/interceptor/managers/ConfigurationHandler.java | 2 +- .../gateway/interceptor/proxies/CustomConfiguration.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java index 0dbe88f..a4a0277 100644 --- a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java +++ b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java @@ -1,5 +1,5 @@ /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * - * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * Copyright (c) 2020 SAP SE or an affiliate company. All rights reserved. * The sample is not intended for production use. Provided "as is". * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ package com.sap.iotservice.extension; diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java index ae77f32..4ac61f4 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java @@ -1,5 +1,5 @@ /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * - * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * Copyright (c) 2020 SAP SE or an affiliate company. All rights reserved. * The sample is not intended for production use. Provided "as is". * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ package com.sap.iotservices.gateway.interceptor.managers; diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java index c86831e..73f9601 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java @@ -1,5 +1,5 @@ /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * - * Copyright (c) 2019 SAP SE or an affiliate company. All rights reserved. + * Copyright (c) 2020 SAP SE or an affiliate company. All rights reserved. * The sample is not intended for production use. Provided "as is". * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ package com.sap.iotservices.gateway.interceptor.proxies; From 1fe4ce421087cf31868f2c83479dcc454f07a530 Mon Sep 17 00:00:00 2001 From: Marco Porru Date: Thu, 4 Jun 2020 17:04:02 +0200 Subject: [PATCH 3/4] Fixed accordingly with PR --- .../extension/impl/EdgeApiExtension.java | 2 +- .../InteroperabilityModuleActivator.java | 5 ++-- .../interceptor/ExternalFlowActivator.java | 27 ++++++++++++----- .../gateway/interceptor/InterceptorImpl.java | 3 +- .../managers/ConfigurationHandler.java | 29 ++++++++++--------- .../proxies/CustomConfiguration.java | 12 ++++---- .../proxies/ExtendedCustomConfiguration.java | 4 +-- .../proxies/MinimalIoTMessage.java | 6 ++-- .../interceptor/proxies/MqttInterop.java | 2 +- 9 files changed, 50 insertions(+), 40 deletions(-) diff --git a/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java index 8f60e2e..72c7c17 100644 --- a/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java +++ b/custom-services-additional-apis-messagebus/edgeApiExtension/src/main/java/com/sap/iotservices/gateway/extension/impl/EdgeApiExtension.java @@ -19,7 +19,7 @@ public class EdgeApiExtension implements IEdgeApiExtension { private static final Logger LOGGER = LoggerFactory.getLogger(EdgeApiExtension.class); - Random r = new java.util.Random(); + private Random r = new java.util.Random(); @Activate public void startService() { diff --git a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java index a4a0277..9a8cab3 100644 --- a/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java +++ b/custom-services-additional-apis-messagebus/extendedOperations/src/main/java/com/sap/iotservice/extension/InteroperabilityModuleActivator.java @@ -36,7 +36,6 @@ @Path(EDGE_API_BASE_PATH_TENANT + "/extended") public class InteroperabilityModuleActivator implements NettyService { - private static final Logger LOGGER = LoggerFactory.getLogger(InteroperabilityModuleActivator.class); private IEdgeApiExtension edgeApiExtension; @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.DYNAMIC) @@ -52,7 +51,7 @@ void unsetEdgeApiExtension(IEdgeApiExtension edgeApiExtension) { @Path("uptime") @Produces(MediaType.APPLICATION_JSON) @Operation(summary = "Returns the UPTIME", description = "The endpoint gives the uptime of the Edge Platform", responses = { - @ApiResponse(responseCode = "200", description = "Successfully returned sensors.", content = @Content(array = @ArraySchema(schema = @Schema(implementation = SensorBean.class)))), + @ApiResponse(responseCode = "200", description = "Successfully returned time.", content = @Content(schema = @Schema(implementation = Long.class))), @ApiResponse(responseCode = "400", description = "Malformed HTTP request", content = @Content(schema = @Schema(implementation = BaseHttpResponse.class))) }) public long getUptime( @Parameter(description = "TenantId", required = true) @PathParam("TenantId") String tenantId) { @@ -64,7 +63,7 @@ public long getUptime( @Path("downtime") @Produces(MediaType.APPLICATION_JSON) @Operation(summary = "Returns the DOWNTIME", description = "The endpoint returns the downtime of the system, computed from the startup of the system", responses = { - @ApiResponse(responseCode = "200", description = "Successfully returned sensors.", content = @Content(array = @ArraySchema(schema = @Schema(implementation = SensorBean.class)))), + @ApiResponse(responseCode = "200", description = "Successfully returned time.", content = @Content(schema = @Schema(implementation = Long.class))), @ApiResponse(responseCode = "400", description = "Malformed HTTP request", content = @Content(schema = @Schema(implementation = BaseHttpResponse.class))) }) public long getDowntime( @Parameter(description = "TenantId", required = true) @PathParam("tenantId") String tenantId) { diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java index 87e58c4..66aed5d 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java @@ -2,12 +2,11 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Dictionary; import java.util.Hashtable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.StringUtils; @@ -39,7 +38,7 @@ /** * This class starts the actual implementation for the Interceptor */ -@Component(immediate = true, service = {}) +@Component(immediate = true) public class ExternalFlowActivator implements ServiceListener, EventHandler { @@ -48,14 +47,15 @@ public class ExternalFlowActivator private static CustomConfiguration configuration; private static boolean initialized = false; private static String lastSuccessfulFingerprint; - private static AtomicReference configStatusService = new AtomicReference<>(); // NOSONAR - private static Object lock = new Object(); + private static AtomicReference configStatusService = new AtomicReference<>(); + private static final Object lock = new Object(); private static volatile boolean ingestionEnabled = true; /** * Interceptor Manager */ private static AtomicReference interceptorMngr = new AtomicReference<>(); private boolean registered = false; + private IGatewayInterceptor interceptor; public static boolean isInitialized() { return initialized; @@ -122,7 +122,7 @@ private static void initConfiguration() { setConfiguration(ConfigurationHandler.loadConfigurationFromDisk(defaultConfig, EVENT_TOPIC)); setLastSuccessfulFingerprint(ConfigurationHandler.getLastFingerprint()); // fallback to default - if (configuration == null) { + if (configuration == null && defaultConfig != null) { log.debug("Starting with default configuration"); setConfiguration(defaultConfig); setLastSuccessfulFingerprint(null); @@ -170,7 +170,7 @@ public void start(BundleContext bundleContext) { bundleContext.registerService(EventHandler.class, this, properties); com.sap.iotservices.gateway.interceptor.ExternalFlowActivator.initConfiguration(); startNewMqtt(); - + this.interceptor = new InterceptorImpl(); new Thread(() -> { IGatewayInterceptorService interceptorMng = getInterceptorManager(); @@ -178,6 +178,7 @@ public void start(BundleContext bundleContext) { synchronized (lock) { if ((interceptorMng != null) && (!registered)) { log.info("Registering implementation of the flow interceptor"); + registered = interceptorMng.addInterceptor(interceptor); } } }).start(); @@ -187,6 +188,16 @@ public void start(BundleContext bundleContext) { @Deactivate public void stop(BundleContext bundleContext) { log.info("Stopping External Flow..."); + if (registered){ + IGatewayInterceptorService interceptorMng = getInterceptorManager(); + + synchronized (lock) { + if ((interceptorMng != null) && (!registered)) { + log.info("Unregistering implementation of the flow interceptor"); + interceptorMng.removeInterceptor(interceptor); + } + } + } } void unsetConfigStatusService(IConfigStatusService arg) { @@ -216,7 +227,7 @@ public void handleEvent(Event event) { if (configStatus != null) { try { String configFileContents = new String(Files.readAllBytes(configFile.toPath()), - StandardCharsets.UTF_8); + Charset.defaultCharset()); log.info("Config File Contents:\n{}", configFileContents); ExtendedCustomConfiguration customConfiguration = ConfigurationHandler .writeConfigurationToDisk(EVENT_TOPIC, configFileContents, fingerprint); diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java index a16eb52..e6b8498 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/InterceptorImpl.java @@ -17,8 +17,7 @@ public class InterceptorImpl private static final Logger log = LoggerFactory.getLogger(InterceptorImpl.class); @Override - public void processObject(String pointcutName, Object... args) - throws Exception { + public void processObject(String pointcutName, Object... args) { try { IoTServicesPointcut pointcut = IoTServicesPointcut.valueOf(pointcutName); switch (pointcut) { diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java index 4ac61f4..a58b8f0 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/managers/ConfigurationHandler.java @@ -26,9 +26,9 @@ public class ConfigurationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationHandler.class); // logger - private static final String BASE_PATH = "./../edgeservices/"; // base path for custom configuration, same of other - // services - private static final String UNIFORM_PATH_SEPARATOR = "/"; // linux/windows valid file separator + private static final String BASE_PATH = "./../edgeservices/"; // base path for custom configuration, same path + // of the others Edge service core services + private static final String UNIFORM_PATH_SEPARATOR = File.separator; // linux/windows valid file separator private static final ObjectMapper mapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) @@ -53,8 +53,8 @@ public static CustomConfiguration loadConfigurationFromDisk(CustomConfiguration defaultConfiguration = loadDefaultConfiguration(); } // existing file paths - String jsonFile = BASE_PATH + serviceName + "/" + serviceName + ".json"; - String fingerprintFile = BASE_PATH + serviceName + "/" + serviceName + "_fingerprint.txt"; + String jsonFile = BASE_PATH + serviceName + UNIFORM_PATH_SEPARATOR + serviceName + ".json"; + String fingerprintFile = BASE_PATH + serviceName + UNIFORM_PATH_SEPARATOR + serviceName + "_fingerprint.txt"; CustomConfiguration fromFile = null; String content = null; String fingerprint = null; @@ -110,6 +110,7 @@ public static ExtendedCustomConfiguration writeConfigurationToDisk(String servic ExtendedCustomConfiguration conf = extractCustomConfiguration(content); if (conf == null) { // configuration not valid + LOGGER.warn("Received invalid configuration: {}", content); return null; } List additionalConfFile = extractPathAndName(conf.getConfigurationFile()); @@ -123,7 +124,7 @@ public static ExtendedCustomConfiguration writeConfigurationToDisk(String servic return null; } } - if (!additionalConfFile.isEmpty() && additionalConfFile.size() > 1) { + if (!additionalConfFile.isEmpty() && additionalConfFile.size() > 0) { path = new File(additionalConfFile.get(0)); if (!path.exists()) { boolean created = path.mkdirs(); @@ -186,16 +187,16 @@ private static ExtendedCustomConfiguration extractCustomConfiguration(String con * @return default configuration object */ public static CustomConfiguration loadDefaultConfiguration() { - // load default file from classloader - InputStream stream = ConfigurationHandler.class.getClassLoader() - .getResourceAsStream("defaultConfiguration.json"); - if (stream == null) { - LOGGER.error("No default configuration file"); - return null; - } String content = null; CustomConfiguration config = null; - try { + + // load default file from classloader + try (InputStream stream = ConfigurationHandler.class.getClassLoader() + .getResourceAsStream("defaultConfiguration.json")) { + if (stream == null) { + LOGGER.error("No default configuration file"); + return null; + } // convert the stream to a string content = IOUtils.toString(stream, Charset.defaultCharset().name()); } catch (IOException e) { diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java index 73f9601..57d4c58 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/CustomConfiguration.java @@ -8,12 +8,12 @@ public class CustomConfiguration { - Float variance; - Float pressureScale; - Boolean ingestionEnabled; - Boolean filterMeasurements; - Boolean filterCalculation; - List filteredObjects; + private Float variance; + private Float pressureScale; + private Boolean ingestionEnabled; + private Boolean filterMeasurements; + private Boolean filterCalculation; + private List filteredObjects; // default constructor public CustomConfiguration() { diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java index a3c9ee6..4f061d9 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/ExtendedCustomConfiguration.java @@ -4,8 +4,8 @@ public class ExtendedCustomConfiguration extends CustomConfiguration { - String externalConfigurationTopic; - String configurationFile; + private String externalConfigurationTopic; + private String configurationFile; public ExtendedCustomConfiguration() { super(); diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java index 3761dc6..874022f 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MinimalIoTMessage.java @@ -1,9 +1,9 @@ package com.sap.iotservices.gateway.interceptor.proxies; public class MinimalIoTMessage { - String capabilityAlternateId; - String sensorAlternateId; - String sensorTypeAlternateId; + private String capabilityAlternateId; + private String sensorAlternateId; + private String sensorTypeAlternateId; public MinimalIoTMessage() { super(); diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java index e12a74b..35c8342 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/proxies/MqttInterop.java @@ -102,7 +102,7 @@ public static void sendMessage(String content) { } public static void sendMessage(String topic, String content) { - MqttMessage message = null; + MqttMessage message; try { message = new MqttMessage(mapper.writeValueAsString(content).getBytes(Charset.defaultCharset())); } catch (JsonProcessingException e) { From 68d540f345ad2f6c9270055c51b225b64603f25f Mon Sep 17 00:00:00 2001 From: Marco Porru Date: Thu, 4 Jun 2020 18:42:22 +0200 Subject: [PATCH 4/4] changed documantation Signed-off-by: Marco Porru --- README.md | 3 +- .../README.md | 280 ++++++++++++++++++ .../main/resources/defaultConfiguration.json | 2 +- 3 files changed, 283 insertions(+), 2 deletions(-) create mode 100644 custom-services-additional-apis-messagebus/README.md diff --git a/README.md b/README.md index 807bded..cfceebe 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Take a look at the list below for links to all the separate samples. | Edge Persistence Aggregation | This sample demonstrates use of the Persistence Service Java API. This sample queries the Persistence Service at an interval. The query aggregates data stored in the Persistence Service, and feeds this data back into Edge Services. | [persistence-aggregation-max-temp](https://github.com/SAP/iot-edge-services-samples/tree/master/persistence-aggregation-max-temp) | | Edge Machine learning | This sample demonstrates how a quality machine learning solution can be deployed on SAP Edge Services platform with an example of defective welding detection. | [edge-ml-welding-sound](https://github.com/SAP/iot-edge-services-samples/tree/master/edge-ml-welding-sound) | | Edge Predictive Analytics 1 | This sample demonstrates how to implement a Predictive Analytics Service. The service integrates the usage of the Persistence Service Java APIs to get the data, and the Edge Service Configuration object to support dynamic configurations. It is using an external (not provided) JPMML library to compute the prediction,with the provided PMML model | [predictive-pmml](https://github.com/SAP/iot-edge-services-samples/tree/master/predictive-pmml) | -| Edge Predictive Analytics 2 | This sample demonstrates how to implement a Predictive Analytics Service. The service integrates the usage of the Persistence Service Java APIs to get the data, and the Edge Service Configuration object to support dynamic configurations. It is using an external python moduleto compute the prediction with an existing and already existing python model (optionally trained in the cloud) | [predictive-python](https://github.com/SAP/iot-edge-services-samples/tree/master/predictive-python) | +| Edge Predictive Analytics 2 | This sample demonstrates how to implement a Predictive Analytics Service. The service integrates the usage of the Persistence Service Java APIs to get the data, and the Edge Service Configuration object to support dynamic configurations. It is using an external python module to compute the prediction with an existing and already existing python model (optionally trained in the cloud) | [predictive-python](https://github.com/SAP/iot-edge-services-samples/tree/master/predictive-python) | +| Edge Custom Service | This sample demonstrates how to implement custom logic with an external module intercommunication. The service integrates the usage of the Edge Service Configuration object to support dynamic configurations. It is using an external python module to compute make some computation, SAP IoT Edge Message bus to exchange messages between the JAVA codebase and the python module (such as the configuration), SAP IoT offline operations to publish new endpoints at the edge by leveraging the integrated Netty Server. | [custom-services-additional-apis-messagebus](https://github.com/SAP/iot-edge-services-samples/tree/master/custom-services-additional-apis-messagebus) | ## How to obtain support diff --git a/custom-services-additional-apis-messagebus/README.md b/custom-services-additional-apis-messagebus/README.md new file mode 100644 index 0000000..9317981 --- /dev/null +++ b/custom-services-additional-apis-messagebus/README.md @@ -0,0 +1,280 @@ +# Edge Custom Service Sample + +## Overview +This sample demonstrates how to implement custom logic with an external module intercommunication. The service integrates the usage of the Edge Service Configuration object to support dynamic configurations. It is using an external python module to compute make some computation, SAP IoT Edge Message bus to exchange messages between the JAVA codebase and the python module (such as the configuration), SAP IoT offline operations to publish new endpoints at the edge by leveraging the integrated Netty Server. + +## Product Documentation + +Product Documentation for SAP Edge Services is available as follows: + +[SAP Edge Services, cloud edition](https://help.sap.com/viewer/p/EDGE_SERVICES) + +### Description + +The purpose of this sample is to implement a custom filtering on the ingested measurements, to reduce the network traffic (save costs!) and forward to the cloud only the required data. + +In this sample we are blocking the original measurements and forwarding to the cloud only the value of the SNR (signal to noise ratio), wich is computed by leveraging some additional custom APIs published at the edge, inside an external module written in Python. + +The measurements are consumed and forwarded to the cloud by using the Edge Messagebus capability. + +### Deploying this sample + +This sample is packaged as an OSGI bundle. It is deployed to SAP Edge Services, cloud edition using a Custom Service defined within the Policy Service of SAP Edge Services. + +## Requirements + +The following must be installed for this sample: +1. Java JDK 1.8 or above (https://www.java.com/en/download/) +2. Apache Maven (https://maven.apache.org/download.cgi) +3. Git command line tool (https://git-scm.com/downloads) +4. SAP Edge Services (Cloud or On-premise edition) +5. SAP IoT Edge Platform + +### SAP Edge Services, cloud edition + +For cloud edition, a working SAP IoT Edge Platform is required, with one SAP Edge Services core service (such as Persistence Service) installed. + +The following needs to be setup on SAP IoT as a corect data ingestion + +1. Create the capabilities +- **capabilityAlternateId:** power +- **properties:** + +| Property Name | Property Type | +|:-------------: |:-------------: | +| power | float | +--- +- **capabilityAlternateId:** pressure +- **properties:** + +| Property Name | Property Type | +|:-------------: |:-------------: | +| pressure | float | +--- +- **capabilityAlternateId:** snr +- **properties:** + +| Property Name | Property Type | +|:-------------: |:-------------: | +| snr | float | + +2. Create the sensor type +- **sensorType name:** packagingSensorType +- **sensorTypeAlternateId:** 1001 + +3. Add all the capabilities into the **_packagingSensorType_** + +## Download and Installation + +### Download the sample app +```json +git clone https://github.com/SAP/iot-edge-services-samples.git +cd iot-edge-services-samples +cd custom-services-additional-apis-messagebus +``` + +### Download the SAP Edge Service dependencies bundles and add to Maven + +#### SAP Edge Services Configuration Service + +1. Ensure that from the Policy Service, the Persistence Service is installed on your gateway. +2. Access the files of the device running the SAP IoT Edge Platform +3. cd /gateway_folder/custombundles +4. Copy the file ConfigService-3.2002.0.jar to the project root of this sample. + NOTE: the version number may change, in which case, the version number in the pom.xml file will need to be updated +5. From root directory of this sample, execute the following command: +```json +mvn install:install-file -Dfile=ConfigService-3.2002.0.jar -DgroupId=com.sap.iot.edgeservices -DartifactId=ConfigService -Dversion=3.2002.0 -Dpackaging=jar +``` + NOTE: if the version number has changed, substitute 3.1912.0 in the above command for the appropriate version number as found in the filename. + +### Customize the source + +You can change the parameters dynamically. +Open the file +src\main\resources\defaultConfiguration.json +create and deploy a new configuration for this service within the Policy Service. +in the body of the configuration put a JSON that contains the parameter that you would like to change. The change is not incremental. + +#### SAP Edge Services, cloud edition + +By default, the sample works directly with SAP Edge Services, cloud edition and nothing needs to be changed. + +#### SAP Edge Services, on-premise edition + +This example, with some modifications, could work with SAP Edge Services, on-premise edition. + +### Compile and Package + +1. Open a shell / command prompt (on Windows as Administrator) and navigate to the `custom-services-additional-apis-messagebus/` directory. +2. For each project edit the provided pom.xml and ensure that the version number of the inherited services (ConfigService and SAP IoT Edge Platform services) jar files matches. If it does not match, change the numbers in the pom.xml + +```json + + com.sap.iot.edgeservices + ConfigService + 3.2002.0 + provided + +``` +```json + 4.51.0 +``` +```json + 4.51.0 +``` + +3. For each project, run following command to compile and build the package: +```json +mvn clean install +``` +4. Verify that the file -1.0.0.jar are created in the /target folder. + +### Activate components + +You need to activate the external operations as described here: https://help.sap.com/viewer/643f531cbf50462c8cc45139ba2dd051/Cloud/en-US/a6204032ad6e4377be9e2fbe89cddf6b.html + +Activate the Edge Messagebus. + +It's an ActiveMQ broker available for every SAP Edge Platform, not just MQTT, disabled by default. +- Open the file /config/config*.xml and put the following xml node (please note that for MQTT it's already inside the file, so just skip this operation): +```json + + gateway-mqtt + + + + + + + + + + + + +``` + +- Open the file /configuration/config.ini and ensure that the following services are started: +```json +plugins/gateway-bus-wrapper-mqtt@6:start,\ +plugins/amq-activator@5:start,\ +``` +#### Enable / Disable Edge Messagebus (no restart required): +1) Add (or modify) a Custom Property for the Edge Platform: + BUS_MEASURES_FLOW +2) Use "OnlyBus" to turn off completely the ingestion to the cloud, or "CloudAndBus" to enable the forwarding of the measurements to the Bus, without interrupting the standard flow towards the Cloud. + +#### Notes about data ingestion at the Edge and Cloud ingestion: + +- Output Topic +Here the description of the topic where the Edge Platform publishes the measurements for the Edge Components (cloud forwarding): +iot/edge/v1/tenant//gateway//measures/out +Remark: It works also with "OnlyBus" flag active + +- Input Topic +Here the description of the topic from which the Edge Platform listens for the measurements published by the Edge Components: +iot/edge/v1/tenant//gateway//measures/in + +----- + + +### Deploy + +#### SAP Edge Services, cloud edition + +1. Use the SAP Edge Services Policy service, navigate to the Services list and create new custom services. +2. Use "EXTERNALFLOW" for the event topic field (or what you have defined at line 46 of the file 2. Use "EXTERNALFLOW" for the event topic field (or what you have defined at line 6 of the file ./external-flow-configuration/src/main/java/com/sap/iotservices/gateway/interceptor/ExternalFlowActivator.java for the project external-flow-configuration; use any other unique value for the other projects +3. Use the jar file inside the folder /targetof each single project. +4. Save it. +5. Go in the Gateways and Group of Gateways list and search for your gateway in the list +6. Deploy the created custom services in this order to respect the nested dependencies: +- edgeApiExtension-interface +- edgeApiExtension +- extendedOperations +- external-flow-configuration + +### Deploy Configurations + +If needed you can create and use a custom configuration for the external-flow-configuration service within the Policy Service. The body of the configuration is a JSON object; these are the default values: +```json +{ + "variance": 1, + "pressureScale": 1.25, + "ingestionEnabled": true, + "filterMeasurements": false, + "filterCalculation": false, + "externalConfigurationTopic": "configuration", + "configurationFile": "defaultConfiguration.json", + "filteredObjects": [ + { + "capabilityAlternateId": "power", + "filterType": "msgbus", + "condition": "AND" + }, + { + "capabilityAlternateId": "snr", + "filterType": "ingestion", + "condition": "AND" + }, + { + "capabilityAlternateId": "pressure", + "filterType": "msgbus", + "condition": "AND" + }, + { + "capabilityAlternateId": "pressure_alert", + "filterType": "ingestion", + "condition": "AND" + } + ] +} +``` +If a new configuration is uploaded the old configuration is discarded (it's not incremental). The unspecified values are replaced with the default values. + +## Run + +### SAP Edge Services, cloud edition + +1. Use a supported method to send data to IoT Services Gateway Edge. For example, send data to the SAP IoT Edge Platform MQTT by using a tool like Paho App. +```json +MESSAGE: { + "capabilityAlternateId": "power", + "sensorTypeAlternateId": "1001", + "sensorAlternateId": "packaging", + "measures": [{ + "power": 131 + }] + } +``` +To actually see the snr values created correctly, read the measurements inside the other capability. + +2. Connect SAP IoT APIs +3. Check the values of +```json +sensorAlternateId: packaging +capabilityAlternateId: power +``` + +```json +sensorAlternateId: packaging +capabilityAlternateId: pressure +``` +and +```json +sensorAlternateId: packaging +capabilityAlternateId: snr +``` + +Verify how the values change when you enable or disable the data ingestion or the filtering and while changing the behavior inside the python external module. + +## How to obtain support + +These samples are provided "as-is" basis with detailed documentation on how to use them. + + +## Copyright and License + +Copyright (c) 2020 SAP SE or an SAP affiliate company. All rights reserved. + +License provided by [SAP SAMPLE CODE LICENSE AGREEMENT](https://github.com/SAP-samples/iot-edge-services-samples/blob/master/predictive-python/LICENSE) diff --git a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json index 8356916..dd08132 100644 --- a/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json +++ b/custom-services-additional-apis-messagebus/external-flow-configuration/src/main/resources/defaultConfiguration.json @@ -5,7 +5,7 @@ "filterMeasurements": false, "filterCalculation": false, "externalConfigurationTopic": "configuration", - "configurationFile": "C:\\Users\\i333269\\Desktop\\defaultConfiguration.json", + "configurationFile": "defaultConfiguration.json", "filteredObjects": [ { "capabilityAlternateId": "power",