MinhVo

Minh Vo

rss feed

Slaying code & making it lit fr fr 🔥 tagline

Hey there 👋 I'm an AI Engineer with 7 years of experience building scalable web and mobile applications. Currently at Neurond AI (May 2025 — present), architecting an Enterprise AI Assistant Platform with multi-tenant RAG on pgvector, multi-provider LLM orchestration, and Azure-native infrastructure. Previously spent 5+ years at SNAPTEC (Sep 2019 — Apr 2025), leading SaaS themes, admin dashboards, and e-commerce platforms — earned the Hero of the Year award in 2021. I specialize in TypeScript, React, Next.js, and AI-Native engineering with Claude Code and Cursor.bio

Back to blogs

Python Async Programming: asyncio and aiohttp

Master Python async: coroutines, event loops, aiohttp, and concurrent execution.

PythonAsyncasyncioBackend

By MinhVo

Introduction

Asynchronous programming in Python solves a fundamental problem: how to handle thousands of concurrent I/O-bound operations without spawning thousands of threads. Traditional synchronous Python code blocks the entire thread while waiting for network responses, file reads, or database queries. If your web server handles 1000 requests per second and each request makes 3 external API calls that take 200ms each, synchronous code needs 600 seconds of wall-clock time per second—requiring 600 threads just to keep up. Async Python lets a single thread handle all 1000 requests concurrently by switching between them during I/O waits, using a fraction of the memory and avoiding the complexity of thread synchronization.

The asyncio module, included in Python's standard library since Python 3.4 and matured significantly through Python 3.12, provides the foundation: an event loop, coroutines, tasks, and synchronization primitives. On top of this foundation, libraries like aiohttp provide asynchronous HTTP clients and servers, asyncpg provides async PostgreSQL access, and aioredis provides async Redis. The ecosystem has matured to the point where async Python is the standard choice for I/O-bound server applications, microservices, and API gateways.

This guide covers Python's async programming model from first principles: how coroutines work under the hood, how the event loop schedules them, how to write and compose async code correctly, how to use aiohttp for HTTP operations, how to handle errors and cancellation, and how to avoid the subtle bugs that async code introduces. We include real-world patterns for web scraping, API aggregation, database access, and background task processing. By the end, you will be able to write production-grade async Python applications.

Python async programming

Understanding Async Python: Core Concepts

Coroutines and the Event Loop

A coroutine is a function defined with async def that can be suspended and resumed. When a coroutine calls await, it yields control back to the event loop, which can then run other coroutines. When the awaited operation completes, the event loop resumes the coroutine where it left off. This is cooperative multitasking—coroutines must explicitly yield control by using await.

import asyncio
 
async def fetch_data(url: str) -> dict:
    print(f"Starting fetch: {url}")
    await asyncio.sleep(1)  # Simulate I/O
    print(f"Completed fetch: {url}")
    return {"url": url, "data": "response"}
 
# Running a coroutine
async def main():
    result = await fetch_data("https://api.example.com")
    print(result)
 
asyncio.run(main())

Tasks and Concurrent Execution

A Task wraps a coroutine and schedules it for execution on the event loop. asyncio.gather() creates tasks for multiple coroutines and runs them concurrently, waiting for all to complete.

async def main():
    # Sequential: takes 3 seconds
    r1 = await fetch_data("url1")  # 1 second
    r2 = await fetch_data("url2")  # 1 second
    r3 = await fetch_data("url3")  # 1 second
 
    # Concurrent: takes 1 second
    r1, r2, r3 = await asyncio.gather(
        fetch_data("url1"),
        fetch_data("url2"),
        fetch_data("url3"),
    )

The Difference Between Async and Threading

Threading uses OS threads and preemptive multitasking—the OS switches between threads at arbitrary points. Async uses coroutines and cooperative multitasking—coroutines switch only at await points. Async is faster for I/O-bound work (no thread creation or context-switching overhead) but cannot use multiple CPU cores (GIL applies to async code just as it does to threaded code).

Tasks, Futures, and the Awaitable Protocol

Everything that can be await-ed implements the awaitable protocol: it has an __await__() method that returns an iterator. Coroutines, Tasks, and Futures are all awaitable. Understanding this hierarchy is important: a coroutine is not scheduled until you await it or wrap it in a Task. Simply calling fetch_data("url") returns a coroutine object that does nothing until awaited.

async def demonstrate_awaitables():
    # This creates a coroutine but does NOT run it
    coro = fetch_data("url1")
 
    # To run it concurrently, wrap in a task
    task = asyncio.create_task(coro)
    result = await task
 
    # Or await directly (sequential)
    result = await fetch_data("url2")
 
    # gather() creates tasks internally
    results = await asyncio.gather(
        fetch_data("url3"),
        fetch_data("url4"),
    )

Event loop architecture

Architecture and Design Patterns

Structured Concurrency with TaskGroups

Python 3.11 introduced TaskGroup for structured concurrency, ensuring that all tasks are properly awaited and exceptions are propagated correctly. Unlike gather(), TaskGroup uses async with to scope the lifetime of tasks.

async def fetch_all(urls: list[str]) -> list[dict]:
    results = []
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_data(url)) for url in urls]
    # All tasks are guaranteed complete here (or exceptions raised)
    return [task.result() for task in tasks]

Producer-Consumer Pattern

async def producer(queue: asyncio.Queue, items: list[str]):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)  # Sentinel to signal completion
 
async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Propagate sentinel for other consumers
            break
        await asyncio.sleep(0.1)  # Simulate processing
        print(f"{name} consumed: {item}")
        queue.task_done()
 
async def main():
    queue = asyncio.Queue(maxsize=10)
    items = [f"item-{i}" for i in range(20)]
 
    await asyncio.gather(
        producer(queue, items),
        consumer(queue, "worker-1"),
        consumer(queue, "worker-2"),
    )

Semaphore-Based Rate Limiting

async def fetch_with_rate_limit(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)
 
    async def limited_fetch(url: str) -> dict:
        async with semaphore:
            return await fetch_data(url)
 
    return await asyncio.gather(*[limited_fetch(url) for url in urls])

Retry with Exponential Backoff

import asyncio
from functools import wraps
 
def retry(max_attempts: int = 3, base_delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    await asyncio.sleep(delay)
            return await func(*args, **kwargs)
        return wrapper
    return decorator
 
@retry(max_attempts=3, base_delay=1.0)
async def fetch_with_retry(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()

Implementation

HTTP Requests with aiohttp

aiohttp is the standard async HTTP library for Python. Unlike requests, it is built on asyncio and does not block the event loop. A single ClientSession manages a connection pool internally.

import aiohttp
import asyncio
 
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()
 
async def main():
    urls = [
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/torvalds",
        "https://api.github.com/users/gvanrossum",
    ]
 
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_json(session, url) for url in urls]
        )
        for result in results:
            print(f"{result['login']}: {result['public_repos']} repos")
 
asyncio.run(main())

Building a Robust Async HTTP Client

import aiohttp
import asyncio
import logging
 
logger = logging.getLogger(__name__)
 
class AsyncHttpClient:
    def __init__(self, base_url: str, max_connections: int = 100, timeout: int = 30):
        self.base_url = base_url
        self.max_connections = max_connections
        self._session = None
        self._timeout = aiohttp.ClientTimeout(total=timeout)
        self._connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=20,
            ttl_dns_cache=300,
        )
 
    async def __aenter__(self):
        self._session = aiohttp.ClientSession(
            connector=self._connector,
            timeout=self._timeout,
        )
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()
 
    async def get(self, path: str, params: dict = None) -> dict:
        url = f"{self.base_url}{path}"
        for attempt in range(3):
            try:
                async with self._session.get(url, params=params) as response:
                    response.raise_for_status()
                    return await response.json()
            except aiohttp.ClientError as e:
                if attempt == 2:
                    raise
                await asyncio.sleep(2 ** attempt)
                logger.warning(f"Retry {attempt + 1}: {e}")
 
    async def post(self, path: str, data: dict = None) -> dict:
        url = f"{self.base_url}{path}"
        async with self._session.post(url, json=data) as response:
            response.raise_for_status()
            return await response.json()
 
# Usage
async def main():
    async with AsyncHttpClient("https://api.example.com") as client:
        users = await client.get("/users")
        results = await asyncio.gather(
            client.get("/users/1"),
            client.get("/users/2"),
            client.get("/users/3"),
        )

Web Scraping with Rate Limiting

import aiohttp
from bs4 import BeautifulSoup
 
async def scrape_page(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        html = await response.text()
        soup = BeautifulSoup(html, 'html.parser')
        return {
            "url": url,
            "title": soup.title.string if soup.title else None,
            "links": [a['href'] for a in soup.find_all('a', href=True)],
        }
 
async def scrape_all(urls: list[str], concurrency: int = 5):
    semaphore = asyncio.Semaphore(concurrency)
    connector = aiohttp.TCPConnector(limit=concurrency, limit_per_host=2)
 
    async with aiohttp.ClientSession(connector=connector) as session:
        async def limited_scrape(url):
            async with semaphore:
                try:
                    return await scrape_page(session, url)
                except Exception as e:
                    return {"url": url, "error": str(e)}
 
        return await asyncio.gather(*[limited_scrape(url) for url in urls])

Async Database Access with asyncpg

import asyncpg
 
async def get_users(pool: asyncpg.Pool, limit: int = 100) -> list[dict]:
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            'SELECT id, name, email FROM users WHERE active = $1 ORDER BY created_at DESC LIMIT $2',
            True, limit
        )
        return [dict(row) for row in rows]
 
async def get_user_with_posts(pool: asyncpg.Pool, user_id: int):
    async with pool.acquire() as conn:
        # Concurrent queries on the same connection
        user, posts = await asyncio.gather(
            conn.fetchrow('SELECT * FROM users WHERE id = $1', user_id),
            conn.fetch('SELECT * FROM posts WHERE user_id = $1 ORDER BY created_at DESC', user_id),
        )
        return {"user": dict(user) if user else None, "posts": [dict(p) for p in posts]}
 
async def main():
    pool = await asyncpg.create_pool(
        'postgresql://user:pass@localhost/mydb',
        min_size=5,
        max_size=20,
    )
    try:
        users = await get_users(pool)
        print(f"Found {len(users)} users")
    finally:
        await pool.close()

Async Context Managers and Cleanup

class AsyncDatabase:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None
 
    async def __aenter__(self):
        self.pool = await asyncpg.create_pool(self.dsn)
        return self
 
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.pool:
            await self.pool.close()
        return False
 
    async def query(self, sql: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(sql, *args)
 
# Usage
async def main():
    async with AsyncDatabase('postgresql://localhost/mydb') as db:
        users = await db.query('SELECT * FROM users')

Python concurrency patterns

Real-World Use Cases

Use Case 1: API Gateway Aggregating Microservices

An API gateway receives a request for a user dashboard and needs to fetch data from 5 different microservices. Instead of calling them sequentially (5 Ă— 200ms = 1000ms), it calls them all concurrently with asyncio.gather() and returns the combined result in ~200ms.

Use Case 2: Bulk Data Import Pipeline

A data import service processes CSV files with millions of rows. It uses an async producer-consumer pipeline: one coroutine reads rows and puts them into a queue, multiple worker coroutines consume rows and batch-insert them into the database using asyncpg's copy_records_to_table().

Use Case 3: Real-Time WebSocket Server

A WebSocket server handling thousands of concurrent connections uses aiohttp's WebSocket support to manage connections asynchronously. Each connection is a lightweight coroutine that reads messages, processes them, and sends responses without blocking other connections.

Use Case 4: Scheduled Task Runner

A background task runner uses asyncio.create_task() to schedule recurring tasks like cache warming, health checks, and log rotation. Tasks run concurrently with the main application without needing separate processes or threads.

Best Practices for Production

  1. Never use blocking calls in async code: requests.get(), time.sleep(), and synchronous file I/O block the entire event loop. Use aiohttp, asyncio.sleep(), and aiofiles instead.
  2. Use async with for connection management: Always use async context managers for HTTP sessions, database connections, and other resources to ensure proper cleanup.
  3. Limit concurrency with semaphores: Unbounded concurrent requests can overwhelm downstream services. Use asyncio.Semaphore() to cap concurrent operations.
  4. Handle cancellation gracefully: Async tasks can be cancelled at any await point. Use try/except asyncio.CancelledError to clean up resources when tasks are cancelled.
  5. Use asyncio.gather(*tasks, return_exceptions=True) for fault-tolerant batch operations: This prevents one failed task from cancelling all others.
  6. Create a single aiohttp.ClientSession per application: Don't create a new session for each request—sessions manage connection pools internally and should be reused.
  7. Use TaskGroup for structured concurrency (Python 3.11+): TaskGroups ensure all tasks complete (or are cancelled) before the group exits, preventing leaked tasks.
  8. Profile with asyncio debug mode: Set PYTHONASYNCIODEBUG=1 or asyncio.run(main(), debug=True) to detect blocking calls and other async anti-patterns.

Common Pitfalls and Solutions

PitfallImpactSolution
Using requests in async codeBlocks entire event loopUse aiohttp for HTTP requests
Forgetting to await a coroutineCoroutine never executesAlways await coroutines or pass to asyncio.create_task()
Creating too many concurrent tasksMemory exhaustion, downstream overloadUse semaphores or TaskGroup to limit concurrency
Not closing resources (sessions, pools)Connection leaksUse async context managers (async with)
Using time.sleep() instead of asyncio.sleep()Blocks event loopAlways use asyncio.sleep() in async code
Synchronous database driversBlocks event loopUse async drivers (asyncpg, aiomysql, motor)
Ignoring cancellationResource leaks, zombie tasksHandle CancelledError and clean up resources
Creating a session per requestConnection pool exhaustionReuse a single session across all requests

Performance Optimization

# Connection pooling with aiohttp for maximum throughput
async def optimized_client():
    connector = aiohttp.TCPConnector(
        limit=100,              # Total connection pool size
        limit_per_host=30,      # Max connections per host
        ttl_dns_cache=300,      # DNS cache TTL in seconds
        enable_cleanup_closed=True,
    )
    timeout = aiohttp.ClientTimeout(
        total=30,       # Total request timeout
        connect=5,      # Connection establishment timeout
        sock_read=10,   # Socket read timeout
    )
    return aiohttp.ClientSession(connector=connector, timeout=timeout)
 
# Batch processing with controlled concurrency
async def process_batch(items: list, batch_size: int = 50):
    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[process_item(item) for item in batch]
        )
        results.extend(batch_results)
    return results

Comparison with Alternatives

Featureasyncio + aiohttpThreading + requestsgeventTwisted
Concurrency modelCooperative (coroutines)Preemptive (OS threads)Green threadsEvent-driven callbacks
Memory per connection~4KB (coroutine)~8MB (thread stack)~4KB (greenlet)~2KB (callback)
Max concurrent connections100K+~1K (thread limit)100K+100K+
DebuggingStandard (improving)StandardModerateDifficult
Learning curveModerate (async/await)EasyEasy (monkey-patching)High
CPU-bound supportNo (use multiprocessing)No (GIL)No (GIL)No (GIL)
Ecosystem maturityExcellentExcellentModerateMature

Advanced Patterns

Async Generator for Streaming Data

async def stream_paginated_api(session: aiohttp.ClientSession, base_url: str):
    page = 1
    while True:
        async with session.get(f"{base_url}?page={page}") as response:
            data = await response.json()
            if not data["results"]:
                break
            for item in data["results"]:
                yield item
            page += 1
 
async def process_stream():
    async with aiohttp.ClientSession() as session:
        async for item in stream_paginated_api(session, "https://api.example.com/items"):
            await process_item(item)

Combining Async with CPU-Bound Work

from concurrent.futures import ProcessPoolExecutor
import asyncio
 
async def mixed_workload():
    loop = asyncio.get_event_loop()
    cpu_result = await loop.run_in_executor(
        ProcessPoolExecutor(), cpu_heavy_function, data
    )
    io_result = await fetch_data("https://api.example.com/data")
    return cpu_result, io_result

Testing Strategies

import pytest
import asyncio
 
@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data("https://api.example.com/test")
    assert result["url"] == "https://api.example.com/test"
 
@pytest.mark.asyncio
async def test_concurrent_fetch():
    urls = [f"https://api.example.com/{i}" for i in range(10)]
    results = await asyncio.gather(*[fetch_data(url) for url in urls])
    assert len(results) == 10
 
@pytest.mark.asyncio
async def test_semaphore_limits_concurrency():
    semaphore = asyncio.Semaphore(2)
    running = 0
    max_running = 0
 
    async def tracked_fetch(url):
        nonlocal running, max_running
        async with semaphore:
            running += 1
            max_running = max(max_running, running)
            await asyncio.sleep(0.1)
            running -= 1
 
    await asyncio.gather(*[tracked_fetch(f"url{i}") for i in range(10)])
    assert max_running <= 2
 
@pytest.mark.asyncio
async def test_cancellation_cleanup():
    cleanup_ran = False
 
    async def long_task():
        nonlocal cleanup_ran
        try:
            await asyncio.sleep(100)
        except asyncio.CancelledError:
            cleanup_ran = True
            raise
 
    task = asyncio.create_task(long_task())
    await asyncio.sleep(0.01)
    task.cancel()
    with pytest.raises(asyncio.CancelledError):
        await task
    assert cleanup_ran

Future Outlook

Python's async ecosystem continues to mature. Python 3.12 improved task group error handling and added asyncio.TaskGroup enhancements. The trend is toward structured concurrency (inspired by Swift and Kotlin), where all concurrent operations are scoped within a block and automatically cleaned up. Libraries like anyio provide a compatibility layer between asyncio and trio, allowing code to run on either backend. The long-term goal is async-first Python, where async is the default and synchronous wrappers are the exception.

Conclusion

Python's async programming model with asyncio and aiohttp enables building high-performance, I/O-bound applications that handle thousands of concurrent operations on a single thread. The key concepts are: (1) coroutines are functions that can be suspended and resumed at await points; (2) the event loop schedules coroutines and runs them cooperatively; (3) asyncio.gather() and TaskGroup enable concurrent execution of multiple coroutines; (4) aiohttp provides async HTTP client and server with connection pooling; and (5) semaphores and connection pools control resource usage.

The critical rules for production async code are: never block the event loop with synchronous calls, always use async context managers for resource cleanup, limit concurrency to protect downstream services, and handle cancellation gracefully. With these practices, async Python can handle workloads that would require hundreds of threads in synchronous code, using a fraction of the memory and with cleaner, more maintainable code.