Back to Notes

Python asyncio

Python asyncio

Single-threaded concurrency via cooperative multitasking. One thread, one event loop, many coroutines. Best for I/O-bound tasks: HTTP, DB, file reads.


Core Concepts

import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(1)   # simulate I/O, yields control to event loop
    return f"data from {url}"

async def main():
    result = await fetch("https://api.example.com")
    print(result)

asyncio.run(main())   # entry point — creates event loop, runs until done
  • async def → coroutine function (returns coroutine object when called)
  • await → yield control until I/O completes; only inside async def
  • asyncio.run() → starts event loop (use at top level, not inside async code)

Concurrent I/O with gather

await on one thing = sequential. gather = concurrent.

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    urls = ["https://api.example.com/users/1",
            "https://api.example.com/users/2",
            "https://api.example.com/users/3"]

    async with aiohttp.ClientSession() as session:
        # Sequential: 3 × 200ms = 600ms
        # results = [await fetch(session, url) for url in urls]

        # Concurrent: max(200ms each) ≈ 200ms total
        results = await asyncio.gather(*[fetch(session, url) for url in urls])

    return results

gather runs all coroutines concurrently, returns results in input order.


Tasks — Fire and Forget

async def main():
    # create_task schedules coroutine to run "now" in background
    task1 = asyncio.create_task(fetch("url1"))
    task2 = asyncio.create_task(fetch("url2"))

    # do other work here while tasks run...
    result1 = await task1
    result2 = await task2

# gather is cleaner for this pattern, but create_task allows more control

gather vs wait vs TaskGroup

# gather — all results or first exception cancels all
results = await asyncio.gather(coro1(), coro2(), return_exceptions=True)

# wait — fine-grained control (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED)
done, pending = await asyncio.wait(
    {asyncio.create_task(c) for c in coros},
    timeout=5.0,
    return_when=asyncio.FIRST_COMPLETED
)

# TaskGroup (Python 3.11+) — structured concurrency, cancel on any failure
async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(coro1())
    task2 = tg.create_task(coro2())
# all tasks done when exiting 'async with'

Semaphore — Limit Concurrency

async def fetch_limited(session, url, sem):
    async with sem:   # only N concurrent at once
        return await fetch(session, url)

async def main():
    sem = asyncio.Semaphore(10)  # max 10 concurrent requests
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[
            fetch_limited(session, url, sem) for url in urls
        ])

Async Context Managers & Iterators

# async context manager
class AsyncDB:
    async def __aenter__(self):
        self.conn = await create_connection()
        return self.conn

    async def __aexit__(self, *args):
        await self.conn.close()

async with AsyncDB() as conn:
    await conn.execute("SELECT ...")

# async iterator
class AsyncStream:
    async def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.read_chunk()
        if not data:
            raise StopAsyncIteration
        return data

async for chunk in AsyncStream():
    process(chunk)

asyncio.Queue — Producer/Consumer

async def producer(queue: asyncio.Queue):
    for i in range(10):
        await queue.put(i)
        await asyncio.sleep(0.1)
    await queue.put(None)   # sentinel

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)  # backpressure: blocks producer when full
    await asyncio.gather(producer(queue), consumer(queue))

Event Loop — What's Happening

Event Loop
├── Task 1: fetch(url1) → awaits network → suspended
├── Task 2: fetch(url2) → awaits network → suspended
├── Task 3: fetch(url3) → awaits network → suspended
└── When I/O ready → resume task → execute until next await

Only one coroutine runs at a time. No true parallelism. CPU-bound code blocks the entire loop.

GIL + asyncio: GIL is irrelevant — it's single-threaded. asyncio solves I/O concurrency, not CPU parallelism.


Running Sync Code in Async Context

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Run blocking code in thread pool — doesn't block event loop
async def fetch_sync_data():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, blocking_db_call, arg1)
    return result

# Run CPU-bound in process pool
async def compute():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_fn, data)
    return result

Common Mistakes

MistakeFix
asyncio.run() inside async fnUse await instead
Calling sync blocking I/O in async fnUse run_in_executor
await without gather for concurrencygather or create_task
Forgetting async with for aiohttp sessionSession must be context manager
Too many concurrent tasksUse Semaphore to cap
Creating event loop manuallyUse asyncio.run()

asyncio in Production (FastAPI / LangChain)

# FastAPI route — async by default
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    user = await db.get(User, user_id)   # async SQLAlchemy
    return user

# LangChain async
result = await chain.ainvoke({"question": "..."})   # async invoke
results = await asyncio.gather(*[chain.ainvoke(q) for q in questions])

Interview Talking Points

  • "asyncio is cooperative concurrency — coroutines yield control at await points. It's not parallel, but it handles thousands of concurrent I/O operations efficiently on one thread."
  • "The GIL doesn't matter for asyncio because it's single-threaded. asyncio solves I/O concurrency; multiprocessing solves CPU parallelism."
  • "For large numbers of outbound calls, I use asyncio.gather with a Semaphore to cap concurrency — otherwise you can exhaust file descriptors or hit rate limits."

Related

  • [[Python/Language Core/Python Programming]] — core language
  • [[Python/Libraries/FastAPI]] — async HTTP framework
  • [[synthesis/Concurrency Deep Dive]] — asyncio vs Go goroutines vs threading
  • [[Go/Channels]] — Go equivalent: goroutines + channels