Lifespan
Every application has a lifecycle: it starts up, it runs (serving requests), and eventually, it shuts down. Managing this lifecycle is crucial for robust applications.
If your API needs to load a heavy machine learning model, or establish a connection to a message broker (like RabbitMQ or Kafka), you cannot do this inside every single request handler, that would be incredibly slow. Instead, you need to perform these actions once when the application starts, and clean them up once when the application stops.
Flama provides a Lifespan system to handle these scenarios effectively.
You should use lifespan logic whenever you have resources that need to live as long as the application itself.
- Resource Management: Establishing connection pools to databases or caches (Redis/Memcached).
- Heavy Initialisation: Loading configuration files or ML models into memory so they are ready for inference.
- Clean Up: Gracefully closing connections, flushing logs, or saying "goodbye" to external services to prevent resource leaks.
Event handlers
The simplest way to hook into the lifecycle is using the @app.on_event decorator.
This is ideal for straightforward scripts where you might rely on global variables or external singletons.
Flama exposes two specific events: startup and shutdown.
app = Flama()
@app.on_event("startup")async def start_service(): print("Connecting to service...") await service.connect()
@app.on_event("shutdown")async def stop_service(): print("Disconnecting from service...") await service.disconnect()Custom Lifespans
For more structured applications, you can define a custom lifespan handler when initialising Flama.
Because Flama instantiates the lifespan handler separately during the startup and shutdown phases, the most robust way to implement a custom lifespan is using a Class-based Async Context Manager.
For this, you must store your resources on the app instance itself, not on the lifespan instance,
to ensure they persist between the startup and shutdown calls.
Class-based pattern
This pattern encapsulates your resource logic in a single class with __aenter__ (startup) and __aexit__ (shutdown) methods.
class ServiceLifespan: def __init__(self, app): self.app = app
async def __aenter__(self): # 1. Startup: Connect and attach to app if self.app: service = await service() setattr(self.app, "service", service) return self
async def __aexit__(self, exc_type, exc_value, traceback): # 2. Shutdown: Retrieve from app and close if self.app and hasattr(self.app, "service"): await self.app.service.disconnect()
app = Flama(lifespan=ServiceLifespan)Example
The following example compares both approaches using a simulated Notification Service.
import asyncioimport typing as tfrom flama import Flamafrom flama.client import Client
class NotificationService: def __init__(self, name: str): self.name = name self.is_online = False
async def connect(self): print(f"[{self.name}] 📡 Connecting to gateway...") await asyncio.sleep(1) self.is_online = True print(f"[{self.name}] ✅ Connected.")
async def disconnect(self): print(f"[{self.name}] 🔌 Closing connection...") await asyncio.sleep(1) self.is_online = False print(f"[{self.name}] 💤 Disconnected.")
# Approach: Event Handlersapp_events = Flama()service_events = NotificationService("Events App")
@app_events.on_event("startup")async def startup_service(): await service_events.connect()
@app_events.on_event("shutdown")async def shutdown_service(): await service_events.disconnect()
@app_events.route("/events", methods=["GET"])async def status_events(): return {"status": "online", "service_ready": service_events.is_online}
# Approach: Custom Lifespan (Class-Based)# We use a class to persist logic, but we MUST store the state on 'app'# because 'ServiceLifespan' is re-instantiated for startup and shutdown.class ServiceLifespan: def __init__(self, app: Flama | None): self.app = app
async def __aenter__(self): if self.app: service = NotificationService("Context App") await service.connect() setattr(self.app, "service", service) return self
async def __aexit__(self, exc_type, exc_value, traceback): if self.app and hasattr(self.app, "service"): await self.app.service.disconnect()
app_context = Flama(lifespan=ServiceLifespan)
@app_context.route("/context", methods=["GET"])async def status_context(): service = getattr(app_context, "service", None) is_ready = service.is_online if service else False return {"status": "online", "service_ready": is_ready}
async def main(): print("Testing Event Handlers...") async with Client(app=app_events) as client: response = await client.get("/events") print(f"[Client] Response: {response.json()}")
print("\nTesting Custom Lifespan...") async with Client(app=app_context) as client: response = await client.get("/context") print(f"[Client] Response: {response.json()}")
if __name__ == "__main__": asyncio.run(main())Testing Event Handlers...[Events App] 📡 Connecting to gateway...[Events App] ✅ Connected.[Client] Response: {'status': 'online', 'service_ready': True}[Events App] 🔌 Closing connection...[Events App] 💤 Disconnected.
Testing Custom Lifespan...[Context App] 📡 Connecting to gateway...[Context App] ✅ Connected.[Client] Response: {'status': 'online', 'service_ready': True}[Context App] 🔌 Closing connection...[Context App] 💤 Disconnected.Conclusion
Conclusion Managing the lifespan of your application is essential for performance and reliability. By initialising heavy resources like service connections or ML models during startup and cleaning them up during shutdown, you ensure your API remains efficient and leak-free.
While Event Handlers offer a quick and easy way to add startup logic for simple scripts, custom Lifespans (using class-based context managers) provide the robust, encapsulated structure required for production-grade applications where state persistence and clean architecture are paramount.