Parallelizing aggregates in YugabyteDB

Franck Pachot - Aug 19 '22 - - Dev Community

In the previous post we have seen a way to simulate loose index scan wit a recursive CTE and a covering range index. I used it to get the last value for each truck_id. Now I want to count the readings per each truck_id. This has to scan all rows. However, I can use the previous technique to generate the statements to do this in parallel.

Example

I'm using the truck_readingstable and truck_last_reading index created in the previous post where I inserted 10000 readings for each of the 2000 trucks.

Note that I have created the index with 10 tablets (in real live, this is done by auto_splitting when table grows):

create index truck_last_reading on truck_readings 
( truck_id asc, ts asc) 
split at values((100),(200),(300),(400),(500),(600),(700),(800),(1000));
Enter fullscreen mode Exit fullscreen mode

Method 1: generating parallel statements

First, I use the Recursive CTE technique to create a table which lists each truck_id and has a placeholder to put the count:

create table tmp_partial_count as
with recursive truck_last_reading as (
 (
  select
   last_truck_last_reading.truck_id
   from truck_readings last_truck_last_reading
  order by
   last_truck_last_reading.truck_id desc,
   last_truck_last_reading.ts desc
  limit 1
 )
union all
 select
  next_truck_last_reading.truck_id
 from truck_last_reading, lateral
 (
  select truck_id from truck_readings
  where truck_id < truck_last_reading.truck_id
  order by truck_id desc, ts desc limit 1
 )
 as next_truck_last_reading
) select truck_id, null::int as partial_count
from truck_last_reading
;
Enter fullscreen mode Exit fullscreen mode

This takes 2 seconds here:

...
SELECT 2000
Time: 2464.776 ms (00:02.465)
yugabyte=#
Enter fullscreen mode Exit fullscreen mode

Then I generate, from this table, the statements to count the reading for each truck_id and update it in my temporary table. I generate the psql statements to run in background with &. I add a wait every 50 jobs to limit the number of connections

\pset format unaligned
\pset tuples_only on
\pset footer off
\o tmp_partial_count.sh
select format($$
psql -d postgres://yugabyte:YugabyteDB@yb0.pachot.net:5433/yugabyte -c 'update tmp_partial_count 
set partial_count=(
select count(*) from truck_readings where truck_id=%s
) where truck_id=%s' &
%s
$$
,truck_id,truck_id
,case when mod(row_number()over(), 200 )=0 then 'wait' end
)
from tmp_partial_count where partial_count is null
;
select 'wait';
\o
Enter fullscreen mode Exit fullscreen mode

This generates the script in tmp_partial_count.sh:

...
yugabyte-# from tmp_partial_count where partial_count is null
yugabyte-# ;
select 'wait';
\oTime: 47.225 ms
yugabyte=# select 'time wait';
Time: 11.529 ms
yugabyte=# \o
Enter fullscreen mode Exit fullscreen mode

I generate it only for the rows where no partial count is there.

I can run this:

\! time sh tmp_partial_count.sh
Enter fullscreen mode Exit fullscreen mode

This lasted half a minute in my lab (a small lab, but the point is that it can scale):

...
UPDATE 1
UPDATE 1
UPDATE 1

real    0m55.229s
user    0m5.690s
sys     0m4.904s
yugabyte=#

Enter fullscreen mode Exit fullscreen mode

I can check that my index is used with an Index Only Scan, which is why it scales (each parallel query reads a different range):

yugabyte=# explain (analyze, costs off) 
           update tmp_partial_count set partial_count=(
             select count(*) from truck_readings
             where truck_id=42
           ) where truck_id=42;

                                                       QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
 Update on tmp_partial_count (actual time=50.422..50.422 rows=0 loops=1)
   InitPlan 1 (returns $0)
     ->  Aggregate (actual time=41.160..41.160 rows=1 loops=1)
           ->  Index Only Scan using truck_last_reading on truck_readings (actual time=4.255..40.626 rows=10000 loops=1)
                 Index Cond: (truck_id = 42)
                 Heap Fetches: 0
   ->  Seq Scan on tmp_partial_count (actual time=50.300..50.394 rows=1 loops=1)
         Filter: (truck_id = 42)
         Rows Removed by Filter: 1999
 Planning Time: 0.091 ms
 Execution Time: 53.597 ms
 Peak Memory Usage: 24 kB
(12 rows)
Enter fullscreen mode Exit fullscreen mode

Again, my lab is small here. With a large cluster the index is probably split (by range) on multiple tablets. This is automatic if you enabled auto-split.

Finally, I check that I have all partial updates:

yugabyte=# 
           select count(*),count(partial_count),sum(partial_count)
           from tmp_partial_count;

 count | count |   sum
-------+-------+----------
  2000 |  2000 | 20000000
(1 row)
Enter fullscreen mode Exit fullscreen mode

Method 2: use Seq Scan parallelism

I'm writing this with YugabyteDB 2.15 and you can expect some optimizations in future versions to parallelize and push down the aggregates. First, let's check the execution plan for one query:

yugabyte=#
             explain (costs off, analyze)
             select   truck_id, count(*) from truck_readings 
             where truck_id is not null
             group by truck_id
;

                                      QUERY PLAN
---------------------------------------------------------------------------------------
 HashAggregate (actual time=65548.176..65548.601 rows=2000 loops=1)
   Group Key: truck_id
   ->  Seq Scan on truck_readings (actual time=3.170..62186.109 rows=20000000 loops=1)
 Planning Time: 3.297 ms
 Execution Time: 65549.386 ms
 Peak Memory Usage: 12665 kB
(6 rows)
Enter fullscreen mode Exit fullscreen mode

This takes one minute. On my lab this is not slower than my complex method above, but on a larger volume, this would not scale because one tablet is read at a time.

YugabyteDB can parallelize the Seq Scan to read the tablets in parallel (the default --ysql_select_parallelism=-1 makes it calculated from the number of tservers: 2 per server, bounded between 2 and 16](https://github.com/yugabyte/yugabyte-db/blob/2.15.2/src/yb/yql/pggate/pg_doc_op.cc#L762)) and I have 3 tservers in my lab. Because reading all rows in parallel would saturate the single YSQL process (the PostgreSQL backend), YugabyteDB parallelized the SeqScan only when there is an expression pushed down (Remote Filter). Here I force this with a dummy where truck_id is not null:

yugabyte=#   set yb_enable_expression_pushdown=true;
yugabyte=#
             explain (costs off, analyze) 
             select   truck_id, count(*) from truck_readings 
             where truck_id is not null
             group by truck_id
;
                                      QUERY PLAN
---------------------------------------------------------------------------------------
 HashAggregate (actual time=39623.390..39623.786 rows=2000 loops=1)
   Group Key: truck_id
   ->  Seq Scan on truck_readings (actual time=4.386..34862.379 rows=20000000 loops=1)
         Remote Filter: (truck_id IS NOT NULL)
 Planning Time: 0.071 ms
 Execution Time: 39624.001 ms
 Peak Memory Usage: 417 kB
(7 rows)

Time: 39636.634 ms (00:39.637)
Enter fullscreen mode Exit fullscreen mode

Here, with a much simple method, I get better results. But remember this depends on your server and your data. Reading the tablets was parallelized but all rows has to be fetched and aggregated by a single backend, which means that it is not as scalable as running queries in parallel from multiple connections.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .