Python asyncio
Python asyncio
Single-threaded concurrency via cooperative multitasking. One thread, one event loop, many coroutines. Best for I/O-bound tasks: HTTP, DB, file reads.
Core Concepts
import asyncio
async def fetch(url: str) -> str:
await asyncio.sleep(1) # simulate I/O, yields control to event loop
return f"data from {url}"
async def main():
result = await fetch("https://api.example.com")
print(result)
asyncio.run(main()) # entry point — creates event loop, runs until done
async def→ coroutine function (returns coroutine object when called)await→ yield control until I/O completes; only insideasync defasyncio.run()→ starts event loop (use at top level, not inside async code)
Concurrent I/O with gather
await on one thing = sequential. gather = concurrent.
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def main():
urls = ["https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3"]
async with aiohttp.ClientSession() as session:
# Sequential: 3 × 200ms = 600ms
# results = [await fetch(session, url) for url in urls]
# Concurrent: max(200ms each) ≈ 200ms total
results = await asyncio.gather(*[fetch(session, url) for url in urls])
return results
gather runs all coroutines concurrently, returns results in input order.
Tasks — Fire and Forget
async def main():
# create_task schedules coroutine to run "now" in background
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))
# do other work here while tasks run...
result1 = await task1
result2 = await task2
# gather is cleaner for this pattern, but create_task allows more control
gather vs wait vs TaskGroup
# gather — all results or first exception cancels all
results = await asyncio.gather(coro1(), coro2(), return_exceptions=True)
# wait — fine-grained control (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED)
done, pending = await asyncio.wait(
{asyncio.create_task(c) for c in coros},
timeout=5.0,
return_when=asyncio.FIRST_COMPLETED
)
# TaskGroup (Python 3.11+) — structured concurrency, cancel on any failure
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coro1())
task2 = tg.create_task(coro2())
# all tasks done when exiting 'async with'
Semaphore — Limit Concurrency
async def fetch_limited(session, url, sem):
async with sem: # only N concurrent at once
return await fetch(session, url)
async def main():
sem = asyncio.Semaphore(10) # max 10 concurrent requests
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[
fetch_limited(session, url, sem) for url in urls
])
Async Context Managers & Iterators
# async context manager
class AsyncDB:
async def __aenter__(self):
self.conn = await create_connection()
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async with AsyncDB() as conn:
await conn.execute("SELECT ...")
# async iterator
class AsyncStream:
async def __aiter__(self):
return self
async def __anext__(self):
data = await self.read_chunk()
if not data:
raise StopAsyncIteration
return data
async for chunk in AsyncStream():
process(chunk)
asyncio.Queue — Producer/Consumer
async def producer(queue: asyncio.Queue):
for i in range(10):
await queue.put(i)
await asyncio.sleep(0.1)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5) # backpressure: blocks producer when full
await asyncio.gather(producer(queue), consumer(queue))
Event Loop — What's Happening
Event Loop
├── Task 1: fetch(url1) → awaits network → suspended
├── Task 2: fetch(url2) → awaits network → suspended
├── Task 3: fetch(url3) → awaits network → suspended
└── When I/O ready → resume task → execute until next await
Only one coroutine runs at a time. No true parallelism. CPU-bound code blocks the entire loop.
GIL + asyncio: GIL is irrelevant — it's single-threaded. asyncio solves I/O concurrency, not CPU parallelism.
Running Sync Code in Async Context
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Run blocking code in thread pool — doesn't block event loop
async def fetch_sync_data():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_db_call, arg1)
return result
# Run CPU-bound in process pool
async def compute():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_fn, data)
return result
Common Mistakes
| Mistake | Fix |
|---|---|
asyncio.run() inside async fn | Use await instead |
| Calling sync blocking I/O in async fn | Use run_in_executor |
await without gather for concurrency | gather or create_task |
Forgetting async with for aiohttp session | Session must be context manager |
| Too many concurrent tasks | Use Semaphore to cap |
| Creating event loop manually | Use asyncio.run() |
asyncio in Production (FastAPI / LangChain)
# FastAPI route — async by default
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
user = await db.get(User, user_id) # async SQLAlchemy
return user
# LangChain async
result = await chain.ainvoke({"question": "..."}) # async invoke
results = await asyncio.gather(*[chain.ainvoke(q) for q in questions])
Interview Talking Points
- "asyncio is cooperative concurrency — coroutines yield control at
awaitpoints. It's not parallel, but it handles thousands of concurrent I/O operations efficiently on one thread." - "The GIL doesn't matter for asyncio because it's single-threaded. asyncio solves I/O concurrency; multiprocessing solves CPU parallelism."
- "For large numbers of outbound calls, I use
asyncio.gatherwith aSemaphoreto cap concurrency — otherwise you can exhaust file descriptors or hit rate limits."
Related
- [[Python/Language Core/Python Programming]] — core language
- [[Python/Libraries/FastAPI]] — async HTTP framework
- [[synthesis/Concurrency Deep Dive]] — asyncio vs Go goroutines vs threading
- [[Go/Channels]] — Go equivalent: goroutines + channels