Testing
Testing is a critical, non-negotiable component of any production-grade application. While unit tests are excellent for verifying isolated business logic, they are often insufficient when building APIs that interact heavily with a relational database. Mocking database calls can only take you so far; eventually, you need robust integration tests that spin up the full application context, connect to a real database schema, execute migrations, and verify that your routes, models, and serialisation all function harmoniously under real-world conditions.
Flama provides powerful, built-in tools designed specifically for this level of rigorous testing, including the asynchronous Client and native SQLAlchemyModule transaction management. When combined with modern testing ecosystems, you can construct a testing suite that is not only highly reliable but also parallelisable and lightning-fast.
Testing philosophy
A common, yet highly destructive, anti-pattern in database testing is executing tests against a local, persistent development database. Over time, this inevitably leads to "state leakage", polluted data left behind by previous test runs. This results in unpredictable test states, flaky assertions, and severe race conditions when tests attempt to read or mutate the same rows concurrently.
To solve this and guarantee complete determinism, our testing strategy in Flama is built upon three core technical pillars:
Ephemeral databases
Tests should never trust the state of a pre-existing database. Instead, we programmatically create a fresh, entirely empty PostgreSQL database at the exact moment the test session begins, and comprehensively destroy it (drop it) the moment the tests finish. This guarantees that every single test run starts from an identical, pristine environment, completely eliminating the "it worked on the last run" syndrome.
Parallel execution
Integration tests are notoriously I/O bound (waiting on network or database responses). Running them sequentially on a single thread wastes massive amounts of time. By utilising pytest-xdist, we can distribute test execution concurrently across multiple CPU cores.
To prevent these concurrent workers from causing race conditions, we dynamically provision isolated ephemeral databases (e.g., test-gw0, test-gw1, test-gw2) for each parallel worker. This ensures that a test running on Worker A will never step on the toes of a test running on Worker B, allowing massive test suites to finish in seconds rather than minutes.
Transactional rollbacks
While ephemeral databases provide a clean slate, re-creating tables and running Alembic migrations for every single test is incredibly slow and computationally expensive.
Instead, we optimise the process:
- We run the full suite of database migrations exactly once per worker database at the start of the session.
- We then wrap every individual test function inside an active, isolated database transaction.
- When the test finishes its execution, regardless of whether it passed or failed, we simply roll back the transaction.
Because of how PostgreSQL handles transactions, all INSERT, UPDATE, and DELETE operations performed during the test are instantly discarded. This leaves the database perfectly pristine for the next test in mere milliseconds, without incurring the massive overhead of dropping and re-creating tables.
Set up
Before we write a single line of test code, we must configure pytest to natively understand and manage our asynchronous <FlamaName /> application. Out of the box, pytest is designed for synchronous code. To bridge this gap, we rely on the pytest-asyncio plugin, but it requires strict configuration to behave predictably in a large integration suite.
Configuration
The pytest.ini file acts as the master control switch for your test suite. By explicitly defining our async rules here, we remove boilerplate from our actual test files and enforce a unified standard across the entire engineering team.
Let's break down the necessary configuration piece by piece:
[pytest]minversion = 3norecursedirs = *settings* *urls* *docs*asyncio_mode = autoasyncio_default_fixture_loop_scope = functionmarkers = type_integration: marks tests as integration type_unit: marks tests as unitDirectory traversal and versioning
minversion = 3: This ensures that anyone running this test suite, whether it's a new developer on their local machine or the CI/CD pipeline, is using a compatible version ofpytest. It prevents bizarre errors caused by legacy test runners.norecursedirs: By default,pytestwill aggressively search every folder in your project for tests. We explicitly tell it to ignore directories likesettings,urls, anddocs. This significantly speeds up the "test discovery" phase, especially as your project grows.
Async context
This is where the heavy lifting for Flama happens.
asyncio_mode = auto: Historically, to test an async function, you had to manually add the@pytest.mark.asynciodecorator above every single test. This was tedious and highly prone to human error (forgetting the decorator often resulted in the test being skipped or passing silently without executing the coroutine). By setting this toauto,pytestautomatically detects anyasync def test_...function and wraps it in an event loop for you. It drastically reduces boilerplate.asyncio_default_fixture_loop_scope = function: This is arguably the most critical setting for database isolation. The "loop scope" defines how long an async event loop stays alive. If you share an event loop across multiple tests (e.g., setting it tosession), a lingering background task or unclosed database connection from Test A can crash Test B. By restricting the scope tofunction, we guarantee that every single test gets its own fresh, isolated event loop that is destroyed immediately after the test concludes.
Categorisation with markers
As your application scales, running the entire test suite on every minor code change becomes impractically slow.
markers: We explicitly registertype_integrationandtype_unit. This allows developers to selectively run subsets of tests from the command line (e.g.,pytest -m type_unit). By registering them here, we also prevent typos, if a developer accidentally types@pytest.mark.type_integraton, pytest will catch the error immediately rather than silently skipping the test.
This is where the magic truly happens. Moving from the configuration to conftest.py is like moving from the blueprints to laying the actual concrete foundation.
In pytest, conftest.py is not just a regular Python module; it acts as a local plugin directory. Any fixtures defined here are automatically discovered and made globally available to all your test files without needing a single import statement. It is the central orchestrator of your test suite’s state.
Architecting the tests
The heart of our testing strategy lives in tests/conftest.py. This file is responsible for the heavy lifting of our environment setup. It orchestrates the lifecycle of the ephemeral databases, safely executes database migrations, dynamically patches the Flama application configuration, and manages the transactional rollbacks that keep our suite fast.
By isolating this infrastructure logic in conftest.py, our actual test files remain completely clean, focused exclusively on business logic and assertions.
Ephemeral database
The first step in our pipeline is to provision a physical database instance for the tests to use. To achieve this, we create a fixture that connects to your PostgreSQL server, executes a CREATE DATABASE command before any tests run, and strictly executes a DROP DATABASE command after all tests have finished.
Session scope
Notice that the fixture is decorated with @pytest.fixture(scope="session"). This is vital for performance. It tells pytest to execute this setup code exactly once per test session, rather than repeating it for every single test function.
Furthermore, we inject a built-in fixture called worker_id. When running tests in parallel using the pytest-xdist plugin, pytest spawns multiple independent processes (workers).
- If you run tests sequentially,
worker_idevaluates to"master". - If you run tests in parallel (e.g., across 4 CPU cores),
worker_idevaluates to"gw0","gw1","gw2", and"gw3".
If all four workers tried to write to a single test database simultaneously, they would instantly trigger race conditions and table deadlocks. By parsing the worker_id, we dynamically generate unique database names (test-gw0, test-gw1, etc.), ensuring absolute isolation for every parallel process.
The AUTOCOMMIT Imperative
By default, SQLAlchemy wraps all database operations in a transaction (a BEGIN ... COMMIT block) to ensure data integrity. However, PostgreSQL strictly forbids executing structural commands like CREATE DATABASE or DROP DATABASE inside a transaction block. If you try, Postgres will throw a fatal error.
To bypass this, we must explicitly override SQLAlchemy's default behavior by passing isolation_level="AUTOCOMMIT" into our engine. This tells SQLAlchemy to fire the raw SQL commands directly at the server without wrapping them in a transaction, satisfying Postgres's requirements.
The lifecycle
In traditional Python, a return statement immediately ends a function. In pytest fixtures, we use yield instead. The yield statement effectively splits the fixture into two distinct phases:
- Setup (Before
yield): Drops any existing leftover database and creates a fresh one. - The Yield: Pauses the fixture, passes the
Databasenamedtuple to the test suite, and allows all the tests in the session to execute. - Teardown (After
yield): Once all tests finish (or even if they crash), the code resumes execution and guarantees the database is cleanly dropped, preventing orphaned databases from cluttering your local Postgres server.
import collectionsimport reimport pytestimport sqlalchemyfrom src import config
@pytest.fixture(scope="session")def sqlalchemy_database(worker_id): # Generates unique DB names per parallel worker (e.g., test-gw0, test-gw1) # ensuring parallel workers never share the same database state. database = ( "test" if worker_id == "master" else f"test-{int(re.match(r'gw(\d+)', worker_id).group(1))}" )
# We must use AUTOCOMMIT because Postgres rejects CREATE/DROP DATABASE # commands if they are wrapped inside standard SQL transactions. engine = sqlalchemy.create_engine( config.DATABASE.url( config.APP.database, config.APP.database_user, config.APP.database_password ), isolation_level="AUTOCOMMIT", )
# SETUP PHASE: Create the completely ephemeral database with engine.connect() as connection: connection.execute(sqlalchemy.text(f'DROP DATABASE IF EXISTS "{database}"')) connection.execute(sqlalchemy.text(f'CREATE DATABASE "{database}"'))
# YIELD PHASE: Pause here and let the test session run # We pass down both the test DB name and the original prod DB name # so we can dynamically patch configs later. yield collections.namedtuple("Database", ["test", "prod"])( test=database, prod=config.APP.database )
# TEARDOWN PHASE: Clean up the database after the test session concludes with engine.connect() as connection: connection.execute(sqlalchemy.text(f'DROP DATABASE "{database}"'))By leveraging this architecture, you guarantee that no matter how complex your test suite becomes, it will always execute against a flawlessly clean, isolated environment.
Applying migrations
Once our ephemeral databases (test-gw0, test-gw1, etc.) are created, they are completely empty. They have no tables, no indexes, and no schema. We must populate them using our Alembic migration scripts.
However, Alembic's programmatic API (command.upgrade) was primarily designed to be run synchronously from a CLI, not invoked simultaneously by four different Python processes sharing the same file system. To safely bridge this gap, we employ dynamic patching and Inter-Process Communication (IPC) via the filesystem using filelock.
Patching
By default, Alembic looks at your application's configuration to determine which database to connect to. If we don't intercept this, Alembic will happily connect to your default database (which might be your local development DB, or worse, production!) and run the migrations there.
To prevent this, we use unittest.mock.patch.object. We dynamically overwrite the database attribute inside our config.APP module in memory. We point it strictly to the sqlalchemy_database.test name we generated in the previous fixture. Because patch is used as a context manager (the with statement), this override is safely isolated to the scope of the setup phase.
Mutex (Mutual Exclusion) file lock
When running in parallel, we need a "traffic light" to ensure workers take turns reading the Alembic directory. We achieve this using filelock.FileLock.
- The Global Lock (
migration.lock): This is our primary mutex. When Worker 0 hits this line, it creates a physical file on your hard drive and claims ownership of it. If Worker 1 arrives a millisecond later, it sees the lock file is taken and pauses its execution. It simply waits in line. Once Worker 0 finishes runningcommand.upgrade("head")and releases the lock, Worker 1 acquires it and proceeds. This strictly serialises the Alembic execution, preventing I/O race conditions while reading the migration scripts. - The Worker Lock (
migration.lock.{worker_id}): This lock serves a completely different purpose: it acts as a tracker. By creating a lock file named after the specific worker (e.g.,migration.lock.gw0), we leave a footprint on the filesystem proving that this specific worker is currently alive and using the database.
The teardown
A robust testing suite doesn't just test if you can build the database; it tests if you can tear it down. Broken rollback scripts are a massive liability in production. We explicitly call command.downgrade(alembic_cfg, "base") to verify that every down_revision in your Alembic history is mathematically sound and syntactically correct.
The "Last Worker Standing" Logic: In a parallel environment, how do we know when the entire test suite is finished so we can run the downgrade? We can't just run it when Worker 0 finishes, because Worker 1 might still be executing tests.
We solve this using the worker tracking locks we created earlier. During the teardown phase (after the yield), each worker deletes its specific lock file. Then, it uses Path(".").glob("migration.lock.*") to scan the directory. If it finds any other worker lock files, it silently exits, meaning other workers are still running tests. Only the absolute last worker to finish will see an empty directory. That final worker then claims the global lock one last time and executes the downgrade.
Here is the annotated code implementing this architecture:
from alembic import commandfrom alembic.config import Config as AlembicConfigfrom filelock import FileLockfrom pathlib import Pathfrom unittest.mock import patch
# autouse=True ensures this runs automatically; no test needs to manually request it.@pytest.fixture(scope="session", autouse=True)def migrations(worker_id, sqlalchemy_database): alembic_cfg = AlembicConfig("alembic.ini")
# Dynamically point Alembic to the ephemeral database with patch.object(config.APP, "database", sqlalchemy_database.test): # If running sequentially (no pytest-xdist), just run the migrations. if worker_id == "master": command.upgrade(alembic_cfg, "head") yield # Let the test session execute command.downgrade(alembic_cfg, "base") # If running in parallel, orchestrate the file locks else: # Create a tracking lock to prove this worker is currently active with FileLock(f"migration.lock.{worker_id}"): # Wait in line for the global lock to prevent I/O collisions with FileLock("migration.lock"): command.upgrade(alembic_cfg, "head") # The yield statement sits OUTSIDE the global lock. # This allows all workers to run their tests concurrently now that # their individual databases are fully migrated. yield # Teardown: Scan the directory. If no other worker locks exist, # this is the final worker. It is safe to run the downgrade. if not any(Path(".").glob("migration.lock.*")): with FileLock("migration.lock"): command.downgrade(alembic_cfg, "base")By combining unittest.mock for dynamic routing and filelock for Inter-Process Communication, we have created a bulletproof, parallel-safe database initialisation pipeline.
Test Client
When writing integration tests, a common mistake is to spin up a live web server (e.g., using Uvicorn on localhost:8000) and make actual HTTP requests over the network using tools like requests or httpx. While this works, it introduces massive network latency, port binding conflicts during parallel execution, and unnecessary overhead.
Instead, we use flama.client.Client. This is an asynchronous client specifically engineered for ASGI (Asynchronous Server Gateway Interface) applications.
Bypassing the socket
When you make a request using flama.client.Client, it does not open a network socket. Instead, it translates your HTTP request (e.g., client.get("/users")) directly into an ASGI dictionary (the standard format that ASGI apps consume) and invokes your application's root callable directly in Python's memory. This entirely bypasses the TCP/IP stack, making requests orders of magnitude faster.
Dynamic dependency injection
Before we can use the client, we must ensure the application it wraps is actually pointing to our sqlalchemy_database.test created in the previous steps.
We do this by intercepting the initialised Flama app and mutating its internal SQLAlchemyModule state. Notice that the scope of these fixtures is scope="function". This guarantees that every single test function gets a fresh instance of the client, preventing internal client state (like cookies or session headers) from leaking between tests.
import src.appfrom flama.client import Clientimport pytest
@pytest.fixture(scope="function")async def app(sqlalchemy_database): # Import the actual, production-ready ASGI application instance _app = src.app.app
# Overwrite the SQLAlchemy database connection string in memory. # This guarantees the app connects to 'test-gw0' instead of 'my_prod_db'. _app.sqlalchemy.database = config.DATABASE.url( sqlalchemy_database.test, config.APP.database_user, config.APP.database_password ) return _app
@pytest.fixture(scope="function")async def client(app): # Use an async context manager to ensure the ASGI app lifecycle # (startup and shutdown events) is properly triggered and cleanly closed. async with Client(app=app) as _client: yield _clientTransactional rollbacks
This section is the absolute crown jewel of this testing architecture.
Even with an ephemeral database, executing TRUNCATE TABLE or DROP TABLE between hundreds of tests would bring your test suite to a crawl. Relational databases are optimised for data integrity, not constant schema destruction.
To achieve millisecond-level resets, we exploit how PostgreSQL handles database transactions.
The "autouse" safety net
We define this fixture with @pytest.fixture(scope="function", autouse=True). The autouse=True flag is crucial: it means pytest will invisibly apply this fixture to every single test function in your suite, even if the developer forgets to include connection in the test's arguments. This eliminates human error. You can never accidentally commit test data and pollute the database.
BEGIN ... ROLLBACK Lifecycle
- The Setup (
begin_transaction): Before the test executes, we ask Flama's nativeSQLAlchemyModulefor a raw database connection. We then explicitly issue aBEGINstatement to start a new transaction block. - The Execution (
yield): We yield the active connection to the test. The test runs, hits the API endpoints, and inserts/updates/deletes data. To the application, these changes appear completely real and permanent. It can query the data back immediately. - The Erasure (
rollback=True): The moment the test finishes, the fixture resumes execution. We callend_transaction(transaction, rollback=True). Instead of committing the changes to the disk, PostgreSQL simply discards the transaction log in memory. Every single mutation the test performed vanishes instantly.
The database is restored to the exact pristine state it was in after the Alembic migrations ran, taking effectively zero I/O time.
@pytest.fixture(scope="function", autouse=True)async def connection(client): # Acquire a dedicated connection from the application's connection pool connection = await client.app.sqlalchemy.open_connection() # Start an active transaction block (BEGIN) transaction = await client.app.sqlalchemy.begin_transaction(connection)
# Yield the connection so the test can use it (and the app can use it) yield connection
# The test has finished. Force a ROLLBACK. # Postgres discards all changes. The database is clean instantly. await client.app.sqlalchemy.end_transaction(transaction, rollback=True) # Return the connection to the pool await client.app.sqlalchemy.close_connection(connection)By successfully setting up the ephemeral database, handling the parallel migrations with filelock, and implementing this transactional rollback fixture, you have constructed an enterprise-grade testing environment.
Writing integration tests
With the infrastructure abstracting away the database lifecycle, our test files should be clean, highly readable, and strictly organised. We follow the industry-standard Arrange-Act-Assert (AAA) pattern for every single test.
Furthermore, we group related tests into logical class structures. This allows us to share specific setup logic (like seeding data) across multiple related endpoints without polluting the global namespace.
Seeding data
Before we can test if an API endpoint retrieves data correctly, we must put data into the database. This is the "Arrange" phase.
Because our global connection fixture already wrapped the current test in an active database transaction, we do not need to worry about cleaning up our seeded data. We can safely inject rows directly into the tables.
- Raw SQLAlchemy Core: We recommend using raw SQLAlchemy Core (
sqlalchemy.insert()) rather than a full ORM session for seeding. It is faster, more explicit, and avoids triggering unintended ORM lifecycle events during the setup phase. - No Commits Required: You do not need to call
connection.commit(). Because the application shares the same active transaction block as the test client, the injected data is immediately visible to Flama the moment theinsertexecutes.
Executing API calls with dynamic routing
When we are ready to hit the API, we use our asynchronous client.
One of the most powerful features of Flama is its routing engine. Instead of hardcoding fragile URL strings like client.get("/api/v1/animals/"), which will break instantly if you change your API prefix, we use client.app.resolve_url("animal:list"). This dynamically resolves the correct URL path based on your route names, making your test suite highly resilient to architectural changes.
The Pagination Gotcha: If you are utilising Flama's built-in CRUDResource, remember that list endpoints are paginated by default. If your test expects to see all seeded data in a single response, you must explicitly pass a page_size parameter in your request that exceeds your seeded row count, or the assertions will fail on missing data.
Parametrisation
The assertion phase is where we verify the application's behavior. To avoid writing ten different test functions for ten different input scenarios, we heavily leverage @pytest.mark.parametrize.
Parametrisation allows us to separate our test data from our test logic. We write the execution code once, and pytest loops through our array of inputs, running a distinct, isolated test for each set of parameters.
The Secondary Database Assertion:
For read-only operations (like GET), checking the HTTP response is usually sufficient. However, for write operations (like POST, PUT, PATCH), relying solely on the HTTP 201 response is dangerous. A badly written endpoint might return a success code without actually saving the data! Therefore, mutation tests should always include a secondary raw DB assertion to verify that the API actually mutated the database state correctly.
Complete implementation
Here is how all these concepts come together in a production-ready test class:
import httpimport pytestimport sqlalchemyfrom src import models
class TestCaseIntegration: # This fixture only runs for tests inside this specific class. # It seeds the database with baseline data we expect to be there. @pytest.fixture(scope="function", autouse=True) async def animals(self, connection): """Seed the database with default testing data.""" data = [ {"name": "test.animal.first"}, {"name": "test.animal.second"}, ] # Insert data using the transaction-wrapped connection. No commit needed! await connection.execute(sqlalchemy.insert(models.table).values(data)) return data
# Parametrisation extracts the varying data from the test logic. @pytest.mark.parametrize( ["expected_status", "expected_names"], [ pytest.param( http.HTTPStatus.OK, ["test.animal.first", "test.animal.second"], id="ok_list_animals", # The 'id' makes the terminal output highly readable ), ], ) async def test_list(self, client, expected_status, expected_names): """Test retrieving a list of resources.""" # Resolve the URL dynamically and pass the pagination override url = str(client.app.resolve_url("animal:list").path) r = await client.get(url, params={"page_size": 100})
# Always print the JSON body on failure to help debugging assert r.status_code == expected_status, r.json()
if r.status_code == http.HTTPStatus.OK: # Extract names from the paginated "data" key and compare as sets # to avoid strict ordering issues assert set([x["name"] for x in r.json()["data"]]) == set(expected_names)
@pytest.mark.parametrize( ["payload", "expected_status"], [ pytest.param( {"name": "new.animal.third"}, http.HTTPStatus.CREATED, id="ok_create_valid_payload", ), pytest.param( {}, # Missing the required 'name' field http.HTTPStatus.BAD_REQUEST, id="fails_missing_required_name", ), ], ) async def test_create(self, connection, client, payload, expected_status): """Test creating a resource and verify database state.""" url = str(client.app.resolve_url("animal:create").path) r = await client.post(url, json=payload)
assert r.status_code == expected_status, r.json()
# Secondary Assertion: Prove the database actually changed if r.status_code == http.HTTPStatus.CREATED: response_data = r.json()
# Query the database directly for the newly created ID db_animal = ( ( await connection.execute( sqlalchemy.select(models.table).where( models.table.c["id"] == response_data["id"] ) ) ) .fetchone() ._asdict() ) # Verify the row exists and the data matches our payload assert db_animal["name"] == payload["name"]Conclusion
Adhering to this architecture, we fundamentally transform how an engineering team interacts with the codebase. Testing database-heavy applications is traditionally a slow, brittle chore that developers dread. With this setup, it becomes a seamless, invisible safety net that accelerates development rather than hindering it.
Let's recap the engineering milestones we have achieved with this infrastructure:
- Absolute Determinism: By leveraging ephemeral databases and strictly isolating state via the
worker_idandfilelock, we completely eliminate the "it worked on the last run" and "flaky test" anti-patterns. If a test fails, it is because the code is broken, not because the environment is polluted. - Pristine Local Environments: Your local development PostgreSQL instance remains completely untouched. No more writing manual cleanup scripts or chasing down ghost data left behind by a failed test run last Tuesday.
- Blazing Execution Speed: Through the combination of
pytest-xdistfor CPU parallelisation, Flama's ASGIClientbypassing the network layer, and our magicautousetransactional rollback fixture, integration tests that typically take minutes to run can now execute in seconds. - Painless Developer Experience (DX): All the complex, async infrastructure orchestration is hidden away in
conftest.py. When a developer sits down to write a new feature, they can focus entirely on the Arrange-Act-Assert flow, writing clean, business-focused test cases without worrying about database state management.
With Flama's native SQLAlchemy integration and its asynchronous testing tools, you are equipped to build enterprise-grade, highly reliable APIs. You can merge pull requests and deploy to production with absolute confidence, knowing your integration suite has rigorously and rapidly verified every single route, model, and database interaction.