How Redis Addressed Our Dynamic Task Scheduling and Concurrent Execution Issues w Celery & Postgres

Nik L. - Mar 18 - - Dev Community

Developing a system designed for scheduled and dynamic synchronization of data between clients' data warehouses and our own datastore posed several intriguing challenges.

Initially, we utilized Goroutines in Go language for scheduling database queries, successfully running the setup with minimal resources employing SQLite and a Go service.

Nevertheless, migrating this functionality onto our Software as a Service (SaaS) platform introduced fresh hurdles linked to dynamic scheduling and concurrent task execution.

Problem Statement

Our primary goal centered around setting up periodic data synchronization from clients' data warehouses to our datastore. Addressing this requirement called for implementing an effective method of syncing information in a consistent manner without causing disruptions.

Previous Architecture Overview

Previously, our architecture enabled end users to connect to their corresponding data warehouses, carry out database queries, and synchronize subscribers following predefined schedules (hourly or daily, for instance).

Given that we employed an integrated SQLite database inside the service, expecting limited instances of simultaneous executions mainly owing to the fact that many consumers elected to update specific tables individually—and taking advantage of Go programming language features to manage scheduling proficiently using goroutines alongside a lightweight library named Asynq—this arrangement seemed reasonably straightforward.

Old SuprSend Architecture

Figure 1 - Old SuprSend architecture catering to multi-tenant requirements

Nonetheless, transferring this ability to our SaaS platform exposed us to two major obstacles: dynamic scheduling from disparate sources and consecutive execution of such schedules involving launching the query, refining outcomes, and aligning subscribers.

Solution Implementation

Delving deeper into the underlying difficulties helped clarify matters:

Dynamic Scheduling

Envision a scenario where anybody could plan a job anytime, configured either via cron schedules or fixed intervals. An adequate scheduler must consistently prioritize jobs while adapting dynamically to alterations.

Fortunately, Redis' Sorted Set offers valuable assistance in tackling scheduling predicaments, providing efficient methods for storing and retrieving tasks contingent upon their execution periods or importance.

Utilizing Redis' ZADD instruction enables adding task depictions to ordered collections, indexed by execution timestamps or priorities expressed as float numbers. Internal implementation involves a hybrid construction incorporating a hash map and a skiplist data structure. While the former grants quick element access depending on key valuations, the latter sustains elements' sequential ordering per designated scores. Such bipartite organization empowers Redis to execute operations swiftly on ordered collections.

Translated into task scheduling terms, scores usually denote execution moments or significance levels. With Redis preserving the collection in ascending order regarding scores, priority assignment hinges on the numerical scores themselves.

Lower scores correspond to greater urgency, ensuring expedited detection and extraction of imminently executable activities. Should two entries possess equal scores, they will default to lexicographic ranking.

A Redis-centric scheduler generally employs Redis' ZADD operation for embedding task illustrations into ordered sets coupled with ZRANGEBYSCORE aimed at acquiring top-priority assignments among the organized records.

Let's examine an example showcasing varying degrees of urgencies and arranged execution instants:



def calculate_score(priority, execution_time):
    # Transform execution_time into UNIX timestamp
    unix_timestamp = execution_time.timestamp()

    # Map priority labels to numeric scale (smaller number implies higher precedence)
    priority_values = {"low":  3, "medium": 2, "high": 1}

    # Generate composite score derived from priority level and UNIX timestamp
    score = unix_timestamp + (10 ** 9 * priority_values[priority])
    return score


Enter fullscreen mode Exit fullscreen mode

Next, populate Redis' ordered collection using the ZADD operation:



import datetime
import redis

# Initialize connection to Redis
r = redis.Redis()

# Insert tasks together with computed scores
r.zadd("scheduled_tasks", {
    f"Task A ({priority})": calculate_score(priority, datetime(2023, 3, 15, 10, 0, 0))
    for priority in ["low", "medium", "high", "low"]
})


Enter fullscreen mode Exit fullscreen mode

Retrieving upcoming duties requires invoking the ZRANGEBYSCORE operation accompanied by the present UNIX timestamp acting as the threshold and a sizable figure (such as +inf) representing the upper limit:



import datetime

# Obtain the present UTC timestamp
current_timestamp = datetime.datetime.utcnow().timestamp()

# Fetch approaching obligations
due_tasks = r.zrangebyscore("scheduled_tasks", current_timestamp, "+inf")


Enter fullscreen mode Exit fullscreen mode

Such configuration guarantees that pressing engagements always precede less urgent ones irrespective of their specified dates.

Having established the principles behind scoring and scheduling mechanisms, let us explore crafting a solid infrastructure integrating independent producers, schedulers, workers, and Redis components working harmoniously.

Building the System

Key aspects include:

  • Designating a producer entity responsible for injecting tasks into Redis via ZADD instructions targeting ordered sets.
  • Establishing a scheduler, repeatedly scanning Redis for deferred responsibilities via ZRANGEBYSCORE functions married to current timestamps, subsequently allocating incoming tasks to available workers.
  • Configuring one or numerous worker entities accountable for performing undertakings and broadcasting heartbeats once the assignments wrap up, thereby notifying the scheduler about updated execution statuses.

In practice, our Application Programming Interface (API) server assumed the role of the producer.

Library Evaluation

After weighing pros and cons, we favored RQ-scheduler written in Python due to its compatibility with the outlined strategy delineated previously. Other candidates included:

  • APScheduler: Insufficient separation between scheduler and worker roles, prompting inconvenience when detaching from centralized API servers.
  • Celerybeat: Static scheduling incapabilities hindered suitability.

Ultimately, RQ-scheduler emerged victorious, demonstrating perfect alignment with our needs besides being accessible within Django frameworks.

New SuprSend Architecture

Figure 2 - Revised SuprSend architecture catering to multi-tenant requirements

Handling Multi-Tenant Data Stores

Initially leaning toward SQLite, we encountered significant constraints when scaling horizontally across distributed applications marked by intensified node counts and increased concurrent write attempts. Key concerns comprise:

  • Limited parallelism capabilities resulting from exclusive file-based locking techniques.
  • Inefficiency in accommodating network protocols native to SQLite amidst distributed settings.
  • Struggles in guaranteeing ACID compliance throughout dispersed nodes.
  • Vulnerabilities pertaining to data consistency and integrity attributable to file-level locks prevalent in SQLite implementations.

Given escalating demands surrounding synchronized multi-process writes against shared databases, we sought alternatives better suited for distributed systems.

Eventually, we adopted Redis or PostgreSQL solutions tailored explicitly for horizontal expansion. Specifically, considering factors like state maintenance during query execution, outcome buffering strategies intended to reduce server burden, and overall system stability, PostgreSQL prevailed as the optimal choice.

Despite slightly elevated computational expenses vis-à-vis SQLite counterparts, investing in enhanced reliability and performance remains paramount.

Processing Queries Efficiently

Addressing resource-intensive query processing represented yet another formidable test. Certain scenarios mandated dealing with substantial datasets spanning millions — or even billions — of rows.
Mitigating excessive loads compelled developing a specialized service devoted exclusively to processing tasks. Upon successful resolution, downstream modules received internal signals promoting subscriber registration transparently.

Happy coding!

. . . . . . . . . . . . . . . . . . . . . . . . . . . .