Write Buffering to Reduce Raft Consensus Latency in YugabyteDB

Franck Pachot - Sep 14 - - Dev Community

YugabyteDB is an open-source distributed SQL database that is compatible with PostgreSQL. It optimizes performance using write buffering, which batches SQL write operations to reduce Raft consensus latency. This means multiple write operations, such as inserts, updates, and deletes, are grouped into a single Raft write, minimizing the latency impact on individual row operations. The writes are flushed and sent to Raft leaders only when necessary, typically when a transaction commits or when a subsequent operation, like a read or a dependent write, requires acknowledgment.

Understanding when these flushes occur is crucial for optimizing performance in SQL-based operations and ensuring efficient database use. Using SQL, which operates on sets of rows rather than stored procedures or triggers, enables the database to achieve better buffering.


Introduction to Write Buffering

Buffering is a crucial technique in distributed databases like YugabyteDB. It enhances performance and reduces write latency by accumulating multiple write operations before sending them to the storage layer distributed to multiple servers. Buffering significantly reduces the number of round trips between different nodes in the database cluster, minimizing the impact of network latency. This allows for more efficient resource usage, reduces the overhead associated with frequent RPC (remote procedure calls), and increases overall throughput.

Some distributed databases use asynchronous writes with synchronous commits. This method allows a transaction to be canceled if all writes were unsuccessful at the time of commitment. However, this approach impacts availability as the database is not resilient to infrastructure failures. In a failure, a new Raft leader can be elected seamlessly. Still, without the assurance of having the most recent writes, the transaction must be terminated, and an error must be sent to the application.

YugabyteDB is resilient to failures. If a node fails and a new Raft leader is elected, the application can continue its transactions without errors. To achieve this level of fault tolerance while maintaining high performance, all writes are synchronous to the Raft quorum but batched, meaning that one Raft log record contains multiple write operations. The buffered writes are only flushed when delaying them would change the PostgreSQL compatible transaction behavior.

Why Buffer Writes?

The primary reasons for buffering write operations in YugabyteDB include:

  • Reduced Latency: The database can reduce the number of RPCs to the tablet servers by accumulating multiple write operations into a single batch. This reduces network overhead and lowers the time spent waiting for individual operations to complete.

  • Increased Throughput: Buffering helps in batching operations, which reduces the number of interactions with the storage layer. This increases throughput as multiple write operations can be processed together, efficiently using CPU and network resources.

  • Improved Resource Utilization: Batching and buffering also reduce the consumption of system resources such as CPU and memory by avoiding the constant initiation and termination of individual RPCs.

However, to maintain database correctness without affecting availability, YugabyteDB must flush these buffers at specific points. The following sections discuss the technical details of how and when write buffering is flushed.

Buffering in YugabyteDB SQL

In the YSQL (Yugabyte SQL) layer, buffering allows the query execution engine (a PostgreSQL fork) to continue processing statements without waiting for operations to finish at the tablet servers. Buffering is straightforward when dealing with tasks like loading data in bulk (for example, using the COPY command or multi-value INSERT) because a single statement performs multiple writes.
YugabyteDB extends this capability to handle successive INSERT, UPDATE, and DELETE operations. Applications written for PostgreSQL did not experience latency issues in a single-node PostgreSQL setup, and the goal of YugabyteDB is to add resilience without requiring changes to the existing code while maintaining similar performance.

Key Requirements for Correct Buffering

SQL puts some constraints on read and write ordering to ensure data consistency and correctness, which sets boundaries for write buffering.

  • Isolation of Read Operations: A read operation within an SQL statement must not see the writes performed by the same statement. This is essential to avoid scenarios where a statement like INSERT INTO table SELECT ... FROM table creates recursive reads from the data it wrote, leading to potential infinite loops (see the Halloween Problem). If you need to read the result of INSERT, UPDATE, DELETE within a statement, use the RETURNING clause.

  • Visibility of Write Operations: Write operations may internally read data in the storage layer. They should not see their write from the same operation. Still, they must be able to read data modified by previous operations in the same transaction, including the earlier operations of the same statement.

These constraints are enforced by the YSQL engine, which sends a timestamp limit (in_txn_limit_ht in the code) using the Hybrid Logical Time so that the provisional writes of the same transaction are visible only up to this timestamp limit.

When Must Writes Be Flushed?

Although buffering is beneficial for performance, there are scenarios where buffered writes must be flushed to ensure consistency and correctness. Flushing can occur due to various factors:

Buffer Full

The most straightforward reason for flushing is when the buffer reaches its maximum capacity. The ysql_session_max_batch_size parameter determines the buffer's size. When this limit is reached, the buffer is flushed.

The buffer size is sufficiently large (3072 operations by default) to maintain high throughput without causing any impact. Moreover, it does not impact latency since the flush from the buffer reaching its capacity is asynchronous. This means that the YSQL engine can continue processing while the buffered writes are sent to the storage layer in the background. This is particularly advantageous for high-throughput tasks like COPY that produce substantial writes.

With asynchronous flushes, the ysql_max_in_flight_ops parameter limits the number of in-flight operations to 10000 by default. A synchronous flush occurs when this limit is reached.

Those parameters can be set in a SQL session:

yugabyte=# \dconfig ysql*max*
  List of configuration parameters
          Parameter          | Value
-----------------------------+-------
 ysql_max_in_flight_ops      | 10000
 ysql_session_max_batch_size | 0
(2 rows)
Enter fullscreen mode Exit fullscreen mode

When ysql_session_max_batch_size is set to zero, it uses the value set at the cluster level (default: --ysql_session_max_batch_size=3072).

Dependency on Write Completion

Sometimes, the query execution engine must wait for writes to complete before proceeding. This happens when a subsequent operation depends on the result of the buffered writes. For example:

  • Read Operations: If a read operation is issued, it must be able to see the results of any previous writes. Therefore, if there are any buffered writes, they must be flushed before the read can proceed.

  • Write to the Same Row: The system must apply them in order if multiple write operations target the same row. The buffer is flushed to avoid conflicting writes to the same data.

  • Transaction Boundaries: Flushing occurs at critical transaction boundaries, such as when committing or rolling back a transaction. This ensures that any pending writes are safely persisted before the transaction concludes.

More Explicit Flush Requests

When the writes are deferred, the session may assume success and continue. This is acceptable only if a failure detected later can rollback to the same state as if it was detected immediately. If an operation can break this transactional guarantee, the YSQL engine explicitly requests a flush before.

  • Savepoints: When dealing with sub-transactions, flushing is required to ensure that operations within the sub-transaction are completed before progressing.

  • PL/pgSQL with exception block: Flushing may be necessary for procedural code to catch any exceptions from buffered operations before running the following statement. Like in PostgreSQL, PL/pgSQL blocks with exceptions run in a sub-transaction, similar to implicit savepoints.

  • Read Committed Flushing occurs at the end of an SQL statement in READ COMMITTED. YugabyteDB automatically creates a savepoint before each statement within a Read Committed isolation level transaction. This allows the transaction to be restarted transparently at a more recent read time if there's a conflict.

  • Executing Non-transactional Side-effects: When a statement that may cause non-transactional side effects is encountered, the system flushes the buffer to detect any write failure, as it cannot roll back after a non-transactional statement.

  • Output to the Application: When the application receives some output, such as a RAISE statement, or even output generated by client_min_messages, the system flushes the buffer to check for any write failure. This is necessary because the system won't be able to roll back later due to any non-transactional work the application may have performed based on the output.

  • Procedural code, Triggers, DDL, INSERT ON CONFLICT, SELECT FOR UPDATE: YugabyteDB flushes the buffered writes for any PostgreSQL behavior that may depend on the ordering of the writes (when using the PostgreSQL code with no specific optimization). To avoid it, it is preferable to use declarative SQL (WITH clause, Common Table Expression, RETURNING) rather than procedural logic.

Examples and EXPLAIN (ANALYZE, DIST)

When you use EXPLAIN (ANALYZE, DIST), it will display the number of write operations being buffered as Storage Write Requests and the number of flushes as Storage Flush Requests. This information is shown for each operation and can be compared to the number of rows identified as Actual rows=. It's crucial to ensure that queries writing many rows are not causing row-by-row flushes, as each flush can be a network call.

I create a table and insert a hundred thousand rows:

yugabyte=# create table demo (id bigserial, a int, b int, c int);
CREATE TABLE

yugabyte=# explain (analyze, dist, costs off)
 insert into demo (a,b,c) select 0,0,0 
 from generate_series(1,100000)
;

                                        QUERY PLAN
-------------------------------------------------------------------------------------------
 Insert on demo (actual time=3589.386..3589.386 rows=0 loops=1)
   ->  Function Scan on generate_series (actual time=10.653..2426.924 rows=100000 loops=1)
 Planning Time: 0.053 ms
 Execution Time: 3619.362 ms
 Storage Read Requests: 0
 Storage Rows Scanned: 0
 Storage Write Requests: 100000
 Catalog Read Requests: 11
 Catalog Read Execution Time: 5.948 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 33
 Storage Flush Execution Time: 27.559 ms
 Storage Execution Time: 33.508 ms
 Peak Memory Usage: 4790 kB

Enter fullscreen mode Exit fullscreen mode

The explain (analyze, dist) shows 33 flush requests due to the buffering size. This is acceptable as a flush occurs every 3072 rows, and not all wait on the network, which allows 10,000 in-flight operations.

It is different when I insert the same with ON CONFLICT DO NOTHING:

yugabyte=# explain (analyze, dist, costs off)
 insert into demo (id,a,b,c)
 select generate_series(100000,200000),0,0,0
 on conflict do nothing;

                                     QUERY PLAN
------------------------------------------------------------------------------------
 Insert on demo (actual time=139679.279..139679.279 rows=0 loops=1)
   Conflict Resolution: NOTHING
   Tuples Inserted: 100000
   Conflicting Tuples: 1
   ->  Subquery Scan on "*SELECT*" (actual time=0.016..292.301 rows=100001 loops=1)
         ->  ProjectSet (actual time=0.006..130.422 rows=100001 loops=1)
               ->  Result (actual time=0.001..0.001 rows=1 loops=1)
                     Storage Table Read Requests: 1
                     Storage Table Read Execution Time: 0.285 ms
                     Storage Table Write Requests: 1
                     Storage Flush Requests: 1
                     Storage Flush Execution Time: 0.822 ms
 Planning Time: 0.049 ms
 Execution Time: 139684.138 ms
 Storage Read Requests: 100001
 Storage Read Execution Time: 43570.783 ms
 Storage Rows Scanned: 1
 Storage Write Requests: 100000
 Catalog Read Requests: 7
 Catalog Read Execution Time: 3.645 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 100000
 Storage Flush Execution Time: 85545.980 ms
 Storage Execution Time: 129120.408 ms
 Peak Memory Usage: 13835 kB
Enter fullscreen mode Exit fullscreen mode

There are 100,000 flush requests, one per row, significantly impacting the response time. I'm running this on version 2024.1.2, and work is ongoing to enable batching for such a construct to help PostgreSQL migrate to YugabyteDB.

The best practice when writing SQL is to use standard SQL to describe exactly what must happen in case of conflicting rows:

yugabyte=# explain (analyze, dist, costs off)
 with "new rows"(id,a,b,c) as (
 -- rows to be inserted with an on conflict do nothing logic
 select generate_series(200000,300000),0,0,0
), "conflict"(id) as (
 -- rows that already exists and matches the on conflict do nothing
 select * from demo  where id in (select id from "new rows")
)
 -- final insert
 insert into demo (id,a,b,c)
 select * from "new rows" where id not in (select id from "conflict")
