JobMaster

The Problem

While building Macaroni at Blink SEO, we needed a way to trigger long-running Python tasks from our Retool frontend. Click a button in the web app, and somewhere a Python script starts processing data. But the existing solutions were either overkill (Celery with Redis/RabbitMQ) or too limited for our needs.

We already had PostgreSQL as our database, so I wondered: could the database itself serve as the job queue? The answer was JobMaster: a lightweight job-queue system that uses PostgreSQL for persistence and coordination, with a decorator to turn any function into a queueable task:

Basic task definition
from jobmaster import task

@task
def generate_report(client_id: int, month: str, format: str = 'pdf'):
    """Generate a monthly report for a client."""
    # Your business logic here
    report = create_report(client_id, month)
    export_report(report, format)
    return f"Report generated: {client_id}_{month}.{format}"

That's it. Add the decorator, and the function becomes a queueable task. JobMaster inspects the signature to register parameters and their types, which are validated when jobs are inserted.

Architecture

It turns out we didn't need a separate message broker. PostgreSQL's transactional guarantees and row-level locking are perfect for job coordination. Add some stored procedures for atomic job insertion and retrieval, and you have a robust queue without the operational complexity of managing additional infrastructure.

JobMaster has three main components: the database schema (tables and stored procedures), the task decorator (which registers Python functions as tasks), and the JobMaster class (which coordinates job execution).

Terminology: A task is a registered function definition (like a class). A job is a specific invocation of that task with arguments (like an instance).

Your web application calls a stored procedure to insert a job into the queue. Separately, a Python worker process polls the queue, pops jobs, and executes them. The database handles all the coordination, so no race conditions or lost jobs.

Inserting a job from your web app
-- Call this from Retool, your API, or anywhere with DB access
CALL jobmaster.insert_job(
    'reports',           -- type_key (module name)
    'generate_monthly',  -- task_key (function name)
    10,                  -- priority (higher = sooner)
    '{"client_id": 42, "month": "2024-01"}'::json
);

Advanced Task Options

JobMaster uses type hints to validate arguments and generate the database schema. If someone tries to insert a job with client_id="abc", it will be rejected.

Select-From Parameters

For parameters with a fixed set of valid values, you can specify them directly in the type hint. This creates an enum-like constraint enforced at the database level:

Constrained parameter values
@task
def export_data(client_id: int, format: ['csv', 'json', 'parquet']):
    """Export client data in the specified format."""
    # format is guaranteed to be one of: 'csv', 'json', 'parquet'
    ...

Write-All Parameters

Sometimes you want to run a task for all valid values of a parameter, like exporting data in every format at once. The write_all option enables this:

Batch job insertion
@task(write_all=['format'])
def export_data(client_id: int, format: ['csv', 'json', 'parquet']):
    ...

# Then insert with 'ALL' to create three separate jobs:
# CALL jobmaster.insert_job('exports', 'export_data', 10,
#      '{"client_id": 42, "format": "ALL"}'::json)
#
# This creates jobs for: csv, json, and parquet

Task Dependencies

Real-world workflows often have dependencies: you can't send a report until it's generated, and you can't generate it until the data is refreshed. JobMaster handles this with the Dependency class:

Defining task dependencies
from jobmaster import task, Dependency, same

@task
def refresh_data(client_id: int):
    """Pull latest data from APIs."""
    ...

@task
def generate_report(client_id: int, month: str):
    """Generate report from refreshed data."""
    ...

@task(
    dependencies=Dependency(
        generate_report,  # This task must complete first
        hours=2,          # Within the last 2 hours
        client_id=same    # With the same client_id
    )
)
def send_report(client_id: int, email: str):
    """Send the generated report via email."""
    ...

The same keyword means "use the same value as this job." When JobMaster pops a send_report job from the queue, it checks if generate_report has completed for that client in the last 2 hours. If not, it automatically inserts the dependency job and re-queues the original.

This creates a self-healing workflow: you can just insert the final job you want, and JobMaster will work backwards to ensure all prerequisites are satisfied.

Resource Management

Not all tasks are equal. Some are lightweight API calls; others load gigabytes of data into memory. JobMaster uses "process units" to prevent resource exhaustion:

Resource-aware task definition
# Configure total available resources (e.g., MB of RAM)
jobmaster = JobMaster(
    db_engine=engine,
    system_process_units=8000  # 8GB available
)

@task(process_units=100)  # Light task: 100MB
def quick_check(client_id: int):
    ...

@task(process_units=2000)  # Heavy task: 2GB
def full_data_refresh(client_id: int):
    ...

When popping jobs, JobMaster sums the process units of all currently running jobs and only picks up new jobs that fit within the remaining capacity. This prevents the common problem of multiple heavy jobs starting simultaneously and crashing the worker.

Database Schema

Running jobmaster.deploy() creates a dedicated schema with tables for tasks, parameters, and jobs. The key tables are:

TablePurpose
task_typesModule-level groupings (type_key)
tasksRegistered task definitions
task_paramsParameter definitions with types and constraints
jobsJob queue with status, priority, arguments

Jobs move through statuses: waitingrunningcomplete (or failed). The stored procedures use row-level locking to ensure exactly-once execution, even with multiple workers.

Running the Worker

The worker script just needs to import your tasks (to register them) and call jobmaster.run():

Worker script
# worker.py
import sqlalchemy
from jobmaster import JobMaster

# Import all modules containing @task decorators
from myapp import reports, exports, notifications

# Create engine and JobMaster instance
engine = sqlalchemy.create_engine("postgresql+pg8000://...")
jobmaster = JobMaster(db_engine=engine)

# Run until queue is empty
if __name__ == '__main__':
    jobs_run = jobmaster.run()
    print(f"Completed {jobs_run} jobs")

In production, we ran this on a cron every few minutes. For near-real-time processing, you could run it in a loop with a short sleep between iterations. The database handles all the coordination, so you can safely run multiple workers in parallel.

Summary

  • PostgreSQL-backed job queue (no Redis or RabbitMQ needed)
  • Simple @task decorator with automatic parameter validation
  • Declarative task dependencies with automatic prerequisite execution
  • Resource management via process units
  • Batch job insertion with write_all
  • Published to PyPI

Takeaways

JobMaster solved our specific problem at Blink SEO: bridging the gap between a low-code frontend and Python data processing. Using PostgreSQL as the message broker meant one fewer system to maintain, and we got transactional guarantees for free.

The dependency system was probably the most valuable feature in practice. Instead of carefully orchestrating job sequences, we could just define "this task needs that task to have run recently" and let JobMaster figure out the execution order.