Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates for naming consistency, formatting etc #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# AWS SUMMIT 2019\n",
"## TechFest - Building a Datalake on AWS\n",
"# Analytics On AWS Demo\n",
vikasomer marked this conversation as resolved.
Show resolved Hide resolved
"\n",
"\n",
"Take your time to read throught the instructions provided in this notebook.\n",
"Take your time to read through the instructions provided in this notebook.\n",
"\n",
"###### Learning Objectives\n",
"- Understand how to interactivly author Glue ETL scripts using Glue Dev Endpoints & SageMaker notebooks\n",
"- Use Boto3 to call Glue APIs to do Glue administrative and operational activities\n",
"\n",
"- Understand how to interactively author Glue ETL scripts using Glue Dev Endpoints & SageMaker notebooks\n",
"- Use boto3 to call Glue APIs to do Glue administrative and operational activities\n",
"\n",
"**Execute the code blocks one cell at a time**"
]
Expand Down Expand Up @@ -116,9 +113,9 @@
"metadata": {},
"outputs": [],
"source": [
"raw_data = glueContext.create_dynamic_frame.from_catalog(database = \"summitdb\", table_name = \"raw\")\n",
"raw_data = glueContext.create_dynamic_frame.from_catalog(database=\"analyticsdemodb\", table_name=\"raw\")\n",
"\n",
"reference_data = glueContext.create_dynamic_frame.from_catalog(database = \"summitdb\", table_name = \"reference_data\")"
"reference_data = glueContext.create_dynamic_frame.from_catalog(database=\"analyticsdemodb\", table_name=\"reference_data\")"
]
},
{
Expand Down Expand Up @@ -167,8 +164,8 @@
"metadata": {},
"outputs": [],
"source": [
"print('raw_data (Count) = ' + str(raw_data.count()))\n",
"print('reference_data (Count) = ' + str(reference_data.count()))"
"print(f'raw_data (count) = {raw_data.count()}')\n",
"print(f'reference_data (count) = {reference_data.count()}')"
]
},
{
Expand Down Expand Up @@ -230,7 +227,7 @@
"\n",
"# Running the SQL statement which \n",
"runningDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Running'\")\n",
"print(\"Running (count) : \" + str(runningDF.count()))\n",
"print(f'Running (count): {runningDF.count()}')\n",
"\n",
"runningDF.show(5)\n"
]
Expand All @@ -253,7 +250,7 @@
"source": [
"# Running the SQL statement which \n",
"workingDF = spark.sql(\"select * from temp_raw_data where activity_type = 'Working'\")\n",
"print(\"Working (count) : \" + str(workingDF.count()))\n",
"print(f'Working (count): {workingDF.count()}')\n",
"\n",
"workingDF.show(5)\n"
]
Expand Down Expand Up @@ -281,14 +278,14 @@
"outputs": [],
"source": [
"\n",
"def filter_function(dynamicRecord):\n",
"\tif dynamicRecord['activity_type'] == 'Running':\n",
"def filter_function(dynamic_record):\n",
"\tif dynamic_record['activity_type'] == 'Running':\n",
"\t\treturn True\n",
"\telse:\n",
"\t\treturn False\n",
"runningDF = Filter.apply(frame = raw_data, f = filter_function)\n",
"runningDF = Filter.apply(frame=raw_data, f=filter_function)\n",
"\n",
"print(\"Running (count) : \" + str(runningDF.count()))"
"print(f'Running (count): {runningDF.count()}')"
]
},
{
Expand All @@ -310,9 +307,9 @@
"outputs": [],
"source": [
"\n",
"workingDF = Filter.apply(frame = raw_data, f = lambda x:x['activity_type']=='Working')\n",
"workingDF = Filter.apply(frame=raw_data, f=lambda x: x['activity_type'] == 'Working')\n",
"\n",
"print(\"Working (count) : \" + str(workingDF.count()))"
"print(f'Working (count): {workingDF.count()}')"
]
},
{
Expand Down Expand Up @@ -341,7 +338,7 @@
"outputs": [],
"source": [
"\n",
"joined_data = Join.apply(raw_data,reference_data, 'track_id', 'track_id')\n"
"joined_data = Join.apply(raw_data, reference_data, 'track_id', 'track_id')\n"
]
},
{
Expand Down Expand Up @@ -385,8 +382,7 @@
"outputs": [],
"source": [
"\n",
"joined_data_clean = DropFields.apply(frame = joined_data, paths = ['partition_0','partition_1','partition_2','partition_3'])\n",
"\n"
"joined_data_clean = DropFields.apply(frame=joined_data, paths=['partition_0','partition_1','partition_2','partition_3'])\n"
]
},
{
Expand Down Expand Up @@ -429,7 +425,7 @@
"# Final step of the transform - Writing transformed data to S3\n",
"- In this step we will be using Glue's write_dynamic_frame functionality to write transformed data to S3\n",
"- We will be storing the transformed data in a different directory & in parquet format\n",
"- make sure you change the D3 bucket name **yourname-datalake-demo-bucket** to reflect your bucket name \n",
"- make sure you change the D3 bucket name **yourname-analytics-demo-bucket** to reflect your bucket name \n",
"\n",
"\n",
"---\n",
Expand All @@ -452,8 +448,8 @@
"source": [
"try:\n",
" datasink = glueContext.write_dynamic_frame.from_options(\n",
" frame = joined_data_clean, connection_type = \"s3\",\n",
" connection_options = {\"path\": \"s3://yourname-datalake-demo-bucket/data/processed-data/\"},\n",
" frame = joined_data_clean, connection_type=\"s3\",\n",
" connection_options = {\"path\": \"s3://yourname-analytics-demo-bucket/data/processed-data/\"},\n",
" format = \"parquet\")\n",
" print('Transformed data written to S3')\n",
"except Exception as ex:\n",
Expand Down Expand Up @@ -485,19 +481,19 @@
"outputs": [],
"source": [
"\n",
"glueclient = boto3.client('glue',region_name='us-east-1')\n",
"glueclient = boto3.client('glue', region_name='us-east-1')\n",
"\n",
"response = glueclient.start_crawler(Name='summitcrawler')\n",
"response = glueclient.start_crawler(Name='AnalyticsDemoCrawler')\n",
"\n",
"print('---')\n",
"\n",
"crawler_state = ''\n",
"crawler_state = None\n",
"while (crawler_state != 'STOPPING'):\n",
" response = glueclient.get_crawler(Name='summitcrawler')\n",
" response = glueclient.get_crawler(Name='AnalyticsDemoCrawler')\n",
" crawler_state = str(response['Crawler']['State'])\n",
" time.sleep(1)\n",
"\n",
"print('Crawler : Stopped')\n",
"print('Crawler Stopped')\n",
"print('---')\n",
"time.sleep(3)\n"
]
Expand All @@ -506,7 +502,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Use boto to view the list of tables in summitdb database\n",
"# Use boto to view the list of tables in analyticsdemodb database\n",
"\n",
"#### Execute Code 🔻"
]
Expand All @@ -518,9 +514,9 @@
"outputs": [],
"source": [
"\n",
"print('** Summitdb has following tables**')\n",
"print('** analyticsdemodb has following tables**')\n",
"response = glueclient.get_tables(\n",
" DatabaseName='summitdb',\n",
" DatabaseName='analyticsdemodb',\n",
")\n",
"\n",
"for table in response['TableList']:\n",
Expand All @@ -547,12 +543,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# 👍 You did an AWSome job today with the lab ...! \n",
"### If you wish you take this notebook and its output back home - you can download / export it:\n",
"\n",
"### If you wish you take this notebook & its output back home - you can download / export it\n",
"\n",
"### In Jupyter's menu bar:\n",
"- Click: **File**\n",
"- In Jupyter's menu bar click **File**:\n",
" - Download As: Notebook(.ipynb) (you can reimport it a jupyter notebook in the future)\n",
" - Download As: HTML (shows code + results in an easy to read format)\n"
]
Expand Down Expand Up @@ -593,8 +586,17 @@
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
}
Loading