Week 40 of 2025

Development log of Joke as a Service

20 items
  1. Write a test for password length validation
  2. Implement password length validation
  3. Narrow the password length test conditions
  4. Write a TODO for a blank password validation
  5. Write a test for blank password validation
  6. Use field_validator to check for blank passwords
  7. Test multiple blank passwords samples
  8. Implement first API test (GET /)
  9. Write a user registration test (POST /jokers/)
  10. Use dependency injection for DB connection
  11. Write a test for authentication
  12. Fix authentication (POST /token/)
  13. Fix a warning about lifespan
  14. Fix warnings about unused markers
  15. Write a comment about dependencies from flake.nix
  16. Remove unused variables from tests
  17. Remove a stale TODO comment
  18. Write some more ideas for REST tests
  19. Update Nix dependencies
  20. Make nix run tests before installing

Write a test for password length validation

On by Tad Lispy

The test does not pass yet.

index 7ddafc7..72366da 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -1,4 +1,6 @@
=from main import JokerRegistration
+from pydantic import ValidationError
+import pytest
=
=
=def test_valid_registration():
@@ -8,3 +10,14 @@ def test_valid_registration():
=    registration = JokerRegistration(**payload)
=    assert registration.name == "Funny Guy"
=    assert registration.password == "Jokes 4 you!"
+
+
+def test_password_too_short_registration():
+    """Password must be at least 8 characters long"""
+
+    payload = {"name": "Funny Guy", "password": "jokes"}
+
+    with pytest.raises(
+        ValidationError, match="Password must be at least 8 characters long"
+    ) as exception_info:
+        JokerRegistration(**payload)

Implement password length validation

On by Tad Lispy

I had to modify the test, as it is quite difficult to modify built-in error message from Pydantic.

index 5874cff..3b9e704 100644
--- a/main.py
+++ b/main.py
@@ -16,7 +16,7 @@ import uvicorn
=from fastapi import FastAPI, HTTPException, Response, Depends
=from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
=from passlib.context import CryptContext
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
=
=
=DB_PATH = Path(os.environ.get("jaas_db_path") or "./jokes.db").expanduser().resolve()
@@ -158,7 +158,7 @@ async def authenticate(form: Annotated[OAuth2PasswordRequestForm, Depends()]) ->
=# TODO: Validation: password and name length, not blank, etc.
=class JokerRegistration(BaseModel):
=    name: str
-    password: str
+    password: str = Field(min_length=8)
=
=
=@app.post("/jokers/", status_code=201)
index 72366da..cda8f0d 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -18,6 +18,6 @@ def test_password_too_short_registration():
=    payload = {"name": "Funny Guy", "password": "jokes"}
=
=    with pytest.raises(
-        ValidationError, match="Password must be at least 8 characters long"
+        ValidationError, match="at least 8 characters"
=    ) as exception_info:
=        JokerRegistration(**payload)

Narrow the password length test conditions

On by Tad Lispy

Make sure the offending field is "password" and that it's too short.

index cda8f0d..44f23b2 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -21,3 +21,6 @@ def test_password_too_short_registration():
=        ValidationError, match="at least 8 characters"
=    ) as exception_info:
=        JokerRegistration(**payload)
+
+    assert exception_info.value.errors()[0]["loc"] == ("password",)
+    assert exception_info.value.errors()[0]["type"] == "string_too_short"

Write a TODO for a blank password validation

On by Tad Lispy

index 44f23b2..aa4e387 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -24,3 +24,6 @@ def test_password_too_short_registration():
=
=    assert exception_info.value.errors()[0]["loc"] == ("password",)
=    assert exception_info.value.errors()[0]["type"] == "string_too_short"
+
+
+# TODO: Password cannot be blank

Write a test for blank password validation

On by Tad Lispy

index aa4e387..767112a 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -26,4 +26,15 @@ def test_password_too_short_registration():
=    assert exception_info.value.errors()[0]["type"] == "string_too_short"
=
=
-# TODO: Password cannot be blank
+def test_blank_password_registration():
+    """Password cannot be blank"""
+
+    payload = {"name": "Funny Guy", "password": "\t    \t\t    \n "}
+
+    with pytest.raises(
+        ValidationError, match="A password cannot be blank"
+    ) as exception_info:
+        JokerRegistration(**payload)
+
+    # assert exception_info.value.errors()[0]["loc"] == ("password",)
+    # assert exception_info.value.errors()[0]["type"] == "string_too_short"

Use field_validator to check for blank passwords

On by Tad Lispy

index 3b9e704..0e41417 100644
--- a/main.py
+++ b/main.py
@@ -16,7 +16,7 @@ import uvicorn
=from fastapi import FastAPI, HTTPException, Response, Depends
=from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
=from passlib.context import CryptContext
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, Field, field_validator
=
=
=DB_PATH = Path(os.environ.get("jaas_db_path") or "./jokes.db").expanduser().resolve()
@@ -160,6 +160,12 @@ class JokerRegistration(BaseModel):
=    name: str
=    password: str = Field(min_length=8)
=
+    @field_validator("password")
+    @classmethod
+    def blank_password_validator(model, value: str):
+        assert not value.isspace(), "A password cannot be blank"
+        return value
+
=
=@app.post("/jokers/", status_code=201)
=def register_joker(registration: JokerRegistration) -> Joker:

Test multiple blank passwords samples

On by Tad Lispy

Re-use the test function with parametrize mark. Both valid and invalid samples are provided, with second argument indicating if they are valid or not.

index 767112a..4236bd1 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -26,15 +26,25 @@ def test_password_too_short_registration():
=    assert exception_info.value.errors()[0]["type"] == "string_too_short"
=
=
-def test_blank_password_registration():
+@pytest.mark.parametrize(
+    "input, good",
+    [
+        ("\t    \t\t    \n ", False),
+        ("\t    \there\t    \n ", True),
+        ("        ", False),
+        ("    *   ", True),
+        ("a       ", True),
+    ],
+)
+def test_blank_password_registration(input, good):
=    """Password cannot be blank"""
=
-    payload = {"name": "Funny Guy", "password": "\t    \t\t    \n "}
+    payload = {"name": "Funny Guy", "password": input}
=
-    with pytest.raises(
-        ValidationError, match="A password cannot be blank"
-    ) as exception_info:
+    if good:
=        JokerRegistration(**payload)
-
-    # assert exception_info.value.errors()[0]["loc"] == ("password",)
-    # assert exception_info.value.errors()[0]["type"] == "string_too_short"
+    else:
+        with pytest.raises(
+            ValidationError, match="A password cannot be blank"
+        ) as exception_info:
+            JokerRegistration(**payload)

Implement first API test (GET /)

On by Tad Lispy

Using FastAPI TestClient.

Also write some TODOs for future tests.

index 9495fe0..9ab8b46 100644
--- a/flake.nix
+++ b/flake.nix
@@ -23,6 +23,7 @@
=          ps.bcrypt
=          ps.python-multipart
=          ps.pytest
+          ps.httpx
=        ]);
=      in
=
new file mode 100644
index 0000000..ebb1c61
--- /dev/null
+++ b/test_rest.py
@@ -0,0 +1,27 @@
+import pytest
+import main
+from fastapi.testclient import TestClient
+
+
+# TODO: Use dependency injection for DB connections so it can be mocked during tests
+#
+#       Currently the database file is created in the CWD, and the tests are not
+#       isolated. A unique temporary file should be created for every test.
+
+client = TestClient(main.app)
+
+
+def test_get_root():
+    response = client.get("/")
+    assert response.status_code == 200
+    body = response.json()
+    assert "id" in body
+    assert "text" in body
+    assert "author_id" in body
+    assert "author_name" in body
+    assert "laughs" in body
+
+
+# TODO: Test user registration
+
+# TODO: Test authentication. Use a fixture to register new users before authenticating.

Write a user registration test (POST /jokers/)

On by Tad Lispy

It does pass the first time, but on subsequent trials it fails, because tests are not isolated. The same .db file is used every time (it also pollutes the filesystem with those files). Let's fix it!

index ebb1c61..42a9715 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -22,6 +22,14 @@ def test_get_root():
=    assert "laughs" in body
=
=
-# TODO: Test user registration
+def test_post_jokers():
+    response = client.post(
+        "/jokers/", json={"name": "Bob", "password": "Bob is funny!"}
+    )
+    assert response.status_code == 201
+    body = response.json()
+    assert "id" in body
+    assert "name" in body
+
=
=# TODO: Test authentication. Use a fixture to register new users before authenticating.

Use dependency injection for DB connection

On by Tad Lispy

This is a refactoring that touches all endpoints, and a bit of initialization code.

It enables the user registration test to pass by isolating the test case. Technically the get_db_path dependency provider is being overridden with a unique path for each test run. That way inserting a user doesn't conflict with previous runs, as they are inserted into different databases.

