The goal of the "Long Ride Alerts" exercise is to provide a real-time warning whenever a taxi ride started two hours ago, and is still ongoing.
This should be done using the event time timestamps and watermarks that are provided in the data stream.
The stream is out-of-order, and it is possible that the END event for a ride will be processed before its START event. But in such cases, we never care to create an alert, since we do know that the ride has ended.
The input data of this exercise is a DataStream
of taxi ride events.
The goal of this exercise is not to find all rides that lasted for more than two hours, but rather to create an alert in real time at the moment it becomes known that a ride has been going on for more than two hours.
The result of the exercise should be a DataStream<TaxiRide>
that only contains START events of
taxi rides that started two hours earlier, and whose END event hasn't yet arrived.
The resulting stream should be printed to standard out.
ℹ️ Rather than following these links to the sources, you might prefer to open these classes in your IDE.
- Java:
org.apache.flink.training.exercises.longrides.LongRidesExercise
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesExercise
- Java:
org.apache.flink.training.exercises.longrides.LongRidesTest
- Scala:
org.apache.flink.training.exercises.longrides.scala.LongRidesTest
Overall approach
This exercise revolves around using a ProcessFunction
to manage some keyed state and event time timers,
and doing so in a way that works even when the END event for a given rideId
arrives before the START (which can happen).
The challenge is figuring out what state to keep, and when to set and clear that state.
You will want to use event time timers that fire two hours after an incoming START event, and in the onTimer()
method,
collect START events to the output only if a matching END event hasn't yet arrived.
State and timers
There are many possible solutions for this exercise, but in general it is enough to keep one
TaxiRide
in state (one TaxiRide
for each key, or rideId
). The approach used in the reference solution is to
store whichever event arrives first (the START or the END), and if it's a START event,
create a timer for two hours later. If and when the other event (for the same rideId
) arrives,
carefully clean things up.
It is possible to arrange this so that if onTimer()
is called, you are guaranteed that
an alert (i.e., the ride kept in state) should be emitted. Writing the code this way conveniently
puts all of the complex business logic together in one place (in the processElement()
method).
Reference solutions are available at GitHub:
- Java API:
org.apache.flink.training.solutions.longrides.LongRidesSolution
- Scala API:
org.apache.flink.training.solutions.longrides.scala.LongRidesSolution
Lab Discussion: ProcessFunction
and Timers (Long Ride Alerts)