The Transportation Hub is RDI's internal data streaming and data warehousing project to test new tools and functionalities in those tools. Several technologies are used here including:
- Docker Compose - bringing up multiple containers on the same local network
- WSL - linux on Windows
- Shell Scripts - running commands on containers
- Python - small text file dataframe operations
- Java - data requests on trimet API and data writes to kafka broker
- Confluent
- Kafka Broker - durable queues and stream processing
- Schema Registry - track schema's for data inputs, also used for serializing and deserializing binary data
- Ksqldb - streaming ETL
- Connect - configuring output data destinations
- React - javascript web development framework
- React Leaflet - react components for leaflet maps
- Snowflake - data warehouse operations
- DBT - framework for writing ETL operations in Snowflake
- AWS
- S3 - storing data on the cloud
- Glue - data catalogs and ETL jobs
- Athena - moderate sized queries
- An appid is required from trimet. Register your contact information to get one. They will send the appid in an email. Enter the app id into
RealtimeStreaming/sh/run-trimet.sh
. - Enter the appid obtained in
sh/run-trimet.sh
for theAPPID
variable. - An ssh key must be generated for communicating with snowflake. For directions on setting this up, consult the snowflake reference. Note, the
ALTER USER
step must be performed by someone withACCOUNTADMIN
credentials. The ssh key will factor into correct settings for the various snowflake connect configurations (SnowflakeSinkConfig.json, SnowflakeSingleSinkConfig.json). Additionally, to configure the S3 connector for kafka (S3SinkConfig.json), aws access credentials, namelyAWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
must be obtained. - The main requirement for running the realtime pipeline is docker desktop with WSL to run it from linux assuming a windows workstation.
The realtime component of the pipeline is launched from a linux shell prompt as follows:
cd RealtimeStreaming
source sh/run-trimet.sh
do_all
If you want to run select sections of the startup sequence, look in sh/run.sh
.
This shell script will do the following:
- Deploy all containers using docker according to the
docker-compose.yml
file. - Add static data sets.
- Setup kafka topics.
- Initialize feeds to broker and schema registry.
- Deploy ksql ETL transformations in lexicographic order of file name.
- Deploy the Snowflake kafka connect sink for continuous feeds.
- Deploy the Snowflake kafka connect sink for one time feeds.
- Deploy the S3 kafka connect sink.
The containers can be stopped and associated data deleted with
./sh/stop.sh
This project demonstrates the lambda architecture. Data is fed into a message queue, and consumed by a speed layer and batch processing layer simultaneously. Because messages are sent in batches to the batch processing layer, higher latency is introduced. However, the batch processing layer will contain much higher volumes of data, and can process queries that run over a longer history. In contrast, the speed layer will process messages immediately, and make messages available to queries with much less latency. The speed layer does not normally retain a large volume of messages.
In our project the:
- message queue is a Kafka broker
- speed layer processing is performed by the ksqldb sql flow backed by kafka topics
- batch processing is performed by Snowflake and alternately on AWS by S3, Glue, and Athena
The configurations in the kafka connector determine the basis upon which messages are buffered to the batch processing layer. The architecture of the pipeline is as follows, descriptions for all components pictured follow:
The various locally deployed docker containers are depicted as squares. The components that are deployed on Snowflake are shown in blue. Containers that have visual components that can be accessed in the browser are shown in red. The yellow elements are in AWS. Most references to files are with respect to the RealtimeStreaming
directory.
The Transportation Hub warehouses data for Portland's local transit system, TriMet. The starting point for data is the GTFS api. json and protobuf are streamed and buffered in this project.
The various data formats used in this project each have their advantages and use cases, and it is worth pointing them out. When we refer to schema changes, we mean, addition or removal of fields and changes of field data types. We will show samples of each just to get a sense of what each format entails.
- json - the simplest one because it is human readable, can be confounding for a consumer of data. First, json is usually large and takes longer to transmit as a result. Secondly, schema changes are difficult to adapt to. When the data is consumed the assumption is that the schema will not change. Inevitably, this assumption does not hold true and problems follow. The only solution for the consumer is to maintain their own manually constructed schema, and take action when non-conforming messages with new schema come in. Thus schema adaptation is data driven. A third problem is that traversal of schema is slow because string parsing must occur in order to find data. The need for human readable json is merely a choice of which machine translation layer will be used to view data.
{
"f1": 2,
"f2": [3,2,5],
"pairs": {
"key": "hello",
"value": "world"
}
"description": "this is a JSON file"
}
- json schema - if the producer can provide a schema, at least the adaptation to new schema is producer driven. Nonetheless, the other two problems of json, lack of compression and slow traversal persist. The schema itself does tend to be on the more verbose side and harder to understand. This is why we serialized with Jackson as opposed to writing json schema.
{
"$id": "https://example.com/test.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Test",
"type": "object",
"required": [
"f1"
],
"properties": {
"f1": {
"type": "number"
},
"f2": {
"type": "array",
"items": {
"type": "number"
}
},
"pairs": {
"type": "object",
"required": [
"key",
"value"
],
"properties": {
"key": {
"type": "string"
},
"value": {
"type": "string"
}
}
},
"description": {
"type": "string"
}
}
}
- protobuf - is a great format for single records. It is compressed, all messages must conform to a schema and thus schema evolution is straightforward, and because it is in a binary format, it is relatively easy to traverse the data. It's only downside is that it is not human-readable. This is usually not a problem because Protobuf is almost always an intermediate data format not used for reading data.
message Test {
required int32 f1 = 1;
repeated int32 f2 = 2;
repeated Pair pairs = 3;
optional string description = 4;
}
message Pair {
required string key = 1;
required string value = 2;
}
- parquet - is a column oriented format that is fantastic as an end data format, as large aggregate queries are faster, due to physically close columnar data orientation and projection. Converting between multiple schematized messages and parquet is relatively straightforward since each has a schema and translations between schema are straightforward. We use parquet in this project exclusively in S3.
message schema {
required int32 f1;
optional group f2 (LIST) {
repeated group list {
optional int32 element;
}
}
optional group pairs {
required binary key (STRING);
required binary value (STRING);
}
optional binary description (STRING);
}
The custom java application was written to consume data from the selected trimet api feed and push to kafka. The application is driven by command line arguments.
Usage: gtfs-streamer [-fhV] -b=<bootstrapServers> -d=<dataClass>
[-n=<numLoops>] -r=<schemaRegistry> -t=<topic> -u=<url>
[-w=<waitTimeMs>] [-p=<String=String>[,
<String=String>...]]...
streams gtfs realtime data to a confluent kafka broker and schema registry
-b, --bootstrap-servers=<bootstrapServers>
hostname:port
-d, --data-class=<dataClass>
Valid values: ResultSetAlert, ResultSetRoute,
ResultSetVehicle, GtfsRealtime
-f file writes of each request payload requested, default
is false
-h, --help Show this help message and exit.
-n=<numLoops> number of get requests to make, -1 (default) for
infinite (stream)
-p, --get-parameters=<String=String>[,<String=String>...]
additional parameters for the get request in form
key1=value1,key2=value2,...
-r, --schema-registry=<schemaRegistry>
http://hostname:port
-t, --topic=<topic> topic to stream data to
-u, --url=<url> url https://<hostname>/<ext>/<servicename>
-V, --version Print version information and exit.
-w, --wait-time=<waitTimeMs>
wait time in ms between successive get requests,
default (1000)
All the custom code written is in the com.resourcedata.transportationhub.realtime
package. The com.google.transit.realtime
package contains POJO's that are used to send Json with JsonSchema. An example is shown below:
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import java.math.BigInteger;
import java.util.List;
@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY)
public class ResultSetAlert {
@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY)
static class AlertSet {
@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY)
static class Alert {
@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY)
static class Location {
Double lng;
Boolean no_service_flag;
String passengerCode;
BigInteger id;
String dir;
Double lat;
String desc;
}
@JsonAutoDetect(fieldVisibility=JsonAutoDetect.Visibility.ANY)
static class Route {
String routeColor;
Boolean frequentService;
Integer route;
Boolean no_service_flag;
String routeSubType;
Integer id;
String type;
String desc;
BigInteger routeSortOrder;
}
List<Route> route;
String info_link_url;
BigInteger end;
Boolean system_wide_flag;
List<Location> location;
BigInteger id;
String header_text;
BigInteger begin;
String desc;
}
List<Alert> alert;
BigInteger queryTime;
}
AlertSet resultSet;
}
Generated sources are produced from protobuf using the respective maven plugin. The section of pom.xml
that makes this happen is show below
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>3.24.3</protocVersion>
<inputDirectories>
<include>src/main/resources</include>
</inputDirectories>
<outputDirectory>target/generated-sources</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
The java code first gets a json or protobuf data object using a supplied appID
request parameter in its http request. The returned payload is returned as an array of bytes. If the user requests to write the payload to a file, it will be written. Thereafter, the message is deserialized as a FeedMessage
type object in the protobuf case defined in the gtfs protobuf specification. In the json case, the message is deserialized using the various classes defined in com.google.transit.realtime
. In either case, the use of the Java Stream package makes the logic more functional. In addition, the use of generics unifies the final steps of stream processing as show below:
public static <T> void streamData(Stream<T> stream, Properties properties, String topic){
try(Producer<String, T> producer = new KafkaProducer<>(properties)){
final DataStreamer<T> dataStreamer = new DataStreamer<>(producer, topic);
stream.forEach(dataStreamer::produce);
}
}
Once the protobuf payload has been deserialized, it is then pushed by the Producer
to kafka. Note the configuration parameters BOOTSTRAP_SERVERS_CONFIG
and schema.registry.url
in producer.properties
. These are the locations of the broker and schema registry. The port for the broker is set to the listener port 29092
which is different than the host network port 9092
. If 9092
is used, communication with the broker will not occur.
The preset command given in the dockerfile_datastreamer
ensures that the container stays open indefinitely. Normally containers terminate if there is not an active command in process.
command: ["tail", "-f", "/dev/null"]
The datastreamer is written as a maven package and is made to deploy into an Ubuntu java docker container that contains both java runtimes and the sdk. For more information, consult the dockerfile_datastreamer
file.
The broker is the data storage layer of kafka. Each separate data stream is stored in a durable queue. Each separate data queue is organized by topic. The datastreamer container from above is a producer of data to kafka. In our usage of kafka, we will only be using one partition per topic. More partitions can be used in the distributed case for faster throughput. A schematic of how a broker works with a topic on one partition is shown below:
The messages from the topic are maintained in a queue regardless of the consumer's activity (unlike a queue service such as SQS). Based on whichever occurs first of retention time exceeded or size exceeded, the oldest messages are deleted (in a LIFO manner). Consumers of topics, per session can decide from which point they want to read the queue; earliest, latest, or at a defined offset.
When using structured data such as json schema, protobuf, or avro, a schema registry is necessary to assist with serializing/deserializing data as well as evolving schema. The schema registry and broker work together to handle all read and write requests.
The realtime-visualizer provides a user interface for displaying realtime data. The realtime-visualizer consists of a React.js application that makes requests to the ksqldb-server, and plots returned information in a Leaflet.js map. The ksqldb-server
and ultimately the kafka broker backing it provides data to the realtime visaulizer. In a locally deployed version, it can be seen on https://localhost:8090
.
The visualizer is powered by React
,react-leaflet
, and react-redux
.
Prominent features include:
- route selection
- updated vehicle locations color coded by status and rotated to reflect current orientation
- bus delay statuses tabulated
ksql is kafka's most accessible, realtime ETL language. The ksqldb-server handles all ETL requests. The current flow of streaming transformations is shown below:
One of the ksql queries used in the pipeline is shown below as an example. We are joining the stream VehicleEntitiesExploded
against the table StopsTable
. It should be noted that this query generates a stream. A stream is an append only unbounded sequence of data. The kafka topic VehicleStopConnectorExp
backs this stream as a persistence layer. It should be noted that here we are also creating an additional key to be used for a join upstream. ksql only supports joins against single keys, so this is our way of supporting multi key joins.
CREATE STREAM VehicleStopConnectorExp
WITH (KAFKA_TOPIC='VehicleStopConnectorExp', VALUE_FORMAT='PROTOBUF')
AS
SELECT
s.stop_id as stop_id,
concat(v.entity->vehicle->trip->trip_id,'_',cast(s.stop_sequence as string)) as trip_seq_id,
s.stop_lat as stop_lat,
s.stop_lon as stop_lon,
s.stop_name as stop_name
FROM VehicleEntitiesExploded v
JOIN StopsTable s ON s.stop_id = v.entity->vehicle->stop_id
EMIT CHANGES;
In contrast, a table is a key value store that contains the latest state for any given key. In our example below, a vehicle id entity->vehicle->vehicle->id
is the key, and for every vehicle, we are showing the latest state of various properties of the vehicle in this table. The query SELECT * FROM VEHICLESLATEST
will then show this latest state. In fact this exact query can be seen in the visualizer app.
CREATE TABLE VehiclesLatest
WITH (KAFKA_TOPIC='VehiclesLatest', VALUE_FORMAT='PROTOBUF')
AS SELECT
entity->vehicle->vehicle->id as vehicle_id,
LATEST_BY_OFFSET(entity->vehicle->position->latitude) as latitude,
LATEST_BY_OFFSET(entity->vehicle->position->longitude) as longitude,
LATEST_BY_OFFSET(entity->vehicle->current_status) as current_status,
LATEST_BY_OFFSET(entity->vehicle->current_stop_sequence) as current_stop_sequence,
LATEST_BY_OFFSET(entity->vehicle->stop_id) as stop_id,
LATEST_BY_OFFSET(entity->vehicle->trip->route_id) as route_id,
LATEST_BY_OFFSET(entity->vehicle->timestamp) as timestmp,
LATEST_BY_OFFSET(entity->vehicle->position->bearing) as bearing,
LATEST_BY_OFFSET(entity->vehicle->position->speed) as speed
FROM VehicleEntitiesExploded
GROUP BY entity->vehicle->vehicle->id
EMIT CHANGES;
An in-depth discussion showing the difference between streams and tables is given by confluent.
The ksqldb-cli provides a cli for issuing ksql requests. A ksql interactive session can be started using the alias ksql
. Example commands are show below: Consult the official ksql reference here.
ksql> SHOW TABLES;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------------
CALENDARDATESTABLE | CalendarDates | KAFKA | JSON | false
CALENDARTABLE | Calendar | KAFKA | JSON | false
ROUTEPATHSTABLE | Shapes | KAFKA | JSON | false
ROUTESTABLE | Routes | KAFKA | JSON | false
STOPSTABLE | Stops | KAFKA | JSON | false
TRIPSTABLE | Trips | KAFKA | JSON | false
VEHICLESLATEST | VehiclesLatest | KAFKA | PROTOBUF | false
----------------------------------------------------------------------------
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
--------------------------------------------------------------------------------------------------------------
ALERTENTITIESEXPLODED | AlertEntitiesExploded | KAFKA | PROTOBUF | false
ALERTS | FeedSpecAlerts | KAFKA | PROTOBUF | false
ALERTSALT | alerts | KAFKA | JSON_SR | false
ALERTSALTEXPLODED | AlertsAltExploded | KAFKA | PROTOBUF | false
KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false
TRIPENTITIESEXPLODED | TripEntitiesExploded | KAFKA | PROTOBUF | false
TRIPENTITIESEXPLODEDSTOPSEXPLODED | TripEntitiesExplodedStopsExploded | KAFKA | PROTOBUF | false
TRIPS | TripUpdate | KAFKA | PROTOBUF | false
VEHICLEENTITIESEXPLODED | VehicleEntitiesExploded | KAFKA | PROTOBUF | false
VEHICLES | VehiclePositions | KAFKA | PROTOBUF | false
VEHICLESALT | vehicles | KAFKA | JSON_SR | false
VEHICLESALTEXPLODED | VehiclesAltExploded | KAFKA | PROTOBUF | false
--------------------------------------------------------------------------------------------------------------
ksql> DESCRIBE VEHICLESLATEST EXTENDED;
Name : VEHICLESLATEST
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : PROTOBUF
Kafka topic : VehiclesLatest (partitions: 1, replication: 1)
Statement : CREATE TABLE VEHICLESLATEST WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='VehiclesLatest', PARTITIONS=1, REPLICAS=1, RETENTION_MS=604800000, VALUE_FORMAT='PROTOBUF') AS SELECT
V.ENTITY->VEHICLE->VEHICLE->ID VEHICLE_ID,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->LATITUDE) LATITUDE,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->LONGITUDE) LONGITUDE,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->CURRENT_STATUS) CURRENT_STATUS,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->CURRENT_STOP_SEQUENCE) CURRENT_STOP_SEQUENCE,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->STOP_ID) STOP_ID,
LATEST_BY_OFFSET(T.ROUTE_ID) ROUTE_ID,
LATEST_BY_OFFSET(T.DIRECTION_ID) DIRECTION_ID,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->TIMESTAMP) TIMESTAMP,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->BEARING) BEARING,
LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->SPEED) SPEED,
LATEST_BY_OFFSET(T.TRIP_ID) TRIP_ID
FROM VEHICLEENTITIESEXPLODED V
INNER JOIN TRIPSTABLE T ON ((T.TRIP_ID = V.ENTITY->VEHICLE->TRIP->TRIP_ID))
GROUP BY V.ENTITY->VEHICLE->VEHICLE->ID
EMIT CHANGES;
Field | Type
--------------------------------------------------------
VEHICLE_ID | VARCHAR(STRING) (primary key)
LATITUDE | DOUBLE
LONGITUDE | DOUBLE
CURRENT_STATUS | VARCHAR(STRING)
CURRENT_STOP_SEQUENCE | BIGINT
STOP_ID | VARCHAR(STRING)
ROUTE_ID | VARCHAR(STRING)
DIRECTION_ID | INTEGER
TIMESTAMP | BIGINT
BEARING | DOUBLE
SPEED | DOUBLE
TRIP_ID | VARCHAR(STRING)
--------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_VEHICLESLATEST_33 (RUNNING) : CREATE TABLE VEHICLESLATEST WITH (CLEANUP_POLICY='compact', KAFKA_TOPIC='VehiclesLatest', PARTITIONS=1, REPLICAS=1, RETENTION_MS=604800000, VALUE_FORMAT='PROTOBUF') AS SELECT V.ENTITY->VEHICLE->VEHICLE->ID VEHICLE_ID, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->LATITUDE) LATITUDE, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->LONGITUDE) LONGITUDE, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->CURRENT_STATUS) CURRENT_STATUS, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->CURRENT_STOP_SEQUENCE) CURRENT_STOP_SEQUENCE, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->STOP_ID) STOP_ID, LATEST_BY_OFFSET(T.ROUTE_ID) ROUTE_ID, LATEST_BY_OFFSET(T.DIRECTION_ID) DIRECTION_ID, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->TIMESTAMP) TIMESTAMP, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->BEARING) BEARING, LATEST_BY_OFFSET(V.ENTITY->VEHICLE->POSITION->SPEED) SPEED, LATEST_BY_OFFSET(T.TRIP_ID) TRIP_ID FROM VEHICLEENTITIESEXPLODED V INNER JOIN TRIPSTABLE T ON ((T.TRIP_ID = V.ENTITY->VEHICLE->TRIP->TRIP_ID)) GROUP BY V.ENTITY->VEHICLE->VEHICLE->ID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Runtime statistics by host
-------------------------
Host | Metric | Value | Last Message
-------------------------------------------------------------------------------
ksqldb-server:8088 | messages-per-sec | 388 | 2023-10-25T21:44:15.418Z
ksqldb-server:8088 | total-messages | 7047872 | 2023-10-25T21:44:15.418Z
-------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic VehiclesLatest)
Consumer Groups summary:
Consumer Group : _confluent-ksql-default_query_CTAS_VEHICLESLATEST_33
Kafka topic : _confluent-ksql-default_query_CTAS_VEHICLESLATEST_33-Join-repartition
Max lag : 433
Partition | Start Offset | End Offset | Offset | Lag
-------------------------------------------------------
0 | 7185246 | 7188282 | 7187849 | 433
-------------------------------------------------------
Kafka topic : Trips
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 64119 | 64119 | 0
------------------------------------------------------
Kafka topic : _confluent-ksql-default_query_CTAS_VEHICLESLATEST_33-Aggregate-GroupBy-repartition
Max lag : 425
Partition | Start Offset | End Offset | Offset | Lag
-------------------------------------------------------
0 | 7042738 | 7047872 | 7047447 | 425
-------------------------------------------------------
Kafka topic : VehicleEntitiesExploded
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
-------------------------------------------------------
0 | 0 | 7192767 | 7192767 | 0
-------------------------------------------------------
ksql> SELECT * FROM VEHICLESLATEST EMIT CHANGES;
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|VEHICLE_|LATITUDE|LONGITUD|CURRENT_|CURRENT_|STOP_ID |ROUTE_ID|DIRECTIO|TIMESTAM|BEARING |SPEED |TRIP_ID |
|ID | |E |STATUS |STOP_SEQ| | |N_ID |P | | | |
| | | | |UENCE | | | | | | | |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|233 |45.53004|-122.653|STOPPED_|29 |8343 |100 |0 |16982512|90.0 |0.0 |12957121|
| |83703613|67889404|AT | | | | |16 | | | |
| |3 |297 | | | | | | | | | |
|103 |45.53004|-122.653|STOPPED_|29 |8343 |100 |0 |16982512|90.0 |0.0 |12957121|
| |83703613|67889404|AT | | | | |16 | | | |
| |3 |297 | | | | | | | | | |
|201 |45.51786|-122.676|IN_TRANS|22 |8336 |100 |0 |16982512|110.0 |0.0 |12957122|
| |42272949|86462402|IT_TO | | | | |05 | | | |
| |2 |344 | | | | | | | | | |
...
The kafka connect plugin is a suite of tools for connecting outside data sources as sinks and sources, places for sending and getting data respectively. In our case, we are sending the data to snowflake and S3. The only customization we make to the regular kafka connect container is to install the snowflake connector by copying the jar file for snowflake connect app along with bouncycastle, which is needed for decrypting ssh passphrases. The snowflake sink connector is configured using SnowflakeSinkConfig.json
. Current connector configurations can be accessed from the control center under the connect section.
Select topics, specified in the "topics" field of the SnowflakeSinkConfig.json
file are sent to snowflake staging tables. S3 data can also be used as inputs for Snowflake data. A range of ETL jobs defined with DBT than transforms that input data to a form that is appropriate for BI reporting in the hub tables. The ETL sql code for Snowflake is defined in the DBT
directory.
The second destination we will sending data is S3 on AWS. We will be using S3 as a data lake. What differentiates a data lake from a data warehouse such as snowflake is transparent use of data on S3, allowing for heterogenous data sources natively. Data lakes, do not out of the box provide ACID transactions, but in a append only/write only scenario this is not a major downside (Further technologies such as Delta Lake and Apache Iceberg can be leveraged for these purposes). A further advantage for a data lake is usually less cost as the management of data is less. In our case, data is buffered into S3 using the S3SinkConnector provided by confluent. The settings for buffering are controlled in S3SinkConfig.json
. In order to surface data for use in analytics and ETL jobs, the AWS Glue crawler must be run over S3 periodically to udpate the Glue Data Catalog. Initially, the crawler will import the schema from parquet files, and infer a schema from csv files. The crawler will add data as tables to the Glue database. Additionally, as new partitions are added, the crawler will add those additional partitions. Up to a moderate level of complexity, Athena is a good tool of choice for running queries over this table data. Once materialization of transformations is desired for much more complicated usecases, a Glue ETL job can be run with code written in Spark.