Handling Errors Gracefully with Asyncio Retries

Mar 25, 2024 ยท 4 min read

As developers, we strive to write robust applications that can handle errors gracefully. This becomes especially important in asynchronous Python applications built with Asyncio. In this article, we'll explore some practical techniques for adding retry logic to Asyncio apps using Python.

The Problem: Transient Errors in Distributed Systems

Modern applications often rely on external services like databases, APIs, message queues, etc. These distributed systems can fail unexpectedly - networks flap, servers restart, load balancers route improperly. Many such failures are transient - succeeding on a retry.

Our Asyncio apps need to handle these transient errors to maintain availability. Simply failing on the first error is fragile. Equally problematic is retrying blindly without limit. This can overload failing services and cause cascading failures.

We need a resilient retry strategy - able to identify transient errors, backing off exponentially between retries, and giving up after a reasonable number of attempts. Let's see how to implement this in Python.

Asyncio Background Tasks and Transients

Consider an Asyncio application that fetches data from a web API and stores it in a database. We implement this as a background asyncio task:

async def fetch_and_store_data():
    data = await fetch_from_api()
    await store_in_database(data)

If fetch_from_api() fails transiently, we want to retry it before giving up. But retrying blocking calls like this can freeze our Asyncio event loop. Instead, we need a special utility - asyncio.create_task():

import asyncio

async def fetch_with_retries():
    for retry in range(5):
            data = await fetch_from_api()
            await store_in_database(data)
        except TransientError:
            if retry == 4:
            wait_time = 2 ** retry
            print(f"Transient error, retrying in {wait_time} seconds") 
            await asyncio.sleep(wait_time)

async def main():


This wraps our fetch logic in a looping task. On transient failures, it waits exponentially longer before retrying up to 5 times. After 5 failed retries, the exception bubbles up. The surrounding Asyncio app stays responsive since this runs in a background task while awaiting main().

This pattern works well for background data fetching, processing queues, polling APIs etc. The retries happen asynchronously without blocking the event loop. Next, let's explore another approach for synchronous failures.

Retrying Synchronous Work with Exponential Backoff

Sometimes, part of our synchronous logic in an Asyncio app needs retries. For example, making blocking HTTP requests with the requests module. We can wrap the call in an Asyncio utility - asyncio.to_thread():

import asyncio
import requests

async def fetch_sync_with_retries():
    for retry in range(5):
            data = await asyncio.to_thread(requests.get, url)
        except TransientError:
            if retry == 4: 
            wait_time = 2 ** retry
            print(f"Transient error, retrying sync call in {wait_time} seconds")
            await asyncio.sleep(wait_time)  

This runs the blocking requests.get() in a thread pool, awaiting the result. On transient failures, it retries with exponential backoff before re-raising. The surrounding Asyncio app remains responsive.

For IO-bound sync code like this, to_thread() combined with retries works well. For CPU-bound sync work, run_in_executor() is better. The same retry pattern applies.

Handling Retry Exceptions Granularly

In complex apps with multiple external services, we often want fine-grained control over retries. For example:

  • Retry connection errors more aggressively
  • Retry HTTP 500 errors but not 400 errors
  • Custom backoff for some errors
  • Limit number of retries per service
  • We can encapsulate this logic in a reusable @retry decorator:

    from functools import wraps
    RETRY_ERRORS = (ConnectionError, HTTP_500) 
    def retry(num_retries=5, on_errors=RETRY_ERRORS):
        def wrap(func):
            async def wrapped(*args, **kwargs):
                for retry in range(num_retries):
                        return await func(*args, **kwargs)
                    except on_errors as e:
                        if retry == num_retries-1:
                        wait = calc_backoff(retry)
                        print(f"Retrying {func} in {wait} seconds")
                        await asyncio.sleep(wait)
            return wrapped
        return wrap
    @retry(on_errors=ConnectionError, num_retries=10)
    async def connect_to_service():
        # implementation

    Now individual functions can be decorated with custom retry behavior.

    The key ideas are:

  • Encapsulate retry logic in decorators and background tasks
  • Exponential backoff prevents overload
  • Allow configuring per-function retry rules
  • Let unhandled exceptions bubble up
  • Together this provides a resilient retry capability for Asyncio apps.

    There are more advanced approaches like tenacity for complex retry scenarios. But the patterns above serve most basic Asyncio retry needs.

    Browse by tags:

    Browse by language:

    The easiest way to do Web Scraping

    Get HTML from any page with a simple API call. We handle proxy rotation, browser identities, automatic retries, CAPTCHAs, JavaScript rendering, etc automatically for you

    Try ProxiesAPI for free

    curl "http://api.proxiesapi.com/?key=API_KEY&url=https://example.com"

    <!doctype html>
        <title>Example Domain</title>
        <meta charset="utf-8" />
        <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
        <meta name="viewport" content="width=device-width, initial-scale=1" />


    Don't leave just yet!

    Enter your email below to claim your free API key: