Let’s imagine we’ve got a source of data with a nested array of multiple values. The data is from an IoT device. Each device has multiple sensors, each sensor provides a reading.
A sample record might look like this:
{
"device_id": "device-001",
"reading_time": "2025-03-03T12:00:00",
"measurements": [
{
"sensor_id": "sensor-1",
"value": 22.5
},
{
"sensor_id": "sensor-2",
"value": 45.2
},
{
"sensor_id": "sensor-3",
"value": 1013.25
}
]
}
What I want to do is understand how in Flink SQL I can unnest the array so that from the above record I’d get something like this, over three records:
{
"device_id": "device-001",
"reading_time": "2025-03-03T12:00:00",
"sensor_id": "sensor-1",
"value": 22.5
}
{
"device_id": "device-001",
"reading_time": "2025-03-03T12:00:00",
"sensor_id": "sensor-2",
"value": 45.2
}
{
"device_id": "device-001",
"reading_time": "2025-03-03T12:00:00",
"sensor_id": "sensor-3",
"value": 1013.25
}
Let’s set up the table that will hold the source data. I’m using filesystem connector, but the data could be coming from anywhere really, including Kafka.
CREATE TABLE iot_1 (
device_id STRING,
reading_time TIMESTAMP(3),
measurements ARRAY<ROW<
`sensor_id` STRING NOT NULL,
`value` DOUBLE NOT NULL
> NOT NULL>
) WITH (
'connector' = 'filesystem',
'path' = '/tmp/measurements.json',
'format' = 'json'
);
When we query the data we can see the array measurements
:
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SELECT * FROM iot_1 LIMIT 1;
+----+-------------+-------------------------+--------------------------------+
| op | device_id | reading_time | measurements |
+----+-------------+-------------------------+--------------------------------+
| +I | device-001 | 2025-03-03 12:00:00.000 | [(sensor-1, 22.5), (sensor-... |
+----+-------------+-------------------------+--------------------------------+
Received a total of 1 rows (0.23 seconds)
Array elements can be accessed by index:
Flink SQL> SELECT measurements[1] FROM iot_1 LIMIT 1;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | (sensor-1, 22.5) |
+----+--------------------------------+
Received a total of 1 row (0.29 seconds)
To explode the array (also known as 'array expansion' or 'unnesting'), use UNNEST
with the CROSS JOIN
syntax:
SELECT device_id, reading_time, m.sensor_id, m.`value`
FROM iot_1
CROSS JOIN UNNEST(measurements) AS m;
+----+------------+-------------------------+-----------+---------+
| op | device_id | reading_time | sensor_id | value |
+----+------------+-------------------------+-----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-1 | 22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-2 | 45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-3 | 1013.25 |
[…]
Here, m
is the alias for the UNNEST
of the measurements
column. In the SELECT
column projection we then just reference the nested column names from the array directly (backtick-quoting the reserved word value
though). You can check the field names with DESCRIBE
:
Flink SQL> DESCRIBE iot_1;
+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+
| device_id | STRING | TRUE | | | |
| reading_time | TIMESTAMP(3) | TRUE | | | |
| measurements | ARRAY<ROW<`sensor_id` STRING NOT NULL, `value` DOUBLE NOT NULL> NOT NULL> | TRUE | | | |
+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+
3 rows in set
Another option is to alias the columns in the UNNEST
based on their position:
SELECT device_id, reading_time, foo, bar
FROM iot_1
CROSS JOIN UNNEST(measurements) AS m(foo, bar);
+----+------------+-------------------------+----------+---------+
| op | device_id | reading_time | foo | bar |
+----+------------+-------------------------+----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-1 | 22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-2 | 45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-3 | 1013.25 |
[…]
Here device_id
becomes foo
, because it’s the first column in the nested array and we’ve aliased it in the UNNEST
statement. This approach is useful if you want to avoid the issue with reserved words (as seen above, with value
). However, it also risks problems if the schema were to change and the order of the columns change, or new ones be added—since the aliases only apply to the position of the columns.
What if we don’t care about the column names, and just want everything out of the array? In this example there are only two columns but if there were dozens then typing all of these into the query would be somewhat tedious.
Fortunately, *
against the m
(the alias of the UNNEST
) is valid:
SELECT device_id, reading_time, m.*
FROM iot_1
CROSS JOIN UNNEST(measurements) AS m;
+----+------------+-------------------------+-----------+---------+
| op | device_id | reading_time | sensor_id | value |
+----+------------+-------------------------+-----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-1 | 22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-2 | 45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-3 | 1013.25 |
[…]
To persist this transformation we use the CREATE TABLE AS SELECT
syntax:
CREATE TABLE iot_1_exploded
WITH (
'connector' = 'filesystem',
'path' = '/tmp/measurements-exploded.json',
'format' = 'json'
) AS
SELECT device_id, reading_time, m.*
FROM iot_1
CROSS JOIN UNNEST(measurements) AS m;
since we’re using the filesystem connector (but any other would work) we can examine the resulting JSON:
$ cat /tmp/measurements-exploded.json/*
{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-1","value":22.5}
{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-2","value":45.2}
{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-3","value":1013.25}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-1","value":23.1}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-2","value":44.8}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-3","value":1012.75}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-1","value":23.8}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-2","value":43.5}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-3","value":1012.5}