Skip to content

Saving geoJSON Data To Cassandra Using User-Defined Types, Spark Dataframes and Spark SQL

Notifications You must be signed in to change notification settings

ajbd2106/geoJSON_to_Cassandra_using_Spark

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 

Repository files navigation

Saving geoJSON Data To Cassandra Using User-Defined Types, Spark Dataframes and Spark SQL

The geoJSON data format is described at geojson.org as "a format for encoding a variety of geographic data structures".

In this example I'll be using a set of oil/gas well data supplied by the State of Colorado describing approx 110,000 wells in the state.

You can find a copy of the data at my GitHub here

I'll start out with some examples of how to manipulate the data loaded into a dataframe, followed by the complete exercise to clean up and store the JSON data in cassandra.

The format of the geoJSON source data looks like this:

{
	"type": "Feature",
	"properties": {
		"API": "12508888",
		"API_Label": "05-125-08888",
		"Operator": "OMIMEX PETROLEUM INC",
		"Well_Title": "8-9-5-45 FERGUSON",
		"Facil_Id": 0,                          <-- integer
		"Facil_Type": "WELL",
		"Facil_Stat": "PR",
		"Operat_Num": 66190,                    <-- big int
		"Well_Num": "8-9-5-45",
		"Well_Name": "FERGUSON",
		"Field_Code": 1970,                     <-- integer
		"Dist_N_S": 1980,                       <-- integer
		"Dir_N_S": "N",
		"Dist_E_W": 600,                        <-- integer
		"Dir_E_W": "E",
		"Qtr_Qtr": "SENE",
		"Section": "9",
		"Township": "5N",
		"Range": "45W",
		"Meridian": "6",
		"Latitude": 40.419416,                  <-- decimal
		"Longitude": -102.379999,               <-- decimal
		"Ground_Ele": 0,                        <-- decimal
		"Utm_X": 722281,                        <-- decimal
		"Utm_Y": 4477606,                       <-- decimal
		"Loc_Qual": "PLANNED Footage",
		"Field_Name": "BALLYNEAL",
		"Api_Seq": "08888",
		"API_County": "125",
		"Loc_ID": 304702,                       <-- decimal
		"Loc_Name": "FERGUSON-65N45W 9SENE",
		"Spud_Date": "2004\/09\/07",
		"Citing_Typ": "ACTUAL",
		"Max_MD": 2727,                         <-- decimal
		"Max_TVD": 2727                         <-- decimal
	},
	"geometry": {
		"type": "Point",
		"coordinates": [722281.0, 4477606.0]
	}
}

Our objective is to load the data into Cassandra so that we can use it in other applications. For this we are going to use Apache Spark, via DataFrames and Spark SQL.

For this exercise I'm using DataStax Enterprise 5.0.1 that contains Apache Cassandra 3.0.7 and comes integrated with a distribution of Apache Spark 1.6.1

Start The Spark REPL

The Spark REPL (Read Evaluate Print Loop) is a shell that allows you to interact with Spark, principally using Scala.

# dse spark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_72)
Type in expressions to have them evaluated.
Type :help for more information.
Initializing SparkContext with MASTER: spark://127.0.0.1:7077
Created spark context..
Spark context available as sc.
Hive context available as sqlContext. Will be initialized on first use.

Read The json file into a Spark Dataframe

Using the 'read' method gives us the json file in a dataframe:
scala> val df = sqlContext.read.json("file:///tmp/colorado_wells.geojson")
You will get some system output like this:
df: org.apache.spark.sql.DataFrame = [_corrupt_record: string, geometry: struct<coordinates:array,type:string>, properties: struct<API:string,API_County:string,API_Label:string,Api_Seq:string,Citing_Typ:string,Dir_E_W:string,Dir_N_S:string,Dist_E_W:bigint,Dist_N_S:bigint,Facil_Id:bigint,Facil_Stat:string,Facil_Type:string,Field_Code:bigint,Field_Name:string,Ground_Ele:bigint,Latitude:double,Loc_ID:bigint,Loc_Name:string,Loc_Qual:string,Longitude:double,Max_MD:bigint,Max_TVD:bigint,Meridian:string,Operat_Num:bigint,Operator:string,Qtr_Qtr:string,Range:string,Section:string,Spud_Date:string,Township:string,Utm_X:bigint,Utm_Y:bigint,Well_Name:string,Well_Num:string,Well_Title:string>, type: string]

We can now look at the schema of the JSON data that we've loaded into the dataframe:

scala> df.printSchema()

