-
Notifications
You must be signed in to change notification settings - Fork 102
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updated Spark analytics workshop steps and script.
- Loading branch information
Showing
2 changed files
with
98 additions
and
91 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,94 +1,101 @@ | ||
import json, configparser, socket, sys, requests | ||
from pyspark import SparkContext | ||
from pyspark import SparkConf | ||
from pyspark.streaming import StreamingContext | ||
from pyspark.streaming.kafka import KafkaUtils | ||
from pyspark.storagelevel import StorageLevel | ||
from pyspark.sql import SQLContext | ||
from uuid import uuid1 | ||
from pyspark.sql.types import * | ||
#!/usr/bin/env python | ||
"""Structured Streaming example | ||
""" | ||
|
||
access_key = sys.argv[1] | ||
host_name = socket.getfqdn() | ||
public_ip = requests.get('http://ifconfig.me').text | ||
import json | ||
import requests | ||
import socket | ||
import sys | ||
import time | ||
|
||
zk_broker = host_name + ":2181" | ||
kafka_topic = "iot" | ||
group_id = "iot-sensor-consumer" | ||
kudu_master = host_name | ||
kudu_table = "impala::default.sensors" | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType | ||
from pyspark.sql.functions import window, from_json, decode | ||
from pyspark.sql.types import * | ||
|
||
# define the table schema | ||
schema = StructType([StructField("sensor_id", IntegerType(), True), | ||
StructField("sensor_ts", LongType(), True), | ||
StructField("sensor_0", DoubleType(), True), | ||
StructField("sensor_1", DoubleType(), True), | ||
StructField("sensor_2", DoubleType(), True), | ||
StructField("sensor_3", DoubleType(), True), | ||
StructField("sensor_4", DoubleType(), True), | ||
StructField("sensor_5", DoubleType(), True), | ||
StructField("sensor_6", DoubleType(), True), | ||
StructField("sensor_7", DoubleType(), True), | ||
StructField("sensor_8", DoubleType(), True), | ||
StructField("sensor_9", DoubleType(), True), | ||
StructField("sensor_10", DoubleType(), True), | ||
StructField("sensor_11", DoubleType(), True), | ||
StructField("is_healthy", IntegerType(), True)]) | ||
KAFKA_BROKERS = "%s:9092" % (socket.gethostname(),) | ||
KUDU_MASTER = "%s:7051" % (socket.gethostname(),) | ||
KUDU_TABLE = "default.sensors" | ||
KAFKA_TOPIC = "iot" | ||
OUTPUT_MODE = "complete" | ||
PUBLIC_IP = requests.get('http://ifconfig.me').text | ||
|
||
#Lazy SqlContext evaluation | ||
def getSqlContextInstance(sparkContext): | ||
if ('sqlContextSingletonInstance' not in globals()): | ||
globals()['sqlContextSingletonInstance'] = SQLContext(sc) | ||
return globals()['sqlContextSingletonInstance'] | ||
SCHEMA = StructType([ | ||
StructField("sensor_id", IntegerType(), True), | ||
StructField("sensor_ts", LongType(), True), | ||
StructField("sensor_0", DoubleType(), True), | ||
StructField("sensor_1", DoubleType(), True), | ||
StructField("sensor_2", DoubleType(), True), | ||
StructField("sensor_3", DoubleType(), True), | ||
StructField("sensor_4", DoubleType(), True), | ||
StructField("sensor_5", DoubleType(), True), | ||
StructField("sensor_6", DoubleType(), True), | ||
StructField("sensor_7", DoubleType(), True), | ||
StructField("sensor_8", DoubleType(), True), | ||
StructField("sensor_9", DoubleType(), True), | ||
StructField("sensor_10", DoubleType(), True), | ||
StructField("sensor_11", DoubleType(), True), | ||
StructField("is_healthy", IntegerType(), True), | ||
]) | ||
|
||
|
||
def getPrediction(p): | ||
def model_lookup(data): | ||
global ACCESS_KEY | ||
p = json.loads(data) | ||
feature = "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s" % (p['sensor_1'], p['sensor_0'], p['sensor_2'], | ||
p['sensor_3'], p['sensor_4'], p['sensor_5'], p['sensor_6'],p['sensor_7'],p['sensor_8'], | ||
p['sensor_9'], p['sensor_10'], p['sensor_11']) | ||
p['sensor_3'], p['sensor_4'], p['sensor_5'], | ||
p['sensor_6'], p['sensor_7'], p['sensor_8'], | ||
p['sensor_9'], p['sensor_10'], p['sensor_11']) | ||
|
||
return requests.post('http://cdsw.' + public_ip + '.nip.io/api/altus-ds-1/models/call-model', | ||
data='{"accessKey":"' + access_key + '", "request":{"feature":"' + feature + '"}}', | ||
headers={'Content-Type': 'application/json'}).json()['response']['result'] | ||
url = 'http://cdsw.' + PUBLIC_IP + '.nip.io/api/altus-ds-1/models/call-model' | ||
data = '{"accessKey":"' + ACCESS_KEY + '", "request":{"feature":"' + feature + '"}}' | ||
while True: | ||
resp = requests.post(url, data=data, headers={'Content-Type': 'application/json'}) | ||
j = resp.json() | ||
if 'response' in j and 'result' in j['response']: | ||
return resp.json()['response']['result'] | ||
print(">>>>>>>>>>>>>>>>>>>>>" + resp.text) | ||
time.sleep(.1) | ||
|
||
|
||
#Insert data into Kudu | ||
def insert_into_kudu(time,rdd): | ||
sqc = getSqlContextInstance(rdd.context) | ||
kudu_df = sqc.createDataFrame(rdd, schema) | ||
kudu_df.show() | ||
kudu_df.write.format('org.apache.kudu.spark.kudu') \ | ||
.option('kudu.master',kudu_master) \ | ||
.option('kudu.table',kudu_table) \ | ||
.mode("append") \ | ||
.save() | ||
def main(): | ||
"""Main""" | ||
spark = SparkSession \ | ||
.builder \ | ||
.appName("KafkaStructuredStreamingExample") \ | ||
.getOrCreate() | ||
spark.sparkContext.setLogLevel("WARN") | ||
|
||
if __name__ == "__main__": | ||
sc = SparkContext(appName="SparkStreaming_IoT") | ||
ssc = StreamingContext(sc, 5) # 5 second window | ||
kvs = KafkaUtils.createStream(ssc, zk_broker, group_id, {kafka_topic:1}) | ||
events = spark \ | ||
.readStream \ | ||
.format("kafka") \ | ||
.option("kafka.bootstrap.servers", KAFKA_BROKERS) \ | ||
.option("startingoffsets", "latest") \ | ||
.option("subscribe", KAFKA_TOPIC) \ | ||
.load() | ||
|
||
kafka_stream = kvs.map(lambda x: x[1]) \ | ||
.map(lambda l: json.loads(l)) \ | ||
.map(lambda p: (int(p['sensor_id']), | ||
int(p['sensor_ts']), | ||
float(p['sensor_0']), | ||
float(p['sensor_1']), | ||
float(p['sensor_2']), | ||
float(p['sensor_3']), | ||
float(p['sensor_4']), | ||
float(p['sensor_5']), | ||
float(p['sensor_6']), | ||
float(p['sensor_7']), | ||
float(p['sensor_8']), | ||
float(p['sensor_9']), | ||
float(p['sensor_10']), | ||
float(p['sensor_11']), | ||
getPrediction(p))) | ||
spark.udf.register("model_lookup", model_lookup) | ||
data = events \ | ||
.select(decode("value", "UTF-8").alias("decoded")) \ | ||
.select(from_json("decoded", SCHEMA).alias("data"), "decoded") \ | ||
.selectExpr("data.*", 'cast(model_lookup(decoded) as int) as is_healthy') | ||
kudu = data \ | ||
.writeStream \ | ||
.format("kudu") \ | ||
.option("kudu.master", KUDU_MASTER) \ | ||
.option("kudu.table", KUDU_TABLE) \ | ||
.option("kudu.operation", "upsert") \ | ||
.option("checkpointLocation", "file:///tmp/checkpoints") \ | ||
.start() | ||
console = data \ | ||
.writeStream \ | ||
.format("console") \ | ||
.start() | ||
|
||
kudu.awaitTermination() | ||
console.awaitTermination() | ||
|
||
#For each RDD in the DStream, insert it into Kudu table | ||
kafka_stream.foreachRDD(insert_into_kudu) | ||
|
||
ssc.start() | ||
ssc.awaitTermination() | ||
if __name__ == '__main__': | ||
ACCESS_KEY = sys.argv[1] | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters