Skip to content

Commit

Permalink
Updated CDC workshop for CSA 1.7.0.1.
Browse files Browse the repository at this point in the history
  • Loading branch information
asdaraujo committed Aug 1, 2022
1 parent 8ced5f9 commit d6aad81
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 27 deletions.
Binary file modified images/cdc/postgres-cdc-template.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified images/cdc/transactions-cdc-details.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 6 additions & 2 deletions setup/terraform/resources/labs/utils/ssb.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _get_csrf_token(txt, quiet=True):

def _get_ui_port():
if is_csa17_or_later():
return '8470' if is_tls_enabled() else '8070'
return '18121'
else:
return '8001' if is_tls_enabled() else '8000'

Expand Down Expand Up @@ -114,7 +114,11 @@ def _get_session():

_api_get('/login', api_type=_API_UI)
if is_csa17_or_later():
_api_get('/internal/user/current', auth=(_SSB_USER, get_the_pwd()))
if is_kerberos_enabled():
auth = HTTPKerberosAuth(mutual_authentication=DISABLED)
else:
auth = (_SSB_USER, get_the_pwd())
_api_get('/internal/user/current', auth=auth)
else:
_api_post('/login', {'next': '', 'login': _SSB_USER, 'password': get_the_pwd()}, api_type=_API_UI, token=True)
return _SSB_SESSION
Expand Down
42 changes: 25 additions & 17 deletions workshop_cdc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ When connected you should see the `psql` prompt, with the database name, as show
+
[source]
----
cdc_test=#
cdc_test=>
----

. Run the following commands to create your test table:
Expand Down Expand Up @@ -186,17 +186,24 @@ SELECT * FROM transactions;

In this lab you will set up an SSB table to capture the changelog stream for the `transactions` table.

. In the SSB UI, click on *Console* (on the left bar) *> Compose*.
. In the SSB UI, if you haven't authenticated yet, log in with user `admin` and password `supersecret1`.

. Click on *Templates > postgres-cdc*
. Create a new job and call it `cdc_labs`.

. In the SQL editor, click on *Templates > postgres-cdc*
+
image::images/cdc/postgres-cdc-template.png[width=800]
image::images/cdc/postgres-cdc-template.png[width=150]
+
You will notice that the SQL editor box will be filled with a generic template of a statement to create a table using the `postgres-cdc` connector.
A CREATE TABLE statement template, using the `postgres-cdc` connector, will be added to the SQL editor.
+
In the next steps you will tailor this statement to match the structure of the PostgreSQL's `transaction` table and configure it with the necessary properties.
In the next steps you will customize this statement to match the structure of the PostgreSQL's `transaction` table and configure it with the necessary properties.

. The `transactions` table you created in step 1 has two columns: `id`, of type `integer`, and `name`, of type `text`.
. The `transactions` table you created in step 1 has two columns:
+
--
* `id`: `integer`
* `name`: `text`.
--
+
In the Flink ANSI SQL dialect, the equivalent data types of the ones above are the following:
+
Expand All @@ -221,7 +228,7 @@ CREATE TABLE transactions_cdc (
----

. The template has lots of properties inside the `WITH` clause that allow you to configure the table to connect to your database and table.
There are properties that must be specified and others that are optional and are commented out in the template.
There are some properties that must be specified and others that are optional and are commented out in the template.
+
In this lab you will set all the required properties plus a few of the optional ones.
You can ignore the other optional properties.
Expand All @@ -231,12 +238,12 @@ Set the following required properties in your statement:
[source,yaml]
----
connector: postgres-cdc
database-name: cdc_test
hostname: <CLUSTER_HOSTNAME>
username: cdc_user
password: supersecret1
database-name: cdc_test
table-name: transactions
schema-name: public
table-name: transactions
----

. _Uncomment_ and set the following optional properties in your statement.
Expand All @@ -245,9 +252,10 @@ NOTE: Make sure your final statement has all the commas separating the property
+
[source,yaml]
----
decoding.plugin.name: pgoutput
decoding.plugin.name: pgoutput
debezium.publication.name: dbz_publication
debezium.slot.name: flink
debezium.snapshot.mode: initial
slot.name: flink
----
+
Your final `CREATE TABLE` statement should look like the one below:
Expand All @@ -263,20 +271,20 @@ CREATE TABLE transactions_cdc (
'username' = 'cdc_user',
'password' = 'supersecret1',
'database-name' = 'cdc_test',
'table-name' = 'transactions',
'schema-name' = 'public',
'decoding.plugin.name' = 'pgoutput',
'table-name' = 'transactions',
'debezium.publication.name' = 'dbz_publication',
'debezium.slot.name' = 'flink',
'debezium.snapshot.mode' = 'initial'
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink'
);
----

. Click on the *Execute* button to execute the statement and create the `transactions_cdc` table.

. Click on the *Tables* tab and navigate to the newly created table to verify its details:
. On the *Virtual Tables* pane you can inspect the details of the newly created table:
+
image::images/cdc/transactions-cdc-details.png[width=600]
image::images/cdc/transactions-cdc-details.png[width=300]

[[lab_3, Lab 3]]
== Lab 3 - Capturing table changes
Expand Down
16 changes: 8 additions & 8 deletions workshop_ssb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ image::images/ssb/register-kafka-provider.png[width=800]
+
[source,yaml]
----
Name: edge2ai-kafka
Brokers: <CLUSTER_HOSTNAME>:9092
Connection protocol: PLAINTEXT
Name: edge2ai-kafka
Brokers: <CLUSTER_HOSTNAME>:9092
Connection protocol: PLAINTEXT
----
+
image::images/ssb/add-kafka-provider.png[width=400]
Expand Down Expand Up @@ -114,7 +114,7 @@ image::images/ssb/source-transformations.png[width=400]
+
To do this, click on the *Event Time* tab and configure the following properties:
+
[source]
[source,yaml]
----
Use Kafka Timestamps: Uncheck it
Input Timestamp Column: sensor_ts
Expand Down Expand Up @@ -183,7 +183,7 @@ Select and copy the contents of the page.

. Complete the schema creation by filling the following properties and save the schema.
+
[source]
[source,yaml]
----
Name: iot_enriched_avro
Description: Schema for the data in the iot_enriched_avro topic
Expand All @@ -201,7 +201,7 @@ image::images/ssb/add-catalog-sr.png[width=800]

. In the *Catalog* dialog box, enter the following details:
+
[source]
[source,yaml]
----
Name: sr
Catalog Type: Schema Registry
Expand All @@ -212,7 +212,7 @@ Enable TLS: No

. Click on the *Add Filter* button and enter the following configuration for the filter:
+
[source]
[source,yaml]
----
Database Filter: .*
Table Filter: iot.*
Expand Down Expand Up @@ -410,7 +410,7 @@ On the job page, click the *Stop* button to pause the job.

. Click on the *Materialized View* button and enter the following properties:
+
[source,python]
[source,yaml]
----
Enable MV: Yes
Recreate on Job Start: No
Expand Down

0 comments on commit d6aad81

Please sign in to comment.