-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Working demo of Kafka & FIssion as part of IOT demo.
- Loading branch information
1 parent
fdbab7b
commit 9d6cddf
Showing
39 changed files
with
1,703 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
.project | ||
.settings | ||
.classpath | ||
target/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Fission Kafka - IOT Demo | ||
|
||
## Setup Kafka | ||
For setting up Kafka on Kubernetes, we will use [kubernetes-kafka repo here](https://github.com/Yolean/kubernetes-kafka). | ||
|
||
Follow the instructions in [Getting started section](https://github.com/Yolean/kubernetes-kafka#getting-started) and install Kafka & Zookeeper after appropriate storage class is configured. | ||
|
||
**OPTIONAL** | ||
|
||
In addition you can install components below: | ||
- yahoo-kafka-manager : Provides a nice UI to look at replication & rebalance stats across topics | ||
|
||
- pixy: Provides a REST/GRPC API wrapper to Kafka. You can list, publish & consume messages using REST or GRPC. | ||
|
||
## Setup Redis | ||
|
||
We will setup redis without persistence and as a single instance (i.e. no master & slave). You can customize the command as per your needs based on [instrcutions from here](https://github.com/helm/charts/tree/master/stable/redis#configuration). Also note that for simplicity - Redis has been setup without a password. | ||
|
||
``` | ||
helm install --name redis-single --namespace redis \ | ||
--set usePassword=false \ | ||
--set cluster.enabled=false \ | ||
--set master.persistence.enabled=false \ | ||
stable/redis | ||
``` |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
<groupId>io.fission.kafka</groupId> | ||
<artifactId>iot-kafka-producer</artifactId> | ||
<version>1.0.0</version> | ||
<name>IoT Kafka Producer</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-web</artifactId> | ||
<version>2.0.1.RELEASE</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.fission</groupId> | ||
<artifactId>fission-java-core</artifactId> | ||
<version>0.0.2-SNAPSHOT</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>2.0.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-core</artifactId> | ||
<version>2.6.6</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
<version>2.6.6</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-annotations</artifactId> | ||
<version>2.6.6</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.12</version> | ||
</dependency> | ||
<!-- https://mvnrepository.com/artifact/org.osgi/org.osgi.core --> | ||
<dependency> | ||
<groupId>org.osgi</groupId> | ||
<artifactId>org.osgi.core</artifactId> | ||
<version>4.3.0</version> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<artifactId>maven-assembly-plugin</artifactId> | ||
<configuration> | ||
<descriptorRefs> | ||
<descriptorRef>jar-with-dependencies</descriptorRef> | ||
</descriptorRefs> | ||
</configuration> | ||
<executions> | ||
<execution> | ||
<id>make-assembly</id> <!-- this is used for inheritance merges --> | ||
<phase>package</phase> <!-- bind to the packaging phase --> | ||
<goals> | ||
<goal>single</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<!-- Adding Sonatype repository to pull snapshots --> | ||
<repositories> | ||
<repository> | ||
<id>fission-java-core</id> | ||
<name>fission-java-core-snapshot</name> | ||
<url>https://oss.sonatype.org/content/repositories/snapshots/</url> | ||
</repository> | ||
</repositories> | ||
|
||
</project> |
75 changes: 75 additions & 0 deletions
75
01_iot_data_producer/src/main/java/io/fission/kafka/IoTData.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package io.fission.kafka; | ||
|
||
import java.io.Serializable; | ||
import java.util.Date; | ||
|
||
import com.fasterxml.jackson.annotation.JsonFormat; | ||
|
||
/** | ||
* Class to represent the IoT vehicle data. | ||
* | ||
* @author abaghel | ||
* | ||
*/ | ||
public class IoTData implements Serializable{ | ||
|
||
private String vehicleId; | ||
private String vehicleType; | ||
private String routeId; | ||
private String longitude; | ||
private String latitude; | ||
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone="MST") | ||
private Date timestamp; | ||
private double speed; | ||
private double fuelLevel; | ||
|
||
public IoTData(){ | ||
|
||
} | ||
|
||
public IoTData(String vehicleId, String vehicleType, String routeId, String latitude, String longitude, | ||
Date timestamp, double speed, double fuelLevel) { | ||
super(); | ||
this.vehicleId = vehicleId; | ||
this.vehicleType = vehicleType; | ||
this.routeId = routeId; | ||
this.longitude = longitude; | ||
this.latitude = latitude; | ||
this.timestamp = timestamp; | ||
this.speed = speed; | ||
this.fuelLevel = fuelLevel; | ||
} | ||
|
||
public String getVehicleId() { | ||
return vehicleId; | ||
} | ||
|
||
public String getVehicleType() { | ||
return vehicleType; | ||
} | ||
|
||
public String getRouteId() { | ||
return routeId; | ||
} | ||
|
||
public String getLongitude() { | ||
return longitude; | ||
} | ||
|
||
public String getLatitude() { | ||
return latitude; | ||
} | ||
|
||
public Date getTimestamp() { | ||
return timestamp; | ||
} | ||
|
||
public double getSpeed() { | ||
return speed; | ||
} | ||
|
||
public double getFuelLevel() { | ||
return fuelLevel; | ||
} | ||
|
||
} |
47 changes: 47 additions & 0 deletions
47
01_iot_data_producer/src/main/java/io/fission/kafka/IoTDataEncoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package io.fission.kafka; | ||
|
||
|
||
import java.util.Map; | ||
import java.util.logging.Logger; | ||
|
||
import org.apache.kafka.common.serialization.Serializer; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import io.fission.kafka.IoTData; | ||
|
||
|
||
/** | ||
* Class to convert IoTData java object to JSON String | ||
* | ||
* @author abaghel | ||
* | ||
*/ | ||
public class IoTDataEncoder implements Serializer<IoTData> { | ||
|
||
private static Logger logger = Logger.getGlobal(); | ||
|
||
private static ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
public IoTDataEncoder() { | ||
|
||
} | ||
|
||
public void configure(Map<String, ?> configs, boolean isKey) { | ||
} | ||
|
||
public byte[] serialize(String topic, IoTData data) { | ||
try { | ||
String msg = objectMapper.writeValueAsString(data); | ||
logger.info(msg); | ||
return msg.getBytes(); | ||
} catch (JsonProcessingException e) { | ||
logger.severe("Error in Serialization" + e); | ||
} | ||
return null; | ||
} | ||
|
||
public void close() { | ||
} | ||
} |
138 changes: 138 additions & 0 deletions
138
01_iot_data_producer/src/main/java/io/fission/kafka/IotProducer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package io.fission.kafka; | ||
|
||
import io.fission.Function; | ||
|
||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
|
||
import java.text.DateFormat; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Calendar; | ||
import java.util.Collections; | ||
import java.util.Date; | ||
import java.util.List; | ||
import java.util.Properties; | ||
import java.util.Random; | ||
import java.util.UUID; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.logging.Logger; | ||
|
||
import org.springframework.http.HttpHeaders; | ||
import org.springframework.http.HttpStatus; | ||
import org.springframework.http.RequestEntity; | ||
import org.springframework.http.ResponseEntity; | ||
|
||
import io.fission.kafka.IoTData; | ||
|
||
import io.fission.Context; | ||
|
||
public class IotProducer implements Function { | ||
|
||
private static Logger logger = Logger.getGlobal(); | ||
static final long FIVE_MINUTE_IN_MILLIS=300000;//millisecs | ||
|
||
|
||
public ResponseEntity call(RequestEntity req, Context context) { | ||
|
||
String brokerList = System.getenv("KAFKA_ADDR"); | ||
String topic = System.getenv("TOPIC_NAME"); | ||
if (brokerList == null || topic == null) { | ||
return ResponseEntity.badRequest().build(); | ||
} | ||
|
||
// Related issue: https://stackoverflow.com/questions/37363119/kafka-producer-org-apache-kafka-common-serialization-stringserializer-could-no | ||
Thread.currentThread().setContextClassLoader(null); | ||
|
||
Properties properties = new Properties(); | ||
properties.put("bootstrap.servers", brokerList); | ||
properties.put("acks", "all"); | ||
properties.put("value.serializer", "io.fission.kafka.IoTDataEncoder"); | ||
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | ||
Producer<String, IoTData> producer = new KafkaProducer<String, IoTData>(properties); | ||
IotProducer iotProducer = new IotProducer(); | ||
try { | ||
iotProducer.generateIoTEvent(producer,topic); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
logger.severe("Failed to send events to Kafka"+ e); | ||
} | ||
producer.close(); | ||
HttpHeaders headers = new HttpHeaders(); | ||
headers.add("Access-Control-Allow-Origin", "*"); | ||
headers.add("Access-Control-Allow-Headers", "*"); | ||
headers.add("Access-Control-Allow-Credentials", "true"); | ||
headers.add("Access-Control-Allow-Methods", "*"); | ||
headers.add("Access-Control-Expose-Headers", "*"); | ||
return ResponseEntity.status(HttpStatus.OK).headers(headers).build(); | ||
} | ||
|
||
/** | ||
* Method runs 100s of times and generates random IoT data in JSON with below | ||
* format. | ||
* | ||
* {"vehicleId":"52f08f03-cd14-411a-8aef-ba87c9a99997","vehicleType":"Public | ||
* Transport","routeId":"route-43","latitude":",-85.583435","longitude":"38.892395","timestamp":1465471124373,"speed":80.0,"fuelLevel":28.0} | ||
* | ||
* @throws InterruptedException | ||
* | ||
*/ | ||
private void generateIoTEvent(Producer<String, IoTData> producer, String topic) throws InterruptedException { | ||
List<String> routeList = Arrays.asList(new String[] { "Route-37", "Route-43", "Route-82" }); | ||
List<String> vehicleTypeList = Arrays | ||
.asList(new String[] { "Large Truck", "Small Truck", "Van", "18 Wheeler", "Car" }); | ||
Random rand = new Random(); | ||
logger.info("Sending events"); | ||
// generate event in loop | ||
List<IoTData> eventList = new ArrayList<IoTData>(); | ||
for (int i = 0; i < 10; i++) {// create 1000 vehicles and push to Kafka on every function invocation | ||
String vehicleId = UUID.randomUUID().toString(); | ||
String vehicleType = vehicleTypeList.get(rand.nextInt(5)); | ||
String routeId = routeList.get(rand.nextInt(3)); | ||
|
||
|
||
Calendar d1 = Calendar.getInstance(); | ||
long t = d1.getTimeInMillis(); | ||
Date d2 = new Date(t + (FIVE_MINUTE_IN_MILLIS)); | ||
Date timestamp = new Date(ThreadLocalRandom.current().nextLong(t, d2.getTime())); | ||
|
||
double speed = rand.nextInt(100 - 20) + 20;// random speed between 20 to 100 | ||
double fuelLevel = rand.nextInt(40 - 10) + 10; | ||
for (int j = 0; j < 5; j++) {// Add 5 events for each vehicle | ||
String coords = getCoordinates(routeId); | ||
String latitude = coords.substring(0, coords.indexOf(",")); | ||
String longitude = coords.substring(coords.indexOf(",") + 1, coords.length()); | ||
IoTData event = new IoTData(vehicleId, vehicleType, routeId, latitude, longitude, timestamp, speed, | ||
fuelLevel); | ||
eventList.add(event); | ||
} | ||
} | ||
Collections.shuffle(eventList);// shuffle for random events | ||
for (IoTData event : eventList) { | ||
producer.send(new ProducerRecord<String, IoTData>(topic, event)); | ||
} | ||
|
||
} | ||
|
||
// Method to generate random latitude and longitude for routes | ||
private String getCoordinates(String routeId) { | ||
Random rand = new Random(); | ||
int latPrefix = 0; | ||
int longPrefix = -0; | ||
if (routeId.equals("Route-37")) { | ||
latPrefix = 33; | ||
longPrefix = -96; | ||
} else if (routeId.equals("Route-82")) { | ||
latPrefix = 34; | ||
longPrefix = -97; | ||
} else if (routeId.equals("Route-43")) { | ||
latPrefix = 35; | ||
longPrefix = -98; | ||
} | ||
Float lati = latPrefix + rand.nextFloat(); | ||
Float longi = longPrefix + rand.nextFloat(); | ||
return lati + "," + longi; | ||
} | ||
|
||
} |
Empty file.
Oops, something went wrong.