How to explode nested arrays with Flink SQL

Robin Moffatt - Mar 4 - - Dev Community

Let’s imagine we’ve got a source of data with a nested array of multiple values. The data is from an IoT device. Each device has multiple sensors, each sensor provides a reading.

A sample record might look like this:

{
    "device_id": "device-001",
    "reading_time": "2025-03-03T12:00:00",
    "measurements": [
        {
            "sensor_id": "sensor-1",
            "value": 22.5
        },
        {
            "sensor_id": "sensor-2",
            "value": 45.2
        },
        {
            "sensor_id": "sensor-3",
            "value": 1013.25
        }
    ]
}
Enter fullscreen mode Exit fullscreen mode

What I want to do is understand how in Flink SQL I can unnest the array so that from the above record I’d get something like this, over three records:

{
    "device_id": "device-001",
    "reading_time": "2025-03-03T12:00:00",
    "sensor_id": "sensor-1",
    "value": 22.5
}
{
    "device_id": "device-001",
    "reading_time": "2025-03-03T12:00:00",
    "sensor_id": "sensor-2",
    "value": 45.2
}
{
    "device_id": "device-001",
    "reading_time": "2025-03-03T12:00:00",
    "sensor_id": "sensor-3",
    "value": 1013.25
}
Enter fullscreen mode Exit fullscreen mode

Let’s set up the table that will hold the source data. I’m using filesystem connector, but the data could be coming from anywhere really, including Kafka.

CREATE TABLE iot_1 (
    device_id STRING,
    reading_time TIMESTAMP(3),
    measurements ARRAY<ROW<
        `sensor_id` STRING NOT NULL,
        `value` DOUBLE NOT NULL
    > NOT NULL>
) WITH (
    'connector' = 'filesystem',
    'path' = '/tmp/measurements.json',
    'format' = 'json'
);
Enter fullscreen mode Exit fullscreen mode

When we query the data we can see the array measurements:

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';

Flink SQL> SELECT * FROM iot_1 LIMIT 1;
+----+-------------+-------------------------+--------------------------------+
| op |   device_id |            reading_time |                   measurements |
+----+-------------+-------------------------+--------------------------------+
| +I |  device-001 | 2025-03-03 12:00:00.000 | [(sensor-1, 22.5), (sensor-... |
+----+-------------+-------------------------+--------------------------------+
Received a total of 1 rows (0.23 seconds)
Enter fullscreen mode Exit fullscreen mode

Array elements can be accessed by index:

Flink SQL> SELECT measurements[1] FROM iot_1 LIMIT 1;

+----+--------------------------------+
| op |                         EXPR$0 |
+----+--------------------------------+
| +I |               (sensor-1, 22.5) |
+----+--------------------------------+
Received a total of 1 row (0.29 seconds)
Enter fullscreen mode Exit fullscreen mode

To explode the array (also known as 'array expansion' or 'unnesting'), use UNNEST with the CROSS JOIN syntax:

SELECT device_id, reading_time, m.sensor_id, m.`value`
  FROM iot_1
        CROSS JOIN UNNEST(measurements) AS m;

+----+------------+-------------------------+-----------+---------+
| op |  device_id |            reading_time | sensor_id |   value |
+----+------------+-------------------------+-----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-1 |    22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-2 |    45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-3 | 1013.25 |
[…]
Enter fullscreen mode Exit fullscreen mode

Here, m is the alias for the UNNEST of the measurements column. In the SELECT column projection we then just reference the nested column names from the array directly (backtick-quoting the reserved word value though). You can check the field names with DESCRIBE:

Flink SQL> DESCRIBE iot_1;

+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+
|         name |                                                                      type | null | key | extras | watermark |
+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+
|    device_id |                                                                    STRING | TRUE |     |        |           |
| reading_time |                                                              TIMESTAMP(3) | TRUE |     |        |           |
| measurements | ARRAY<ROW<`sensor_id` STRING NOT NULL, `value` DOUBLE NOT NULL> NOT NULL> | TRUE |     |        |           |
+--------------+---------------------------------------------------------------------------+------+-----+--------+-----------+

3 rows in set
Enter fullscreen mode Exit fullscreen mode

Another option is to alias the columns in the UNNEST based on their position:

SELECT device_id, reading_time, foo, bar
  FROM iot_1
        CROSS JOIN UNNEST(measurements) AS m(foo, bar);

+----+------------+-------------------------+----------+---------+
| op |  device_id |            reading_time |      foo |     bar |
+----+------------+-------------------------+----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-1 |    22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-2 |    45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 | sensor-3 | 1013.25 |
[…]
Enter fullscreen mode Exit fullscreen mode

Here device_id becomes foo, because it’s the first column in the nested array and we’ve aliased it in the UNNEST statement. This approach is useful if you want to avoid the issue with reserved words (as seen above, with value). However, it also risks problems if the schema were to change and the order of the columns change, or new ones be added—since the aliases only apply to the position of the columns.

What if we don’t care about the column names, and just want everything out of the array? In this example there are only two columns but if there were dozens then typing all of these into the query would be somewhat tedious.

Fortunately, * against the m (the alias of the UNNEST) is valid:

SELECT device_id, reading_time, m.*
  FROM iot_1
       CROSS JOIN UNNEST(measurements) AS m;

+----+------------+-------------------------+-----------+---------+
| op |  device_id |            reading_time | sensor_id |   value |
+----+------------+-------------------------+-----------+---------+
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-1 |    22.5 |
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-2 |    45.2 |
| +I | device-001 | 2025-03-03 12:00:00.000 |  sensor-3 | 1013.25 |
[…]
Enter fullscreen mode Exit fullscreen mode

To persist this transformation we use the CREATE TABLE AS SELECT syntax:

CREATE TABLE iot_1_exploded
    WITH (
        'connector' = 'filesystem',
        'path' = '/tmp/measurements-exploded.json',
        'format' = 'json'
    ) AS
    SELECT device_id, reading_time, m.*
    FROM iot_1
        CROSS JOIN UNNEST(measurements) AS m;
Enter fullscreen mode Exit fullscreen mode

since we’re using the filesystem connector (but any other would work) we can examine the resulting JSON:

$ cat /tmp/measurements-exploded.json/*

{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-1","value":22.5}
{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-2","value":45.2}
{"device_id":"device-001","reading_time":"2025-03-03 12:00:00","sensor_id":"sensor-3","value":1013.25}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-1","value":23.1}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-2","value":44.8}
{"device_id":"device-002","reading_time":"2025-03-03 12:05:00","sensor_id":"sensor-3","value":1012.75}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-1","value":23.8}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-2","value":43.5}
{"device_id":"device-003","reading_time":"2025-03-03 12:10:00","sensor_id":"sensor-3","value":1012.5}
Enter fullscreen mode Exit fullscreen mode
. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .