Fast load into a database table is a feature we need on any database. Datawarehouses use it daily. OLTP requires it regularly. And probably from the beginning of their existence to migrate data to it. With Python,the psycopg2 client is commonly used and I'll start there (this is the first post in a series). The main motivation is that psycopg2 doesn't have prepared statements, and parsing each INSERT, even with a list of rows, is not efficient for loading million of rows. But it has a nice alternative as it can call the COPY command.
All this works on any PostgreSQL-compatible engine. I'll use YugabyteDB here because loading data involves multiple nodes, and it is then critical to do it right. I'll simulate an IoT ingest taking the function from the Database performance comparison for IoT use cases. This project uses INSERT ... VALUES to load data but I'll use COPY which is the most efficient for any PostgreSQL compatible database that supports it. I've submitted a PR with some optimizations (now merged).
I'll define the IoT events with dataclasses_json
and will use the psychopg2
driver which is the most frequently used with Python. This requires:
pip install psycopg2 dataclasses_json
I re-use the generate_events
code from the MaibornWolff project:
from datetime import datetime
import random
import string
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from datetime import datetime, timezone
@dataclass_json
@dataclass
class Event:
timestamp: datetime
device_id: str
sequence_number: str
temperature: float
def _rand_string(length):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def generate_events(device_id, start_timestamp, num_events, sequence_number=1, device_spread=1):
device_ids = [f"{_rand_string(4)}{device_id}{_rand_string(4)}" for i in range(device_spread)]
if start_timestamp == 0:
start_timestamp = int(datetime.now().timestamp()*1000)
for i in range(num_events):
event = Event(
datetime.fromtimestamp(start_timestamp/1000, timezone.utc)
, device_ids[i%device_spread], sequence_number, random.uniform(-20, 35))
yield event
sequence_number += 1
start_timestamp += random.randint(5, 10)*60
The only thing I changed here is define a timestamp datatype rather than an epoch.
I create an iot_demo
table to store those events:
import psycopg2
yb = psycopg2.connect('postgres://yugabyte:franck@yb1.pachot.net:5433/yugabyte')
yb.cursor().execute("""
drop table if exists iot_demo;
create table if not exists iot_demo (
timestamp timestamp,
device_id text,
sequence_number bigint,
temperature real,
primary key(device_id,timestamp,sequence_number)
);
""")
This connects to my very small Yugabyte database that I keep publicly open. You can easily create one following the documentation or our free tier cloud.
I have defined the primary key without mentioning HASH or RANGE partitioning so that the code is compatible with any PostgreSQL database. In YugabyteDB, the defaults are HASH for the first column in the primary key and ASC for the others, so what I want to achieve is: hash sharding to distribute the devices and range on time to get the measures together.
As generate_events
is a generator (there's a yield
clause to return the event in the loop) I just loop on it to build a string version of the events (tab-separated columns)
StringIO
import io
import psycopg
def load_events_with_copy_psycopg2(device_id,num_events):
yb=psycopg2.connect('postgres://yugabyte:franck@yb1.pachot.net:5433/yugabyte')
ysql=yb.cursor()
ysql.execute("set yb_default_copy_from_rows_per_transaction=1000")
tsv=io.StringIO()
for event in generate_events(
device_id=device_id, start_timestamp=0
, num_events=num_events, sequence_number=1
, device_spread=1):
tsv.writelines(f'{event.timestamp}\t{event.device_id}\t{event.sequence_number}\t{event.temperature}\n')
tsv.seek(0)
ysql.copy_from(tsv,'iot_demo',sep="\t",columns=('timestamp', 'device_id', 'sequence_number', 'temperature'))
tsv.seek(0)
tsv.truncate(0)
yb.commit()
ysql.close()
In psychopg2 the copy_from()
reads from a file, but here I generate events into an in-memory StringIO that will be read like a file by copy_from()
. Don't forget to seek(0)
after writing to it so that copy_from()
starts at the begining. And truncate(0)
when done.
I'm keeping all simple there. If there are some character strings that contain tabs or newlines, you may have to handle them as with TSV format. But in a future post I'll show that you can do better.
Pandas
You can also use pandas to format the CSV correctly and use copy_expert
which allows for more options:
import pandas
import psycopg2
def load_events_with_copy_from_psycopg2(device_id,num_events):
yb=psycopg2.connect('postgres://yugabyte:franck@yb1.pachot.net:5433/yugabyte')
ysql=yb.cursor()
ysql.execute("set yb_default_copy_from_rows_per_transaction=1000")
events=[]
for event in generate_events(
device_id=device_id, start_timestamp=0
, num_events=num_events, sequence_number=1
, device_spread=1):
events.append(event)
csv=io.StringIO(pandas.DataFrame(events).to_csv(header=True,index=False))
ysql.copy_expert("""
COPY iot_demo(timestamp,device_id,sequence_number,temperature)
FROM STDIN WITH DELIMITER ',' CSV HEADER
""",csv)
yb.commit()
ysql.close()
The one with pandas is easier, as it takes care of the CSV format.
Threads
A distributed database scales and you may load from multiple threads. This is easy as psycopg2 is thread safe.
Here is the main program that loads events in loops in threads:
import threading
def loop_in_thread(device_id,num_loops,num_events):
t1=datetime.now().timestamp()
for l in range(num_loops):
load_events_with_copy_psycopg2(device_id,num_events)
t2=datetime.now().timestamp()
print(f'{device_id:2d}: {(num_events)/(t2-t1):8.2f} rows/seconds/thread')
threads=[]
for i in range(2):
t=threading.Thread(target=loop_in_thread,args=[i,10,10000])
threads.append(t)
t.start()
for i in threads:
t.join()
Here is the result on my tiny database:
[opc@C tmp]$ python3 copy_from.py
0: 724.47 rows/seconds/thread
1: 709.02 rows/seconds/thread
2: 687.75 rows/seconds/thread
The goal of this post was to show that even with this old psycopg2 driver that doesn't support prepared statements, we can do fast loads. I'll write more on bulk inserts. In PostgreSQL, as in YugabyteDB, COPY is the optimal way to ingest data. The difference is that PostgreSQL by default loads all data in one transaction, and you can define intermediate commits with ROWS_PER_TRANSACTION option. However, in YugabyteDB, the inserted rows have to be buffered to be distributed efficiently to the tablet servers. It is more efficient to limit the size of transactions and the yb_default_copy_from_rows_per_transaction
has been introduced with a default value of 1000 to avoid memory issues when loading large files on small servers. Setting this to 0 reverts to the same as PostgreSQL in case you want to be sure that all is rolled back in case of failure.