Skip to content

Commit

Permalink
Added a new workshop - CDC with Flink/SSB
Browse files Browse the repository at this point in the history
  • Loading branch information
asdaraujo committed Nov 10, 2021
1 parent 981e5cf commit fe5dbc0
Show file tree
Hide file tree
Showing 20 changed files with 825 additions and 119 deletions.
1 change: 1 addition & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ If you already familiar with the instructions in the README, time to start worki
* link:workshop_nifi.adoc[NiFi and Streams Processing]
* link:workshop_dataviz.adoc[Creating Dashboards with Cloudera Data Viz]
* link:workshop_ssb.adoc[Querying streams with SQL]
* link:workshop_cdc.adoc[Change Data Capture (CDC) with Flink/SSB]
* link:streams_replication.adoc[Streams Replication]
* link:spark_analytics.adoc[Spark and Fast Analytics with Kudu]
* link:datascience.adoc[CDSW Experiments and Models]
Expand Down
Binary file added images/cdc/initial-snapshot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/jdbc-template.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/job-status-running.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/kafka-json-template.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/magnifier-icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/postgres-cdc-template.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/primary-key-clause.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/sample-all-messages.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/topics-icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/trans-changelog-contents.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/transactions-capture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/cdc/transactions-cdc-details.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions setup/terraform/resources/create_db_pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ CREATE DATABASE eventador_admin OWNER eventador_admin ENCODING 'UTF8';

CREATE USER eventador_snapper WITH PASSWORD :'the_pwd';
CREATE DATABASE eventador_snapper OWNER eventador_snapper ENCODING 'UTF8';

-- Configuration for Flink Debezium connector
CREATE DATABASE cdc_test;
CREATE ROLE cdc_user WITH REPLICATION LOGIN PASSWORD :'the_pwd';
GRANT CONNECT ON DATABASE cdc_test TO cdc_user;
\c cdc_test
SELECT * FROM pg_create_logical_replication_slot('flink', 'pgoutput', false);
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO cdc_user;

12 changes: 12 additions & 0 deletions setup/terraform/resources/labs/utils/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from subprocess import Popen, PIPE
from . import *