root
|-- _corrupt_record: string (nullable = true)
|-- geometry: struct (nullable = true)
| |-- coordinates: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- type: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- API: string (nullable = true)
| |-- API_County: string (nullable = true)
| |-- API_Label: string (nullable = true)
| |-- Api_Seq: string (nullable = true)
| |-- Citing_Typ: string (nullable = true)
| |-- Dir_E_W: string (nullable = true)
| |-- Dir_N_S: string (nullable = true)
| |-- Dist_E_W: long (nullable = true)
| |-- Dist_N_S: long (nullable = true)
| |-- Facil_Id: long (nullable = true)
| |-- Facil_Stat: string (nullable = true)
| |-- Facil_Type: string (nullable = true)
| |-- Field_Code: long (nullable = true)
| |-- Field_Name: string (nullable = true)
| |-- Ground_Ele: long (nullable = true)
| |-- Latitude: double (nullable = true)
| |-- Loc_ID: long (nullable = true)
| |-- Loc_Name: string (nullable = true)
| |-- Loc_Qual: string (nullable = true)
| |-- Longitude: double (nullable = true)
| |-- Max_MD: long (nullable = true)
| |-- Max_TVD: long (nullable = true)
| |-- Meridian: string (nullable = true)
| |-- Operat_Num: long (nullable = true)
| |-- Operator: string (nullable = true)
| |-- Qtr_Qtr: string (nullable = true)
| |-- Range: string (nullable = true)
| |-- Section: string (nullable = true)
| |-- Spud_Date: string (nullable = true)
| |-- Township: string (nullable = true)
| |-- Utm_X: long (nullable = true)
| |-- Utm_Y: long (nullable = true)
| |-- Well_Name: string (nullable = true)
| |-- Well_Num: string (nullable = true)
| |-- Well_Title: string (nullable = true)
|-- type: string (nullable = true)

And we can examine the data that has been read:

scala> df.show()

