Query Amazon Redshift from YugabyteDB though PostgreSQL Foreign Data Wrapper and VPC peering

Franck Pachot - Jul 5 '23 - - Dev Community

Amazon Redshift is a scalable database service for Data Warehouse and YugabyteDB a distributed SQL database for OLTP. Both of them have their query layer based on PostgreSQL which makes it easy to inter-connect (same datatypes, same catalog views...) and are horizontally scalable. The operational database may need to query the datawarehouse. Here is an example to show how it is easy to query Redshift from YugabyteDB thanks to the Foreign Data Wrapper.

Create a Redshift cluster

I'll create a sample Data Warehouse on Amazon Redshift v2 Serverless (I'm saving my AWS Heros credits by using the Free Trial - $300.00 for 3 months):

Redshift

The workgroup (think of it as a PostgreSQL database cluster):
Workgroup

The namespace (think of it as a PostgreSQL database)
Namespace

The creation takes a few minutes:
Creation

I need to remember the user and password I've provided for the namespace, and the endpoint for the workgroup:
Workgroup Endpoint

I also need to check the VPC to enable VPC peering:

Image description

I connect to my Redshift cluster with the Query Editor:

Image description

Redshift has its roots in ParAcell which query layer is based on a fork of PostgreSQL 8 and that's why we can connect with the PostgreSQL protocol and use the PostgreSQL Foreign Data Wrapper.

Create sample database

It is easy to create a sample database from the Redshift Query Editor v2:
Create connection

Sample notebooks

Create sample database

The notebook has also some sample queries that I'll use:
Create view

Create a YugabyteDB Managed on AWS

This can be provisioned from https://cloud.yugabyte.com

I create a YugabyteDB database on AWS, managed by Yugabyte, with VPC peering to my VPC where I've created the Redshift workgroup.

I check the VPC peering:
VPC

I choose to deploy on Amazon Web Services infrastructure:
Cloud

I define the deployment in terms of resources and fault tolerance:
Nodes

I connect to my YugabyteDB cluster:

$ PGPASSWORD=****** psql -p 5433 -h pub-eu-west-1.cd3e3992-5565-42a4-8ae4-6bd21161ba80.cloudportal.yugabyte.com -U admin yugabyte
Pager usage is off.
psql (16beta2, server 11.2-YB-2.19.0.0-b0)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
Type "help" for help.

