-
Notifications
You must be signed in to change notification settings - Fork 6
使用 MATCH_RECOGNIZE 模式匹配
本节讨论在流数据中如何写SQL查询来做模式匹配。
你将会掌握如下知识点:
- 在SQL:2016中定义的
MATCH_RECOGNIZE
语法; - Flink中支持的语义及限制;
- 如何在Append表中实现模式匹配。
这个练习将教您如何在数据流中使用模式匹配,以及如何在匹配的行集合中做相关计算。
在这个练习中,我们想计算每一次行程的耗时时长,即行程结束时间减去行程开始时间,单位为分钟。这意味着,我们需要基于rideId来做一个开始事件和结束事件的模式查询。
注意: 这个练习跟join练习一摸一样,但是我们想通过使用 MATCH_RECOGNIZE
来解决这个问题。
点击查看提示
- 找到每个
rideId
固定模式 - 这个模式由两类事件组成:一个开始事件和一个结束事件;
- 使用提供的
timeDiff
函数来返回两个时间戳的间隔,单位毫秒。
输出结果如下:
rideId durationMin
52693 13
46868 24
53226 12
53629 11
55651 7
43220 31
53082 12
54716 9
55125 9
57211 4
44795 28
53563 12
点击查看答案
SELECT rideId, timeDiff(startT, endT) / 60000 AS durationMin
FROM Rides
MATCH_RECOGNIZE (
PARTITION BY rideId
ORDER BY rideTime
MEASURES
S.rideTime AS startT,
E.rideTime AS endT
AFTER MATCH SKIP PAST LAST ROW
PATTERN (S E)
DEFINE
S AS S.isStart,
E AS NOT E.isStart
);
This query matches start and end events of the same ride, i.e., that have the same ride id.
By partitioning the table on rideId
, only rides with the same id are processed together.
Events are then distinguished into a start event S
and an end event E
and the pattern to match is defined as (S E)
, i.e., exactly one S
followed by exactly one E
. Finally, we emit for each match, the rideId
and the timestamps of the start and end events, and compute the ride duration in the SELECT
clause using the timeDiff
function.
在这个练习中,我们想找出出租车在开始一个新的行程之前闲置了多长时间。
使用 MATCH_RECOGNIZE
语法来检测这个模式:
- 一个开始事件;
- 对同一个出租车不同行程的潜在中间事件 (任意数量);
- 一个结束事件;
- 下一个开始事件;
计算休息时长,单位为分钟。
点击查看提示
- 使用一个
AFTER MATCH SKIP TO LAST variable
策略来包含下次匹配前的最近一次开始事件。 - 使用内置的
TIMESTAMPDIFF
函数来计算时间差,单位分钟。
输出参考如下:
taxiId ride_start ride_end next_ride_start minutes_of_rest
2013000002 2013-01-01 00:00:00.0 2013-01-01 00:06:00.0 2013-01-01 00:16:00.0 10
2013000004 2013-01-01 00:00:00.0 2013-01-01 00:08:00.0 2013-01-01 00:13:00.0 5
2013000032 2013-01-01 00:00:00.0 2013-01-01 00:05:00.0 2013-01-01 00:06:00.0 1
2013000128 2013-01-01 00:01:00.0 2013-01-01 00:04:00.0 2013-01-01 00:12:00.0 8
2013000256 2013-01-01 00:02:00.0 2013-01-01 00:10:00.0 2013-01-01 00:10:00.0 0
2013000512 2013-01-01 00:03:25.0 2013-01-01 00:04:51.0 2013-01-01 00:10:00.0 5
2013000512 2013-01-01 00:10:00.0 2013-01-01 00:13:31.0 2013-01-01 00:14:19.0 0
2013001028 2013-01-01 00:05:52.0 2013-01-01 00:13:20.0 2013-01-01 00:14:12.0 0
2013000258 2013-01-01 00:02:00.0 2013-01-01 00:08:00.0 2013-01-01 00:11:00.0 3
2013002070 2013-01-01 00:08:57.0 2013-01-01 00:12:26.0 2013-01-01 00:13:46.0 1
点击查看答案
SELECT * FROM Rides
MATCH_RECOGNIZE(
PARTITION BY taxiId
ORDER BY rideTime
MEASURES
START_RIDE.rideTime AS ride_start,
END_RIDE.rideTime AS ride_end,
NEXT_RIDE.rideTime AS next_ride_start,
TIMESTAMPDIFF(MINUTE, END_RIDE.rideTime, NEXT_RIDE.rideTime) AS minutes_of_rest
AFTER MATCH SKIP TO LAST NEXT_RIDE
PATTERN (START_RIDE M* END_RIDE NEXT_RIDE)
DEFINE
START_RIDE AS START_RIDE.isStart = true,
M AS M.rideId <> START_RIDE.rideId,
END_RIDE AS END_RIDE.isStart = false,
NEXT_RIDE AS NEXT_RIDE.isStart = true
);
The query matches the pattern mentioned above. The variable START_RIDE
detects the start event. M
defines a greedy set of ride events that don't belong to the same ride of the start event. The END_RIDE
detects the end event. The NEXT_RIDE
variable defines the following start event for computing the resting time. Once a match has been detected, we measure the difference in minutes and return it.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.