+--------------------+--------------------+--------------------+-------+
|     _corrupt_record|            geometry|          properties|   type|
+--------------------+--------------------+--------------------+-------+
|                   {|                null|                null|   null|
|"type": "FeatureC...|                null|                null|   null|
|"crs": { "type": ...|                null|                null|   null|
|       "features": [|                null|                null|   null|
|                null|[WrappedArray(722...|[12508888,125,05-...|Feature|
|                null|[WrappedArray(524...|[12323461,123,05-...|Feature|
|                null|[WrappedArray(530...|[12323462,123,05-...|Feature|
|                null|[WrappedArray(523...|[12323463,123,05-...|Feature|
|                null|[WrappedArray(523...|[12323464,123,05-...|Feature|
|                null|[WrappedArray(235...|[04511663,045,05-...|Feature|
|                null|[WrappedArray(235...|[04511664,045,05-...|Feature|
|                null|[WrappedArray(236...|[04511665,045,05-...|Feature|
|                null|[WrappedArray(236...|[04511666,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511667,045,05-...|Feature|
|                null|[WrappedArray(524...|[12323467,123,05-...|Feature|
|                null|[WrappedArray(494...|[01306522,013,05-...|Feature|
|                null|[WrappedArray(244...|[04511668,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511669,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511670,045,05-...|Feature|
|                null|[WrappedArray(245...|[04511671,045,05-...|Feature|
+--------------------+--------------------+--------------------+-------+
only showing top 20 rows

When the data is in the dataframe we can register it as a SparkSQL table (I've called it "jsontable") so that we can select data using SQL e.g. API and geometry coordinates for all wells into another dataframe called "well_locs":

scala> df.registerTempTable("jsonTable")

Now we can play with the data using SQL syntax:

scala> val well_locs = sqlContext.sql("SELECT properties.API, geometry.coordinates FROM jsontable")
well_locs: org.apache.spark.sql.DataFrame = [API: string, coordinates: array]

scala> well_locs.show()

+--------+--------------------+
|     API|         coordinates|
+--------+--------------------+
|    null|                null|
|    null|                null|
|    null|                null|
|    null|                null|
|12508888|[722281.0, 447760...|
|12323461|[524048.0, 444462...|
|12323462|[530187.0, 445971...|
|12323463|[523218.0, 444455...|
|12323464|[523598.0, 444340...|
|04511663|[235668.0, 437192...|
|04511664|[235672.0, 437193...|
|04511665|[236287.0, 437168...|
|04511666|[236284.0, 437168...|
|04511667|[244604.0, 437456...|
|12323467|[524594.0, 445802...|
|01306522|[494622.0, 444139...|
|04511668|[244666.0, 437484...|
|04511669|[244661.0, 437484...|
|04511670|[244656.0, 437484...|
|04511671|[245144.0, 437490...|
+--------+--------------------+
only showing top 20 rows

Spark doesn't understand the first element (column) in the geoJSON structure, the FeatureCollection wrapper, and shows it as a corrupt column. We will remove this when we save to Cassandra:

scala> val x = sqlContext.sql("SELECT _corrupt_record FROM jsontable")
x: org.apache.spark.sql.DataFrame = [_corrupt_record: string]

scala> x.show()

+--------------------+
|     _corrupt_record|
+--------------------+
|                   {|
|"type": "FeatureC...|
|"crs": { "type": ...|
|       "features": [|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows

There are some null records in there that we don't want to see. We can create "well_locs" again and exclude those null records from the query results:

scala> val well_locs = sqlContext.sql("SELECT properties.API, geometry.coordinates FROM jsontable where properties.API is not null")
well_locs: org.apache.spark.sql.DataFrame = [API: string, coordinates: array]

scala> well_locs.show()

+--------+--------------------+
|     API|         coordinates|
+--------+--------------------+
|12508888|[722281.0, 447760...|
|12323461|[524048.0, 444462...|
|12323462|[530187.0, 445971...|
|12323463|[523218.0, 444455...|
|12323464|[523598.0, 444340...|
|04511663|[235668.0, 437192...|
|04511664|[235672.0, 437193...|
|04511665|[236287.0, 437168...|
|04511666|[236284.0, 437168...|
|04511667|[244604.0, 437456...|
|12323467|[524594.0, 445802...|
|01306522|[494622.0, 444139...|
|04511668|[244666.0, 437484...|
|04511669|[244661.0, 437484...|
|04511670|[244656.0, 437484...|
|04511671|[245144.0, 437490...|
|04511672|[245141.0, 437490...|
|04511673|[265880.0, 437586...|
|10310665|[204771.0, 441822...|
|04511674|[233487.0, 438827...|
+--------+--------------------+
only showing top 20 rows

We can use Spark SQL to extract geoJSON properties from our Spark SQL table - lets look at the API values for the wells in the dataset:

scala> val API = sqlContext.sql("SELECT properties.API FROM jsonTable")
API: org.apache.spark.sql.DataFrame = [API: string]

scala> API.collect.foreach(println)

-> prints all ~84,000 API numbers

There may be null records in the source data - these are evident as nulls at the beginning and the end, and we need to remove them before we write the data to the Cassandra database. We can see the nulls at the start:

scala> API.collect.take(10).foreach(println)

[null]
[null]
[null]
[null]
[12508888]
[12323461]
[12323462]
[12323463]
[12323464]
[04511663]

There are some nulls at the end too. We can use Spark SQL to extract the well API numbers from the table and count them - actually there are two ways we can do this: (1) create a new dataframe and count the rows, or (2) directly count the rows in the table using SQL:

scala> val API = sqlContext.sql("SELECT properties.API FROM jsonTable")
scala> API.count
res17: Long = 110280

or

scala> sqlContext.sql("SELECT count (*) FROM jsonTable").show
+------+
|   _c0|
+------+
|110280|
+------+

Do the same thing, removing null records - we can see that the 6 null records have been removed. With a dataframe:

scala> val API = sqlContext.sql("SELECT properties.API FROM jsonTable where properties.API is not null")
API: org.apache.spark.sql.DataFrame = [API: string]

scala> API.count
res21: Long = 110274

...and with SQL:

scala> sqlContext.sql("SELECT count (*) FROM jsonTable where properties.API is not null").show
+------+                                                                        
|   _c0|
+------+
|110274|
+------+

Tidy Up The geoJSON Data And Save It To Cassandra

OK, so we've seen how we can manipulate and examine the data in our source geoJSON data that we loaded into a Spark dataframe and then represented as a Spark SQL table. Now we can go through the process of cleaning up the data and formatting it correctly so that we can load it into Cassandra.

1. Process The geoJSON Source File To Convert All Field (Column) Names To Lower Case

There appears an issue with upper case column names when saving from Spark to Cassandra (not sure whether it's the Spark Cassandra connector or Spark itself - there's a JIRA - I'm using Spark 1.6.1 here). So we first need to convert our column names to lower case. I've used a script containing the following sed commands:
sed -i -e 's/"Api_Seq"/"api_seq"/' colorado_wells.geojson
sed -i -e 's/"API_County"/"api_county"/' colorado_wells.geojson
sed -i -e 's/"API_Label"/"api_label"/' colorado_wells.geojson
sed -i -e 's/"API"/"api"/' colorado_wells.geojson
sed -i -e 's/"Api"/"api"/' colorado_wells.geojson
sed -i -e 's/"APi"/"api"/' colorado_wells.geojson
sed -i -e 's/"Operator"/"operator"/' colorado_wells.geojson
sed -i -e 's/"Well_Title"/"well_title"/' colorado_wells.geojson
sed -i -e 's/"Facil_Id"/"facil_id"/' colorado_wells.geojson
sed -i -e 's/"Facil_Type"/"facil_type"/' colorado_wells.geojson
sed -i -e 's/"Facil_Stat"/"facil_stat"/' colorado_wells.geojson
sed -i -e 's/"Operat_Num"/"operat_num"/' colorado_wells.geojson
sed -i -e 's/"Well_Num"/"well_num"/' colorado_wells.geojson
sed -i -e 's/"Well_Name"/"well_name"/' colorado_wells.geojson
sed -i -e 's/"Field_Code"/"field_code"/' colorado_wells.geojson
sed -i -e 's/"Dist_N_S"/"dist_n_s"/' colorado_wells.geojson
sed -i -e 's/"Dir_N_S"/"dir_n_s"/' colorado_wells.geojson
sed -i -e 's/"Dist_E_W"/"dist_e_w"/' colorado_wells.geojson
sed -i -e 's/"Dir_E_W"/"dir_e_w"/' colorado_wells.geojson
sed -i -e 's/"Qtr_Qtr"/"qtr_qtr"/' colorado_wells.geojson
sed -i -e 's/"Section"/"section"/' colorado_wells.geojson
sed -i -e 's/"Township"/"township"/' colorado_wells.geojson
sed -i -e 's/"Range"/"range"/' colorado_wells.geojson
sed -i -e 's/"Meridian"/"meridian"/' colorado_wells.geojson
sed -i -e 's/"Latitude"/"latitude"/' colorado_wells.geojson
sed -i -e 's/"Longitude"/"longitude"/' colorado_wells.geojson
sed -i -e 's/"Ground_Ele"/"ground_ele"/' colorado_wells.geojson
sed -i -e 's/"Utm_X"/"utm_x"/' colorado_wells.geojson
sed -i -e 's/"Utm_Y"/"utm_y"/' colorado_wells.geojson
sed -i -e 's/"Loc_Qual"/"loc_qual"/' colorado_wells.geojson
sed -i -e 's/"Field_Name"/"field_name"/' colorado_wells.geojson
sed -i -e 's/"Loc_ID"/"loc_id"/' colorado_wells.geojson
sed -i -e 's/"Loc_Name"/"loc_name"/' colorado_wells.geojson
sed -i -e 's/"Spud_Date"/"spud_date"/' colorado_wells.geojson
sed -i -e 's/"Citing_Typ"/"citing_typ"/' colorado_wells.geojson
sed -i -e 's/"Max_MD"/"max_md"/' colorado_wells.geojson
sed -i -e 's/"Max_TVD"/"max_tvd"/' colorado_wells.geojson

2. Load The geoJSON File Into A Spark DataFrame

scala> val df = sqlContext.read.json("file:///tmp/colorado_wells.geojson")

3. Identify The Corrupt Record

The FeatureCollection wrapper of the JSON object isn't understood and needs to be removed:
scala> df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- api: string (nullable = true)
 |    |-- api_county: string (nullable = true)
 |    |-- api_label: string (nullable = true)
 |    |-- api_seq: string (nullable = true)
 |    |-- citing_typ: string (nullable = true)
 |    |-- dir_e_w: string (nullable = true)
 |    |-- dir_n_s: string (nullable = true)
 |    |-- dist_e_w: long (nullable = true)
 |    |-- dist_n_s: long (nullable = true)
 |    |-- facil_id: long (nullable = true)
 |    |-- facil_stat: string (nullable = true)
 |    |-- facil_type: string (nullable = true)
 |    |-- field_code: long (nullable = true)
 |    |-- field_name: string (nullable = true)
 |    |-- ground_ele: long (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- loc_id: long (nullable = true)
 |    |-- loc_name: string (nullable = true)
 |    |-- loc_qual: string (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- max_md: long (nullable = true)
 |    |-- max_tvd: long (nullable = true)
 |    |-- meridian: string (nullable = true)
 |    |-- operat_num: long (nullable = true)
 |    |-- operator: string (nullable = true)
 |    |-- qtr_qtr: string (nullable = true)
 |    |-- range: string (nullable = true)
 |    |-- section: string (nullable = true)
 |    |-- spud_date: string (nullable = true)
 |    |-- township: string (nullable = true)
 |    |-- utm_x: long (nullable = true)
 |    |-- utm_y: long (nullable = true)
 |    |-- well_name: string (nullable = true)
 |    |-- well_num: string (nullable = true)
 |    |-- well_title: string (nullable = true)
 |-- type: string (nullable = true)

scala> df.show()

+--------------------+--------------------+--------------------+-------+
|     _corrupt_record|            geometry|          properties|   type|
+--------------------+--------------------+--------------------+-------+
|                   {|                null|                null|   null|
|"type": "FeatureC...|                null|                null|   null|
|"crs": { "type": ...|                null|                null|   null|
|       "features": [|                null|                null|   null|
|                null|[WrappedArray(722...|[12508888,125,05-...|Feature|
|                null|[WrappedArray(524...|[12323461,123,05-...|Feature|
|                null|[WrappedArray(530...|[12323462,123,05-...|Feature|
|                null|[WrappedArray(523...|[12323463,123,05-...|Feature|
|                null|[WrappedArray(523...|[12323464,123,05-...|Feature|
|                null|[WrappedArray(235...|[04511663,045,05-...|Feature|
|                null|[WrappedArray(235...|[04511664,045,05-...|Feature|
|                null|[WrappedArray(236...|[04511665,045,05-...|Feature|
|                null|[WrappedArray(236...|[04511666,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511667,045,05-...|Feature|
|                null|[WrappedArray(524...|[12323467,123,05-...|Feature|
|                null|[WrappedArray(494...|[01306522,013,05-...|Feature|
|                null|[WrappedArray(244...|[04511668,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511669,045,05-...|Feature|
|                null|[WrappedArray(244...|[04511670,045,05-...|Feature|
|                null|[WrappedArray(245...|[04511671,045,05-...|Feature|
+--------------------+--------------------+--------------------+-------+
only showing top 20 rows

Register the dataframe as a SparkSQL table so that we can see it more easily:

scala> df.registerTempTable("jsonTable");

4. Drop The _corrupt_record Column From Our DataFrame:

We need to get rid of that _corrupt_record column - we do that using the dataframe drop method.
scala> val df2=df.drop(df.col("_corrupt_record"))
df2: org.apache.spark.sql.DataFrame = [geometry: struct<coordinates:array,type:string>, properties: struct<API:string,API_County:string,API_Label:string,Api_Seq:string,Citing_Typ:string,Dir_E_W:string,Dir_N_S:string,Dist_E_W:bigint,Dist_N_S:bigint,Facil_Id:bigint,Facil_Stat:string,Facil_Type:string,Field_Code:bigint,Field_Name:string,Ground_Ele:bigint,Latitude:double,Loc_ID:bigint,Loc_Name:string,Loc_Qual:string,Longitude:double,Max_MD:bigint,Max_TVD:bigint,Meridian:string,Operat_Num:bigint,Operator:string,Qtr_Qtr:string,Range:string,Section:string,Spud_Date:string,Township:string,Utm_X:bigint,Utm_Y:bigint,Well_Name:string,Well_Num:string,Well_Title:string>, type: string]

scala> df2.show()

+--------------------+--------------------+-------+ | geometry| properties| type| +--------------------+--------------------+-------+ | null| null| null| | null| null| null| | null| null| null| | null| null| null| |[WrappedArray(722...|[12508888,125,05-...|Feature| |[WrappedArray(524...|[12323461,123,05-...|Feature| |[WrappedArray(530...|[12323462,123,05-...|Feature| |[WrappedArray(523...|[12323463,123,05-...|Feature| |[WrappedArray(523...|[12323464,123,05-...|Feature| |[WrappedArray(235...|[04511663,045,05-...|Feature| |[WrappedArray(235...|[04511664,045,05-...|Feature| |[WrappedArray(236...|[04511665,045,05-...|Feature| |[WrappedArray(236...|[04511666,045,05-...|Feature| |[WrappedArray(244...|[04511667,045,05-...|Feature| |[WrappedArray(524...|[12323467,123,05-...|Feature| |[WrappedArray(494...|[01306522,013,05-...|Feature| |[WrappedArray(244...|[04511668,045,05-...|Feature| |[WrappedArray(244...|[04511669,045,05-...|Feature| |[WrappedArray(244...|[04511670,045,05-...|Feature| |[WrappedArray(245...|[04511671,045,05-...|Feature| +--------------------+--------------------+-------+ only showing top 20 rows

5. Filter Out The Null Records

Remove the unwanted null records from the data set:
scala> val df3=df2.filter("type is not null")
scala> df3.show()

+--------------------+--------------------+-------+ | geometry| properties| type| +--------------------+--------------------+-------+ |[WrappedArray(722...|[12508888,125,05-...|Feature| |[WrappedArray(524...|[12323461,123,05-...|Feature| |[WrappedArray(530...|[12323462,123,05-...|Feature| |[WrappedArray(523...|[12323463,123,05-...|Feature| |[WrappedArray(523...|[12323464,123,05-...|Feature| |[WrappedArray(235...|[04511663,045,05-...|Feature| |[WrappedArray(235...|[04511664,045,05-...|Feature| |[WrappedArray(236...|[04511665,045,05-...|Feature| |[WrappedArray(236...|[04511666,045,05-...|Feature| |[WrappedArray(244...|[04511667,045,05-...|Feature| |[WrappedArray(524...|[12323467,123,05-...|Feature| |[WrappedArray(494...|[01306522,013,05-...|Feature| |[WrappedArray(244...|[04511668,045,05-...|Feature| |[WrappedArray(244...|[04511669,045,05-...|Feature| |[WrappedArray(244...|[04511670,045,05-...|Feature| |[WrappedArray(245...|[04511671,045,05-...|Feature| |[WrappedArray(245...|[04511672,045,05-...|Feature| |[WrappedArray(265...|[04511673,045,05-...|Feature| |[WrappedArray(204...|[10310665,103,05-...|Feature| |[WrappedArray(233...|[04511674,045,05-...|Feature| +--------------------+--------------------+-------+ only showing top 20 rows

Check we have the correct record count:

scala> df3.count
res12: Long = 110274

Create a new SparkSQL table for our new filtered dataframe df3:

scala> df3.registerTempTable("jsonTable");
scala> val API = sqlContext.sql("SELECT properties.API FROM jsonTable")
API: org.apache.spark.sql.DataFrame = [API: string]

scala> API.show()
+--------+
|     API|
+--------+
|12508888|
|12323461|
|12323462|
|12323463|
|12323464|
|04511663|
|04511664|
|04511665|
|04511666|
|04511667|
|12323467|
|01306522|
|04511668|
|04511669|
|04511670|
|04511671|
|04511672|
|04511673|
|10310665|
|04511674|
+--------+
only showing top 20 rows

6. Add A Primary Key Column

We need a primary key for our Cassandra table. I'm going to use the well API number as my partition key - it's unique and will distribute the data nicely around our cluster.

Here's how we add a column to a dataframe, selecting the values from the existing properties.API column:

scala> val df4 = df3.withColumn("api", df3("properties.API"))

NB you can also add literal columns using this syntax:

dataframe.withColumn("newName",lit("newValue")) )

After adding the new column we can see it in the schema - the API column is now a new column at the top level rather than a sub-element of properties:

scala> df4.show()
+--------------------+--------------------+-------+--------+
|            geometry|          properties|   type|     api|
+--------------------+--------------------+-------+--------+
|[WrappedArray(722...|[12508888,125,05-...|Feature|12508888|
|[WrappedArray(524...|[12323461,123,05-...|Feature|12323461|
|[WrappedArray(530...|[12323462,123,05-...|Feature|12323462|
|[WrappedArray(523...|[12323463,123,05-...|Feature|12323463|
|[WrappedArray(523...|[12323464,123,05-...|Feature|12323464|
|[WrappedArray(235...|[04511663,045,05-...|Feature|04511663|
|[WrappedArray(235...|[04511664,045,05-...|Feature|04511664|
|[WrappedArray(236...|[04511665,045,05-...|Feature|04511665|
|[WrappedArray(236...|[04511666,045,05-...|Feature|04511666|
|[WrappedArray(244...|[04511667,045,05-...|Feature|04511667|
|[WrappedArray(524...|[12323467,123,05-...|Feature|12323467|
|[WrappedArray(494...|[01306522,013,05-...|Feature|01306522|
|[WrappedArray(244...|[04511668,045,05-...|Feature|04511668|
|[WrappedArray(244...|[04511669,045,05-...|Feature|04511669|
|[WrappedArray(244...|[04511670,045,05-...|Feature|04511670|
|[WrappedArray(245...|[04511671,045,05-...|Feature|04511671|
|[WrappedArray(245...|[04511672,045,05-...|Feature|04511672|
|[WrappedArray(265...|[04511673,045,05-...|Feature|04511673|
|[WrappedArray(204...|[10310665,103,05-...|Feature|10310665|
|[WrappedArray(233...|[04511674,045,05-...|Feature|04511674|
+--------------------+--------------------+-------+--------+
only showing top 20 rows

7. Create A New SparkSQL Table For The Dataframe df4

Register a new SparkSQL table based on the new dataframe we just created that contains the primary key column we need:
scala> df4.registerTempTable("jsonTable")

And again, we can check the data is there:

scala> val API = sqlContext.sql("SELECT api, properties.api FROM jsonTable")
API: org.apache.spark.sql.DataFrame = [api: string, api: string]

scala> API.show()
+--------+--------+
|     api|     api|
+--------+--------+
|12508888|12508888|
|12323461|12323461|
|12323462|12323462|
|12323463|12323463|
|12323464|12323464|
|04511663|04511663|
|04511664|04511664|
|04511665|04511665|
|04511666|04511666|
|04511667|04511667|
|12323467|12323467|
|01306522|01306522|
|04511668|04511668|
|04511669|04511669|
|04511670|04511670|
|04511671|04511671|
|04511672|04511672|
|04511673|04511673|
|10310665|10310665|
|04511674|04511674|
+--------+--------+
only showing top 20 rows

8. Use The SparkSQL Table To Build A New DF With The Columns In The Correct Order

We want the columns in our dataframe to match the layout of the table we are going to save into - we can achieve that by selecting the data into a new dataframe from our latest SparkSQL table:

scala> val wells=sqlContext.sql("SELECT api,geometry, properties, type from jsontable")

wells: org.apache.spark.sql.DataFrame = [api: string, geometry: struct, properties: struct, type: string]

And here is our nicely formatted dataset ready to go into Cassandra:

scala> wells.show()
+--------+--------------------+--------------------+-------+
|     api|            geometry|          properties|   type|
+--------+--------------------+--------------------+-------+
|12508888|[WrappedArray(722...|[12508888,125,05-...|Feature|
|12323461|[WrappedArray(524...|[12323461,123,05-...|Feature|
|12323462|[WrappedArray(530...|[12323462,123,05-...|Feature|
|12323463|[WrappedArray(523...|[12323463,123,05-...|Feature|
|12323464|[WrappedArray(523...|[12323464,123,05-...|Feature|
|04511663|[WrappedArray(235...|[04511663,045,05-...|Feature|
|04511664|[WrappedArray(235...|[04511664,045,05-...|Feature|
|04511665|[WrappedArray(236...|[04511665,045,05-...|Feature|
|04511666|[WrappedArray(236...|[04511666,045,05-...|Feature|
|04511667|[WrappedArray(244...|[04511667,045,05-...|Feature|
|12323467|[WrappedArray(524...|[12323467,123,05-...|Feature|
|01306522|[WrappedArray(494...|[01306522,013,05-...|Feature|
|04511668|[WrappedArray(244...|[04511668,045,05-...|Feature|
|04511669|[WrappedArray(244...|[04511669,045,05-...|Feature|
|04511670|[WrappedArray(244...|[04511670,045,05-...|Feature|
|04511671|[WrappedArray(245...|[04511671,045,05-...|Feature|
|04511672|[WrappedArray(245...|[04511672,045,05-...|Feature|
|04511673|[WrappedArray(265...|[04511673,045,05-...|Feature|
|10310665|[WrappedArray(204...|[10310665,103,05-...|Feature|
|04511674|[WrappedArray(233...|[04511674,045,05-...|Feature|
+--------+--------------------+--------------------+-------+
only showing top 20 rows

scala> wells.printSchema()
root
 |-- api: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- api: string (nullable = true)
 |    |-- api_county: string (nullable = true)
 |    |-- api_label: string (nullable = true)
 |    |-- api_seq: string (nullable = true)
 |    |-- citing_typ: string (nullable = true)
 |    |-- dir_e_w: string (nullable = true)
 |    |-- dir_n_s: string (nullable = true)
 |    |-- dist_e_w: long (nullable = true)
 |    |-- dist_n_s: long (nullable = true)
 |    |-- facil_id: long (nullable = true)
 |    |-- facil_stat: string (nullable = true)
 |    |-- facil_type: string (nullable = true)
 |    |-- field_code: long (nullable = true)
 |    |-- field_name: string (nullable = true)
 |    |-- ground_ele: long (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- loc_id: long (nullable = true)
 |    |-- loc_name: string (nullable = true)
 |    |-- loc_qual: string (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- max_md: long (nullable = true)
 |    |-- max_tvd: long (nullable = true)
 |    |-- meridian: string (nullable = true)
 |    |-- operat_num: long (nullable = true)
 |    |-- operator: string (nullable = true)
 |    |-- qtr_qtr: string (nullable = true)
 |    |-- range: string (nullable = true)
 |    |-- section: string (nullable = true)
 |    |-- spud_date: string (nullable = true)
 |    |-- township: string (nullable = true)
 |    |-- utm_x: long (nullable = true)
 |    |-- utm_y: long (nullable = true)
 |    |-- well_name: string (nullable = true)
 |    |-- well_num: string (nullable = true)
 |    |-- well_title: string (nullable = true)
 |-- type: string (nullable = true)

9. Create The Cassandra Database Schema

We need to create the table to store our data. The table structure needs to reflect the nested nature of the JSON data that we want to store.

Create A Cassandra Keyspace

I'm running this on a single node of Cassandra and Spark. If you're using multiple nodes you can increase the replication factor as required. If you're using multiple datacentres you should replace the replication strategy with 'NetworkTopologyStrategy'.
CREATE KEYSPACE IF NOT EXISTS wells WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
USE wells;

Create A User-Defined Type For The Well Properties Structure

CREATE TYPE well_data (
api text,
api_Label text,
operator text,
well_title text,
facil_Id int,
facil_type text,
facil_stat text,
operat_num bigint,
well_num text,
well_name text,
field_code int,
dist_n_s int,
dir_n_s text,
dist_e_w int,
dir_e_w text,
qtr_qtr text,
section text,
township text,
range text,
meridian text,
latitude decimal,
longitude decimal,
ground_ele decimal,
utm_x decimal,
utm_y decimal,
loc_qual text,
field_name text,
api_seq text,
api_county text,
loc_id decimal,
loc_name text,
spud_date text,
citing_typ text,
max_MD decimal,
max_tvd decimal);

Create A User-Defined Type For the Geometry Structure

CREATE TYPE geometry_data (		
type text,
coordinates list);

Create The Wells Table

CREATE TABLE wells (
api text,
type text,
properties frozen,
geometry frozen,
PRIMARY KEY(api));

Let's test that the table matches the JSON layout using a single record insert:

INSERT into wells.wells JSON '{"API": "12508888", "type": "Feature","properties": {"API": "12508888","API_Label": "05-125-08888","Operator": "OMIMEX PETROLEUM INC","Well_Title": "8-9-5-45 FERGUSON","Facil_Id": "0","Facil_Type": "WELL","Facil_Stat": "PR","Operat_Num": "66190","Well_Num": "8-9-5-45","Well_Name": "FERGUSON","Field_Code": "1970","Dist_N_S": "1980","Dir_N_S": "N","Dist_E_W": "600","Dir_E_W": "E","Qtr_Qtr": "SENE","Section": "9","Township": "5N","Range": "45W","Meridian": "6","Latitude": "40.419416","Longitude": "-102.379999","Ground_Ele": 0,"Utm_X": "722281","Utm_Y": "4477606","Loc_Qual": "PLANNED Footage","Field_Name": "BALLYNEAL","Api_Seq": "08888","API_County": "125","Loc_ID": "304702","Loc_Name": "FERGUSON-65N45W 9SENE","Spud_Date": "2004\/09\/07","Citing_Typ": "ACTUAL","Max_MD": "2727","Max_TVD": "2727"},"geometry": {"type": "Point","coordinates": [722281.0, 4477606.0]}}';

Update A Record Using An Upsert

INSERT into wells.wells JSON '{"API": "12508888", "type": "Feature","properties": {"Operator": "CAPIREX PETROLEUM INC"}}';

10. Write The Dataframe Back to Cassandra

Now that we have the data in a correctly formatted dataframe we can write it to a Cassandra table using the Spark Cassandra connector.
scala> wells.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "wells", "keyspace" -> "wells")).save()
This command expects the table to be empty. There are other options for the save command that you can explore for example
df.write.format.options.mode(SaveMode.Append).save()

Once the data is saved into Cassandra, go into cqlsh and query the records in the wells.wells table.

Have fun!

11. Reading From A Cassandra Table

Of course, you may also want to read data BACK into a dataframe from a Cassandra table. To achieve this we can use the following Scala command:
val df5 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "wells", "keyspace" -> "wells")).load()
And we can demonstrate that we have all the records returned in our new dataframe:
scala> df5.count()
res29: Long = 110274

About

Saving geoJSON Data To Cassandra Using User-Defined Types, Spark Dataframes and Spark SQL

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published