Once I was tasked with improving the database and general app operations of backend services which are using MongoDB as their main database.
These services were part of huge infrastructure where millions of messages coming through the queues and needed to be processed based on the message actions. That means tons of DB ops each second and other additional checks while processing.
The processing layer of service was using pymongo to interact with MongoDB and service itself was running in synchronous environment. Even the database operations was handled in bulk still performance was not capable of handling incoming data.
Synchronous code was making things even worse. The code execution waits result from current operation to move forward. That's a serious bottleneck in scalable systems.
This was causing queue overflows and potential data loss every time.
Asynchronous Environment
The solution I implemented was combination of:
Motor
Asyncio
Uvloop
Let's quickly go through the definitions of these items.
PyMongo is the official MongoDB driver for Python, providing a simple and intuitive way to interact with MongoDB databases. It's synchronous, meaning each database operation blocks the execution of your program until it completes, which can be a bottleneck in I/O-bound tasks.
Motor is the asynchronous driver for MongoDB, built on top of PyMongo and designed to take advantage of Python's asyncio library. Motor allows you to perform non-blocking database operations, making it suitable for high-performance applications that require concurrency.
To illustrate the performance differences, I prepared a stress test using two scripts: one using Motor (asynchronous) and the other using PyMongo (synchronous). Both scripts performed the same task of reading and writing documents to MongoDB in batches.
Both scripts reading 300k documents from source collection and migrating them to new target collection.
Asynchronous Script (Motor)
importloggingimportasyncioimporttimefrombsonimportObjectIdfrommotor.motor_asyncioimportAsyncIOMotorClient# Configure logging
logging.basicConfig(level=logging.INFO)logger=logging.getLogger(__name__)# MongoDB setup
MONGO_URI='mongodb://root:root@localhost:27019'DB_NAME='products'COLLECTION_NAME='gmc_products'client=AsyncIOMotorClient(MONGO_URI)db=client[DB_NAME]collection=db[COLLECTION_NAME]target_collection=db["new_collection"]asyncdeffetch_products(batch_size,last_id=None):query={'_id':{'$gt':last_id}}iflast_idelse{}cursor=collection.find(query).sort('_id').limit(batch_size)products=awaitcursor.to_list(length=batch_size)returnproductsasyncdefbulk_write_to_mongo(products):forproductinproducts:product['_id']=ObjectId()# Generate a new ObjectId for each product
try:result=awaittarget_collection.insert_many(products,ordered=False)logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')exceptExceptionase:logger.error(f'Error inserting products into MongoDB: {e}')asyncdefprocess_batches(batch_size,concurrency_limit):tasks=[]last_id=NonewhileTrue:products=awaitfetch_products(batch_size,last_id)ifnotproducts:breaklast_id=products[-1]['_id']tasks.append(bulk_write_to_mongo(products))iflen(tasks)>=concurrency_limit:awaitasyncio.gather(*tasks)tasks=[]# Process remaining tasks if any
iftasks:awaitasyncio.gather(*tasks)asyncdefmain():batch_size=1000concurrency_limit=10start_time=time.time()awaitprocess_batches(batch_size,concurrency_limit)end_time=time.time()logger.info(f'Total time: {end_time-start_time:.2f} seconds.')if__name__=='__main__':asyncio.run(main())
Synchronous Script (PyMongo)
importloggingimporttimefrombsonimportObjectIdfrompymongoimportMongoClient# Configure logging
logging.basicConfig(level=logging.INFO)logger=logging.getLogger(__name__)# MongoDB setup
MONGO_URI='mongodb://root:root@localhost:27019'DB_NAME='products'COLLECTION_NAME='gmc_products'TARGET_COLLECTION_NAME='new_collection'client=MongoClient(MONGO_URI)db=client[DB_NAME]collection=db[COLLECTION_NAME]target_collection=db[TARGET_COLLECTION_NAME]deffetch_products(batch_size,last_id=None):query={'_id':{'$gt':last_id}}iflast_idelse{}cursor=collection.find(query).sort('_id').limit(batch_size)products=list(cursor)returnproductsdefbulk_write_to_mongo(products):forproductinproducts:product['_id']=ObjectId()# Generate a new ObjectId for each product
try:result=target_collection.insert_many(products,ordered=False)logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')exceptExceptionase:logger.error(f'Error inserting products into MongoDB: {e}')defprocess_batches(batch_size):last_id=NonewhileTrue:products=fetch_products(batch_size,last_id)ifnotproducts:breaklast_id=products[-1]['_id']bulk_write_to_mongo(products)defmain():batch_size=1000start_time=time.time()process_batches(batch_size)end_time=time.time()logger.info(f'Total time: {end_time-start_time:.2f} seconds.')if__name__=='__main__':main()
Results and Analysis
Execution Time of Migrating 300k documents:
Asynchronous script: 17.15 seconds
Synchronous script: 23.26 seconds
The asynchronous script completed the task 6.11 seconds faster than the synchronous script. While this might not seem like a significant difference for a single run, it becomes more pronounced in high-load scenarios or when processing large datasets continuously.
Benefits of Using Motor and Asynchronous Environment
Improved Throughput: Asynchronous operations can handle more tasks concurrently, increasing overall throughput. This is especially beneficial in applications with high I/O operations, such as web servers handling multiple database queries simultaneously.
Non-Blocking I/O: Asynchronous operations do not block the main thread, allowing other tasks to continue running. This results in better CPU utilization and smoother application performance, particularly under load.
Scalability: Asynchronous code scales better with the number of concurrent operations. For example, a web application using Motor can handle more simultaneous requests compared to one using PyMongo.
Resource Efficiency: Asynchronous operations can lead to more efficient use of system resources. For instance, the event loop in asyncio allows the application to switch between tasks, reducing idle times and improving overall efficiency.
Source Code and Video Explanation
You can find the source code on Github repository below:
The choice between Motor and PyMongo depends on the specific needs of your application. For applications that require high concurrency and efficient I/O handling, Motor and the asynchronous approach offer significant advantages. However, for simpler applications or scripts where ease of implementation is a priority, PyMongo's synchronous approach might be sufficient.
By leveraging asynchronous operations with Motor, you can build more scalable and performant applications, making it a worthwhile consideration for modern web development.