Skip to content

Commit

Permalink
Incremental updates to support Esri User Conference demo.
Browse files Browse the repository at this point in the history
  • Loading branch information
amollenkopf committed Aug 5, 2016
1 parent d30e89c commit a6e4816
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 164 deletions.
54 changes: 54 additions & 0 deletions elasticsearch-marathon.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"id": "elasticsearch",
"cpus": 0.2,
"mem": 512,
"ports": [0],
"instances": 1,
"healthChecks": [
{
"gracePeriodSeconds": 120,
"intervalSeconds": 30,
"maxConsecutiveFailures": 0,
"path": "/",
"portIndex": 0,
"protocol": "HTTP",
"timeoutSeconds": 5
}
],
"args": [
"--zookeeperMesosUrl", "zk://master.mesos:2181/mesos",
"--zookeeperMesosTimeout", "20000",
"--webUiPort", "31100",
"--frameworkFailoverTimeout", "2592000.0",
"--frameworkName", "elasticsearch",

"--elasticsearchNodes", "5",
"--elasticsearchClusterName", "spatiotemporal-store",
"--elasticsearchPorts", "9200,9300",
"--elasticsearchCpu", "4.0",
"--elasticsearchDisk", "10240",
"--elasticsearchRam", "12228",
"--elasticsearchDockerImage", "elasticsearch:2.3.4",

"--executorForcePullImage", "false",
"--executorName", "spatiotemporal-store"
],
"env": {
"JAVA_OPTS": "-Xms128m -Xmx256m"
},
"container": {
"type": "DOCKER",
"docker": {
"image": "mesos/elasticsearch-scheduler:1.0.1",
"network": "HOST",
"forcePullImage": false
}
},
"ports": [31100],
"requirePorts": true,
"labels": {
"DCOS_PACKAGE_FRAMEWORK_NAME": "elasticsearch",
"DCOS_PACKAGE_IS_FRAMEWORK": "true"
}
}

1 change: 1 addition & 0 deletions event-source/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
FROM centos
RUN yum install -y java-1.8.0-openjdk.x86_64
ADD ./data/parolee/parolee.csv /data/parolee/parolee.csv
ADD ./data/people/people.csv /data/people/people.csv
ADD ./target/scala-2.11/event-source-assembly-1.0.jar /jars/event-source-assembly-1.0.jar
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object FileEventSource extends App {
val producer: Producer[String, String] = new KafkaProducer[String, String](props)

var ix = 0
var count = 0
println("Writing " + eventsPerSecond + " e/s every " + intervalInMillis + " millis to topic " + topic + " on brokers " + brokers + " ...")
while(true) {
if (ix + eventsPerSecond > tracks.length)
Expand All @@ -41,10 +42,14 @@ object FileEventSource extends App {
val eventString = tracks(jx).replace("TIME", System.currentTimeMillis().toString).trim
producer.send(new ProducerRecord[String, String](topic, eventString.split(",")(0), eventString))
ix += 1
count += 1
if (verbose)
println(jx + ": " + eventString)
}
println()
if (verbose)
println()
println("Time %s: File Event Source sent %s events.".format(System.currentTimeMillis, count))
count = 0
Thread.sleep(intervalInMillis)
}
producer.close()
Expand Down
3 changes: 2 additions & 1 deletion spatiotemporal-esri-analytics/rat01.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
}
},
"id": "rat01",
"instances": 5,
"instances": 3,
"cpus": 4,
"mem": 2048
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink {
val Array(zkQuorum, topic, geofenceFilteringOnStr, url, tShirtMessage, phoneNumber, verboseStr, esNodes, esClusterName, esIndexName) = args
val geofenceFilteringOn = geofenceFilteringOnStr.toBoolean
val verbose = verboseStr.toBoolean
val shards = 20 //TODO: expose as param, default is 3
val replicas = 0 //TODO: expose as param, default is 1

println("es.cluster.name: " + esClusterName)
println("es.nodes: " + esNodes)
val sparkConf = new SparkConf()
.setAppName("rat2")
.set("es.cluster.name", esClusterName)
Expand Down Expand Up @@ -134,7 +138,7 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink {
EsField("geometry", EsFieldType.GeoPoint)
)
if (!ElasticsearchUtils.doesDataSourceExists(esIndexName, esNode, esPort))
ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort)
ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort, shards, replicas)

