DuckDB on YugabyteDB

Franck Pachot - Nov 22 '22 - - Dev Community

Update: the issue described below has been fixed and the DuckDB postgres_scanner can now push down the filters. Here is how I start DuckDB to connect to YugabyteDB database yb_demo_northwind on my.host:

PGDATABASE=yb_demo_northwind duckdb -cmd "
set pg_debug_show_queries to true;
set pg_experimental_filter_pushdown to true;
set pg_use_ctid_scan to false;
attach 'user=yugabyte host=my.host port=5433' AS db (TYPE POSTGRES, READ_ONLY);
"
Enter fullscreen mode Exit fullscreen mode

I can run a very DuckDB-style query:

FROM db.order_details
JOIN db.products USING (product_id)
JOIN db.orders USING (order_id)
SELECT 
    product_name,
    COUNT(*) AS total_orders,
    SUM(quantity) AS total_quantity,
    SUM(quantity) FILTER (WHERE ship_country = 'France') AS quantity_france
GROUP BY ALL
HAVING quantity_france>100;
;

Enter fullscreen mode Exit fullscreen mode

With pg_debug_show_queries set, it shows the queries sent to YugabyteDB, with a WHERE clause that YugabyteDB will push down to the storage:

Image description

That's all. What follows below is the old blog post.


DuckDB is an open-source in-process SQL OLAP database, with the possibility to query PostgreSQL Tables. YugabyteDB is an open-source distributed SQL database optimized for OLTP and is PostgreSQL-compatible. You can see the temptation to marry them and be able to run some OLAP queries on top of the scalable OLTP.

If you have tried to run DuckDB on top of YugabyteDB, you may have encountered the following error:

ERROR:  System column "ctid" is not supported yet
Enter fullscreen mode Exit fullscreen mode

The reason is that DuckDB uses the PostgreSQL ctid to parallelize the scan of the source table. This is described in the DuckDb Postgres Scanner description

YugabyteDB is PostgreSQL-compatible, but with a different, distributed, storage that shards tables and indexes, and stores tuples in LSM-Tree where there's no equivalent of the PostgreSQL heap table's ctid.

Reading in parallel could use ranges on the primary key, or yb_hash_code(). For the moment, my simple workaround is not trying to parallelize the reads at this level, which is really easy because in YugabyteDB pgclass.relpages is zero and then DuckDB should not try start multiple threads.

Workaround to bypasse CTID

I've forked The DuckDB Postgres Scanner extension https://github.com/duckdblabs/postgres_scanner into:

DuckDB postgresscanner extension

The postgresscanner extension allows DuckDB to directly read data from a running Postgres instance. The data can be queried directly from the underlying Postgres tables, or read into DuckDB tables.

Usage

To make a Postgres database accessible to DuckDB, use the POSTGRES_ATTACH command:

CALL POSTGRES_ATTACH('');
Enter fullscreen mode Exit fullscreen mode

POSTGRES_ATTACH takes a single required string parameter, which is the libpq connection string. For example you can pass 'dbname=postgresscanner' to select a different database name. In the simplest case, the parameter is just ''. There are three additional named parameters:

  • source_schema the name of a non-standard schema name in Postgres to get tables from. Default is public.
  • sink_schema the schema name in DuckDB to create views. Default is main.
  • overwrite whether we should overwrite existing views in the target schema, default is false.
  • filter_pushdown whether filter predicates that DuckDB derives from the query should…




I changed a single thing to be able to run it: I test the range of ctid to read (task_min, task_max) and just replace the ctid BETWEEN '(%d,0)'::tid AND '(%d,0)'::tid with 1=1 when ( task_min == 0 && task_max == POSTGRES_TID_MAX ).

Gitpod

I've also added a gitpod.yml to build it automatically in Gitpod:

tasks:
  - init:   sudo apt-get install -y cmake curl bison && make
Enter fullscreen mode Exit fullscreen mode

Start YugabyteDB

To test it, I start a YugabyteDB container with the Northwind demo tables:

docker run -d --name yb -p 5433:5433 -p 15433:15433 \
yugabytedb/yugabyte bash -c '
yugabyted start 
until yugabyted demo connect <<'SQL'
\x
select *
 from pg_stat_activity
 where pid!=pg_backend_pid() and datid is not null;
\watch 5
SQL
do sleep 1 ; done 2>/dev/null
'
Enter fullscreen mode Exit fullscreen mode

The port 5433 is the PostgreSQL endpoint that I will use to connect to the yugabyte database with yugabyte user.
The port 15433 is the GUI where you can see the running queries.

Test POSTGRES_ATTACH

Then, I start DuckDB:


./build/release/duckdb -unsigned

Enter fullscreen mode Exit fullscreen mode

I load the Postgres Scanner extension:


LOAD 'build/release/extension/postgres_scanner/postgres_scanner.duckdb_extension';

Enter fullscreen mode Exit fullscreen mode

I create a destination schema franck, attach to my YugabyteDB database schema public, and display the list of views:


CREATE schema franck;

SELECT * FROM POSTGRES_ATTACH(
 'dbname=yugabyte user=yugabyte host=localhost port=5433'
 , source_schema='public'
 , sink_schema='franck'
);

PRAGMA show_tables;

Enter fullscreen mode Exit fullscreen mode

Here is the output:
Result

Query

I query my tables with a syntax that DuckDB adds to PostgreSQL:


select count(*)
 , sum(quantity)
 , sum(quantity) filter(ship_country='France') as quantity_france
from  franck.order_details
 join franck.products using(product_id)
 join franck.orders using(order_id)
;

Enter fullscreen mode Exit fullscreen mode

This reads the base tables and executes the SQL statement on them:
Output

I can check the queries that have been run on YugabyteDB
Queries

The most important here is the COPY in FORMAT BINARY. You can see how far the YugabyteDB compatibility goes: the binary format works. Of course, the idea of the PostgreSQL scanner is to query all tables in parallel by chunks, and this is where CTID is used. With my workaround, full tables are read with one COPY. I think that if there's a need for optimization, this will be done in YugabyteDB parallelism.

It still works with vanilla PostgreSQL

To test that this patch still works with normal PostgreSQL, I test it with the RNA Central database:

SELECT * FROM POSTGRES_ATTACH(
'dbname=pfmegrnargs user=reader password=NWDMCE5xdipIjRrp host=hh-pgsql-public.ebi.ac.uk port=5432'
, source_schema='rnacen', sink_schema='franck'
);
 select * from franck.rnc_gene_status limit 100000;
Enter fullscreen mode Exit fullscreen mode

Image description

While running I can see the query with CTID:

pfmegrnargs=> select * from pg_stat_Activity where query like '%COPY%';
-[ RECORD 1 ]----+--------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------
datid            | 16388
datname          | pfmegrnargs
pid              | 40212
usesysid         | 16391
usename          | reader
application_name |
client_addr      | 130.61.113.39
client_hostname  |
client_port      | 33900
backend_start    | 2022-11-23 08:35:37.888412+00
xact_start       | 2022-11-23 08:35:37.942932+00
query_start      | 2022-11-23 08:35:45.630902+00
state_change     | 2022-11-23 08:35:53.243005+00
wait_event_type  | Client
wait_event       | ClientRead
state            | idle in transaction
backend_xid      |
backend_xmin     | 135314
query            |
                 | COPY (SELECT "id", "assembly_id", "urs_taxid", "region_id", "status" FROM "rnacen"."rnc_gene_status" WHERE ctid BETWEEN '(2000,0)'::tid AND '(3000,0)'::tid ) TO STDOUT (FORMAT binary);
                 |
backend_type     | client backend

Enter fullscreen mode Exit fullscreen mode

The bypass of CTID filter when not needed has also better performance in PostgreSQL:

pfmegrnargs=> explain analyze select * from rnc_gene_status where ctid between '(0,0)'::tid and '(4294967294,0)'::tid
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 Seq Scan on rnc_gene_status  (cost=0.00..206197.45 rows=40577 width=49) (actual time=0.026..6341.411 rows=8115061 loops=1)
   Filter: ((ctid >= '(0,0)'::tid) AND (ctid <= '(4294967294,0)'::tid))
 Planning time: 0.088 ms
 Execution time: 7115.750 ms
(4 rows)

pfmegrnargs=> explain analyze select * from rnc_gene_statu
                                                          QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------
 Seq Scan on rnc_gene_status  (cost=0.00..165620.63 rows=8115363 width=49) (actual time=2.750..1262.961 rows=8115061 loops=1)
 Planning time: 0.073 ms
 Execution time: 1552.490 ms
(3 rows)
Enter fullscreen mode Exit fullscreen mode

This is 1.3 seconds instead of 6.3 seconds with the full-range CTID condition.

In summary, this optimizes for PostgreSQL and allows queries on YugabyteDB.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .