Skip to main content

Why Temporal Join is Stream Processing’s Superpower

· 9 min read
Matthias Broecheler

Stream processing technologies like Apache Flink introduce a new type of data transformation that’s very powerful: the temporal join. Temporal joins add context to data streams while being efficient and fast to execute.

Temporal Join >

This article introduces the temporal join, compares it to the traditional inner join, explains when to use it, and why it is a secret superpower.

Table of Contents:

The Join: A Quick Review

Let's take a quick detour down memory lane and revisit the good ol' join operation. That trusty sidekick in your SQL utility belt helps you link data from two or more tables based on a related column between them.

Suppose we are operating a factory with a number of machines that roast and package coffee. We place sensors on each machine to monitor the temperature and detect overheating.

We keep track of the sensors and machines in two database tables.

The Sensor table contains the serial number and machine id that the sensor is placed on.

idserialNomachineid
1X57-774501
2X33-453203
3X54-554501

The Machine table contains the name of each machine.

idname
203Iron Roaster
501Gritty Grinder

To identify all the sensors on the machine “Iron Roaster” we use the following SQL query which joins the Sensor and Machine tables:

SELECT s.id, s.serialNo FROM Sensor s 
JOIN Machine m ON s.machineid = m.id
WHERE m.name = “Iron Roaster”

Why are joins as important as your morning coffee? Without it, your data tables are like islands, isolated and lonely. Joins bring them together, creating meaningful relationships between data, and enriching data records with context to see the bigger picture.

By default, databases execute joins as inner joins which means only matching records are included in the join.

So, now that we've refreshed our memory about the classic join, let's dive into the exciting world of temporal joins in stream processing systems like Apache Flink.

The Temporal Join: Linking Stream and State

Temporal Join DeLorean >

Picture this: you're a time traveler. You have the power to access any point in time, past or future, at your will. Now, imagine that your data could do the same. Enter the Temporal Join, the DeLorean of data operations, capable of taking your data on a time-traveling adventure.

A Temporal Join is like a regular join but with a twist. It allows you to join a stream of data (the time traveler) with a versioned table (the timeline) based on the time attribute of the data stream. This means that for each record in the stream, the join will find the most recent record in the versioned table that is less than or equal to the stream record's time.

The versioned table is a normal state table where we keep track of data changes over time. That is, we keep older versions of each record around to allow the stream to match the correct version in time. Like time travel, temporal joins can make your head spin a bit. Let’s look at an example to break it down.

Temporal Join vs Inner Join

Back to our coffee roasting factory, we collect the temperature readings from each sensor in a data stream.

timestampsensoridtemperature
2023-07-10T07:11:081105.2
2023-07-10T07:11:08283.1
...
2023-07-10T13:25:16177.8
2023-07-10T13:25:16283.5

And we want to know the maximum temperature recorded for each machine.

Easy enough, let’s join the temperature data stream with the Sensors table and aggregate by machine id:

SELECT s.machineid, MAX(r.temperature) AS maxTemp 
FROM SensorReading r INNER JOIN Sensor s
ON r.sensorid = s.id GROUP BY s.machineid

But here is a problem: What if we moved a sensor from one machine to another during the day? With an inner join, all of the sensor’s readings would be linked to the machine it was last placed on. So, if sensor 1 records a high temperature of 105 degrees in the morning and we move the sensor to the “Iron Roaster” machine in the afternoon, then we might see the 105 degrees falsely show up as the maximum temperature for the Iron Roaster. See how time played a trick on our join?

And this happens whenever we join a data stream with a state table that changes over time, like our sensors that get moved around the factory. What to do? Let’s call the temporal join to our rescue:

SELECT s.machineid, MAX(r.temperature) AS maxTemp 
FROM SensorReading r TEMPORAL JOIN Sensor s
ON r.sensorid = s.id GROUP BY s.machineid

Pretty much the same query, just a different join type. Just a heads-up: the syntax for temporal joins in Flink SQL is more complex - we'll get to that later.

As a temporal join, we are joining each sensor reading with the version of the sensor record at the time of the data stream. In other words, the join not only matches the sensor reading with the sensor record based on the id but also based on the timestamp of the reading to ensure it matches the right version of the sensor record. Pretty neat, right?

Whenever you join a data stream with a state that changes over time, you want to use the temporal join to make sure your data is lined up correctly in time. Temporal joins are a powerful feature of stream processing engines that would be difficult to implement in a database.

Why Temporal Joins are Fast and Efficient

Apache Flink >

Not only do temporal joins solve the time-alignment problem when joining data streams with changing state, modern stream processors like Apache Flink are also incredibly efficient at executing temporal joins. A powerful feature with great performance? Sounds too good to be true. Let’s peek behind the stream processing curtain to find out why.

In stream processing, joins are maintained as the underlying data changes over time. That requires the stream engine to hold all the data it needs to update join records when either side of the join changes. This makes inner joins pretty expensive on data streams.

Consider our max-temperature query with the inner join: When we join a temperature reading with the corresponding sensor record, and that record changes, the engine has to update the result join record. To do so, it has to store all the sensor readings to determine which join results are affected by a change in a sensor record. This can lead to a lot of updates and hence a lot of downstream computation. It can also cause system failure when there are a lot of temperature readings in our data stream because the stream engine has to store all of them.

Temporal joins, on the other hand, can be executed much more efficiently. The stream engine only needs to store the versions of the sensor table that are within the time bounds of the sensor reading data stream. And it only has to briefly store (if at all) the sensor reading records to ensure they are joined with the most up-to-date sensor records. Moreover, temporal joins don’t require sending out a massive amount of updated join records when sensors change placement since the join is fixed in time.

Temporal Joins Made Easy to Use

DataSQRL >

Because temporal joins are so powerful, we made them easy to use in DataSQRL. DataSQRL is a compiler for Apache Flink that builds integrated data pipelines for your event-driven or streaming applications. DataSQRL supports the simplified temporal join syntax shown in the queries above. In addition, DataSQRL defaults to a temporal join whenever you join a state and a stream table on the state table’s primary key. In that way, DataSQRL helps you pick the right join for your data and makes it easy for developers new to stream processing.

Apache Flink supports temporal joins in Flink SQL using the following syntax:

SELECT s.machineid, MAX(r.temperature) AS maxTemp 
FROM SensorReading AS r JOIN Sensor
FOR SYSTEM_TIME AS OF SensorReading.timestamp AS s
ON r.sensorid = s.id GROUP BY s.machineid

You need to be careful that the join column for the state table is the primary key of that table and that you set the timestamp for the SensorReading table. DataSQRL does that for you automatically based on watermark.

Time to Wrap Up This Temporal Journey

We've reached the end of our time-traveling adventure through the universe of temporal joins. We've seen how they're like the DeLorean of data operations, zipping us back and forth through time to make sure our data matches up just right. We've also compared them to the good ol' inner join.

Temporal joins help us avoid the pitfalls of time-alignment problems when joining data streams with changing state. They're also super efficient, making them a great choice for high-volume, real-time data processing.

And that’s why the temporal join is stream processing's secret superpower.

DataSQRL makes using temporal joins a breeze. With its simplified syntax and smart defaults, it's like having a personal tour guide leading you through the sometimes bewildering landscape of stream processing. Take a look at our IoT tutorial to see a complete example of temporal joins in action or take a look at our extended tutorial for a step-by-step guide to stream processing including temporal joins. And we are here to help if you have any questions.

Happy data time-traveling, folks!