datasource.foreachRDD((rdd: RDD[Map[String, Any]], time: Time) => {
println("Time %s: Updating Elasticsearch (%s total records)".format(time, rdd.count))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink {
val Array(zkQuorum, topic, geofenceFilteringOnStr, verboseStr, esNodes, esClusterName, esIndexName) = args
val geofenceFilteringOn = geofenceFilteringOnStr.toBoolean
val verbose = verboseStr.toBoolean
val shards = 20 //TODO: expose as param, default is 3
val replicas = 0 //TODO: expose as param, default is 1

val sparkConf = new SparkConf()
.setAppName("rat1")
.setAppName("taxi-rat")
.set("es.cluster.name", esClusterName)
.set("es.nodes", esNodes)
.set("es.index.auto.create", "true")
Expand Down Expand Up @@ -76,8 +78,8 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink {
"pickup_latitude" -> fields(16).toDouble,
"dropoff_longitude" -> fields(17).toDouble,
"dropoff_latitude" -> fields(18).toDouble,
"geometry" -> s"${point._1},${point._2}",
"---geo_hash---" -> s"${point._1},${point._2}"
"geometry" -> s"${point._1},${point._2}"//,
//"---geo_hash---" -> s"${point._1},${point._2}"
)
}
)
Expand Down Expand Up @@ -107,7 +109,7 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink {
EsField("geometry", EsFieldType.GeoPoint)
)
if (!ElasticsearchUtils.doesDataSourceExists(esIndexName, esNode, esPort))
ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort)
ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort, shards, replicas)

datasource.foreachRDD((rdd: RDD[Map[String, Any]], time: Time) => {
rdd.saveToEs(esIndexName + "/" + esIndexName) // ES index/type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.params.{BasicHttpParams, HttpConnectionParams}

object TShirtSink {
object TShirtSink extends App {
var alerted = false

def sink(url: String, message: String): String = {
Expand All @@ -29,21 +29,37 @@ object TShirtSink {
@throws(classOf[java.net.SocketTimeoutException])
def post(url: String, message: String): String = {
val httpParams = new BasicHttpParams()
HttpConnectionParams.setConnectionTimeout(httpParams, 3000)
HttpConnectionParams.setSoTimeout(httpParams, 3000)
HttpConnectionParams.setConnectionTimeout(httpParams, 5000)
HttpConnectionParams.setSoTimeout(httpParams, 5000)
val client = new DefaultHttpClient(httpParams)

val post = new HttpPost(url)
post.setHeader("Content-Type", "application/json")
post.setEntity(new StringEntity("{ \"message\": \"" + message + "\" }"))
val body = "{ \"message\":\"" + message + "\", \"rgb\":[255,255,255], \"animation\":3000 }"
val entity = new StringEntity(body)
println(body)
post.setEntity(entity)

/*
{
"message":"My message",
"rgb":[255,255,255],
"animation":3000
}
*/

//val client = new DefaultHttpClient
val response = client.execute(post).getStatusLine
println(response)
response.getStatusCode + ": " + response.getReasonPhrase
}
//TODO: To resolve deprecations, see http://stackoverflow.com/questions/3000214/java-http-client-request-with-defined-timeout

// bad: http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt
// good: http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirtfake
//println(sink("http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt", "MesosCon 2016 Test"))

//println(sink("http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt", "Adam Test 1"))
val message: String = args(0).toString
println("sending " + message)
println(sink("http://esri12agents.westus.cloudapp.azure.com:10005/api/tshirt", message))
}
Loading

0 comments on commit a6e4816

Please sign in to comment.