Repository
With the data model (i.e., the SQLAlchemy table and Pydantic schemas) in place, the next step is to build the abstraction layer that connects the business logic to the database. This is the role of the Repository pattern, one of the most important building blocks of Domain-Driven Design. The repository encapsulates all data access logic for a specific entity, providing a clean, well-defined interface that the rest of the application can use without knowing anything about the underlying persistence mechanism.
Flama offers complete, production-ready implementations of the repository pattern for both SQLAlchemy-backed and HTTP-backed applications, so you can focus on your domain logic rather than on writing boilerplate data access code.
What is a repository?
A Repository is an object that mediates between the domain model and the data source. Think of it as a specialised gatekeeper: it knows how to read entities from the database and write them back, but it exposes this capability through a high-level, domain-oriented interface rather than raw SQL queries or ORM calls.
Why is the repository pattern important?
- Decoupling: The business logic never interacts with the database directly. It communicates exclusively through the repository interface, making the two layers fully independent.
- Testability: Because the repository is an abstraction, you can replace the real implementation with a mock or an in-memory alternative during testing, without changing a single line of business logic.
- Centralised data access: All operations on a given entity live in one place. If you need to change how users are queried or stored, you change the repository, nothing else.
- Consistency: The repository enforces a uniform interface for data manipulation, preventing ad-hoc, scattered database calls across the codebase.
The main virtue repositories bring is isolating the data access logic so that it can be swapped, mocked, or evolved independently of the rest of the application.
Repository hierarchy
Flama provides a layered class hierarchy for repositories, each level adding more concrete functionality. There are two parallel branches: one for direct database access (SQLAlchemy) and one for remote access over HTTP.
SQL repositories
| Class | Purpose |
|---|---|
AbstractRepository | Abstract base. Defines the interface contract. |
BaseRepository | Concrete base with minimal shared logic. |
SQLAlchemyRepository | Adds an async SQLAlchemy connection. |
SQLAlchemyTableRepository | Full CRUD operations for a specific table. |
HTTP repositories
| Class | Purpose |
|---|---|
AbstractRepository | Abstract base. Defines the interface contract. |
BaseRepository | Concrete base with minimal shared logic. |
HTTPRepository | Adds an HTTP Client for remote API calls. |
HTTPResourceRepository | Full CRUD operations for a specific remote resource. |
For database-backed use cases, you will subclass SQLAlchemyTableRepository. For use cases that consume a remote Flama API over HTTP, you will subclass HTTPResourceRepository.
SQL repositories in depth
The remainder of this page uses the SQLAlchemy branch to build a concrete repository for the user entity from the Data Model page. The HTTP branch is covered in the HTTP repositories section further down.
Built-in CRUD methods
The SQLAlchemyTableRepository class provides the following methods, each of which operates on the table defined by the _table class attribute:
create(*data): Inserts one or more new rows into the table. Returns the created elements. RaisesIntegrityErrorif a constraint is violated (e.g., a duplicate email).retrieve(*clauses, **filters): Fetches a single row matching the given clauses or filters. RaisesNotFoundErrorif no row matches, orMultipleRecordsErrorif more than one row matches.update(data, *clauses, **filters): Updates rows matching the clauses or filters with the provided data. Returns the updated elements. RaisesIntegrityErrorif the update violates a constraint.delete(*clauses, **filters): Deletes a single row matching the clauses or filters. RaisesNotFoundErrorif the row does not exist.list(*clauses, order_by=None, order_direction="asc", **filters): Returns an async iterable of all rows matching the clauses or filters. If no clauses or filters are provided, it returns all rows in the table.drop(*clauses, **filters): Deletes all rows matching the clauses or filters. Returns the number of rows deleted.
All of these methods support two styles of filtering:
- Clauses: SQLAlchemy expressions, e.g.,
user_table.c["id"].in_((1, 2, 3)). - Keyword filters: Simple equality checks, e.g.,
email="[email protected]".
Both styles can be combined in a single call.
Building a SQL repository
Defining a repository for our user_table is remarkably straightforward. We subclass SQLAlchemyTableRepository and set the _table attribute:
from flama.ddd.repositories.sqlalchemy import SQLAlchemyTableRepository
class UserRepository(SQLAlchemyTableRepository): _table = user_tableThat is all that is needed. With these three lines, UserRepository inherits the full suite of CRUD methods described above, all operating on the user table. There is no SQL to write, no queries to define, no data access boilerplate, Flama provides everything out of the box.
Adding custom methods
While the built-in methods cover the vast majority of use cases, there are situations where you need domain-specific data access logic that goes beyond standard CRUD. In such cases, you can add custom methods to your repository class.
For example, suppose we need to count the number of active users:
class ExtendedUserRepository(SQLAlchemyTableRepository): _table = user_table
async def count_active_users(self) -> int: result = await self._connection.execute( self._table.select().where(self._table.c.active == True) ) return len(result.all())Custom methods have access to self._connection, which is the async SQLAlchemy connection managed by the worker. This gives you full flexibility to write any query you need, while keeping the data access logic encapsulated within the repository.
SQL repository example
Let us put the repository to work with a self-contained example that demonstrates the full CRUD lifecycle. In a real application, the database connection would be managed by a Worker (covered in the next page), but here we create one manually to show the repository in isolation.
import uuid
import sqlalchemyfrom flama.ddd.repositories.sqlalchemy import SQLAlchemyTableRepositoryfrom flama.sqlalchemy import metadatafrom sqlalchemy import create_enginefrom sqlalchemy.ext.asyncio import create_async_engine
# Data modeluser_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column( "id", sqlalchemy.String, primary_key=True, nullable=False, default=lambda: str(uuid.uuid4()), ), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False),)
# Repositoryclass UserRepository(SQLAlchemyTableRepository): _table = user_table
# Demoasync def main(): # Create the database sync_engine = create_engine("sqlite:///ddd_repo_demo.db", echo=False) metadata.create_all(sync_engine) sync_engine.dispose()
# Run repository operations engine = create_async_engine("sqlite+aiosqlite:///ddd_repo_demo.db") async with engine.connect() as connection: repo = UserRepository(connection)
# Create user_id = str(uuid.uuid4()) created = await repo.create( { "id": user_id, "name": "Alice", "surname": "Smith", "email": "[email protected]", "password": "hashed_password", "active": False, } ) await connection.commit() print(f"Created: {created}")
# Retrieve user = await repo.retrieve(id=user_id) print(f"Retrieved: {user}")
# Update updated = await repo.update({"active": True}, id=user_id) await connection.commit() print(f"Updated: {updated}")
# List users = [u async for u in repo.list()] print(f"All users: {users}")
# Delete await repo.delete(id=user_id) await connection.commit() print("User deleted.")
await engine.dispose()Running this produces the following output:
Created: [{'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': False}]Retrieved: {'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': False}Updated: [{'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': True}]All users: [{'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': True}]User deleted.Notice how every data operation goes through the repository's methods, never through raw SQL. The calling code simply invokes repo.create(...), repo.retrieve(...), and so on. If we ever needed to switch from SQLite to PostgreSQL (or even to an entirely different storage backend) only the repository would need to change; the code that uses it would remain untouched.
HTTP repositories in depth
Not every data source is a local database. In microservice architectures, your application may need to consume another service's API as if it were a local data store. Flama provides the HTTP repository branch for exactly this purpose. An HTTPResourceRepository exposes the same style of CRUD interface as its SQLAlchemy counterpart, but each method translates into an HTTP request to a remote Flama API instead of a SQL query.
Built-in CRUD methods
The HTTPResourceRepository delegates all operations to an internal HTTPResourceManager, which maps each method to the appropriate HTTP verb:
| Method | HTTP Verb | Endpoint | Description |
|---|---|---|---|
create(data) | POST | /{resource}/ | Create a new element. |
retrieve(id) | GET | /{resource}/{id}/ | Retrieve a single element. |
update(id, data) | PUT | /{resource}/{id}/ | Full update of an element. |
partial_update(id, data) | PATCH | /{resource}/{id}/ | Partial update of an element. |
delete(id) | DELETE | /{resource}/{id}/ | Delete an element. |
list() | GET | /{resource}/ | List elements (with pagination). |
replace(data) | PUT | /{resource}/ | Replace the full collection. |
partial_replace(data) | PATCH | /{resource}/ | Partial replace of the collection. |
drop() | DELETE | /{resource}/ | Drop the entire collection. |
Note that the method signatures differ slightly from the SQL variant. Because HTTP repositories operate on a remote resource identified by URL, they use a positional id parameter rather than SQLAlchemy-style clauses and keyword filters.
The list method supports automatic pagination. By default, it uses page-number pagination, but you can switch to limit-offset pagination by passing pagination="limit_offset".
Building an HTTP repository
Defining an HTTP repository mirrors the SQLAlchemy pattern. Instead of a _table attribute, you declare a _resource attribute that specifies the URL path prefix of the remote resource:
from flama.ddd.repositories.http import HTTPResourceRepository
class UserRepository(HTTPResourceRepository): _resource = "/user"With these three lines, UserRepository inherits the full suite of HTTP-backed CRUD methods. When the repository calls create(data), for example, it sends a POST request to /user/ on the remote API.
Error handling
HTTP repositories translate HTTP status codes into the same domain exceptions used by the SQL repository branch:
- 400 Bad Request →
IntegrityError - 404 Not Found →
NotFoundError
This means that business logic built on top of either repository variant can use the same exception-handling patterns.
When to use HTTP repositories
HTTP repositories are the right choice when your application acts as a client of another Flama service. A typical scenario is a gateway or aggregation service that composes data from multiple backend APIs. Because the repository interface is consistent across backends, you can mix SQL and HTTP repositories within the same worker if needed (though in practice, each worker tends to use a single backend type).
HTTP repository example
Let us put the HTTP repository to work with a self-contained example that mirrors the SQL repository example above. For the HTTP repository to have something to talk to, we first need a backend: a minimal Flama API that exposes user CRUD endpoints. The backend uses the SQL-backed patterns we have already covered, so we keep its definition brief:
import typing as timport uuid
import pydanticimport sqlalchemyfrom flama import Flama, typesfrom flama.client import Clientfrom flama.ddd import WorkerComponentfrom flama.ddd.repositories.http import HTTPResourceRepositoryfrom flama.ddd.repositories.sqlalchemy import SQLAlchemyTableRepositoryfrom flama.ddd.workers.sqlalchemy import SQLAlchemyWorkerfrom flama.sqlalchemy import SQLAlchemyModule, metadata
# Backend: a SQL-backed Flama app that exposes user CRUD over HTTPuser_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column( "id", sqlalchemy.String, primary_key=True, nullable=False, default=lambda: str(uuid.uuid4()), ), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False),)
class User(pydantic.BaseModel): id: t.Optional[str] = None name: str surname: str email: str password: str active: bool = False
class UserSQLRepository(SQLAlchemyTableRepository): _table = user_table
class BackendWorker(SQLAlchemyWorker): user: UserSQLRepository
backend = Flama( modules=[SQLAlchemyModule("sqlite+aiosqlite:///")], components=[WorkerComponent(worker=BackendWorker())],)
@backend.on_event("startup")async def on_startup(): async with backend.sqlalchemy.engine.begin() as conn: await conn.run_sync(metadata.create_all)
@backend.post("/user/")async def create_user( worker: BackendWorker, data: t.Annotated[types.Schema, types.SchemaMetadata(User)],): async with worker: created = await worker.user.create(dict(data)) return created[0]
@backend.get("/user/{user_id}/")async def get_user(worker: BackendWorker, user_id: str): async with worker: return await worker.user.retrieve(id=user_id)
@backend.put("/user/{user_id}/")async def update_user( worker: BackendWorker, user_id: str, data: t.Annotated[types.Schema, types.SchemaMetadata(User)],): async with worker: updated = await worker.user.update(dict(data), id=user_id) return updated[0]
@backend.delete("/user/{user_id}/")async def delete_user(worker: BackendWorker, user_id: str): async with worker: await worker.user.delete(id=user_id)
# HTTP Repository: consumes the backend over HTTPclass UserHTTPRepository(HTTPResourceRepository): _resource = "/user"
# Demoasync def main(): async with Client(app=backend) as client: repo = UserHTTPRepository(client)
# Create user_id = str(uuid.uuid4()) created = await repo.create( { "id": user_id, "name": "Alice", "surname": "Smith", "email": "[email protected]", "password": "hashed_password", "active": False, } ) print(f"Created: {created}")
# Retrieve user = await repo.retrieve(user_id) print(f"Retrieved: {user}")
# Update updated = await repo.update( user_id, { "id": user_id, "name": "Alice", "surname": "Smith", "email": "[email protected]", "password": "hashed_password", "active": True, }, ) print(f"Updated: {updated}")
# Delete await repo.delete(user_id) print("User deleted.")Two things are worth noting about this example:
Client(app=backend): Flama'sClient(which extendshttpx.AsyncClient) accepts a local Flama application as theappparameter. This routes all HTTP calls through the application's ASGI interface without opening a network socket, which is ideal for testing and self-contained demonstrations.- Method signatures: Unlike the SQL repository, where
retrieveanddeleteaccept keyword filters (e.g.,id=user_id), the HTTP repository methods take a positionalidparameter (e.g.,repo.retrieve(user_id)). Theupdatemethod similarly takesidas its first argument rather than a keyword filter. This reflects the URL-centric nature of HTTP: each entity is identified by its path.
Running this produces the following output:
Created: {'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': False}Retrieved: {'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': False}Updated: {'id': '...', 'name': 'Alice', 'surname': 'Smith', 'email': '[email protected]', 'password': 'hashed_password', 'active': True}User deleted.The calling code is structurally identical to the SQL repository example: repo.create(...), repo.retrieve(...), repo.update(...), repo.delete(...). The only difference is that each call now translates to an HTTP request rather than a SQL query. If the backend were changed from SQLite to PostgreSQL, or from a local app to a remote service, the HTTP repository's calling code would remain untouched.
With the repository handling data access, the remaining challenge is managing the connection lifecycle. In the next page, we introduce the Worker pattern, which takes care of this automatically, whether the backend is a database or an HTTP API.