index 0e41417..963af57 100644
--- a/main.py
+++ b/main.py
@@ -19,8 +19,6 @@ from passlib.context import CryptContext
=from pydantic import BaseModel, Field, field_validator
=
=
-DB_PATH = Path(os.environ.get("jaas_db_path") or "./jokes.db").expanduser().resolve()
-
=JWT_SECRET_KEY = os.environ.get("jaas_jwt_secret_key")
=JWT_ALGORITHM = "HS256"
=JWT_TTL_MINUTES = 30
@@ -35,20 +33,52 @@ security = OAuth2PasswordBearer(tokenUrl="token")
=password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
=
=
+def get_db_path() -> Path:
+    """A dependable providing a path to the SQLite .db file
+
+    Can be overridden in tests to isolate different cases.
+    """
+    return Path(os.environ.get("jaas_db_path") or "./jokes.db").expanduser().resolve()
+
+
+DBPath = Annotated[Path, Depends(get_db_path)]
+
+
+def get_database(path: DBPath) -> sqlite3.Connection:
+    """A dependable providing a configured DB connection"""
+    with sqlite3.connect(path) as db:
+        db.row_factory = sqlite3.Row
+        db.execute("Pragma foreign_keys = on")
+        yield db
+
+
+Database = Annotated[sqlite3.Connection, Depends(get_database)]
+
+
=async def app_lifespan(app: FastAPI):
-    logger.info(f"Using the database at {DB_PATH}")
-    if not os.path.exists(DB_PATH):
+    # Below is a homegrown dependency injection
+    #
+    # We need this to initialize a mock database during tests, but the standard
+    # FastAPI dependency injection apparently doesn't work in lifecycle hook.
+    # This solution seems a bit smelly. E.g. transient dependencies would not be
+    # taken care of. Is there a more elegant way to inject dependencies into the
+    # lifespan function?
+    _get_db_path = app.dependency_overrides.get(get_db_path) or get_db_path
+    db_path = _get_db_path()
+
+    logger.info(f"Using the database at {db_path}")
+    if not os.path.exists(db_path):
=        init_sql_path = Path(__file__).joinpath("../init.sql").expanduser().resolve()
=        logger.info(
-            f"Database does not exist at {DB_PATH}. Initializing with {init_sql_path}."
+            f"Database does not exist at {db_path}. Initializing with {init_sql_path}."
=        )
=        init_sql = init_sql_path.read_text(encoding="utf-8")
=        try:
-            with sqlite3.connect(DB_PATH) as db:
+            with sqlite3.connect(db_path) as db:
=                db.executescript(init_sql)
=        except Exception as exception:
=            logger.warn("Initializing database failed. Rolling back")
-            os.remove(DB_PATH)
+            os.remove(db_path)
=            raise exception
=
=    yield
@@ -79,7 +109,12 @@ class Token(BaseModel):
=    token_type: str = "bearer"
=
=
-async def get_current_user(token: Annotated[str, Depends(security)]):
+# TODO: Use type aliases for nicer dependency injection
+#
+#       See <https://fastapi.tiangolo.com/tutorial/dependencies/#share-annotated-dependencies>
+
+
+async def get_current_user(token: Annotated[str, Depends(security)], db: Database):
=    """Provide the current user if logged in, otherwise err.
=
=    It's a dependency that can be used on endpoints that require
@@ -89,70 +124,65 @@ async def get_current_user(token: Annotated[str, Depends(security)]):
=    token_data = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
=    username = token_data.get("sub")
=
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        joker = cursor.execute(
-            """
-            Select
-                joker.id,
-                joker.name
-            from
-                joker
-            where
-                joker.name = :username
-            ;
-            """,
-            {"username": username},
-        ).fetchone()
+    cursor = db.cursor()
+    joker = cursor.execute(
+        """
+        Select
+            joker.id,
+            joker.name
+        from
+            joker
+        where
+            joker.name = :username
+        ;
+        """,
+        {"username": username},
+    ).fetchone()
=
-        if joker is None:
-            raise HTTPException(
-                status_code=400,
-                detail="Invalid joker",
-                headers={"WWW-Authenticate": "Bearer"},
-            )
+    if joker is None:
+        raise HTTPException(
+            status_code=400,
+            detail="Invalid joker",
+            headers={"WWW-Authenticate": "Bearer"},
+        )
=
-        return Joker(**joker)
+    return Joker(**joker)
=
=
=@app.post("/token")
-async def authenticate(form: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
+async def authenticate(
+    form: Annotated[OAuth2PasswordRequestForm, Depends()], db: Database
+) -> Token:
=    """Get a token based on username and password."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        joker = cursor.execute(
-            """
-            Select
-                joker.id,
-                joker.name,
-                joker.password
-            from
-                joker
-            where
-                joker.name = :username
-            ;
-            """,
-            {"username": form.username},
-        ).fetchone()
+    cursor = db.cursor()
+    joker = cursor.execute(
+        """
+        Select
+            joker.id,
+            joker.name,
+            joker.password
+        from
+            joker
+        where
+            joker.name = :username
+        ;
+        """,
+        {"username": form.username},
+    ).fetchone()
=
-        if joker is None:
-            raise HTTPException(
-                status_code=400, detail="Incorrect username or password"
-            )
+    if joker is None:
+        raise HTTPException(status_code=400, detail="Incorrect username or password")
=
-        if not password_context.verify(form.password, joker["password"]):
-            raise HTTPException(
-                status_code=400, detail="Incorrect username or password"
-            )
+    if not password_context.verify(form.password, joker["password"]):
+        raise HTTPException(status_code=400, detail="Incorrect username or password")
=
-        ttl = timedelta(minutes=JWT_TTL_MINUTES)
-        exp = datetime.now(timezone.utc) + ttl
-        encoded = jwt.encode(
-            {"exp": exp, "sub": joker["name"]}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM
-        )
-        return Token(access_token=encoded)
+    # TODO: Separate the pure part below and write some unit tests
+    ttl = timedelta(minutes=JWT_TTL_MINUTES)
+    exp = datetime.now(timezone.utc) + ttl
+    encoded = jwt.encode(
+        {"exp": exp, "sub": joker["name"]}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM
+    )
+    return Token(access_token=encoded)
=
=
=# TODO: Validation: password and name length, not blank, etc.
@@ -168,36 +198,34 @@ class JokerRegistration(BaseModel):
=
=
=@app.post("/jokers/", status_code=201)
-def register_joker(registration: JokerRegistration) -> Joker:
+def register_joker(registration: JokerRegistration, db: Database) -> Joker:
=    password = password_context.hash(registration.password)
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        try:
-            joker = cursor.execute(
-                """
-                Insert into joker (
-                    name,
-                    password
-                ) values (
-                    :name,
-                    :password
-                )
-                returning id, name
-                ;
-                """,
-                {"name": registration.name, "password": password},
-            ).fetchone()
-
-            return Joker(**joker)
-
-        except sqlite3.IntegrityError as error:
-            if error.args[0] == "UNIQUE constraint failed: joker.name":
-                raise HTTPException(
-                    status_code=409, detail=f"Name already taken: {registration.name}"
-                )
-            else:
-                raise error
+    cursor = db.cursor()
+    try:
+        joker = cursor.execute(
+            """
+            Insert into joker (
+                name,
+                password
+            ) values (
+                :name,
+                :password
+            )
+            returning id, name
+            ;
+            """,
+            {"name": registration.name, "password": password},
+        ).fetchone()
+
+        return Joker(**joker)
+
+    except sqlite3.IntegrityError as error:
+        if error.args[0] == "UNIQUE constraint failed: joker.name":
+            raise HTTPException(
+                status_code=409, detail=f"Name already taken: {registration.name}"
+            )
+        else:
+            raise error
=
=
=@app.get("/me")
@@ -207,41 +235,37 @@ def get_current_joker(user: Annotated[Joker, Depends(get_current_user)]) -> Joke
=
=
=@app.get("/me/laughs")
-def get_laughs(user: Annotated[Joker, Depends(get_current_user)]) -> list[Joke]:
+def get_laughs(
+    user: Annotated[Joker, Depends(get_current_user)], db: Database
+) -> list[Joke]:
=    """List of jokes user laughed at."""
-    """List all jokes."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        db.execute("Pragma foreign_keys = on")
-        cursor = db.cursor()
-        cursor.execute(
-            """
-            Select
-                joke.id,
-                joke.text,
-                joker.id as author_id,
-                joker.name as author_name,
-                (select count(*) from laugh where laugh.joke = joke.id) as laughs
-            from
-                joke
-                join joker on joke.author = joker.id
-                join laugh on joke.id = laugh.joke
-            where
-                laugh.joker = :joker
-            ;
-            """,
-            {"joker": user.id},
-        )
-        return [Joke(**row) for row in cursor.fetchall()]
+    cursor = db.cursor()
+    cursor.execute(
+        """
+        Select
+            joke.id,
+            joke.text,
+            joker.id as author_id,
+            joker.name as author_name,
+            (select count(*) from laugh where laugh.joke = joke.id) as laughs
+        from
+            joke
+            join joker on joke.author = joker.id
+            join laugh on joke.id = laugh.joke
+        where
+            laugh.joker = :joker
+        ;
+        """,
+        {"joker": user.id},
+    )
+    return [Joke(**row) for row in cursor.fetchall()]
=
=
=@app.get("/")
-def get_random_joke() -> Joke:
+def get_random_joke(db: Database) -> Joke:
=    """Get a single random joke."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        cursor.execute("""
+    cursor = db.cursor()
+    cursor.execute("""
=        Select
=            joke.id,
=            joke.text,
@@ -255,12 +279,14 @@ def get_random_joke() -> Joke:
=            random()
=        limit 1
=        ;
-        """)
-        return Joke(**cursor.fetchone())
+    """)
+    return Joke(**cursor.fetchone())
=
=
=@app.get("/jokes")
-def list_jokes(sort: str = "laughs", descending: bool = True) -> list[Joke]:
+def list_jokes(
+    db: Database, sort: str = "laughs", descending: bool = True
+) -> list[Joke]:
=    """List all jokes."""
=    logger.info(f"{sort} {descending}")
=
@@ -271,119 +297,112 @@ def list_jokes(sort: str = "laughs", descending: bool = True) -> list[Joke]:
=            detail=f"Invalid sort parameter: '{sort}'. Acceptable values are {sortable_columns}",
=        )
=
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        db.execute("Pragma foreign_keys = on")
-        cursor = db.cursor()
-        direction = "desc" if descending else "asc"
-        cursor.execute(
-            f"""
-            Select
-                joke.id,
-                joke.text,
-                joker.id as author_id,
-                joker.name as author_name,
-                (select count(*) from laugh where laugh.joke = joke.id) as laughs
-            from
-                joke
-                join joker on joke.author = joker.id
-            order by {sort} collate nocase {direction}
-            ;
-            """
-        )
-        return [Joke(**row) for row in cursor.fetchall()]
+    cursor = db.cursor()
+    direction = "desc" if descending else "asc"
+    cursor.execute(
+        f"""
+        Select
+            joke.id,
+            joke.text,
+            joker.id as author_id,
+            joker.name as author_name,
+            (select count(*) from laugh where laugh.joke = joke.id) as laughs
+        from
+            joke
+            join joker on joke.author = joker.id
+        order by {sort} collate nocase {direction}
+        ;
+        """
+    )
+    return [Joke(**row) for row in cursor.fetchall()]
=
=
=@app.get("/jokes/{id}")
-def get_joke(id: int) -> Joke:
+def get_joke(db: Database, id: int) -> Joke:
=    """Get a single joke."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        joke = cursor.execute(
-            """
-            Select
-                joke.id,
-                joke.text,
-                joker.id as author_id,
-                joker.name as author_name,
-                (select count(*) from laugh where laugh.joke = joke.id) as laughs
-            from
-                joke
-                join joker on joke.author = joker.id
-            where
-                joke.id = :id
-            ;
-            """,
-            {"id": id},
-        ).fetchone()
+    cursor = db.cursor()
+    joke = cursor.execute(
+        """
+        Select
+            joke.id,
+            joke.text,
+            joker.id as author_id,
+            joker.name as author_name,
+            (select count(*) from laugh where laugh.joke = joke.id) as laughs
+        from
+            joke
+            join joker on joke.author = joker.id
+        where
+            joke.id = :id
+        ;
+        """,
+        {"id": id},
+    ).fetchone()
=
-        if joke is None:
-            raise HTTPException(status_code=404)
+    if joke is None:
+        raise HTTPException(status_code=404)
=
-        return Joke(**joke)
+    return Joke(**joke)
=
=
=@app.put("/jokes/{id}/laugh", status_code=201)
-def laugh(id: int, joker: Annotated[Joker, Depends(get_current_user)]) -> Response:
+def laugh(
+    db: Database, id: int, joker: Annotated[Joker, Depends(get_current_user)]
+) -> Response:
=    """Add a laugh to a funny joke."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        db.execute("Pragma foreign_keys = on")
-        cursor = db.cursor()
+    cursor = db.cursor()
=
-        try:
-            cursor.execute(
-                """
-                Insert into laugh (
-                    joke,
-                    joker
-                ) values (
-                    :joke,
-                    :joker
-                )
-                ;
-                """,
-                {"joke": id, "joker": joker.id},
+    try:
+        cursor.execute(
+            """
+            Insert into laugh (
+                joke,
+                joker
+            ) values (
+                :joke,
+                :joker
=            )
+            ;
+            """,
+            {"joke": id, "joker": joker.id},
+        )
=
-            return Response(status_code=201)
+        return Response(status_code=201)
=
-        except sqlite3.IntegrityError as error:
-            if error.args[0] == "UNIQUE constraint failed: laugh.joke, laugh.joker":
-                raise HTTPException(
-                    status_code=200, detail="You've already laughed at this joke"
-                )
-            elif error.args[0] == "FOREIGN KEY constraint failed":
-                raise HTTPException(
-                    status_code=404,
-                    detail="Probably no such joke. Or maybe you don't exist. Who can tell?",
-                )
-            else:
-                raise error
+    except sqlite3.IntegrityError as error:
+        if error.args[0] == "UNIQUE constraint failed: laugh.joke, laugh.joker":
+            raise HTTPException(
+                status_code=200, detail="You've already laughed at this joke"
+            )
+        elif error.args[0] == "FOREIGN KEY constraint failed":
+            raise HTTPException(
+                status_code=404,
+                detail="Probably no such joke. Or maybe you don't exist. Who can tell?",
+            )
+        else:
+            raise error
=
=
=@app.delete("/jokes/{id}/laugh", status_code=204)
-def unlaugh(id: int, joker: Annotated[Joker, Depends(get_current_user)]) -> Response:
+def unlaugh(
+    db: Database, id: int, joker: Annotated[Joker, Depends(get_current_user)]
+) -> Response:
=    """Let it be known that the joke is not funny anymore."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        db.execute("Pragma foreign_keys = on")
-        cursor = db.cursor()
-
-        # TODO: Handle a 404 type of situation
-        cursor.execute(
-            """
-            Delete from laugh
-            where
-                joke = :joke and
-                joker = :joker
-            ;
-            """,
-            {"joke": id, "joker": joker.id},
-        )
+    cursor = db.cursor()
+
+    # TODO: Handle a 404 type of situation
+    cursor.execute(
+        """
+        Delete from laugh
+        where
+            joke = :joke and
+            joker = :joker
+        ;
+        """,
+        {"joke": id, "joker": joker.id},
+    )
=
-        return Response(status_code=204)
+    return Response(status_code=204)
=
=
=class JokeSubmission(BaseModel):
@@ -399,26 +418,26 @@ class JokeSubmission(BaseModel):
=
=@app.post("/jokes", status_code=201)
=def new_joke(
-    joke: JokeSubmission, joker: Annotated[Joker, Depends(get_current_user)]
+    db: Database,
+    joke: JokeSubmission,
+    joker: Annotated[Joker, Depends(get_current_user)],
=) -> Response:
=    """Write a new joke."""
-    with sqlite3.connect(DB_PATH) as db:
-        db.row_factory = sqlite3.Row
-        cursor = db.cursor()
-        row = cursor.execute(
-            """
-            Insert into joke (
-                text,
-                author
-            )
-            values (:text, :author)
-            returning id
-            ;
-            """,
-            {"text": joke.text, "author": joker.id},
-        ).fetchone()
+    cursor = db.cursor()
+    row = cursor.execute(
+        """
+        Insert into joke (
+            text,
+            author
+        )
+        values (:text, :author)
+        returning id
+        ;
+        """,
+        {"text": joke.text, "author": joker.id},
+    ).fetchone()
=
-        return Response(headers={"location": f"/jokes/{row['id']}"}, status_code=201)
+    return Response(headers={"location": f"/jokes/{row['id']}"}, status_code=201)
=
=
=if __name__ == "__main__":
index 42a9715..67884b7 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -3,15 +3,21 @@ import main
=from fastapi.testclient import TestClient
=
=
-# TODO: Use dependency injection for DB connections so it can be mocked during tests
-#
-#       Currently the database file is created in the CWD, and the tests are not
-#       isolated. A unique temporary file should be created for every test.
+@pytest.fixture
+def client(tmp_path):
+    """Setup a test client with a unique DB"""
+    db_path = tmp_path / "test-jokes.db"
=
-client = TestClient(main.app)
+    def get_mock_db_path():
+        return db_path
=
+    main.app.dependency_overrides[main.get_db_path] = get_mock_db_path
=
-def test_get_root():
+    with TestClient(main.app) as client:
+        yield client
+
+
+def test_get_root(client):
=    response = client.get("/")
=    assert response.status_code == 200
=    body = response.json()
@@ -22,7 +28,7 @@ def test_get_root():
=    assert "laughs" in body
=
=
-def test_post_jokers():
+def test_post_jokers(client):
=    response = client.post(
=        "/jokers/", json={"name": "Bob", "password": "Bob is funny!"}
=    )

Write a test for authentication

On by Tad Lispy

It depends on a new fixture that registers the joker before they can authenticate. This fixture modifies the test client database. It relies on the fact that PyTest fixtures re-use results from other fixtures, i.e. the client fixture in the test will be the same as the client fixture in registered_joker fixture. See https://docs.pytest.org/en/stable/how-to/fixtures.html#fixtures-can-be-requested-more-than-once-per-test-return-values-are-cached. It's a really nice feature!

This test doesn't currently pass. It uncovered a bug in the POST /token/ endpoint, that is unnecessarily asynchronous and thus can't use SQLite connection injected into it from another thread.

index 67884b7..6362019 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -17,6 +17,18 @@ def client(tmp_path):
=        yield client
=
=
+@pytest.fixture
+def registered_joker(client, request):
+    """Registers the joker with a client and provides it's record"""
+
+    name = request.node.get_closest_marker("register_name").args[0]
+    password = request.node.get_closest_marker("register_password").args[0]
+
+    response = client.post("/jokers/", json={"name": name, "password": password})
+    assert response.status_code == 201, f"Failed to register user: {response.json()}"
+    yield response.json()
+
+
=def test_get_root(client):
=    response = client.get("/")
=    assert response.status_code == 200
@@ -28,7 +40,7 @@ def test_get_root(client):
=    assert "laughs" in body
=
=
-def test_post_jokers(client):
+def test_post_jokers(client, request):
=    response = client.post(
=        "/jokers/", json={"name": "Bob", "password": "Bob is funny!"}
=    )
@@ -39,3 +51,16 @@ def test_post_jokers(client):
=
=
=# TODO: Test authentication. Use a fixture to register new users before authenticating.
+@pytest.mark.register_name("Alice")
+@pytest.mark.register_password("Buhahaha!")
+def test_post_token(client, registered_joker):
+    name = registered_joker["name"]
+    id = registered_joker["id"]
+
+    response = client.post(
+        "/token", data={"username": "Alice", "password": "Buhahaha!"}
+    )
+
+    assert response.status_code == 200
+    assert "access_token" in response.json()
+    assert response.json().get("token_type") == "bearer"

Fix authentication (POST /token/)

On by Tad Lispy

The authentication was implemented as an async function (for no good reason), and it was preventing it from using SQLite connection injected.

index 963af57..afc6850 100644
--- a/main.py
+++ b/main.py
@@ -150,7 +150,7 @@ async def get_current_user(token: Annotated[str, Depends(security)], db: Databas
=
=
=@app.post("/token")
-async def authenticate(
+def authenticate(
=    form: Annotated[OAuth2PasswordRequestForm, Depends()], db: Database
=) -> Token:
=    """Get a token based on username and password."""

Fix a warning about lifespan

On by Tad Lispy

It needs to be decorated with contextlib.asynccontextmanager for some reason.

index afc6850..d85f87c 100644
--- a/main.py
+++ b/main.py
@@ -10,6 +10,7 @@ import sqlite3
=from datetime import datetime, timedelta, timezone
=from pathlib import Path
=from typing import Optional, Annotated
+from contextlib import asynccontextmanager
=
=import jwt
=import uvicorn
@@ -55,6 +56,7 @@ def get_database(path: DBPath) -> sqlite3.Connection:
=Database = Annotated[sqlite3.Connection, Depends(get_database)]
=
=
+@asynccontextmanager
=async def app_lifespan(app: FastAPI):
=    # Below is a homegrown dependency injection
=    #

Fix warnings about unused markers

On by Tad Lispy

I merged name and password markers into a single credentials dict. It's more elegant that way. Then to fix the warning, this new marker is registered in pytest.ini file.

new file mode 100644
index 0000000..0906a51
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,3 @@
+[pytest]
+markers =
+    credentials: used for registering or authenticating a user
index 6362019..0abe3dc 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -21,10 +21,11 @@ def client(tmp_path):
=def registered_joker(client, request):
=    """Registers the joker with a client and provides it's record"""
=
-    name = request.node.get_closest_marker("register_name").args[0]
-    password = request.node.get_closest_marker("register_password").args[0]
+    credentials = request.node.get_closest_marker("credentials").kwargs
+    assert "name" in credentials
+    assert "password" in credentials
=
-    response = client.post("/jokers/", json={"name": name, "password": password})
+    response = client.post("/jokers/", json=credentials)
=    assert response.status_code == 201, f"Failed to register user: {response.json()}"
=    yield response.json()
=
@@ -51,8 +52,7 @@ def test_post_jokers(client, request):
=
=
=# TODO: Test authentication. Use a fixture to register new users before authenticating.
-@pytest.mark.register_name("Alice")
-@pytest.mark.register_password("Buhahaha!")
+@pytest.mark.credentials(name="Alice", password="Buhahaha!")
=def test_post_token(client, registered_joker):
=    name = registered_joker["name"]
=    id = registered_joker["id"]

Write a comment about dependencies from flake.nix

On by Tad Lispy

Initially (and eventually in the future) I want this project ot be compatible with standard, modern Python tooling (which I guess at this time is uv), but being a Nix user and running out of time to deploy it, I opted to manage Python dependencies using Nix. In the future I'd like to make it work both as a Nix flake and as a standalone Python project.

For context: https://discourse.nixos.org/t/uv2nix-build-develop-python-projects-using-uv-with-nix/58563/11?u=tad-lispy

index 6793d0a..e8bdd96 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,6 +5,9 @@ description = "Joke as a Service"
=readme = "README.md"
=license = "GPL-3.0-only"
=requires-python = ">=3.13"
+# FIXME: The dependencies are currently managed with flake.nix
+#
+#        The section below is redundant and most likely outdated
=dependencies = [
=    "fastapi[standard] (>=0.116.1,<0.117.0)",
=    "pyjwt (>=2.10.1,<3.0.0)",

Remove unused variables from tests

On by Tad Lispy

index 4236bd1..ec54422 100644
--- a/test_joker_registration_validation.py
+++ b/test_joker_registration_validation.py
@@ -44,7 +44,5 @@ def test_blank_password_registration(input, good):
=    if good:
=        JokerRegistration(**payload)
=    else:
-        with pytest.raises(
-            ValidationError, match="A password cannot be blank"
-        ) as exception_info:
+        with pytest.raises(ValidationError, match="A password cannot be blank"):
=            JokerRegistration(**payload)
index 0abe3dc..e643b03 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -54,11 +54,8 @@ def test_post_jokers(client, request):
=# TODO: Test authentication. Use a fixture to register new users before authenticating.
=@pytest.mark.credentials(name="Alice", password="Buhahaha!")
=def test_post_token(client, registered_joker):
-    name = registered_joker["name"]
-    id = registered_joker["id"]
-
=    response = client.post(
-        "/token", data={"username": "Alice", "password": "Buhahaha!"}
+        "/token", data={"username": registered_joker["name"], "password": "Buhahaha!"}
=    )
=
=    assert response.status_code == 200

Remove a stale TODO comment

On by Tad Lispy

index e643b03..7450a12 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -51,7 +51,6 @@ def test_post_jokers(client, request):
=    assert "name" in body
=
=
-# TODO: Test authentication. Use a fixture to register new users before authenticating.
=@pytest.mark.credentials(name="Alice", password="Buhahaha!")
=def test_post_token(client, registered_joker):
=    response = client.post(

Write some more ideas for REST tests

On by Tad Lispy

index 7450a12..fbd6dac 100644
--- a/test_rest.py
+++ b/test_rest.py
@@ -60,3 +60,11 @@ def test_post_token(client, registered_joker):
=    assert response.status_code == 200
=    assert "access_token" in response.json()
=    assert response.json().get("token_type") == "bearer"
+
+
+# TODO: Test posting a joke
+# TODO: Test deleting an own joke
+# TODO: Test deleting a joke from someone else (should fail)
+# TODO: Test laughing
+# TODO: Test un-laughing
+# TODO: Test un-laughing when wasn't laughing anyway (should give 404)

Update Nix dependencies

On by Tad Lispy

index 3bb45b3..a5e32a2 100644
--- a/flake.lock
+++ b/flake.lock
@@ -44,11 +44,11 @@
=        ]
=      },
=      "locked": {
-        "lastModified": 1758897340,
-        "narHash": "sha256-YCakDdXHLJ06xo48RN/w3B9MdS9VEmcBAlQYibocQLw=",
+        "lastModified": 1759497781,
+        "narHash": "sha256-LGguuO6j2Tb3oNOix+oqxU3WfhoaFIEBLuQynSjWmjU=",
=        "owner": "cachix",
=        "repo": "devenv",
-        "rev": "2d0941e76e7d9c3a03d47d431f38cc3dd5f90f98",
+        "rev": "84bb5b2b4e0bb09d21e7172517ee2088ec09ab48",
=        "type": "github"
=      },
=      "original": {
@@ -186,16 +186,16 @@
=        ]
=      },
=      "locked": {
-        "lastModified": 1755029779,
-        "narHash": "sha256-3+GHIYGg4U9XKUN4rg473frIVNn8YD06bjwxKS1IPrU=",
+        "lastModified": 1758763079,
+        "narHash": "sha256-Bx1A+lShhOWwMuy3uDzZQvYiBKBFcKwy6G6NEohhv6A=",
=        "owner": "cachix",
=        "repo": "nix",
-        "rev": "b0972b0eee6726081d10b1199f54de6d2917f861",
+        "rev": "6f0140527c2b0346df4afad7497baa08decb929f",
=        "type": "github"
=      },
=      "original": {
=        "owner": "cachix",
-        "ref": "devenv-2.30.4",
+        "ref": "devenv-2.30.5",
=        "repo": "nix",
=        "type": "github"
=      }

Make nix run tests before installing

On by Tad Lispy

index 9ab8b46..6dcdda6 100644
--- a/flake.nix
+++ b/flake.nix
@@ -25,6 +25,7 @@
=          ps.pytest
=          ps.httpx
=        ]);
+        test_jwt_secret_key="2ac1b726b973bf993096e63264772d859e70b60d05f17eca69e3f39cae2ccf71";
=      in
=
=    {
@@ -33,6 +34,7 @@
=          pname = "jaas";
=          version = "0.1.0";
=          src = ./.;
+          buildInputs = [ python ];
=
=          installPhase = ''
=            mkdir --parents $out/lib $out/bin
@@ -41,6 +43,9 @@
=            echo "${python}/bin/python $out/lib/main.py" >> $out/bin/jaas
=            chmod a+x $out/bin/jaas
=          '';
+
+          doCheck = true;
+          checkPhase = "jaas_jwt_secret_key=${test_jwt_secret_key} pytest";
=        };
=      };
=