fastapi-langgraph
https://github.com/fanqingsong/fastapi-langgraph
A production-ready FastAPI template for building AI agent applications with LangGraph integration. This template provides a robust foundation for building scalable, secure, and maintainable AI agent services.
-
Production-Ready Architecture
- FastAPI for high-performance async API endpoints
- LangGraph integration for AI agent workflows
- Langfuse for LLM observability and monitoring
- Structured logging with environment-specific formatting
- Rate limiting with configurable rules
- PostgreSQL for data persistence
- Docker and Docker Compose support
- Prometheus metrics and Grafana dashboards for monitoring
-
Security
- JWT-based authentication
- Session management
- Input sanitization
- CORS configuration
- Rate limiting protection
-
Developer Experience
- Environment-specific configuration
- Comprehensive logging system
- Clear project structure
- Type hints throughout
- Easy local development setup
-
Model Evaluation Framework
- Automated metric-based evaluation of model outputs
- Integration with Langfuse for trace analysis
- Detailed JSON reports with success/failure metrics
- Interactive command-line interface
- Customizable evaluation metrics
"""Chatbot API endpoints for handling chat interactions.This module provides endpoints for chat interactions, including regular chat, streaming chat, message history management, and chat history clearing. """import json from typing import Listfrom fastapi import (APIRouter,Depends,HTTPException,Request, ) from fastapi.responses import StreamingResponsefrom app.api.v1.auth import get_current_session from app.core.config import settings from app.core.langgraph.graph import LangGraphAgent from app.core.limiter import limiter from app.core.logging import logger from app.models.session import Session from app.schemas.chat import (ChatRequest,ChatResponse,Message,StreamResponse, )router = APIRouter() agent = LangGraphAgent()@router.post("/chat", response_model=ChatResponse) @limiter.limit(settings.RATE_LIMIT_ENDPOINTS["chat"][0]) async def chat(request: Request,chat_request: ChatRequest,session: Session = Depends(get_current_session), ):"""Process a chat request using LangGraph.Args:request: The FastAPI request object for rate limiting.chat_request: The chat request containing messages.session: The current session from the auth token.Returns:ChatResponse: The processed chat response.Raises:HTTPException: If there's an error processing the request."""try:logger.info("chat_request_received",session_id=session.id,message_count=len(chat_request.messages),)# Process the request through the LangGraphresult = await agent.get_response(chat_request.messages, session.id, user_id=session.user_id)logger.info("chat_request_processed", session_id=session.id)return ChatResponse(messages=result)except Exception as e:logger.error("chat_request_failed", session_id=session.id, error=str(e), exc_info=True)raise HTTPException(status_code=500, detail=str(e))@router.post("/chat/stream") @limiter.limit(settings.RATE_LIMIT_ENDPOINTS["chat_stream"][0]) async def chat_stream(request: Request,chat_request: ChatRequest,session: Session = Depends(get_current_session), ):"""Process a chat request using LangGraph with streaming response.Args:request: The FastAPI request object for rate limiting.chat_request: The chat request containing messages.session: The current session from the auth token.Returns:StreamingResponse: A streaming response of the chat completion.Raises:HTTPException: If there's an error processing the request."""try:logger.info("stream_chat_request_received",session_id=session.id,message_count=len(chat_request.messages),)async def event_generator():"""Generate streaming events.Yields:str: Server-sent events in JSON format.Raises:Exception: If there's an error during streaming."""try:full_response = ""async for chunk in agent.get_stream_response(chat_request.messages, session.id, user_id=session.user_id):full_response += chunkresponse = StreamResponse(content=chunk, done=False)yield f"data: {json.dumps(response.model_dump())}\n\n"# Send final message indicating completionfinal_response = StreamResponse(content="", done=True)yield f"data: {json.dumps(final_response.model_dump())}\n\n"except Exception as e:logger.error("stream_chat_request_failed",session_id=session.id,error=str(e),exc_info=True,)error_response = StreamResponse(content=str(e), done=True)yield f"data: {json.dumps(error_response.model_dump())}\n\n"return StreamingResponse(event_generator(), media_type="text/event-stream")except Exception as e:logger.error("stream_chat_request_failed",session_id=session.id,error=str(e),exc_info=True,)raise HTTPException(status_code=500, detail=str(e))@router.get("/messages", response_model=ChatResponse) @limiter.limit(settings.RATE_LIMIT_ENDPOINTS["messages"][0]) async def get_session_messages(request: Request,session: Session = Depends(get_current_session), ):"""Get all messages for a session.Args:request: The FastAPI request object for rate limiting.session: The current session from the auth token.Returns:ChatResponse: All messages in the session.Raises:HTTPException: If there's an error retrieving the messages."""try:messages = await agent.get_chat_history(session.id)return ChatResponse(messages=messages)except Exception as e:logger.error("get_messages_failed", session_id=session.id, error=str(e), exc_info=True)raise HTTPException(status_code=500, detail=str(e))@router.delete("/messages") @limiter.limit(settings.RATE_LIMIT_ENDPOINTS["messages"][0]) async def clear_chat_history(request: Request,session: Session = Depends(get_current_session), ):"""Clear all messages for a session.Args:request: The FastAPI request object for rate limiting.session: The current session from the auth token.Returns:dict: A message indicating the chat history was cleared."""try:await agent.clear_chat_history(session.id)return {"message": "Chat history cleared successfully"}except Exception as e:logger.error("clear_chat_history_failed", session_id=session.id, error=str(e), exc_info=True)raise HTTPException(status_code=500, detail=str(e))