Domain driven designWorker
Domain driven design~ 11 min read

Worker

In the previous page, we saw how the repository pattern encapsulates data access behind a clean interface. However, the example required us to manage the database connection manually. In a production application, this kind of manual wiring would be error-prone and tedious, especially when multiple repositories need to participate in the same atomic transaction.

This is where the Worker comes in. The worker is Flama's implementation of the Unit of Work pattern, and it is the piece that ties repositories together with connection and transactional guarantees. It manages the connection lifecycle (whether that is a database connection or an HTTP client), ensures that all operations within a single business action succeed together or fail together (where the backend supports it), and creates the repository instances automatically.

What is a Worker?

A Worker (unit of work) is an object that encapsulates a single, atomic unit of business work. When you enter a worker's context, it opens a database connection, begins a transaction, and creates all the repositories it manages. When you exit the context, it either commits the transaction (if everything succeeded) or rolls it back (if an exception occurred), and then closes the connection.

Why is the worker pattern important?

  • Atomicity: All the operations performed within a single async with worker: block are grouped into a single transaction. If any operation fails, no partial changes are persisted.
  • Connection management: The worker handles opening and closing database connections automatically, eliminating the risk of leaked connections or unclosed sessions.
  • Repository orchestration: The worker instantiates all its repositories with the correct connection instance, ensuring they all operate within the same transactional context.
  • Clean business logic: Resource methods can focus entirely on business rules, without worrying about connection handling, transaction management, or error recovery.

The main virtue workers bring is providing a structured, atomic boundary for business operations, ensuring data consistency while keeping the business logic decoupled from infrastructure plumbing.

Worker hierarchy

Flama provides a layered class hierarchy for workers:

ClassPurpose
AbstractWorkerAbstract base. Defines the lifecycle interface (begin, end, commit, rollback).
BaseWorkerAdds the metaclass machinery that discovers repositories declared as class attributes.
WorkerA no-op implementation for cases that do not require transactions.
SQLAlchemyWorkerManages an async SQLAlchemy connection and transaction.
HTTPWorkerManages a Flama HTTP Client for remote repository patterns.

For database-backed applications, SQLAlchemyWorker is the class you will use. For applications that consume a remote API over HTTP, HTTPWorker provides the same worker interface with an HTTP Client in place of a database connection.

SQLAlchemy Worker

The SQLAlchemyWorker manages an async SQLAlchemy connection and transaction lifecycle. It is the worker you will use whenever your repositories access a local database.

Lifecycle

Understanding the worker's lifecycle is key to using it effectively. The lifecycle follows a precise sequence:

  1. Acquire lock: The worker acquires an internal async lock to prevent concurrent usage.
  2. Set up (set_up()): Opens a database connection and begins a transaction. For SQLAlchemyWorker, this means calling app.sqlalchemy.open_connection() and app.sqlalchemy.begin_transaction().
  3. Create repositories: The worker inspects its class-level repository declarations (type annotations) and instantiates each repository with the current connection.
  4. Business logic: Your code runs inside the async with worker: block, calling repository methods as needed.
  5. Commit or rollback: If the block exits normally, changes are committed. If an exception occurs, the transaction is rolled back.
  6. Tear down (tear_down()): The transaction is ended and the connection is closed.
  7. Destroy repositories: All repository instances are removed from the worker.
  8. Release lock: The async lock is released.

In code, this entire lifecycle is triggered by the async with worker: context manager:

async with worker:    # Steps 1-3 happen automatically before this line    await worker.user.create({...})  # Step 4: business logic    user = await worker.user.retrieve(email="[email protected]")# Steps 5-8 happen automatically after this line

Building a SQLAlchemy Worker

Defining a worker is as simple as declaring which repositories it manages. Each repository is declared as a class-level type annotation:

from flama.ddd.workers.sqlalchemy import SQLAlchemyWorker
class RegisterWorker(SQLAlchemyWorker): user: UserRepository

That is all that is needed. The SQLAlchemyWorker metaclass automatically discovers the user annotation, recognises that UserRepository is a subclass of BaseRepository, and registers it. When you enter the async with block, the worker creates an instance of UserRepository with the active connection and assigns it to self.user.

Multiple repositories

If your business operation spans multiple entities, simply declare additional repositories:

class OrderWorker(SQLAlchemyWorker):    user: UserRepository    product: ProductRepository    order: OrderRepository

All three repositories will be created with the same connection and participate in the same transaction. This is the power of the unit of work pattern, you can atomically create a user, a product, and an order in a single async with block, confident that either all three are persisted or none of them are.

Integrating with your app

To make a worker available to your route handlers and resource methods via dependency injection, Flama provides the WorkerComponent. This component bridges the worker and Flama's injector system.

The registration happens at application creation time:

from flama import Flamafrom flama.ddd import WorkerComponentfrom flama.sqlalchemy import SQLAlchemyModule
DATABASE_URL = "sqlite+aiosqlite:///ddd_worker_demo.db"
app = Flama( modules=[SQLAlchemyModule(DATABASE_URL)], components=[WorkerComponent(worker=RegisterWorker())],)

Two things are happening here:

  • SQLAlchemyModule(DATABASE_URL): A Flama module that manages the async SQLAlchemy engine. It provides the app.sqlalchemy interface that the worker uses to open connections and manage transactions.
  • WorkerComponent(worker=RegisterWorker()): A component that provides the RegisterWorker instance to any function that declares it as a parameter. It uses Flama's can_handle_parameter() mechanism to match by class identity, so the injector knows exactly which worker to provide when a route handler requests worker: RegisterWorker.

Once registered, any route handler or resource method can receive the worker by simply declaring it as a parameter:

@app.post("/demo/")async def demo_worker(worker: RegisterWorker):    async with worker:        await worker.user.create({...})        user = await worker.user.retrieve(id=user_id)    return {"created_user": user}

Example

Here is a complete, self-contained example that puts together the data model, repository, worker, and application wiring. It exposes a single endpoint that creates a user and retrieves it, all within one atomic unit of work.

import uuid
import flamaimport sqlalchemyfrom flama import Flamafrom flama.ddd import WorkerComponentfrom flama.ddd.repositories.sqlalchemy import SQLAlchemyTableRepositoryfrom flama.ddd.workers.sqlalchemy import SQLAlchemyWorkerfrom flama.sqlalchemy import SQLAlchemyModule, metadata
# Data modeluser_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column( "id", sqlalchemy.String, primary_key=True, nullable=False, default=lambda: str(uuid.uuid4()), ), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False),)

# Repositoryclass UserRepository(SQLAlchemyTableRepository): _table = user_table

# Workerclass RegisterWorker(SQLAlchemyWorker): user: UserRepository

# ApplicationDATABASE_URL = "sqlite+aiosqlite:///ddd_worker_demo.db"
app = Flama( openapi={ "info": { "title": "Worker Demo API", "version": "1.0.0", "description": "Demonstrating the Worker pattern with Flama 🔥", }, }, docs="/docs/", modules=[SQLAlchemyModule(DATABASE_URL)], components=[WorkerComponent(worker=RegisterWorker())],)

@app.on_event("startup")async def on_startup(): async with app.sqlalchemy.engine.begin() as conn: await conn.run_sync(metadata.create_all) print("Database initialised.")

@app.post("/demo/")async def demo_worker(worker: RegisterWorker): """ tags: - Demo summary: Worker demo description: Creates a user and retrieves it within an atomic unit of work. responses: 200: description: Demonstration result with the created user. """ user_id = str(uuid.uuid4())
async with worker: await worker.user.create( { "id": user_id, "name": "Alice", "surname": "Smith", "email": f"alice-{user_id[:8]}@example.com", "password": "hashed_secret", "active": True, } ) user = await worker.user.retrieve(id=user_id)
return {"created_user": user}

if __name__ == "__main__": flama.run(flama_app=app, server_host="0.0.0.0", server_port=8000)

When you send a POST request to /demo/, the worker opens a connection, begins a transaction, creates the user, retrieves it, commits the transaction, and closes the connection, all automatically. The response will look like:

{    "created_user": {        "id": "a91a5c6e-09fb-4a50-a157-339a8398426d",        "name": "Alice",        "surname": "Smith",        "email": "[email protected]",        "password": "hashed_secret",        "active": true    }}

With the repository and worker in place, we have a complete data access layer that isolates the database from everything else.

HTTP Worker

The HTTPWorker provides the same worker interface for applications that consume a remote Flama API over HTTP. Instead of managing a database connection and transaction, it manages an HTTP Client instance, and passes it to its repositories.

Key differences

AspectSQLAlchemyWorkerHTTPWorker
ConnectionAsync SQLAlchemy AsyncConnectionFlama Client (HTTP)
TransactionsReal database transactions (commit / rollback)No-op (HTTP is stateless)
Set upOpens DB connection + begins transactionCreates and opens an HTTP Client
Tear downEnds transaction + closes connectionCloses and disposes the HTTP Client
ConstructorNo extra parameters (uses app.sqlalchemy)url (string or callable) + optional app

Because HTTP is inherently stateless, the commit() and rollback() methods on HTTPWorker are intentional no-ops. The worker boundary still provides a useful structure: it guarantees that the HTTP client is properly initialised before any repository call, and properly closed afterwards.

Building an HTTP Worker

Defining an HTTP worker mirrors the SQLAlchemy pattern. You declare the repositories it manages as class-level type annotations. The constructor takes the url of the remote API (either as a static string or as a callable that returns one):

from flama.ddd.workers.http import HTTPWorkerfrom flama.ddd.repositories.http import HTTPResourceRepository

class UserRepository(HTTPResourceRepository): _resource = "/user"

class RemoteWorker(HTTPWorker): user: UserRepository

When entering the async with block, the worker creates an HTTP Client pointed at the given url, then instantiates each declared repository with that client.

Integrating with your app

Registration follows the same pattern as SQLAlchemyWorker, using a WorkerComponent:

from flama import Flamafrom flama.ddd import WorkerComponent
app = Flama( openapi={ "info": { "title": "Gateway API", "version": "1.0.0", "description": "A service that consumes a remote Flama API", }, }, docs="/docs/", components=[WorkerComponent(worker=RemoteWorker(url="http://backend:8000"))],)

Notice that no SQLAlchemyModule is needed, the HTTPWorker manages its own HTTP client. The url parameter can be a callable if the backend address is resolved dynamically (e.g., from a service registry or environment variable):

import os
RemoteWorker(url=lambda: os.environ["BACKEND_URL"])

Using the HTTP Worker

Once registered, the HTTP worker is consumed exactly like the SQLAlchemy variant:

@app.get("/users/{user_id}/")async def get_user(user_id: str, worker: RemoteWorker):    async with worker:        user = await worker.user.retrieve(user_id)    return user

The calling code is completely unaware of whether the data comes from a local database or a remote API. This backend-agnostic interface is the core value of the repository and worker patterns.

When to use

HTTP workers are the right choice when your application acts as a gateway, aggregator, or backend-for-frontend that consumes one or more remote Flama APIs. They allow you to treat remote services as if they were local data sources, with the same clean, domain-oriented interface.

Example

Here is a complete, self-contained example that mirrors the SQLAlchemy Worker example above. It builds a gateway application whose single endpoint creates a user and retrieves it, delegating all data operations to a backend Flama API via the HTTPWorker.

To keep the example self-contained, we define the backend in the same file and route the gateway's requests through it using httpx.ASGITransport. In production, the backend would be a separate service and you would pass its URL directly (e.g., GatewayWorker(url="http://backend:8000")).

import typing as timport uuid
import httpximport pydanticimport sqlalchemyfrom flama import Flama, typesfrom flama.client import Clientfrom flama.ddd import WorkerComponentfrom flama.ddd.repositories.http import HTTPResourceRepositoryfrom flama.ddd.repositories.sqlalchemy import SQLAlchemyTableRepositoryfrom flama.ddd.workers.http import HTTPWorkerfrom flama.ddd.workers.sqlalchemy import SQLAlchemyWorkerfrom flama.sqlalchemy import SQLAlchemyModule, metadata

# Backend (a SQL-backed Flama app, as built in the earlier pages)user_table = sqlalchemy.Table( "user", metadata, sqlalchemy.Column( "id", sqlalchemy.String, primary_key=True, nullable=False, default=lambda: str(uuid.uuid4()), ), sqlalchemy.Column("name", sqlalchemy.String, nullable=False), sqlalchemy.Column("surname", sqlalchemy.String, nullable=False), sqlalchemy.Column("email", sqlalchemy.String, nullable=False, unique=True), sqlalchemy.Column("password", sqlalchemy.String, nullable=False), sqlalchemy.Column("active", sqlalchemy.Boolean, nullable=False),)

class User(pydantic.BaseModel): id: t.Optional[str] = None name: str surname: str email: str password: str active: bool = False

class UserSQLRepository(SQLAlchemyTableRepository): _table = user_table

class BackendWorker(SQLAlchemyWorker): user: UserSQLRepository

backend = Flama( modules=[SQLAlchemyModule("sqlite+aiosqlite:///")], components=[WorkerComponent(worker=BackendWorker())],)

@backend.on_event("startup")async def on_startup(): async with backend.sqlalchemy.engine.begin() as conn: await conn.run_sync(metadata.create_all)

@backend.post("/user/")async def create_user( worker: BackendWorker, data: t.Annotated[types.Schema, types.SchemaMetadata(User)],): async with worker: created = await worker.user.create(dict(data)) return created[0]

@backend.get("/user/{user_id}/")async def get_user(worker: BackendWorker, user_id: str): async with worker: return await worker.user.retrieve(id=user_id)

# Gateway: an HTTPWorker-based app that consumes the backendclass UserHTTPRepository(HTTPResourceRepository): _resource = "/user"

class GatewayWorker(HTTPWorker): user: UserHTTPRepository

gateway = Flama( openapi={ "info": { "title": "Gateway API", "version": "1.0.0", "description": "A gateway that consumes a remote Flama API", }, }, docs="/docs/", components=[ WorkerComponent( worker=GatewayWorker( url="http://localapp", transport=httpx.ASGITransport(app=backend), ) ) ],)

@gateway.post("/demo/")async def demo_http_worker(worker: GatewayWorker): """ tags: - Demo summary: HTTP Worker demo description: Creates a user and retrieves it via the HTTP worker. responses: 200: description: Demonstration result with the created user. """ user_id = str(uuid.uuid4())
async with worker: await worker.user.create( { "id": user_id, "name": "Alice", "surname": "Smith", "email": f"alice-{user_id[:8]}@example.com", "password": "hashed_secret", "active": True, } ) user = await worker.user.retrieve(user_id)
return {"created_user": user}

The gateway application exposes a single POST endpoint that exercises the HTTP worker. Notice how demo_http_worker is structurally identical to its SQLAlchemy counterpart from the earlier example: enter the worker context, call repository methods, return the result. The only difference is what happens behind the scenes, HTTP requests instead of SQL queries.

When you send a POST request to /demo/, the HTTP worker opens an HTTP Client, creates the UserHTTPRepository with that client, and the repository translates each method call into the corresponding HTTP request against the backend. The response will look like:

{    "created_user": {        "id": "2d1dbc22-2c1d-4366-a424-90161e73466f",        "name": "Alice",        "surname": "Smith",        "email": "[email protected]",        "password": "hashed_secret",        "active": true    }}

In the next page, we build the Resource layer, the place where the actual business logic lives, expressed as clean, readable endpoint methods that delegate all data operations to the worker.