yugabyte=> select version();
                                                                                         version

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 11.2-YB-2.19.0.0-b0 on x86_64-pc-linux-gnu, compiled by clang version 15.0.7 (https://github.com/yugabyte/llvm-project.git 6b9d30d80f5cebc66c9fa46013d18f125ea8ec0e), 64-bit
(1 row)
Enter fullscreen mode Exit fullscreen mode

Create Foreign Data Wrapper

I connect to my YugabyteDB database and load the PostgreSQL Foreign Data Wrapper extension:

Pager usage is off.
psql (16beta2, server 11.2-YB-2.19.0.0-b0)
Type "help" for help.

yugabyte=# create extension postgres_fdw;

CREATE EXTENSION
Enter fullscreen mode Exit fullscreen mode

I create a server to connect to the Redshift workspace:

yugabyte=# create server sample_data_dev
            foreign data wrapper postgres_fdw
            options (
            host 'default-workgroup.802756008554.eu-west-1.redshift-serverless.amazonaws.com',
             port '5439', dbname 'sample_data_dev'
           );

CREATE SERVER
Enter fullscreen mode Exit fullscreen mode

I create a mapping for my yugabyte user to connect to the Redshift namespace:

yugabyte=# create user mapping for admin server sample_data_dev
            options (
             user 'franck', password '******'
           );

CREATE USER MAPPING
Enter fullscreen mode Exit fullscreen mode

I'll import the tpch metadata as foreign tables:

yugabyte=# create schema tpch;

CREATE SCHEMA

yugabyte=# import foreign schema tpch 
            from server sample_data_dev
            into tpch
           ;

IMPORT FOREIGN SCHEMA

yugabyte=# \det+ tpch.*

                                     List of foreign tables
 Schema |  Table   |     Server      |                 FDW options                 | Description
--------+----------+-----------------+---------------------------------------------+-------------
 tpch   | customer | sample_data_dev | (schema_name 'tpch', table_name 'customer') |
 tpch   | lineitem | sample_data_dev | (schema_name 'tpch', table_name 'lineitem') |
 tpch   | nation   | sample_data_dev | (schema_name 'tpch', table_name 'nation')   |
 tpch   | orders   | sample_data_dev | (schema_name 'tpch', table_name 'orders')   |
 tpch   | part     | sample_data_dev | (schema_name 'tpch', table_name 'part')     |
 tpch   | partsupp | sample_data_dev | (schema_name 'tpch', table_name 'partsupp') |
 tpch   | region   | sample_data_dev | (schema_name 'tpch', table_name 'region')   |
 tpch   | supplier | sample_data_dev | (schema_name 'tpch', table_name 'supplier') |
(8 rows)
Enter fullscreen mode Exit fullscreen mode

Query Redshift from YugabyteDB

I'm running the sample query as-is:

\timing on
SELECT
        L_ORDERKEY,
        SUM(L_EXTENDEDPRICE*(1-L_DISCOUNT))     AS REVENUE,
        O_ORDERDATE,
        O_SHIPPRIORITY
FROM
        tpch.CUSTOMER,
        tpch.ORDERS,
        tpch.LINEITEM
WHERE   C_MKTSEGMENT    = 'FURNITURE' AND
        C_CUSTKEY       = O_CUSTKEY AND
        L_ORDERKEY      = O_ORDERKEY AND
        O_ORDERDATE     < '1995-03-28' AND
        L_SHIPDATE      > '1995-03-28'
GROUP   BY      L_ORDERKEY,
                O_ORDERDATE,
                O_SHIPPRIORITY
ORDER   BY      REVENUE DESC,
                O_ORDERDATE;

...
    5540389 |    979.1496 | 1995-01-12  |              0
    1112002 |    967.8952 | 1994-12-11  |              0
    4469728 |    955.8990 | 1994-12-06  |              0
(11451 rows)

Time: 366.417 ms
Enter fullscreen mode Exit fullscreen mode

This has joined two tables with million of rows, and aggregated them to eleven thousand groups, in three seconds. The transfer is fast, within the same VPC. To get this performance, the Foreign Data Wrapper has pushed down the joins, the filters and the aggregation, as we can see in the execution plan with explain (costs off, analyze, verbose):

                                                                         QUERY PLAN                                                                                                                                                                                                         
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Sort (actual time=167.723..168.812 rows=11451 loops=1)
   Output: lineitem.l_orderkey, (sum((lineitem.l_extendedprice * ('1'::numeric - lineitem.l_discount)))), orders.o_orderdate, orders.o_shippriority
   Sort Key: (sum((lineitem.l_extendedprice * ('1'::numeric - lineitem.l_discount)))) DESC, orders.o_orderdate
   Sort Method: quicksort  Memory: 1279kB
   ->  Foreign Scan (actual time=6.715..165.108 rows=11451 loops=1)
         Output: lineitem.l_orderkey, (sum((lineitem.l_extendedprice * ('1'::numeric - lineitem.l_discount)))), orders.o_orderdate, orders.o_shippriority
         Relations: Aggregate on (((tpch.customer) INNER JOIN (tpch.orders)) INNER JOIN (tpch.lineitem))
         Remote SQL: SELECT r3.l_orderkey, sum((r3.l_extendedprice * (1::numeric - r3.l_discount))), r2.o_orderdate, r2.o_shippriority FROM ((tpch.customer r1 INNER JOIN tpch.orders r2 ON (((r1.c_custkey = r2.o_custkey)) AND ((r2.o_orderdate < '1995-03-28'::date)) AND ((r1.c_mktsegment = 'FURNITURE'::bpchar)))) INNER JOIN tpch.lineitem r3 ON (((r2.o_orderkey = r3.l_orderkey)) AND ((r3.l_shipdate > '1995-03-28'::date)))) GROUP BY 1, 3, 4
 Planning Time: 1.121 ms
 Execution Time: 172.474 ms
 Peak Memory Usage: 1430 kB
(11 rows)

Time: 208.332 ms
Enter fullscreen mode Exit fullscreen mode

YugabyteDB has fetched the result of rows=11451, with only the required columns, and had only to sort it according to the ORDER BY.

Import data though the Foreign Data Wrapper

There's also the possibility to create the tables and import data though FDW.

I create the empty tables and define the primary key:

create table CUSTOMER as select * from tpch.CUSTOMER where false;
create table ORDERS   as select * from tpch.ORDERS   where false;
create table LINEITEM as select * from tpch.LINEITEM where false;
alter table CUSTOMER add primary key (C_CUSTKEY asc) ;
alter table ORDERS   add primary key (O_ORDERKEY asc) ;
alter table LINEITEM add primary key (L_ORDERKEY asc, L_LINENUMBER) ;
Enter fullscreen mode Exit fullscreen mode

I bulk load the rows:

set yb_disable_transactional_writes=on;
set yb_enable_upsert_mode=on;
insert into CUSTOMER select * from tpch.CUSTOMER;
insert into ORDERS   select * from tpch.ORDERS;
insert into LINEITEM select * from tpch.LINEITEM;
set yb_disable_transactional_writes=off;
set yb_enable_upsert_mode=off;
analyze customer, orders, lineitem;
Enter fullscreen mode Exit fullscreen mode

This takes some time. It is easy but not as efficient as YugabyteDB Voyager that can parallelize the operations according to the resources available.

yugabyte=# insert into CUSTOMER select * from tpch.CUSTOMER;
INSERT 0 150000
Time: 4902.487 ms (00:04.902)
yugabyte=# insert into ORDERS   select * from tpch.ORDERS;
INSERT 0 1500000
Time: 43425.688 ms (00:43.426)
yugabyte=# insert into LINEITEM select * from tpch.LINEITEM;
INSERT 0 6001215
Time: 236597.138 ms (03:56.597)
yugabyte=# analyze customer, orders, lineitem;
ANALYZE
Time: 16092.902 ms (00:16.093)
Enter fullscreen mode Exit fullscreen mode

I'll test the same query and for analytics, better use the Cost Based Optimizer and Batched Nested Loop:

set yb_enable_optimizer_statistics=on;
set yb_bnl_batch_size=1024;
Enter fullscreen mode Exit fullscreen mode

I query the local tables:

explain (costs off, analyze)
SELECT
        L_ORDERKEY,
        SUM(L_EXTENDEDPRICE*(1-L_DISCOUNT))     AS REVENUE,
        O_ORDERDATE,
        O_SHIPPRIORITY
FROM
public.CUSTOMER,
public.ORDERS,
public.LINEITEM
WHERE   C_MKTSEGMENT    = 'FURNITURE' AND
        C_CUSTKEY       = O_CUSTKEY AND
        L_ORDERKEY      = O_ORDERKEY AND
        O_ORDERDATE     < '1995-03-28' AND
        L_SHIPDATE      > '1995-03-28'
GROUP   BY      L_ORDERKEY,
                O_ORDERDATE,
                O_SHIPPRIORITY
ORDER   BY      REVENUE DESC,
                O_ORDERDATE;

                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 Sort (actual time=13844.143..13844.911 rows=11451 loops=1)
   Sort Key: (sum((lineitem.l_extendedprice * ('1'::numeric - lineitem.l_discount)))) DESC, orders.o_orderdate
   Sort Method: quicksort  Memory: 1279kB
   ->  GroupAggregate (actual time=13827.429..13840.671 rows=11451 loops=1)
         Group Key: lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority
         ->  Sort (actual time=13827.404..13828.610 rows=30426 loops=1)
               Sort Key: lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority
               Sort Method: quicksort  Memory: 3146kB
               ->  YB Batched Nested Loop Join (actual time=115.450..13818.025 rows=30426 loops=1)
                     Join Filter: (orders.o_orderkey = lineitem.l_orderkey)
                     ->  YB Batched Nested Loop Join (actual time=20.771..9266.851 rows=146777 loops=1)
                           Join Filter: (customer.c_custkey = orders.o_custkey)
                           ->  Seq Scan on orders (actual time=7.349..104.383 rows=735379 loops=1)
                                 Remote Filter: (o_orderdate < '1995-03-28'::date)
                           ->  Index Scan using customer_pkey on customer (actual time=11.817..11.990 rows=203 loops=719)
                                 Index Cond: (c_custkey = ANY (ARRAY[orders.o_custkey, $1, $2, ..., $1023]))
                                 Remote Filter: (c_mktsegment = 'FURNITURE'::bpchar)
                     ->  Index Scan using lineitem_pkey on lineitem (actual time=29.619..30.852 rows=211 loops=144)
                           Index Cond: (l_orderkey = ANY (ARRAY[orders.o_orderkey, $1025, $1026, ..., $2047]))
                           Remote Filter: (l_shipdate > '1995-03-28'::date)
 Planning Time: 16.824 ms
 Execution Time: 13847.103 ms
 Peak Memory Usage: 9728 kB
(23 rows)

Time: 13928.338 ms (00:13.928)

Enter fullscreen mode Exit fullscreen mode

The performance is still correct: 13 seconds to read millions of rows, join them and aggregate them. And without creating any additional index, without partitioning the tables (beyond the automatic sharding), and with query planner hints. For such queries it is crucial to analyze the tables and enable the Cost Based optimizer.

So finally, you have the choice in a cloud environment:

  • read from a datawarehouse-specialized service (Redshift)
  • or duplicate data in your operational database (YugabyteDB) Thanks to AWS VPC peering and PostgreSQL compatibility of the databases used, both are easy to setup.

And if you want to know more about the PostgreSQL Foreign Data Wrapper, the open source contributor who made it working on YugabyteDB will talk about it in our next Community Open Hours:

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .