-
Notifications
You must be signed in to change notification settings - Fork 465
/
Copy pathct_transform.slt
144 lines (108 loc) · 2.72 KB
/
ct_transform.slt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
mode cockroach
# Stream Table Join
statement ok
CREATE TABLE small (pk INT, val STRING)
statement ok
INSERT INTO small VALUES (1, 'v0')
statement ok
CREATE TABLE big (fk INT)
statement ok
CREATE CONTINUAL TASK stj
FROM TRANSFORM big USING
(SELECT big.fk, small.val FROM big JOIN small ON big.fk = small.pk)
statement ok
INSERT INTO big VALUES (1)
statement ok
UPDATE small SET val = 'v1'
statement ok
INSERT INTO big VALUES (1)
query IT
SELECT * FROM stj
----
1 v0
1 v1
# Audit Log
statement ok
CREATE TABLE anomalies (a INT);
statement ok
CREATE MATERIALIZED VIEW anomalies_mv AS SELECT * FROM anomalies
statement ok
CREATE CONTINUAL TASK audit_log
FROM TRANSFORM anomalies_mv USING
(TABLE anomalies_mv)
statement ok
INSERT INTO anomalies VALUES (1)
statement ok
DELETE FROM anomalies
query I
SELECT * FROM anomalies_mv
----
query I
SELECT * FROM audit_log
----
1
# Stateless Source Transformation
statement ok
CREATE TABLE source_raw (ts STRING);
statement ok
CREATE CONTINUAL TASK source_cleaned
FROM TRANSFORM source_raw USING
(SELECT ts::timestamptz FROM source_raw)
query T
SELECT try_parse_monotonic_iso8601_timestamp('2024-10-11T15:28:01')
----
NULL
statement ok
INSERT INTO source_raw VALUES ('2024-10-11T15:28:01')
query T
SELECT * FROM source_cleaned
----
2024-10-11 15:28:01+00
# Idempotency Keys
statement ok
CREATE TABLE maybe_dup (val STRING, idem_key INT)
statement ok
CREATE CONTINUAL TASK deduped
FROM TRANSFORM maybe_dup USING
(SELECT * FROM maybe_dup WHERE idem_key NOT IN (SELECT idem_key FROM deduped))
statement ok
INSERT INTO maybe_dup VALUES ('orig-1', 1), ('orig-2', 2)
statement ok
INSERT INTO maybe_dup VALUES ('nope', 1), ('yep', 3)
query T
SELECT val FROM deduped ORDER BY val
----
orig-1
orig-2
yep
# Demultiplexing Webhook Sources
statement ok
CREATE TABLE events (data JSONB)
statement ok
CREATE CONTINUAL TASK events_load
FROM TRANSFORM events USING
(SELECT data->>'user' FROM events WHERE data->>'type' = 'load')
statement ok
CREATE CONTINUAL TASK events_click
FROM TRANSFORM events USING
(SELECT data->>'foo' FROM events WHERE data->>'type' = 'click')
statement ok
INSERT INTO events VALUES ('{"type": "load", "user": "alice"}')
statement ok
INSERT INTO events VALUES ('{"type": "click", "foo": "bar"}')
query T
SELECT * FROM events_load
----
alice
query T
SELECT * FROM events_click
----
bar