Async task queues @ Insurami

Async task queues allow programmers to have more control over the backend processing, making it robust against unexpected situations and more capable of delivering a sophisticated and complex business logic.

Introduction

At Insurami, we provide landlords and their tenants with an alternative to traditional lease deposits. Landlords can refer new and existing tenants on our platform deposit guarantee quote.

What this means in our backend is a sequence of I/O operations and network requests to third-party services that can take more time than the user might expect. Additionally, there could be situations where a HTTP request fails temporarily, in which case the entire processing flow is broken forever and data in the backend is rendered in an invalid state.

To tackle this problem, we wanted to breakdown the backend processing code into individual steps that can be run independently in a sequence of steps, retried if necessary and the entire flow rolled-back if needed. Following these principles we can start looking at our backend in a transactional way, giving us confidence that our data is consistent.

Implementation

Task queues provide a way to execute code asynchronously by sending messages to a queue, which are picked up and processed by a process running in the background. There are many task queue frameworks available, but we chose Celery as it is a very established solution and compatible with AWS message queues, which we have already integrated in our platform. The AWS Simple Queue System will work as our message broker, facilitating the communication between processes.

Celery provides the means to execute code blocks sequentially with the chain method. When the execution chain is started, every block of code runs as a task, and its result passed as the input to the next block.

Every task can be configured to retry when exceptions are raised inside them. It is possible to provide a maximum number of retries, so tasks don't keep failing indefinitely when recovery is not possible.

It is also possible to set an interval between retries, to avoid repeating executions too close in time. A well-known strategy to calculate the interval between retries is to have an exponential back-off, so tasks are allowed a longer interval as more retries are fired. The algorithm to generate such intervals was introduced in this post. Celery version 4 introduces a native auto-retry option with exponential backoff.

The code

We have briefly exposed the main concepts on the implementation of async task queues. Now is the time to see what all that looks like in terms of code:

To add the auto-retry configuration and a custom on_failure handler we override the celery.Task class with our own InsuramiTask class.

class InsuramiTask(Task):
   """Implements custom tasks with flask context and autoretry."""

   abstract = True
   retry_kwargs = {"max_retries": 10}
   retry_backoff = 4
   retry_backoff_max = 600
   autoretry_for = (Exception,)
   retry_jitter = False

   def on_failure(self, exc, task_id, args, kwargs, einfo):
       max_retries = self.retry_kwargs.get("max_retries")
       with configure_scope() as scope:
           scope.set_extra("sub_taks", self.s.__call__.__name__)
           scope.set_extra("celery_task_id", task_id)
           scope.set_extra("args", args)
           capture_exception(
               InsuramiTaskFailure(
                   "Task has failed {0} times: {1}".format(
                       max_retries, str(exc)
                   )
               )
           )
Abstracts a task in our execution chain.
  • We decided on a maximum of 10 retries.
  • The retry backoff will start at 4 seconds. So the interval sequence in seconds between retries will be 4, 8, 16, and so on.
  • A maximum interval of 600 seconds.
  • The exception class which triggers an auto-retry is Exception, so effectively every exception will trigger one.
  • In order to make the exponential backoff sequence work you need to remove the randomness factor introduced with retry_jitter.

To handle the failure after the maximum retries we override the on_failure handle, which we use to send a custom issue into our Sentry account to track issues in async task queues. This would also be the place to handle roll-backs on our task failure. Since the method receives the name of the task, the traceback and other relevant info its easy to infer the operations needed to leave the DB in a correct state.

To instantiate the celery object:

# tasks.py
from server import app
 
def make_celery(app):
 
   celery = Celery(
       app.import_name,
       backend=app.config["CELERY_RESULT_BACKEND"],
       broker=app.config["CELERY_BROKER_URL"],
       include=["steps"],
   )
 
   queue_name_prefix = app.config["INSURAMI_ENV"]
 
   celery.conf.task_default_queue = "tasks"
   celery.conf.update(
       {
           "broker_transport_options": {
               "region": "eu-west-2",
               "queue_name_prefix": queue_name_prefix,
           }
       }
   )
 
   celery.Task = InsuramiTask
   return celery
 
runner = make_celery(app)
The make_celery method returns an instance of the task runner

The make_celery method takes your flask app and returns a celery instance called. We will call it runner. Looking at it in more detail,

The Celery constructor needs:

  • a backend parameter, which in our case is a postgres URI to store metadata of task execution.
  • A broker parameter, which is the SQS URI for messaging between the tasks.
  • You must include the list of modules with the tasks you want to execute.
  • The default queue name.
  • Additional config for transport options.
  • The AWS region of the broker.
  • You can specify a queue name prefix, to separate messages by environment, for instance.
  • Do not forget to override the default celery.Task with our defined InsuramiTask.

Next, your code broken-down to individual methods:

# steps.py
from tasks import runner 
 
@runner.task
def step_one(input_data):
    # some processing here
    # 
    return result
 
@runner.task
def step_two(input_args):
    # input_args will receive `result`
    # if step_one and step_two are chained
    # together.
    
These are all individual tasks executed in sequence

The @task decorator turns our methods into tasks. The last step is defining our chain. To do this we write yet another function.

# process.py
from celery import chain
from steps import step_one, step_two
from tasks import runner
 
@runner.task
def start_new_processing(input_data):
 
   # Chain
   chain(
       step_one.s(input_data),
       step_two.s(),
   ).apply_async()
This module starts the chain processing

When we initiate the chain in another task, we will send it to the queue, which will start the processing of the steps defined above.


In more detail, what is passed to the chain is the steps signatures -the .s() methods- separated by commas. The chain itself is started by invoking apply_async(). If you want to know more about signatures and apply_async check the celery.Task class docs.

Once all your code is in place a request handler can call the method start_new_processing which would trigger the async task queue.

Results

The UX in our referral frontend is now much faster since the client is not blocked by operations in the backend. In addition to that, we have been able to automatically recover from a temporary problem with one of the third-party services we use as part of our risk analysis on tenants.