アーカイブ
はじめに
前回の記事 では、FastAPI の認証機能を使って API エンドポイントに認証処理を入れていきました。
アクセストークンはユーザー名をそのまま返すような仮実装となっていたため、今回は JWT 形式でアクセストークンを発行し、トークンの偽造や改ざんを防げる形にアップデートしていきます。
※本記事は FastAPI の学習・理解を目的としたものであり、本番利用を前提としたものではない点にご留意ください。
おさらい
前回作成したコードを再掲します。
# main.py
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
}
def verify_password(plain_password: str, hashed_password: str) -> bool:
# 本来は bcrypt や Passlib を使ってハッシュと照合する
return "fakehashed" + plain_password == hashed_password
def get_user(db, username: str) -> UserInDB | None:
if username in db:
return UserInDB(**db[username])
return None
def authenticate_user(username: str, password: str) -> User | None:
user = get_user(fake_users_db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def decode_token(token):
# 本来は JWT の検証やデコードを行う
user = get_user(fake_users_db, token)
if not user:
return None
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return User(**user.model_dump(exclude={"hashed_password"}))
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 便宜上、ユーザー名をアクセストークンとして返す
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: Annotated[User, Depends(get_current_user)]):
return current_user
エンドポイントに認証処理を追加する際のポイントは以下でした。
・OAuth2PasswordBearer を使い、アクセストークンが必要なエンドポイントの Depends に指定する
・OAuth2PasswordRequestForm を使い、ログインおよびアクセストークン発行用のエンドポイントを実装する
準備
これまで main.py に全ての処理を書いていましたが、コードが複雑になってきたため、役割でファイルを分割しておきます。
新しいファイル構成:
/
├── main.py # アプリケーションのエントリーポイント
├── models.py # Pydanticデータモデル(Token, User, UserInDB)
├── database.py # データベース操作(ユーザー取得、フェイクDB)
├── auth.py # 認証機能(パスワード検証、ユーザー認証、トークンデコード)
├── dependencies.py # FastAPI依存関数(get_current_user)
└── routers/
├── __init__.py # routersパッケージ初期化
└── auth.py # 認証関連エンドポイント(/auth/token, /auth/me)
# main.py
# アプリケーションのエントリーポイント
from fastapi import FastAPI
from routers import auth
app = FastAPI()
app.include_router(auth.router)
# models.py
# Pydanticデータモデル(Token, User, UserInDB)
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
# database.py
# データベース操作(ユーザー取得、フェイクDB)
from models import UserInDB
# テスト用のフェイクユーザーデータベース
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
}
def get_user(db: dict, username: str) -> UserInDB | None:
"""データベースからユーザーを取得する"""
if username in db:
return UserInDB(**db[username])
return None
# auth.py
# 認証機能(パスワード検証、ユーザー認証、トークンデコード)
from fastapi.security import OAuth2PasswordBearer
from database import fake_users_db, get_user
from models import User, UserInDB
# 認証スキームの定義
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""パスワードを検証する(本来は bcrypt や Passlib を使ってハッシュと照合する)"""
return "fakehashed" + plain_password == hashed_password
def authenticate_user(username: str, password: str) -> User | None:
"""ユーザー名とパスワードでユーザーを認証する"""
user = get_user(fake_users_db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def decode_token(token: str) -> UserInDB | None:
"""トークンをデコードしてユーザー情報を取得する(本来は JWT の検証やデコードを行う)"""
user = get_user(fake_users_db, token)
if not user:
return None
return user
# dependencies.py
# FastAPI依存関数(get_current_user)
from fastapi import Depends, HTTPException, status
from auth import decode_token, oauth2_scheme
from models import User
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""現在のユーザーを取得する依存関数"""
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return User(**user.model_dump(exclude={"hashed_password"}))
# routers/auth.py
# 認証関連エンドポイント(/auth/token, /auth/me)
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from auth import authenticate_user
from dependencies import get_current_user
from models import Token, User
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
"""ログインしてトークンを取得する"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 便宜上、ユーザー名をアクセストークンとして返す
return Token(access_token=user.username, token_type="bearer")
@router.get("/me")
async def get_current_user_info(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
"""現在のユーザー情報を取得する"""
return current_user
認証系のエンドポイントを /auth/xxx にしたり、アクセストークンのレスポンスをモデル化したりといった細かい違いはありますが、動きは前回から変えていません。
実装と動作確認
以下の手順で、JWT 形式のアクセストークンによる認証を実装していきます。
1. ハッシュ化ライブラリの導入
2. JWT 形式のアクセストークンの発行
3. アクセストークンの検証
ステップ1. ハッシュ化ライブラリの導入
ログイン時のパスワードを検証する際、"fakehashed" を付けることでハッシュ化とみなしている部分を、外部ライブラリを使用して適切にハッシュ化します。
今回は、ハッシュ化ライブラリとして bcrypt を使用します。
※FastAPI の公式ドキュメントでは pathlib(ハッシュ化ライブラリのラッパー) を使用していますが、最新の bcrypt(4.3.0) に対応していないようだったため直接 bcrypt を使いました。
uv add bcrypt
パスワードの検証を bcrypt を使った実装に書き換えます。
# auth.py
import bcrypt
from fastapi.security import OAuth2PasswordBearer
from database import fake_users_db, get_user
from models import User, UserInDB
# 認証スキームの定義
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""パスワードを検証する"""
return bcrypt.checkpw(
plain_password.encode("ascii"), hashed_password.encode("ascii")
)
...
また、テスト用ユーザーのパスワード "secret" をハッシュ化した値を確認し、DB に登録するハッシュ化パスワードも変更しておきます。
$ uv run python
>>> import bcrypt
>>> bcrypt.hashpw(b"secret", bcrypt.gensalt())
b'$2b$12$1vteTkYqeMlREO992pXA8urwLuWibGFeENaPa/qB2/PtUvEpuKCZq'
# database.py
...
# テスト用のフェイクユーザーデータベース
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$1vteTkYqeMlREO992pXA8urwLuWibGFeENaPa/qB2/PtUvEpuKCZq",
"disabled": False,
},
}
...
認証を実行してみます。
認証が通り、パスワードの検証が正しく行えていることを確認できました。
ステップ2. JWT 形式のアクセストークンの発行
JWT の説明はここでは省略しますが、FastAPI 公式ドキュメントでは以下のように記載されています。
JWTとは「JSON Web Tokens」の略称です。
JSONオブジェクトをスペースのない長く密集した文字列で表現したトークンの仕様です。例えば次のようになります:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
これらは暗号化されていないので、誰でもコンテンツから情報を復元できてしまいます。
しかし、トークンは署名されているため、あなたが発行したトークンを受け取った人は、あなたが実際に発行したということを検証できます。
詳しくは JSON Web Tokenの概要 - jwt.io などをご参照ください。
JWT アクセストークンの生成には PyJWT を使います。
※FastAPI 公式ドキュメントの日本語版(0.116.1)では python-jose を使用していましたが、最新の原文(0.117.1)では PyJWT を使用する記述になっていました。
今回は JWT を使用したシンプルな認証のみを試せればよかったため、原文を参照しました。
$ uv add PyJWT
ログイン成功時に JWT アクセストークンを生成し、レスポンスで返すようにします。
# routers/auth.py
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from auth import create_access_token
from dependencies import get_current_user
from models import Token, User
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
"""ログインしてトークンを取得する"""
token = create_access_token(form_data.username, form_data.password)
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return token
...
# auth.py
from datetime import datetime, timedelta, timezone
import bcrypt
import jwt
from fastapi.security import OAuth2PasswordBearer
from database import fake_users_db, get_user
from models import Token, User, UserInDB
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "7a4bc60299c6a69af0aa853014a1a7cf74ac7aecd9f420dc7a41ea16fbd2c266"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
...
def create_jwt_token(data: dict, expires_delta: timedelta | None = None) -> str:
"""JWTトークンを作成する(汎用)"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_access_token(username: str, password: str) -> Token | None:
"""ユーザー認証してアクセストークンを作成する"""
user = authenticate_user(username, password)
if not user:
return None
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_jwt_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
...
SECRET_KEY は、以下のコマンドで生成しています。
$ openssl rand -hex 32
7a4bc60299c6a69af0aa853014a1a7cf74ac7aecd9f420dc7a41ea16fbd2c266
署名アルゴリズムは HS256、アクセストークンの有効期間は 30 分としています。
では、再度認証を試してみましょう。
ログインし、/auth/me を実行してみます。
リクエストヘッダーの Authorization を見ると、JWT 形式のアクセストークンが使用されていることが分かります。
トークンを検証する処理は未実装のため、レスポンスは 401(Unauthorized) で返っています。
ついでに、JSON Web Tokens - jwt.io でトークンをデコードしてみます。
署名アルゴリズム(alg)やトークンの種別(typ)、ユーザー名(sub)や有効期限(exp)の情報が含まれていることが確認できました。
ステップ3. アクセストークンの検証
最後に、アクセストークンを検証してユーザー情報を取得する処理を実装していきます。
# models.py
...
class TokenData(BaseModel):
username: str
...
# auth.py
from datetime import datetime, timedelta, timezone
import bcrypt
import jwt
from fastapi import HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from database import fake_users_db, get_user
from models import Token, TokenData, User, UserInDB
...
def decode_token(token: str) -> UserInDB:
"""トークンをデコードしてユーザー情報を取得する(認証失敗時は例外発生)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except jwt.InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# dependencies.py
from fastapi import Depends
from auth import decode_token, oauth2_scheme
from models import User
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""現在のユーザーを取得する依存関数"""
user = decode_token(token)
return User(**user.model_dump(exclude={"hashed_password"}))
再度 /auth/me を実行してみます。
アクセストークンの検証に成功し、200 でレスポンスが返ってくることを確認できました。
まとめ
FastAPI の標準機能に加えてハッシュ化ライブラリと JWT ライブラリを使用することで、JWT 形式のアクセストークンを使った認証機能を実装することができました。
FastAPI に限った話ではないですが、サードパーティライブラリが豊富で、それらを活用することで素早く実装できるのは Python の良いところだとあらためて感じました。
今回のポイントは以下の2つです。
・ハッシュ化ライブラリを使用し、ユーザーが入力したパスワードを検証する
・JWT ライブラリを使用し、アクセストークンの発行・検証を行う
ここまでで認証機能は実装できましたが、アクセストークンの有効期限が切れるたびにログインしなければいけないのは不便です。
一方で、アクセストークンの有効期限を長くすると不正アクセスの危険性が高まります。
これらの解決方法として、次回はリフレッシュトークンの導入を試みます。
参考資料
・OAuth2 with Password (and hashing), Bearer with JWT tokens - FastAPI
積極採用中!尖ったPythonエンジニアへの第一歩はこちらから