;

                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------
 Insert on demo (actual time=1550.319..1550.319 rows=0 loops=1)
   CTE new rows
     ->  ProjectSet (actual time=0.007..7.209 rows=100001 loops=1)
           ->  Result (actual time=0.001..0.001 rows=1 loops=1)
   CTE conflict
     ->  YB Batched Nested Loop Join (actual time=273.535..523.581 rows=1 loops=1)
           Join Filter: (demo_1.id = "new rows_1".id)
           ->  HashAggregate (actual time=63.780..84.904 rows=100001 loops=1)
                 Group Key: "new rows_1".id
                 ->  CTE Scan on "new rows" "new rows_1" (actual time=0.000..37.487 rows=100001 loops=1)
           ->  Index Scan using demo_pkey on demo demo_1 (actual time=3.842..3.843 rows=0 loops=98)
                 Index Cond: (id = ANY (ARRAY["new rows_1".id, $2, $3, ..., $1024]))
                 Storage Table Read Requests: 1
                 Storage Table Read Execution Time: 3.110 ms
                 Storage Table Rows Scanned: 0
   ->  CTE Scan on "new rows" (actual time=523.656..595.856 rows=100000 loops=1)
         Filter: (NOT (hashed SubPlan 3))
         Rows Removed by Filter: 1
         Storage Table Write Requests: 100000
         Storage Flush Requests: 29
         Storage Flush Execution Time: 103.504 ms
         SubPlan 3
           ->  CTE Scan on conflict (actual time=273.537..523.586 rows=1 loops=1)
 Planning Time: 9.168 ms
 Execution Time: 1609.649 ms
 Storage Read Requests: 98
 Storage Read Execution Time: 304.823 ms
 Storage Rows Scanned: 1
 Storage Write Requests: 100000
 Catalog Read Requests: 27
 Catalog Read Execution Time: 11.521 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 33
 Storage Flush Execution Time: 157.586 ms
 Storage Execution Time: 473.929 ms
 Peak Memory Usage: 17864 kB
Enter fullscreen mode Exit fullscreen mode

This gets back to 33 flush requests and a quick response time. The SQL query might appear more complex, but it clearly explains the behavior, while ON CONFLICT could be full of surprises because it deviates from standard SQL.

Here is another example of ON CONFLICT DO UPDATE:

Triggers are executed row-by-row and this includes a flush:

yugabyte=# create or replace function set_c_value()
 returns trigger as $$
 begin new.c := new.a + new.b; return new;
 end; $$ language plpgsql;
CREATE FUNCTION
yugabyte=# create trigger demo_c_trigger
 before insert or update on demo
for each row execute function set_c_value();
CREATE TRIGGER
yugabyte=# explain (analyze, dist, costs off)
 insert into demo (a,b) 
 select 0,0 from generate_series(1,100000)
;

                                        QUERY PLAN
-------------------------------------------------------------------------------------------
 Insert on demo (actual time=94315.943..94315.943 rows=0 loops=1)
   ->  Function Scan on generate_series (actual time=11.294..1390.580 rows=100000 loops=1)
 Planning Time: 0.079 ms
 Trigger demo_c_trigger: time=90394.312 calls=100000
 Execution Time: 94317.812 ms
 Storage Read Requests: 0
 Storage Rows Scanned: 0
 Storage Write Requests: 100000
 Catalog Read Requests: 11
 Catalog Read Execution Time: 8.266 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 100000
 Storage Flush Execution Time: 86278.397 ms
 Storage Execution Time: 86286.663 ms
 Peak Memory Usage: 4896 kB
Enter fullscreen mode Exit fullscreen mode

You can avoid those flushes for bulk loads by disabling the trigger and including its logic in the SQL statement.

Here is an example with INSERT using a function:

Here is an example using WITH to run multiple DML into one WITH clause:

When working with SQL databases, performing operations on row sets using SQL is best, with common table expressions, the WITH clause, and the RETURNING clause for INSERT, DELETE, and UPDATE statements. When a large number of rows is manipulated, avoid adding row-by-row procedural logic with PL/pgSQL, triggers, or other constructs, as this might prevent the database from efficiently batching operations. Still, YugabyteDB optimizes those constructs to provide acceptable performance for legacy applications.

Conclusion

YugabyteDB utilizes write buffering as a crucial performance optimization to minimize RPC overhead, decrease latency, and enhance throughput. This is achieved by carefully managing when buffers are flushed to guarantee correctness while maximizing efficiency. Unlike databases that use asynchronous Raft or parallel commits, this approach offers improved resilience and prevents infrastructure errors from being passed on to the application.

However, flushing becomes mandatory to maintain data integrity in specific cases, such as read-after-write scenarios, dependencies between operations, transaction boundaries, and non-transactional effects. Understanding these mechanisms can help database administrators and developers fine-tune system performance and achieve optimal results in write-heavy workloads. SQL is used to declare operations on a set of rows rather than a procedural row-by-row approach.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .