Easy async function execution in Python/Django using a queue and consumer Thread

I didn’t want to setup django-q2, Celery, or any of the other heavier background task running options for Django, since I just wanted to do basic API calls and save tracking data to the db, without blocking user requests. My async work also didn’t need to be transactional, so losing the queue due to app server failure or restarts didn’t matter much. This implementation is also quite performant, so likely to be faster than having a bunch of extra plumbing added.

# Usage:
def my_background_method(x='x', y='y', z='z'):
    ... time consuming API call or db save ...
    pass

# method with parameters, use a lambda
get_default_background_worker().add_to_queue(lambda: my_background_method(x="parameters", y="go", z="here"))

# else just pass the method without parentheses
get_default_background_worker().add_to_queue(my_basic_background_method)

import logging
import queue
import time
from threading import Thread
log = logging.getLogger(__name__)


# Change this to fit your memory constraints, as insertion will block when the queue reaches this size,
# tune DEFAULT_PAUSE_SECONDS down if you're hitting this max size often
MAX_QUEUE_SIZE = 10000

# Increase this if your queue is empty often, else your CPU will be at 100% in the while self.running loop
DEFAULT_PAUSE_SECONDS = 10


class BackgroundConsumer(Thread):
    def __init__(self, queue, worker_name):
        Thread.__init__(self)
        self.queue = queue
        self.worker_name = worker_name
        self.daemon = True # don't block on exit
        self.running = True # start running immediately

    def run(self):
        """Runs the consumer."""
        log.info(f"Consumer is running for {self.worker_name}...")
        while self.running:
            log.debug(f"Saving queue in {self.worker_name}")
            self.save_queue()
            # pause before saving queue again to avoid spinning the cpu at 100% with empty queues
            time.sleep(DEFAULT_PAUSE_SECONDS)
        log.info(f"Consumer for {self.worker_name} exited")

    def save_queue(self):
        """Save the queue to the database."""
        while not self.queue.empty():
            fn = self.queue.get(block=True)
            try:
                fn()
            except Exception as e:
                log.exception(f"Error saving queue item in {self.worker_name}, skipping", e)
            self.queue.task_done()

    def pause(self):
        """Pause the consumer."""
        self.running = False


class BackgroundWorker:
    def __init__(self, name):
        self.name = name
        log.info(f"Instantiating background worker {name}")
        self.queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
        self.consumer = BackgroundConsumer(self.queue, worker_name=self.name)
        self.consumer.start()
    def add_to_queue(self, fn):
        """ Add a function to the queue for later execution by the BackgroundConsumer Thread"""
        self.queue.put(fn)

aqc_default_background_worker = None
def get_default_background_worker():
    global aqc_default_background_worker
    if not aqc_default_background_worker:
        aqc_default_background_worker = BackgroundWorker(name="default_background_worker")
    return aqc_default_background_worker

# Best to use separate workers by duplicating the get_default_background_worker() method and renaming it. 
# Make sure the global doesn't conflict with anything else. 
# Depending on your app server and runtime you might need to keep this global somewhere else.