The question of co-partitioning or interleaving frequently arises when discussing Distributed SQL features. The idea of co-locating rows that are commonly joined intuitively suggests improved performance. However, it's essential to base this decision on concrete facts by measuring the impact of joining rows from multiple nodes. Premature optimization, when unnecessary, can hinder data distribution agility.
In this blog post, I will provide insights into how cross-node joins perform and scale in YugabyteDB, using a simple test case as an example.
The history of database technologies demonstrates that co-locating rows from different tables never got the popularity expected from it.
Here are a few examples coming to my mind:
- Oracle CLUSTER, which was used for pre-joining rows but proved impractical for tables that evolve in structure or size.
- Google Spanner, which initially supported only interleaved foreign keys but eventually removed this limitation.
- CockroachDB, which finally deprecated interleaved tables, given the small pros and the large number of cons.
- YugabyteDB considered those alternatives but opted to improve cross-node joins and co-locate only the small tables that do not need to be distributed. This can be reconsidered if a real use case shows the need to co-partition.
These real-world cases underscore the importance of making informed decisions when considering co-partitioning and interleaving.
Here is a small demo with two tables where I'll show the execution plan to understand the cost of joining two tables that are distributed independently.
create table demo1 (
primary key (id asc)
, id bigserial
, value1 float
);
create table demo2 (
primary key (id asc, num asc)
, id bigint not null references demo1(id)
, num int
, value2 float
);
insert into demo1(value1)
select generate_series
from generate_series(1,100)
;
insert into demo2(id, num, value2)
select id, generate_series, random()
from demo1 cross join generate_series(1,10000)
;
analyze demo1, demo2;
I'm running this on YugabyteDB Managed distributed over 3 availability zones:
Batched Nested Loops
I'm doing this test with Batched Nested Loops and Optimizer Statistics enabled, which is not yet the default in the version I'm using (YugabyteDB 2.19.2)
set yb_bnl_batch_size to 1024;
set yb_enable_optimizer_statistics to on;
Here is a simple join as an example. Feel free to test with different volumes. The most important is the execution plan.
yugabyte=# explain (analyze, dist, costs off)
select *
from demo1
join demo2 using(id)
where value1>0.5
and value2>0.5
;
QUERY PLAN
----------------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=9.269..3943.405 rows=500040 loops=1)
Join Filter: (demo1.id = demo2.id)
-> Seq Scan on demo1 (actual time=0.884..0.900 rows=100 loops=1)
Remote Filter: (value1 > '0.5'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.789 ms
-> Index Scan using demo2_pkey on demo2 (actual time=8.263..3669.042 rows=500040 loops=1)
Index Cond: (id = ANY (ARRAY[demo1.id, $1, $2, ..., $1023]))
Remote Filter: (value2 > '0.5'::double precision)
Storage Table Read Requests: 489
Storage Table Read Execution Time: 3446.736 ms
Planning Time: 0.683 ms
Execution Time: 3971.439 ms
Storage Read Requests: 490
Storage Read Execution Time: 3447.525 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 3447.525 ms
Peak Memory Usage: 442 kB
(21 rows)
500040 rows have been fetched from the join to demo2
in 4 seconds from a cluster distributed to multiple availability zones.
What is the cost of joining between tablets that can be in remote nodes? 490 remote read request, one to get 100 rows from demo1
and 489 to get the matching 500040 rows from demo2
.
This response time is predictable and configurable The number of read requests depends on yb_bnl_batch_size
, ysql_request_limit
, ysql_prefetch_limit
which are set to 1024. The read request time depends on the deployment: latency between nodes and number of nodes.
Follower reads
For example, I can avoid cross-AZ calls by reading from a state of 15 seconds ago, which allows to read from the closest replica:
yugabyte=> begin transaction read only;
BEGIN
yugabyte=*> set yb_follower_read_staleness_ms=15000;
SET
yugabyte=*> set local yb_read_from_followers=on;
SET
yugabyte=*> explain (analyze, dist, costs off)
select *
from demo1
join demo2 using(id)
where value1>0.5
and value2>0.5
;
QUERY PLAN
----------------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=7.010..2733.839 rows=500040 loops=1)
Join Filter: (demo1.id = demo2.id)
-> Seq Scan on demo1 (actual time=1.064..1.094 rows=100 loops=1)
Remote Filter: (value1 > '0.5'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.973 ms
-> Index Scan using demo2_pkey on demo2 (actual time=5.787..2434.887 rows=500040 loops=1)
Index Cond: (id = ANY (ARRAY[demo1.id, $1, $2, ..., $1023]))
Remote Filter: (value2 > '0.5'::double precision)
Storage Table Read Requests: 489
Storage Table Read Execution Time: 2197.448 ms
Planning Time: 0.584 ms
Execution Time: 2763.900 ms
Storage Read Requests: 490
Storage Read Execution Time: 2198.421 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 2198.421 ms
Peak Memory Usage: 442 kB
(21 rows)
yugabyte=*> commit;
COMMIT
The difference is small here because the latency between AZs in AWS regions is low but such features allow fast response time for reporting queries in a multi-region deployment.
What if we co-locate the rows to join?
Would it be better if colocated? Let's test on a single denormalized table.
yugabyte=# create table demo as
select * from demo1 join demo2 using(id)
;
SELECT 1000000
yugabyte=# explain (analyze, dist, costs off)
select *
from demo
where value1>0.5
and value2>0.5
;
QUERY PLAN
----------------------------------------------------------------------------------------------
Seq Scan on demo (actual time=13.602..2579.236 rows=500040 loops=1)
Remote Filter: ((value1 > '0.5'::double precision) AND (value2 > '0.5'::double precision))
Storage Table Read Requests: 164
Storage Table Read Execution Time: 2493.309 ms
Planning Time: 3.389 ms
Execution Time: 2607.967 ms
Storage Read Requests: 164
Storage Read Execution Time: 2493.309 ms
Storage Write Requests: 0
Catalog Read Requests: 2
Catalog Read Execution Time: 3.076 ms
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 2496.385 ms
Peak Memory Usage: 14 kB
(15 rows)
It is two times faster, 2.5 seconds and 164 read request. But only two times faster. The same improvement that I had with follower reads. This 2x can easily be compensated by the scaling possibilities we have when not trying to colocate the table rows. For example, one table can be split, and its shards relocated, without the need to do the same on the other.
Hash Join
Without Batched Nested Loop, a Hash Join is also efficient:
yugabyte=# explain (analyze, dist, costs off)
/*+ hashjoin( demo1 demo2 ) */
select *
from demo1
join demo2 using(id)
where value1>0.5
and value2>0.5
;
QUERY PLAN
---------------------------------------------------------------------------
Hash Join (actual time=8.427..3597.417 rows=500040 loops=1)
Hash Cond: (demo2.id = demo1.id)
-> Seq Scan on demo2 (actual time=7.446..3492.206 rows=500040 loops=1)
Remote Filter: (value2 > '0.5'::double precision)
Storage Table Read Requests: 489
Storage Table Read Execution Time: 3402.490 ms
-> Hash (actual time=0.960..0.960 rows=100 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 13kB
-> Seq Scan on demo1 (actual time=0.909..0.929 rows=100 loops=1)
Remote Filter: (value1 > '0.5'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.838 ms
Planning Time: 0.224 ms
Execution Time: 3623.558 ms
Storage Read Requests: 490
Storage Read Execution Time: 3403.328 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 3403.328 ms
Peak Memory Usage: 119 kB
(22 rows)
For cases where you don't need to create an index on the foreign key, this execution plan will kick-in with two Seq Scan with Remote Filter.
Merge Join
As I have range indexes on both table, an efficient Merge Join is also possible:
yugabyte=# explain (analyze, dist, costs off)
/*+ mergejoin ( demo1 demo2 ) */
select *
from demo1
join demo2 using(id)
where value1>0.5
and value2>0.5
;
QUERY PLAN
----------------------------------------------------------------------------------------------
Merge Join (actual time=8.520..3805.945 rows=500040 loops=1)
Merge Cond: (demo2.id = demo1.id)
-> Index Scan using demo2_pkey on demo2 (actual time=7.565..3713.344 rows=500040 loops=1)
Remote Filter: (value2 > '0.5'::double precision)
Storage Table Read Requests: 489
Storage Table Read Execution Time: 3528.213 ms
-> Index Scan using demo1_pkey on demo1 (actual time=0.949..1.115 rows=100 loops=1)
Remote Filter: (value1 > '0.5'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.891 ms
Planning Time: 0.214 ms
Execution Time: 3831.961 ms
Storage Read Requests: 490
Storage Read Execution Time: 3529.104 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 3529.104 ms
Peak Memory Usage: 87 kB
(20 rows)
Again, the response time is similar. It is important to verify that all execution plans efficient as it avoids unpredictable performance if the data distribution changes.
Distributed Join is acceptable
The conclusion is that joining rows from two distributed tables is not a huge performance problem. The cost is the transfer of rows between nodes and I've taken the worst case here with a 50% selectivity. Do the same with value1>0.9995 and value2>0.9995
and you will see a faster query. Without additional indexes, it is not much faster because remote calls was not so important compared to the Seq Scan:
yugabyte=> explain (analyze, dist, costs off)
select *
from demo1
join demo2 using(id)
where value1>0.9995
and value2>0.9995
;
QUERY PLAN
----------------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=1418.892..1419.367 rows=506 loops=1)
Join Filter: (demo1.id = demo2.id)
-> Seq Scan on demo1 (actual time=0.885..0.910 rows=100 loops=1)
Remote Filter: (value1 > '0.9995'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.805 ms
-> Index Scan using demo2_pkey on demo2 (actual time=1417.873..1418.074 rows=506 loops=1)
Index Cond: (id = ANY (ARRAY[demo1.id, $1, $2, ..., $1023]))
Remote Filter: (value2 > '0.9995'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 1417.727 ms
Planning Time: 0.650 ms
Execution Time: 1419.575 ms
Storage Read Requests: 2
Storage Read Execution Time: 1418.533 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 1418.533 ms
Peak Memory Usage: 442 kB
(21 rows)
However, when you have correct indexing on both tables, the response time is back to milliseconds:
yugabyte=> create index on demo1(value1 asc, id asc);
CREATE INDEX
yugabyte=> create index on demo2(value2 asc, id, num);
CREATE INDEX
yugabyte=> explain (analyze, dist, costs off)
select *
from demo1
join demo2 using(id)
where value1>0.9995
and value2>0.9995
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=4.215..4.688 rows=506 loops=1)
Join Filter: (demo1.id = demo2.id)
-> Seq Scan on demo1 (actual time=0.786..0.804 rows=100 loops=1)
Remote Filter: (value1 > '0.9995'::double precision)
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.690 ms
-> Index Only Scan using demo2_value2_id_num_idx on demo2 (actual time=3.273..3.452 rows=506 loops=1)
Index Cond: ((id = ANY (ARRAY[demo1.id, $1, $2, ..., $1023])) AND (value2 > '0.9995'::double precision))
Heap Fetches: 0
Storage Index Read Requests: 1
Storage Index Read Execution Time: 3.156 ms
Planning Time: 0.514 ms
Execution Time: 4.923 ms
Storage Read Requests: 2
Storage Read Execution Time: 3.846 ms
Storage Write Requests: 0
Catalog Read Requests: 0
Catalog Write Requests: 0
Storage Flush Requests: 0
Storage Execution Time: 3.846 ms
Peak Memory Usage: 442 kB
(21 rows)
The message I want to convey is that you should not jump to the conclusion that joining two distributed tables is a problem. YugabyteDB has implemented many operations with batching to minimize the effect of remote calls. Additionally, features such as follower reads or leader preference reduce the duration of remote calls.
Therefore, interleaving or co-partitioning is probably not necessary, and would reduce agility and scalability more than improving the performance. Unless you have a good reason for it that you can share on Issue #79. But, first, test and tune the queries to see if you need something else.