def execute_sql(cmd, db_name, username, password):
cmd_line = 'PGPASSWORD={pwd} psql --host edge2ai-1.dim.local --port 5432 --username {usr} {db}'.format(
usr=username, pwd=password, db=db_name)
proc = Popen(cmd_line, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = proc.communicate(cmd.encode('utf-8'))
return proc.returncode, stdout, stderr
199 changes: 81 additions & 118 deletions setup/terraform/resources/labs/utils/ssb.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,49 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import uuid

from . import *

_SSB_USER = 'admin'
_SSB_SESSION = None
_SSB_CSRF_TOKEN = None

_API_INTERNAL = 'internal'
_API_EXTERNAL = 'external'
_API_UI = 'ui'

def _get_api_url():
return get_url_scheme() + '://cdp.{}.nip.io:8000/api/v1'.format(get_public_ip())


def _get_rest_api_url():
return get_url_scheme() + '://cdp.{}.nip.io:18121/api/v1'.format(get_public_ip())


def _get_ui_url():
return get_url_scheme() + '://cdp.{}.nip.io:8000/ui'.format(get_public_ip())


def _api_call(func, path, data=None, files=None, headers=None, ui=False, token=False):
def _get_url(api_type):
if api_type == _API_UI:
return _get_ui_url()
elif api_type == _API_INTERNAL:
return _get_api_url()
else:
return _get_rest_api_url()


def _api_call(func, path, data=None, files=None, headers=None, api_type=_API_INTERNAL, token=False):
global _SSB_CSRF_TOKEN
if not headers:
headers = {}
if not ui:
if api_type != _API_UI:
headers['Content-Type'] = 'application/json'
data = json.dumps(data)
if token:
headers['X-CSRF-TOKEN'] = _SSB_CSRF_TOKEN
url = (_get_ui_url() if ui else _get_api_url()) + path
url = _get_url(api_type) + path
resp = func(url, data=data, headers=headers, files=files)
if resp.status_code != requests.codes.ok:
raise RuntimeError("Call to {} returned status {}. \nData: {}\nResponse: {}".format(
Expand All @@ -37,16 +56,16 @@ def _api_call(func, path, data=None, files=None, headers=None, ui=False, token=F
return resp


def _api_get(path, data=None, ui=False, token=False):
return _api_call(_get_session().get, path, data=data, ui=ui, token=token)
def _api_get(path, data=None, api_type=_API_INTERNAL, token=False):
return _api_call(_get_session().get, path, data=data, api_type=api_type, token=token)


def _api_post(path, data=None, files=None, headers=None, ui=False, token=False):
return _api_call(_get_session().post, path, data=data, files=files, headers=headers, ui=ui, token=token)
def _api_post(path, data=None, files=None, headers=None, api_type=_API_INTERNAL, token=False):
return _api_call(_get_session().post, path, data=data, files=files, headers=headers, api_type=api_type, token=token)


def _api_delete(path, data=None, ui=False, token=False):
return _api_call(_get_session().delete, path, data=data, ui=ui, token=token)
def _api_delete(path, data=None, api_type=_API_INTERNAL, token=False):
return _api_call(_get_session().delete, path, data=data, api_type=api_type, token=token)


def _get_session():
Expand All @@ -56,8 +75,8 @@ def _get_session():
if is_tls_enabled():
_SSB_SESSION.verify = get_truststore_path()

_api_get('/login', ui=True)
_api_post('/login', {'next': '', 'login': _SSB_USER, 'password': get_the_pwd()}, ui=True, token=True)
_api_get('/login', api_type=_API_UI)
_api_post('/login', {'next': '', 'login': _SSB_USER, 'password': get_the_pwd()}, api_type=_API_UI, token=True)
return _SSB_SESSION


Expand Down Expand Up @@ -127,117 +146,61 @@ def delete_table(table_name):
_api_delete('/sb-source/{}'.format(table['id']), token=True)


def delete_api_key(apikey):
def execute_sql(stmt, job_name=None, parallelism=None, sample_interval_millis=None, savepoint_path=None,
start_with_savepoint=None):
if not job_name:
job_name = 'job_{}_{}'.format(uuid.uuid1().hex[0:4], int(1000000*time.time()))
data = {
'apikeys_list': '["{}"]'.format(apikey),
'sql': stmt,
'job_parameters': {
'job_name': job_name,
# 'snapshot_config': {
# 'name': 'string',
# 'key_column_name': 'string',
# 'api_key': 'string',
# 'recreate': true,
# 'ignore_nulls': true,
# 'enabled': true
# },
'parallelism': parallelism,
'sample_interval_millis': sample_interval_millis,
'savepoint_path': savepoint_path,
'start_with_savepoint': start_with_savepoint
},
'execute_in_session': True
}
headers = {
'Accept': 'application/json',
'Username': 'admin',
'Content-Type': 'application/json',
}
resp = _api_delete('/apps/apikey_api', data)
return _api_post('/ssb/sql/execute', data, headers=headers, api_type=_API_EXTERNAL)


def create_connection(conn_type, conn_name, params):
def get_jobs(state='RUNNING'):
resp = _api_get('/ssb/jobs', api_type=_API_EXTERNAL)
return [j for j in resp.json()['jobs'] if state is None or j['state'] == state]


def stop_job(job_name, savepoint=False, savepoint_path=None, timeout=1000, wait_secs=0):
data = {
'dataconnection_type': conn_type,
'dataconnection_name': conn_name,
'dataconnection_info': json.dumps({'PARAMS': params}),
'do_validate': True
'savepoint': savepoint,
'savepoint_path': savepoint_path,
'timeout': timeout,
}
resp = _api_post('/datasets/dataconnection', data)


def get_connection(conn_name):
resp = _api_get('/datasets/dataconnection')
conns = [c for c in resp.json() if c['name'] == conn_name]
if conns:
return conns[0]
return None


def delete_connection(dc_id=None, dc_name=None):
assert dc_id is not None or dc_name is not None, 'One of "dc_id" or "dc_name" must be specified.'
assert dc_id is None or dc_name is None, 'Only one of "dc_id" or "dc_name" can be specified.'
if dc_id is None:
conn = get_connection(dc_name)
if conn:
dc_id = conn['id']
if dc_id:
_api_delete('/datasets/dataconnection/{}'.format(dc_id))


def create_dataset(data):
resp = _api_post('/datasets/dataset', data)


def delete_dataset(ds_id=None, ds_name=None, dc_name=None):
assert ds_id is not None or ds_name is not None or dc_name is not None,\
'One of "ds_id", "ds_name" or "dc_name" must be specified.'
assert (0 if ds_id is None else 1) + (0 if ds_name is None else 1) + (0 if dc_name is None else 1),\
'Only one of "ds_id", "ds_name" or "dc_name" can be specified.'
if ds_id is not None:
ds_ids = [ds_id]
elif ds_name is not None:
ds = get_datasets(ds_name=ds_name)
assert len(ds) <= 1, 'More than one dataset found with the same name'
ds_ids = [d['id'] for d in ds]
else:
ds_ids = [d['id'] for d in get_datasets(conn_name=dc_name)]
for ds_id in ds_ids:
_api_delete('/datasets/dataset/{}?delete_table=false'.format(ds_id))


def get_datasets(ds_name=None, conn_name=None):
resp = _api_get('/datasets/dataset')
return [d for d in resp.json()
if (ds_name is None or d['name'] == ds_name)
and (conn_name is None or d['dc_name'] == conn_name)]


def import_artifacts(dc_name, file_name):
apikey = None
try:
apikey, secret = create_api_key()
headers = {'AUTHORIZATION': 'apikey ' + secret}
payload = {'dry_run': False, 'dataconnection_name': dc_name}
files = {'import_file': open(file_name, 'r')}
_api_post('/migration/api/import/', files=files, data=payload, headers=headers)
finally:
if apikey:
delete_api_key(apikey)


def _get_model():
r = _get_session().post(get_altus_api_url() + '/models/list-models',
json={'projectOwnerName': 'admin',
'latestModelDeployment': True,
'latestModelBuild': True})
models = [m for m in r.json() if m['name'] == 'IoT Prediction Model']
model = None
for m in models:
if m['name'] == _CDSW_MODEL_NAME:
model = m
return model


def _deploy_model(model):
_get_session().post(get_altus_api_url() + '/models/deploy-model', json={
'modelBuildId': model['latestModelBuild']['id'],
'memoryMb': 4096,
'cpuMillicores': 1000,
})


def get_model_access_key():
resp = _api_post('/ssb/jobs/{}/stop'.format(job_name), api_type=_API_EXTERNAL, data=data)
while True:
model = _get_model()
if not model:
status = 'not created yet'
elif 'latestModelDeployment' not in model or 'status' not in model['latestModelDeployment']:
status = 'unknown'
elif model['latestModelDeployment']['status'] == 'deployed':
return model['accessKey']
elif model['latestModelDeployment']['status'] == 'stopped':
_deploy_model(model)
status = 'stopped'
else:
status = model['latestModelDeployment']['status']
LOG.info('Model not deployed yet. Model status is currently "%s". Waiting for deployment to finish.', status)
time.sleep(10)
jobs = get_jobs()
if not any(j['name'] == job_name for j in jobs):
break
time.sleep(1)

# additional wait in case we need to ensure the release of resources, like replication slots
time.sleep(wait_secs)

return resp


def stop_all_jobs():
for job in get_jobs():
stop_job(job['name'])
Loading

0 comments on commit fe5dbc0

Please sign in to comment.