To stream data from Alpha Vantage and store it in Confluent Kafka , we will be fetching data from Alpha Vantage continuously and send to Kafka in real time. We will use one Pyspark note book to simulate it Then we will create Databricks notebooks to consude data from Kafka topic and put in delta tables
Prerequisistes
- Alpha Vantage API Key 2) Confluent Kafka Cluster 3) PySpark Environment in Databricks
API used in this project is https://www.alphavantage.co/query?function=TIME_SERIES_WEEKLY_ADJUSTED&symbol=IBM&apikey=demo
Based on the Symbol we provide, we can get the most recent 100 intraday OHLCV bars by default when the outputsize parameter is not set
To consume this API, we have to generate API Key by visiting https://www.alphavantage.co/support/#api-key . It is free of cost. In free plan only 5 API request allowed per minute
We are using Unity Catalog to store data in Medallion structure. So we setup ADLS Storage to store Metastore,Catalog, Schema, External tables,Checkpoints and External Volumes. Structure is like below
Catalog and corresponding schema's are created in Databricks like this. All data will be pointing to corresponding external storages defined in Azure ADLS given below
External locations are defined in Databricks
Created Kafka cluster in Confluent
We wll using a topic to store messages coming from API
Topic is created. At present no messages in this topic
Setup new client
Need to create API key next
Please make sure to note down below details to use in Databricks notebook to connect to Kafka
Instead of keeping few secret details in Key Vault, I used env variables in cluster to keep API Key, Kafka Server and Kafka Topic name. This is not adviced in prod
I have created a key vault in Azure to store all required API_ID, Kafka Server, Kafka Username , Kafka Passoword and Topic name .
Note- Make sure to give access to service pricipal accees to "Azure Databricks" "Key Vault Secrets User" . It will help to access key vault records from Databricks
We will navigate to Databricks and start creating note books
To load data in Kafka topic, I created one notebook. It will simulate data from API continuously to Topic
Topic is ready. Appropriate configurations to connect to Kafka is mentioned in the notebook
Now you can see data loading to topic from API
Now check Topic
I changed stock symbol to Google and loaded data. You can see more data loading in topic. It is still running
Now we have another Bronze note book I created to load data from Kafka Topic and load it in Bronze layer storage
Now we will get bronze data , cleanup and loading in silver layer
Just noted Kafka topic is still getting data from api
Silver layer table loading with below data. It is cleaned and column names are changed
Now we will focus on Gold layer. Here we will capture aggregated data based on Silver table
Mean time always review new tables created under Catalog Kafka
And you can see table data is populating in ADLS storage created in Azure too
Streaming jobs are still running in Bronze Layer
After loading pending data, I will interupt reading data from Kafka and stop streaming in Bronze Layer
Final step is to show aggregated data in Power BI
Select Partnor Connect from page and choose power Bi
Report file will be downloaded in your system
Open this file in PowerBI Desktop
You can see your tables
By dragging and dropping you can create appropriate visualizations in Power Bi and publish to many destinations
Next we will create a workflow (job) to execute these notebooks in a scheduled interval