This lab demonstrates using Synapse Analytics Spark to work with the Common Data Model.
After completing the lab, you will know how to interact with the Common Data Model in Synapse Analytics.
- Lab 04 - Working with the Common Data Model in Synapse Analytics
Before stepping through the exercises in this lab, make sure you have properly configured your Azure Synapse Analytics workspace. Perform the tasks below to configure the workspace.
NOTE
If you have already created and configured the Synapse Analytics workspace while running one of the other labs available in this repo, you must not perform this task again and you can move on to the next task. The labs are designed to share the Synapse Analytics workspace, so you only need to create it once.
Follow the instructions in Deploy your Azure Synapse Analytics workspace to create and configure the workspace.
Details coming soon ...
Our CDM data is already stored on Azure Data Lake Gen 2.
We have to open the corresponding container/folder:
To view the actual data, we can use a SQL query. Right click on the corresponding data folder and choose New SQL Script
then choose Select TOP 100 rows
You may right click and choose Preview
to visualize contents for the default.manifest.cdm.json
file.
This file points to our entity which is part of our subfolder SaleSmall
, see below.
You may right click and choose Preview
to visualize contents for the SaleSmall.cdm.json
file.
The data itself is stored in a subfolder: 2020-12-12
, see below.
Click the Run
button in the toolbar to execute the SQL query. Once executed, results are visible in the lower pane.
Alternately we can load the CDM data in a Spark dataframe
Choose a Spark pool first, then click the Run
button in the toolbar to run the notebook. Once executed, results are visible in the lower pane.
We will load the existing CDM data into Spark dataframes. Open a notebook and use the following python code to load the CDM data:
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime
storageAccountName = "asadatalake01.dfs.core.windows.net"
container = "wwi-02"
outputContainer = "wwi-02"
abfssRoot = "abfss://" + outputContainer + "@" + storageAccountName
folder1 = "/cdm-data/input"
#read CDM entities
df = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder1 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
df.printSchema()
df.select("*").show()
The loaded CDM data is now displayed as spark dataframes in the lower pane.
The dataframe schema matches the structure of our CDM entity.
Note how our manifest file default.manifest.cdm.json
points to our model
...
"entities" : [
{
"type" : "LocalEntity",
"entityName" : "SaleSmall",
"entityPath" : "SaleSmall/SaleSmall.cdm.json/SaleSmall",
}
]
...
which in turn contains our entity's definition SaleSmall
.
Some contents of SaleSmall.cdm.json
is skipped for brevity:
{
"definitions" : [
{
"entityName" : "SaleSmall",
"attributeContext" : {
"type" : "entity",
"name" : "SaleSmall",
"definition" : "resolvedFrom/SaleSmall",
"contents" : [
...
]
},
"hasAttributes" : [
{
"name" : "TransactionId",
"dataFormat" : "String"
},
{
"name" : "CustomerId",
"dataFormat" : "Int32"
},
{
"name" : "ProductId",
"dataFormat" : "Int16"
},
{
"name" : "Quantity",
"dataFormat" : "Byte"
},
{
"name" : "Price",
"dataFormat" : "Decimal"
},
{
"name" : "TotalAmount",
"dataFormat" : "Decimal"
},
{
"name" : "TransactionDate",
"dataFormat" : "Int32"
},
{
"name" : "ProfitAmount",
"dataFormat" : "Decimal"
},
{
"name" : "Hour",
"dataFormat" : "Byte"
},
{
"name" : "Minute",
"dataFormat" : "Byte"
},
{
"name" : "StoreId",
"dataFormat" : "Int16"
}
]
}
]
}
We will update existing CDM data by modifying the Spark dataframe schema.
from pyspark.sql.types import *
from pyspark.sql import functions as f, Row
from decimal import Decimal
from datetime import datetime
storageAccountName = "asadatalake01.dfs.core.windows.net"
container = "wwi-02"
outputContainer = "wwi-02"
abfssRoot = "abfss://" + outputContainer + "@" + storageAccountName
folder1 = "/cdm-data/input"
folder2 = "/cdm-data/update-implicit"
#read CDM
df = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder1 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
df.printSchema()
df.select("*").show()
#update dataframe/schema
df2 = df.withColumn("x4", f.lit(0))
df2.printSchema()
df2.select("*").show()
#write CDM; entity manifest is implicitly created based on df schema
(df2.write.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.option("format", "parquet")
.option("compression", "gzip")
.save())
readDf2 = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
readDf2.select("*").show()
This is now the new schema as shown by the spark dataframe:
Note that the CDM manifest was updated:
We will update existing CDM data by using a modified CDM manifest.
from pyspark.sql.types import *
from pyspark.sql import functions as f, Row
from decimal import Decimal
from datetime import datetime
storageAccountName = "asadatalake01.dfs.core.windows.net"
container = "wwi-02"
outputContainer = "wwi-02"
abfssRoot = "abfss://" + outputContainer + "@" + storageAccountName
folder1 = "/cdm-data/input"
folder2 = "/cdm-data/update-explicit"
#read CDM
df = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder1 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
df.printSchema()
df.select("*").show()
#update dataframe/schema
df2 = df.withColumn("x5", f.lit(0))
df2.printSchema()
df2.select("*").show()
#write CDM; entity manifest is explicitly passed as SaleSmall-Modified.cdm.json
(df2.write.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.option("entityDefinitionModelRoot", container + folder1) #root folder for our own CDM models
.option("entityDefinitionPath", "/salesmall/SaleSmall/SaleSmall-Modified.cdm.json/SaleSmall") #relative path for our own CDM model
.option("format", "parquet")
.option("compression", "gzip")
.save())
readDf2 = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
readDf2.select("*").show()
This is now the new schema as shown by the spark dataframe:
Note that the CDM manifest contains a member that matches our new column.
We will write new CDM data based on existing Spark dataframes.
We will infer the CDM manifest based on the dataframe's schema.
Open a notebook and use the following python code to save the CDM data:
storageAccountName = "asadatalake01.dfs.core.windows.net"
container = "wwi-02"
outputContainer = "wwi-02"
abfssRoot = "abfss://" + outputContainer + "@" + storageAccountName
# WARNING: if output folder exists, writer will fail with a java.lang.NullPointerException
folder1 = "/cdm-data/output-implicit"
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime
df = spark.read.parquet(abfssRoot + "/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet")
df.show(10)
df.printSchema()
#save dataframe using CDM format; entity manifest is implicitly created based on df schema
(df.write.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder1 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.option("format", "parquet")
.option("compression", "gzip")
.save())
readDf = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder1 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.load())
readDf.select("*").show()
Writing new CDM data based on existing CDM models is just as easy.
The CDM manifest is explicit here, user specified.
Open a notebook and use the following python code to save the CDM data:
storageAccountName = "asadatalake01.dfs.core.windows.net"
container = "wwi-02"
outputContainer = "wwi-02"
abfssRoot = "abfss://" + outputContainer + "@" + storageAccountName
# WARNING: if output folder exists, writer will fail with a java.lang.NullPointerException
folderInput = "/cdm-data/input"
folder2 = "/cdm-data/output-explicit"
from pyspark.sql.types import *
from pyspark.sql import functions, Row
from decimal import Decimal
from datetime import datetime
df2 = spark.read.parquet(abfssRoot + "/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet")
df2.show(10)
df2.printSchema()
#save dataframe using CDM format; entity manifest is explicitly passed as SaleSmall/SaleSmall.cdm.json
(df2.write.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.option("entityDefinitionModelRoot", container + folderInput) #root folder for our own CDM models
.option("entityDefinitionPath", "/salesmall/SaleSmall/SaleSmall.cdm.json/SaleSmall") #relative path for our own CDM model
.mode("overwrite")
.save())
readDf2 = (spark.read.format("com.microsoft.cdm")
.option("storage", storageAccountName)
.option("manifestPath", container + folder2 + "/salesmall/default.manifest.cdm.json")
.option("entity", "SaleSmall")
.option("entityDefinitionModelRoot", container + folderInput)
.load())
readDf2.select("*").show()
Go to the Integrate
hub, press the +
button then choose Pipeline
to create a new pipeline.
Name your pipeline, then pick an activity from the Activities
list: drag a Notebook
element found under the Synapse
group.
In the lower pane, choose the Settings
tab and select the notebook you want to be used by our pipeline.
In our example we chose the notebook that converts raw data to cdm format.
Press Add trigger
to use a condition that will automatically launch your new pipeline.
Press New
to create a new trigger.
- Name your trigger, and choose its type as
Event
. - Choose the Azure subscription, Storage account name, Container name.
- Setup a path prefix that will be used to detect changes in our datalake in a specific location.
- Check the
Blob created
Event to make sure that pipeline is launched when new blob appears in your blob path.
Save your trigger, then click on the Validate
button and then press Publish
to save your new pipeline.
We chose to use a trigger that detects changes in our datalake. Which means that everytime we get new data in our datalake, our pipeline will run. In our case, the notebook will be executed, and will convert the raw data to the CDM format.
Follow the instructions in Clean-up your subscription to clean-up your environment after the hands-on lab.
To learn more about the topics covered in this lab, use these resources:
- Azure.Analytics.Synapse.Artifacts Namespace Classes
- Using the Spark CDM Connector
- Common Data Model (CDM) Schema
In case you encounter any issues with the content in this repository, please follow the How to report issues guideline. We will try to address them as soon as possible. Please check your open issues to learn about their status.