A user exposed his "ETL" use-case as the following:
- a transaction table holds an ID (
k1 int
in my example) for with there are multiple records (k2 int
for their key andv1 int
,v2 int
for their value in my small example) - the application receives a payload, in JSON, describing the new data
- new IDs will be INSERTed
- existing IDs will be UPDATEd
- missing IDs will me marked as deleted (
deleted boolean
) - I add a timestamp
ts timestamptz
to record the last change time
Their current application uses Python Pandas to retrieve the current records, compare them with the new paysload, and write it back to the database. With a distributed database like YugabyteDB, it is more efficient to process this in the database, because that can scale (we can connecto to any node), run in one server-side transaction (easier to handle transparent retries in case of clock skew) and reduce the roundtrips (working on batches of rows). And with PostgreSQL compatibility, JSON processing and INSERT ... ON CONFLICT ... UPDATE behavior can do the same with clean and simple SQL.
Example
Here is a simple example declaring my table:
create table demo (
k1 int, k2 int, v1 int, v2 text
, deleted boolean, ts timestamptz
, primary key ( k1,k2 )
);
This works in PostgreSQL and YugabyteDB. With YugabyteDB the primary key defaults to lsm (k1 HASH, k2 ASC)
which is good for my use case: distribute on the ID and colocate their records.
I'll implement the whole logic in one SQL query. WITH clauses (aka CTE - Common Table Expressions) maintain the readability:
-
payload
reads my JSON payload passed as$2
into records, thanks tojsonb_to_recordset
, adding the ID$1 as k1
-
to_upsert
formats thepayload
into the destination format, settingis_deleted
to false and adding the timestamp -
to_soft_delete
retrieves the current records and compare it with thepayload
to format them as soft deletes withis_deleted
set to true - finally the union of
to_upsert
andto_soft_delete
is merged with an INSERT ... ON CONFLICT ... UPDATE
Here it is as a prepared statement:
prepare etl (int, jsonb) as
with
payload as (
select $1 as k1,* from jsonb_to_recordset($2) as payload( k2 int, v1 int, v2 text )
),
to_upsert as (
select k1, k2, v1, v2, false as deleted, now() as ts from payload
),
to_soft_delete as (
select k1, k2, v1, v2, true as deleted, now() as ts from demo
where k1=$1
and (k1, k2) not in ( select k1, k2 from payload)
)
insert into demo select * from to_upsert union select * from to_soft_delete
on conflict (k1, k2) do
update set v1=excluded.v1, v2=excluded.v2, deleted=excluded.deleted, ts=excluded.ts
;
This can be coded as a stored procedure but a prepared statement avoids to parse and optimize it for each call. It is easy to prepare it in the connection pool initialization command.
My table is empty. I'll insert a payload for k1=0
with 3 records:
execute etl(0, $$
[
{"k2":1,"v1":1,"v2":"my first insert"},
{"k2":2,"v1":1,"v2":"my first insert"},
{"k2":3,"v1":1,"v2":"my first insert"}
]
$$::jsonb);
This has inserted 3 rows:
yugabyte=# select * from demo;
k1 | k2 | v1 | v2 | deleted | ts
----+----+----+-----------------+---------+-------------------------------
0 | 1 | 1 | my first insert | f | 2022-04-29 14:39:08.467744+00
0 | 2 | 1 | my first insert | f | 2022-04-29 14:39:08.467744+00
0 | 3 | 1 | my first insert | f | 2022-04-29 14:39:08.467744+00
(3 rows)
Now, on the same k1=0
the new payload updates 2 rows, removes the other and adds a new one:
execute etl(0,$$
[
{"k2":1,"v1":1,"v2":"my update"},
{"k2":2,"v1":1,"v2":"my update"},
{"k2":4,"v1":1,"v2":"my second insert"}
]
$$::jsonb);
Here is the result:
yugabyte=# select * from demo;
k1 | k2 | v1 | v2 | deleted | ts
----+----+----+------------------+---------+-------------------------------
0 | 1 | 1 | my update | f | 2022-04-29 14:41:00.109561+00
0 | 2 | 1 | my update | f | 2022-04-29 14:41:00.109561+00
0 | 3 | 1 | my first insert | t | 2022-04-29 14:41:00.109561+00
0 | 4 | 1 | my second insert | f | 2022-04-29 14:41:00.109561+00
(4 rows)
Performance
Always check the execution plan:
yugabyte=# explain (costs off, analyze) execute etl(0,$$ [9/3108]
[
{"k2":1,"v1":1,"v2":"my update"},
{"k2":2,"v1":1,"v2":"my update"},
{"k2":4,"v1":1,"v2":"my second insert"}
]
$$::jsonb);
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Insert on demo (actual time=11.787..11.787 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: demo_pkey
Tuples Inserted: 0
Conflicting Tuples: 4
CTE payload
-> Function Scan on jsonb_to_recordset payload (actual time=0.011..0.012 rows=3 loops=1)
CTE to_upsert
-> CTE Scan on payload payload_1 (actual time=0.013..0.014 rows=3 loops=1)
CTE to_soft_delete
-> Index Scan using demo_pkey on demo demo_1 (actual time=1.079..1.081 rows=1 loops=1)
Index Cond: (k1 = 0)
Filter: (NOT (hashed SubPlan 3))
Rows Removed by Filter: 3
SubPlan 3
-> CTE Scan on payload payload_2 (actual time=0.001..0.001 rows=3 loops=1)
-> HashAggregate (actual time=1.105..1.110 rows=4 loops=1)
Group Key: to_upsert.k1, to_upsert.k2, to_upsert.v1, to_upsert.v2, to_upsert.deleted, to_upsert.ts
-> Append (actual time=0.014..1.099 rows=4 loops=1)
-> CTE Scan on to_upsert (actual time=0.014..0.016 rows=3 loops=1)
-> CTE Scan on to_soft_delete (actual time=1.080..1.082 rows=1 loops=1)
Planning Time: 0.262 ms
Execution Time: 11.864 ms
(23 rows)
This reads only one tablet, the one for Index Cond: (k1 = 0)
, with an Index Scan
on the primary key, where all rows are colocated. This is where the (k1 HASH, k2 ASC)
is important.
Of course, this could be optimized, like not updating the values that are the same. But keeping code simple is probably the best choice. Optimization will depend on more precise use-case.
This query can also be easily generated from the Information Schema. You need the column definitions (k1 HASH, k2 ASC
), column list (k1, k2, v1, v2
), key columns (k1, k2
) and set clauses (v1=excluded.v1, v2=excluded.v2
).