Commits: 1

WIP: Implement OAuth2 password authentication flow

The tokens issued are just usernames, so very insecure.

index 58c1ec2..9a65d97 100644
--- a/main.py
+++ b/main.py
@@ -4,9 +4,10 @@ A sample REST API service.
=You can use it to teach concepts of REST.
="""
=
-from fastapi import FastAPI, HTTPException, Response
+from fastapi import FastAPI, HTTPException, Response, Depends
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
=from pydantic import BaseModel
-from typing import Optional
+from typing import Optional, Annotated
=import sqlite3
=import logging
=
@@ -16,19 +17,87 @@ app = FastAPI(
=)
=
=logger = logging.getLogger("uvicorn.error")
-
-@app.on_event("startup")
-def init():
-    logger.info("Something something init. Now what?")
+security = OAuth2PasswordBearer(tokenUrl="token")
=
=
=class Joke(BaseModel):
=    """A joke."""
+
=    id: Optional[int]
=    text: str
=    author: str
=
=
+class Joker(BaseModel):
+    """A user of JaaS."""
+
+    id: Optional[int]
+    name: str
+
+
+async def get_current_user(token: Annotated[str, Depends(security)]):
+    """Provide the current user if logged in, otherwise err.
+
+    It's a dependency that can be used on endpoints that require
+    authentication.
+    """
+    with sqlite3.connect("jokes.db") as db:
+        db.row_factory = sqlite3.Row
+        cursor = db.cursor()
+        joker = cursor.execute("""
+        Select
+            joker.id,
+            joker.name
+        from
+            joker
+        where
+            joker.name = :username
+        ;
+        """, {"username": token}).fetchone()
+
+        if joker is None:
+            raise HTTPException(
+                status_code=400,
+                detail="Invalid joker",
+                headers={"WWW-Authenticate": "Bearer"},
+            )
+
+        return Joker(**joker)
+
+
+@app.post("/token")
+async def authenticate(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
+    """Get a token based on username and password."""
+    with sqlite3.connect("jokes.db") as db:
+        db.row_factory = sqlite3.Row
+        cursor = db.cursor()
+        joker = cursor.execute("""
+        Select
+            joker.id,
+            joker.name
+        from
+            joker
+        where
+            joker.name = :username and joker.password = :password
+        ;
+        """, {"username": form.username, "password": form.password}).fetchone()
+
+        if joker is None:
+            raise HTTPException(
+                status_code=400,
+                detail="Incorrect username or password"
+            )
+
+        token = {"access_token": joker["name"], "token_type": "bearer"}
+
+        return token
+
+
+@app.get("/me")
+def get_current_joker(user: Annotated[Joker, Depends(get_current_user)]) -> Joker:
+    """Give the user information about them. Requires authentication."""
+    return user
+
=
=@app.get("/")
=def get_random_joke() -> Joke: