From a6e48164eba7343a5f54ff1a693df6daa33d34fe Mon Sep 17 00:00:00 2001 From: Adam Mollenkopf Date: Fri, 5 Aug 2016 06:51:22 -0700 Subject: [PATCH] Incremental updates to support Esri User Conference demo. --- elasticsearch-marathon.json | 54 ++++++ event-source/Dockerfile | 1 + .../org/cam/geo/source/FileEventSource.scala | 7 +- spatiotemporal-esri-analytics/rat01.json | 3 +- ...icTaskWithElasticsearchAndTShirtSink.scala | 6 +- ...sriAnalyticTaskWithElasticsearchSink.scala | 10 +- .../cam/geo/analytics/esri/TShirtSink.scala | 26 ++- .../org/cam/geo/sink/ElasticsearchUtils.scala | 161 ++---------------- spatiotemporal-event-source/Dockerfile | 2 + spatiotemporal-event-source/source01.json | 3 +- 10 files changed, 109 insertions(+), 164 deletions(-) create mode 100644 elasticsearch-marathon.json diff --git a/elasticsearch-marathon.json b/elasticsearch-marathon.json new file mode 100644 index 0000000..8a94f25 --- /dev/null +++ b/elasticsearch-marathon.json @@ -0,0 +1,54 @@ +{ + "id": "elasticsearch", + "cpus": 0.2, + "mem": 512, + "ports": [0], + "instances": 1, + "healthChecks": [ + { + "gracePeriodSeconds": 120, + "intervalSeconds": 30, + "maxConsecutiveFailures": 0, + "path": "/", + "portIndex": 0, + "protocol": "HTTP", + "timeoutSeconds": 5 + } + ], + "args": [ + "--zookeeperMesosUrl", "zk://master.mesos:2181/mesos", + "--zookeeperMesosTimeout", "20000", + "--webUiPort", "31100", + "--frameworkFailoverTimeout", "2592000.0", + "--frameworkName", "elasticsearch", + + "--elasticsearchNodes", "5", + "--elasticsearchClusterName", "spatiotemporal-store", + "--elasticsearchPorts", "9200,9300", + "--elasticsearchCpu", "4.0", + "--elasticsearchDisk", "10240", + "--elasticsearchRam", "12228", + "--elasticsearchDockerImage", "elasticsearch:2.3.4", + + "--executorForcePullImage", "false", + "--executorName", "spatiotemporal-store" + ], + "env": { + "JAVA_OPTS": "-Xms128m -Xmx256m" + }, + "container": { + "type": "DOCKER", + "docker": { + "image": "mesos/elasticsearch-scheduler:1.0.1", + "network": "HOST", + "forcePullImage": false + } + }, + "ports": [31100], + "requirePorts": true, + "labels": { + "DCOS_PACKAGE_FRAMEWORK_NAME": "elasticsearch", + "DCOS_PACKAGE_IS_FRAMEWORK": "true" + } + } + diff --git a/event-source/Dockerfile b/event-source/Dockerfile index c2521b5..df24d2e 100644 --- a/event-source/Dockerfile +++ b/event-source/Dockerfile @@ -2,4 +2,5 @@ FROM centos RUN yum install -y java-1.8.0-openjdk.x86_64 ADD ./data/parolee/parolee.csv /data/parolee/parolee.csv +ADD ./data/people/people.csv /data/people/people.csv ADD ./target/scala-2.11/event-source-assembly-1.0.jar /jars/event-source-assembly-1.0.jar diff --git a/event-source/src/main/scala/org/cam/geo/source/FileEventSource.scala b/event-source/src/main/scala/org/cam/geo/source/FileEventSource.scala index 83b95e6..f2d6ac4 100644 --- a/event-source/src/main/scala/org/cam/geo/source/FileEventSource.scala +++ b/event-source/src/main/scala/org/cam/geo/source/FileEventSource.scala @@ -33,6 +33,7 @@ object FileEventSource extends App { val producer: Producer[String, String] = new KafkaProducer[String, String](props) var ix = 0 + var count = 0 println("Writing " + eventsPerSecond + " e/s every " + intervalInMillis + " millis to topic " + topic + " on brokers " + brokers + " ...") while(true) { if (ix + eventsPerSecond > tracks.length) @@ -41,10 +42,14 @@ object FileEventSource extends App { val eventString = tracks(jx).replace("TIME", System.currentTimeMillis().toString).trim producer.send(new ProducerRecord[String, String](topic, eventString.split(",")(0), eventString)) ix += 1 + count += 1 if (verbose) println(jx + ": " + eventString) } - println() + if (verbose) + println() + println("Time %s: File Event Source sent %s events.".format(System.currentTimeMillis, count)) + count = 0 Thread.sleep(intervalInMillis) } producer.close() diff --git a/spatiotemporal-esri-analytics/rat01.json b/spatiotemporal-esri-analytics/rat01.json index b2a6875..532479c 100644 --- a/spatiotemporal-esri-analytics/rat01.json +++ b/spatiotemporal-esri-analytics/rat01.json @@ -11,7 +11,8 @@ } }, "id": "rat01", - "instances": 5, + "instances": 3, "cpus": 4, "mem": 2048 } + diff --git a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink.scala b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink.scala index d85fae7..a62aa28 100644 --- a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink.scala +++ b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink.scala @@ -57,7 +57,11 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink { val Array(zkQuorum, topic, geofenceFilteringOnStr, url, tShirtMessage, phoneNumber, verboseStr, esNodes, esClusterName, esIndexName) = args val geofenceFilteringOn = geofenceFilteringOnStr.toBoolean val verbose = verboseStr.toBoolean + val shards = 20 //TODO: expose as param, default is 3 + val replicas = 0 //TODO: expose as param, default is 1 + println("es.cluster.name: " + esClusterName) + println("es.nodes: " + esNodes) val sparkConf = new SparkConf() .setAppName("rat2") .set("es.cluster.name", esClusterName) @@ -134,7 +138,7 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchAndTShirtSink { EsField("geometry", EsFieldType.GeoPoint) ) if (!ElasticsearchUtils.doesDataSourceExists(esIndexName, esNode, esPort)) - ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort) + ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort, shards, replicas) datasource.foreachRDD((rdd: RDD[Map[String, Any]], time: Time) => { println("Time %s: Updating Elasticsearch (%s total records)".format(time, rdd.count)) diff --git a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchSink.scala b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchSink.scala index 98821c6..2b64a11 100644 --- a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchSink.scala +++ b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/SpatiotemporalEsriAnalyticTaskWithElasticsearchSink.scala @@ -33,9 +33,11 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink { val Array(zkQuorum, topic, geofenceFilteringOnStr, verboseStr, esNodes, esClusterName, esIndexName) = args val geofenceFilteringOn = geofenceFilteringOnStr.toBoolean val verbose = verboseStr.toBoolean + val shards = 20 //TODO: expose as param, default is 3 + val replicas = 0 //TODO: expose as param, default is 1 val sparkConf = new SparkConf() - .setAppName("rat1") + .setAppName("taxi-rat") .set("es.cluster.name", esClusterName) .set("es.nodes", esNodes) .set("es.index.auto.create", "true") @@ -76,8 +78,8 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink { "pickup_latitude" -> fields(16).toDouble, "dropoff_longitude" -> fields(17).toDouble, "dropoff_latitude" -> fields(18).toDouble, - "geometry" -> s"${point._1},${point._2}", - "---geo_hash---" -> s"${point._1},${point._2}" + "geometry" -> s"${point._1},${point._2}"//, + //"---geo_hash---" -> s"${point._1},${point._2}" ) } ) @@ -107,7 +109,7 @@ object SpatiotemporalEsriAnalyticTaskWithElasticsearchSink { EsField("geometry", EsFieldType.GeoPoint) ) if (!ElasticsearchUtils.doesDataSourceExists(esIndexName, esNode, esPort)) - ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort) + ElasticsearchUtils.createDataSource(esIndexName, esFields, esNode, esPort, shards, replicas) datasource.foreachRDD((rdd: RDD[Map[String, Any]], time: Time) => { rdd.saveToEs(esIndexName + "/" + esIndexName) // ES index/type diff --git a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/TShirtSink.scala b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/TShirtSink.scala index 77113eb..f8b335f 100644 --- a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/TShirtSink.scala +++ b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/analytics/esri/TShirtSink.scala @@ -8,7 +8,7 @@ import org.apache.http.entity.StringEntity import org.apache.http.impl.client.DefaultHttpClient import org.apache.http.params.{BasicHttpParams, HttpConnectionParams} -object TShirtSink { +object TShirtSink extends App { var alerted = false def sink(url: String, message: String): String = { @@ -29,21 +29,37 @@ object TShirtSink { @throws(classOf[java.net.SocketTimeoutException]) def post(url: String, message: String): String = { val httpParams = new BasicHttpParams() - HttpConnectionParams.setConnectionTimeout(httpParams, 3000) - HttpConnectionParams.setSoTimeout(httpParams, 3000) + HttpConnectionParams.setConnectionTimeout(httpParams, 5000) + HttpConnectionParams.setSoTimeout(httpParams, 5000) val client = new DefaultHttpClient(httpParams) val post = new HttpPost(url) post.setHeader("Content-Type", "application/json") - post.setEntity(new StringEntity("{ \"message\": \"" + message + "\" }")) + val body = "{ \"message\":\"" + message + "\", \"rgb\":[255,255,255], \"animation\":3000 }" + val entity = new StringEntity(body) + println(body) + post.setEntity(entity) + + /* + { + "message":"My message", + "rgb":[255,255,255], + "animation":3000 + } + */ //val client = new DefaultHttpClient val response = client.execute(post).getStatusLine + println(response) response.getStatusCode + ": " + response.getReasonPhrase } //TODO: To resolve deprecations, see http://stackoverflow.com/questions/3000214/java-http-client-request-with-defined-timeout // bad: http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt // good: http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirtfake - //println(sink("http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt", "MesosCon 2016 Test")) + + //println(sink("http://clcamesos25agents.westus.cloudapp.azure.com:10001/api/tshirt", "Adam Test 1")) + val message: String = args(0).toString + println("sending " + message) + println(sink("http://esri12agents.westus.cloudapp.azure.com:10005/api/tshirt", message)) } \ No newline at end of file diff --git a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/sink/ElasticsearchUtils.scala b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/sink/ElasticsearchUtils.scala index a54ad18..df13645 100644 --- a/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/sink/ElasticsearchUtils.scala +++ b/spatiotemporal-esri-analytics/src/main/scala/org/cam/geo/sink/ElasticsearchUtils.scala @@ -21,6 +21,7 @@ object ElasticsearchUtils { * @param esPort the es port * @return */ + //TODO: Change DataSource references to Index def doesDataSourceExists(dataSourceName:String, esHostName:String, esPort:Int = 9200):Boolean = { val client = HttpClients.createDefault() try { @@ -62,19 +63,17 @@ object ElasticsearchUtils { * @param fields the fields * @param esHostName the es host name * @param esPort the es port name - * @param geometryType the geometry type, could be "esriGeometryPoint", "esriGeometryPolygon" or "esriGeometryPolyline" */ - def createDataSource(dataSourceName:String, fields:Array[EsField], esHostName:String, esPort:Int = 9200, geometryType:String = "esriGeometryPoint"):Boolean = { + def createDataSource(dataSourceName:String, fields:Array[EsField], esHostName:String, esPort:Int = 9200, shards:Int=3, replicas:Int=1):Boolean = { val client = HttpClients.createDefault() try { val dataSourceNameToLowercase = dataSourceName.toLowerCase() - // step 1 - create the mapping with metadata val createMappingURLStr = s"http://$esHostName:$esPort/$dataSourceNameToLowercase" val createMappingURL:URL = new URL(createMappingURLStr) val httpPut:HttpPut = new HttpPut(createMappingURL.toURI) httpPut.setHeader(HttpHeaders.CONTENT_TYPE, "application/json") httpPut.setHeader("charset", "utf-8") - val createMappingRequest = createMappingJsonAsStr(dataSourceNameToLowercase, fields) + val createMappingRequest = createMappingJsonAsStr(dataSourceNameToLowercase, fields, shards, replicas) httpPut.setEntity(new StringEntity(createMappingRequest, ContentType.APPLICATION_JSON)) // execute @@ -101,55 +100,7 @@ object ElasticsearchUtils { parseError.printStackTrace() return false } - - // step 2 - create the metadata documents - val geometryField = fields.find( field => field.fieldType == EsFieldType.GeoPoint || field.fieldType == EsFieldType.GeoShape ) - val geometryFieldName = geometryField match { - case Some(field) if field.fieldType == EsFieldType.GeoPoint || field.fieldType == EsFieldType.GeoShape=> - field.name - case _ => - "Geometry" - } - val createOIDDoc = - s"""{ "index" : { "_index" : "$dataSourceNameToLowercase", "_type" : "---metadata---", "_id" : "object_id_generator" } } - {} - """.stripMargin - val createMetadataDoc = - s"""{ "index" : { "_index" : "$dataSourceNameToLowercase", "_type" : "---metadata---", "_id" : "data_source_metadata" } } - {"aliasName":"$dataSourceNameToLowercase","indexNamePrefix":"${UUID.randomUUID().toString}","indexTypeName":"$dataSourceNameToLowercase","oidFieldName":"objectid","globalIdFieldName":"globalid","trackIdFieldName":null,"startTimeFieldName":null,"endTimeFieldName":null,"geometryFieldName":"$geometryFieldName","geometryType":"$geometryType","timeInterval":10,"timeIntervalUnits":"esriTimeUnitsSeconds","hasLiveData":true,"objectIdStrategy":"ObjectId64Bit","rollingIndexStrategy":"Hourly","oidSeed":0,"oidBlockSize":1,"esriGeoHashes":"[{\\"field_name\\":\\"triangle_h_102100\\",\\"type\\":\\"triangle\\",\\"wkid\\":102100,\\"precision\\":20,\\"orientation\\":\\"horizontal\\"},{\\"field_name\\":\\"triangle_v_102100\\",\\"type\\":\\"triangle\\",\\"wkid\\":102100,\\"precision\\":20,\\"orientation\\":\\"vertical\\"},{\\"field_name\\":\\"square_102100\\",\\"type\\":\\"square\\",\\"wkid\\":102100,\\"precision\\":20}]"} - """.stripMargin - val bulkRequest = createOIDDoc + "\n" +createMetadataDoc - val bulkURLStr = s"http://$esHostName:$esPort/$dataSourceNameToLowercase/_bulk" - val bulkURL:URL = new URL(bulkURLStr) - val httpPost:HttpPost = new HttpPost(bulkURL.toURI) - httpPost.setEntity(new StringEntity(bulkRequest, ContentType.DEFAULT_TEXT)) - - // execute - val bulkResponse = client.execute(httpPost) - val bulkResponseAsString = getHttpResponseAsString(bulkResponse, httpPut) - try { - val createSuccessOpt = bulkResponseAsString.map(str => { - implicit val formats = DefaultFormats - (parse(str) \ "errors").extract[Boolean] - }) - createSuccessOpt match { - case Some(errors) => - if (errors) { - println(s"Failed to create the metadata and oid documents for DataSource $dataSourceName! Errors: $bulkResponseAsString") - return false - } - case _ => - println(s"Failed to create the mapping for DataSource $dataSourceName! Errors: $bulkResponseAsString") - return false - } - } catch { - case parseError: Throwable => - println(s"Failed to create the mapping for DataSource $dataSourceName!") - parseError.printStackTrace() - return false - } true - } finally { client.close() } @@ -164,7 +115,6 @@ object ElasticsearchUtils { * @return a String */ private def getHttpResponseAsString(response:CloseableHttpResponse, request:HttpUriRequest):Option[String] = { - // make sure we close the response try { val entity = response.getEntity val responseString = { @@ -192,9 +142,14 @@ object ElasticsearchUtils { * @param fields the es fields to create * @return a json str */ - private def createMappingJsonAsStr(datSourceName:String, fields:Array[EsField]):String = { + private def createMappingJsonAsStr(datSourceName:String, fields:Array[EsField], shards:Int=3, replicas:Int=1):String = { s""" |{ + | "settings": { + | "number_of_shards" : $shards, + | "auto_expand_replicas" : 0, + | "number_of_replicas" : $replicas + | }, | "mappings": { | "$datSourceName": { | "_timestamp": { @@ -203,70 +158,6 @@ object ElasticsearchUtils { | "properties": { | ${createFieldMappingJsonAsStr(fields)} | } - | }, - | "---metadata---": { - | "_timestamp": { - | "enabled": true - | }, - | "properties": { - | "aliasName": { - | "type": "string" - | }, - | "endTimeFieldName": { - | "type": "string" - | }, - | "esriGeoHashes": { - | "type": "string" - | }, - | "geometryFieldName": { - | "type": "string" - | }, - | "geometryType": { - | "type": "string" - | }, - | "globalIdFieldName": { - | "type": "string" - | }, - | "hasLiveData": { - | "type": "string" - | }, - | "indexNamePrefix": { - | "type": "string" - | }, - | "indexTypeName": { - | "type": "string" - | }, - | "objectIdStrategy": { - | "type": "string" - | }, - | "oidBlockSize": { - | "type": "string" - | }, - | "oidFieldName": { - | "type": "string" - | }, - | "oidGuid": { - | "type": "string" - | }, - | "oidSeed": { - | "type": "string" - | }, - | "rollingIndexStrategy": { - | "type": "string" - | }, - | "startTimeFieldName": { - | "type": "string" - | }, - | "timeInterval": { - | "type": "string" - | }, - | "timeIntervalUnits": { - | "type": "string" - | }, - | "trackIdFieldName": { - | "type": "string" - | } - | } | } | } |} @@ -274,16 +165,7 @@ object ElasticsearchUtils { } private def createFieldMappingJsonAsStr(fields:Array[EsField]):String = { - val idFields = - """ - |"globalid": { - | "type": "string" - |}, - |"objectid": { - | "type": "long" - |} - """.stripMargin - fields.foldRight(idFields)( (field, str) => { + fields.foldRight("")( (field, str) => { val fieldJson = field.fieldType match { case EsFieldType.Unknown => "" case EsFieldType.GeoPoint => @@ -293,29 +175,6 @@ object ElasticsearchUtils { "lat_lon": "true", "geohash": "true", "geohash_prefix": "true" - }, - "---geo_hash---": { - "type": "esri_geo_hash", - "hashes": [{ - "orientation": "horizontal", - "precision": 20, - "wkid": 102100, - "type": "triangle", - "field_name": "triangle_h_102100" - }, - { - "orientation": "vertical", - "precision": 20, - "wkid": 102100, - "type": "triangle", - "field_name": "triangle_v_102100" - }, - { - "precision": 20, - "wkid": 102100, - "type": "square", - "field_name": "square_102100" - }] } """.stripMargin case EsFieldType.GeoShape => diff --git a/spatiotemporal-event-source/Dockerfile b/spatiotemporal-event-source/Dockerfile index 97089c5..850dc9f 100644 --- a/spatiotemporal-event-source/Dockerfile +++ b/spatiotemporal-event-source/Dockerfile @@ -3,4 +3,6 @@ FROM centos RUN yum install -y java-1.8.0-openjdk.x86_64 ADD ./data/taxi/taxi-simulation-01-25.csv /data/taxi/taxi-simulation-01-25.csv ADD ./data/taxi/taxi-route.csv /data/taxi/taxi-route.csv +ADD ./data/bus/bus928.csv /data/bus/bus928.csv +ADD ./data/vehicle/vehicle412.csv /data/vehicle/vehicle412.csv ADD ./target/scala-2.11/spatiotemporal-event-source-assembly-1.0.jar /jars/spatiotemporal-event-source-assembly-1.0.jar diff --git a/spatiotemporal-event-source/source01.json b/spatiotemporal-event-source/source01.json index 07eaa65..61fbd97 100644 --- a/spatiotemporal-event-source/source01.json +++ b/spatiotemporal-event-source/source01.json @@ -6,9 +6,10 @@ "forcePullImage": false } }, - "cmd": "java -Xms4096m -Xmx4096m -jar ./jars/spatiotemporal-event-source-assembly-1.0.jar broker-0.kafka.mesos:9671,broker-1.kafka.mesos:9543,broker-2.kafka.mesos:9142,broker-3.kafka.mesos:9602,broker-4.kafka.mesos:9472 taxi 3000 /data/taxi/taxi-simulation-01-25.csv 1 false", + "cmd": "java -Xms4096m -Xmx4096m -jar ./jars/spatiotemporal-event-source-assembly-1.0.jar broker-0.kafka.mesos:9995,broker-1.kafka.mesos:9185,broker-2.kafka.mesos:9398 taxi 3000 /data/taxi/taxi-simulation-01-25.csv 1 false", "id": "source01", "instances": 1, "cpus": 1.0, "mem": 5120 } +