The goal of this exercise is to join together the TaxiRide
and TaxiFare
records for each ride.
For each distinct rideId
, there are exactly three events:
- a
TaxiRide
START event - a
TaxiRide
END event - a
TaxiFare
event (whose timestamp happens to match the start time)
The result should be a DataStream<Tuple2<TaxiRide, TaxiFare>>
, with one record for each distinct rideId
. Each tuple should pair the TaxiRide
START event for some rideId
with its matching TaxiFare
.
For this exercise you will work with two data streams, one with TaxiRide
events generated by a TaxiRideSource
and the other with TaxiFare
events generated by a TaxiFareSource
. See Using the Taxi Data Streams for information on how to download the data and how to work with these stream generators.
The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare>
records, one for each distinct rideId
. The exercise is setup to ignore the END events, and you should join the event for the START of each ride with its corresponding fare event.
The resulting stream is printed to standard out.
ℹ️ Rather than following these links to the sources, you might prefer to open these classes in your IDE.
- Java:
org.apache.flink.training.exercises.ridesandfares.RidesAndFaresExercise
- Scala:
org.apache.flink.training.exercises.ridesandfares.scala.RidesAndFaresExercise
- Java:
org.apache.flink.training.exercises.ridesandfares.RidesAndFaresTest
- Scala:
org.apache.flink.training.exercises.ridesandfares.scala.RidesAndFaresTest
Program Structure
You can use a RichCoFlatMap
to implement this join operation. Note that you have no control over the order of arrival of the ride and fare records for each rideId, so you'll need to be prepared to store either piece of information until the matching info arrives, at which point you can emit a Tuple2<TaxiRide, TaxiFare>
joining the two records together.
Working with State
You should be using Flink's managed, keyed state to buffer the data that is being held until the matching event arrives. And be sure to clear the state once it is no longer needed.
For the purposes of this exercise it's okay to assume that the START and fare events are perfectly paired. But in a real-world application you should worry about the fact that whenever an event is missing, the other event for the same rideId
will be held in state forever. In a later lab we'll look at the ProcessFunction
and Timers which may also help the situation here.
Reference solutions are available in this project:
- Java:
org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution
- Scala:
org.apache.flink.training.solutions.ridesandfares.scala.RidesAndFaresSolution