一、FastAPI概述
1.1 什么是FastAPI
FastAPI is a modern, high-performance Python web framework designed for building APIs. It’s rapidly gaining popularity due to its ease of use, speed, and powerful features. Built on top of Starlette, FastAPI leverages asynchronous programming and type hints to deliver exceptional performance while maintaining code readability.
FastAPI 是一個現代的、高性能的 Python 網絡框架,專為構建 API 而設計。由于其易用性、速度和強大的功能,它正迅速受到歡迎。FastAPI 構建于 Starlette 之上,利用異步編程和類型提示來實現卓越的性能,同時保持代碼的可讀性。
Key Features of FastAPI
- High Performance: FastAPI is incredibly fast, often matching the performance of Node.js and Go frameworks.
- Fast to Code: Type hints and automatic data validation significantly reduce development time.
- Easy to Use: The intuitive syntax and clear documentation make it accessible to developers of all levels.
- Rich Feature Set: Includes data validation, automatic documentation, dependency injection, and more.
- Data Validation: Ensures data integrity by automatically validating incoming data based on Python type hints.
- Automatic Documentation: Generates interactive API documentation using Swagger UI, making it easy to understand and test APIs.
- Dependency Injection: Manages dependencies efficiently, improving code organisation and testability.
- Asynchronous Support: Handles multiple requests concurrently, enhancing performance and scalability.
FastAPI 的主要特性
- 高性能: FastAPI 極其快速,通常能與 Node.js 和 Go 框架的性能相媲美。
- 快速編碼: 類型提示和自動數據驗證顯著減少了開發時間。
- 易于使用: 直觀的語法和清晰的文檔使其對各級開發者都易于上手。
- 豐富的功能集: 包括數據驗證、自動文檔生成、依賴注入等。
- 數據驗證: 通過基于 Python 類型提示自動驗證傳入數據來確保數據完整性。
- 自動文檔生成: 使用 Swagger UI 生成交互式 API 文檔,便于理解和測試 API。
- 依賴注入: 高效管理依賴項,改善代碼組織和可測試性。
- 異步支持: 并發處理多個請求,提高性能和可擴展性。
1.2 創建基本的FastAPI應用
from fastapi import FastAPIapp = FastAPI()@app.get("/")
async def root():return {"message": "Hello, World!"}
This code defines a basic FastAPI application with a single endpoint, /. When a GET request is made to this endpoint, the function root is called, and it returns a JSON response containing the message “Hello, World!”.
這段代碼定義了一個基本的 FastAPI 應用程序,其中包含一個單一的端點 / 。當向此端點發出 GET 請求時,會調用 root 函數,該函數會返回一個包含消息“Hello, World!”的 JSON 響應。
1.3 使用pydantic驗證數據
FastAPI seamlessly integrates with Pydantic for robust data validation. Let’s create a model to validate incoming data:
FastAPI 與 Pydantic 兼容性極強,能夠實現強大的數據驗證功能。接下來,我們來創建一個模型以驗證傳入的數據:
from fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI()class Item(BaseModel):name: strprice: floatis_available: bool@app.post("/items/")
async def create_item(item: Item):return item
In this example, the Item model defines the expected data structure for incoming requests. The create_item function automatically validates the incoming data against the Item model. If the data is invalid, FastAPI will return an appropriate error response.
在這個示例中,Item 模型定義了接收請求時所期望的數據結構。create_item 函數會自動根據 Item 模型對傳入的數據進行驗證。如果數據無效,FastAPI 將返回相應的錯誤響應。
FastAPI is a powerful and efficient framework for building APIs with Python. Its focus on performance, ease of use, and data validation makes it an excellent choice for a wide range of projects. In the following sections, we will explore advanced topics like database integration, authentication, and deployment to build a complete web application using FastAPI, React, and MongoDB.
FastAPI 是一個基于 Python 語言的強大且高效的構建 API 的框架。它在性能、易用性和數據驗證方面的出色表現使其成為眾多項目的理想選擇。在接下來的章節中,我們將探討一些高級主題,如數據庫集成、認證以及部署,以使用 FastAPI、React 和 MongoDB 構建一個完整的網絡應用程序。
二、創建后端項目
2.1 目錄結構
fastapi_project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ └── routes.py
├── requirements.txt
2.2 模型
app/models.py
from pydantic import BaseModelclass Item(BaseModel):id: strname: strdescription: str | None = Noneprice: floattax: float = 10.5tags: list[str] = []
2.3 數據庫
app/database.py
import motor.motor_asyncioclient = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")async def get_database():db = client["fastapi_db"]return db
2.4 路由
app/routes.py
from fastapi import FastAPI
from app import routesapp = FastAPI()app.include_router(routes.router)if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
三、請求處理
3.1 路徑參數
服務端:
from fastapi import FastAPI, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 連接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允許所有來源訪問
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定義博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 創建一個博客
@app.post("/blogs/")
async def create_blog(blog: Blog):result = await collection.insert_one(blog.dict())return {"_id": str(result.inserted_id)}# 獲取所有博客
@app.get("/blogs/")
async def get_blogs():blogs = await collection.find().to_list(length=100)return [{"_id": str(blog["_id"]), **blog} for blog in blogs]# 通過路徑參數獲取單個博客
@app.get("/blogs/{blog_id}")
async def get_blog(blog_id: str):blog = await collection.find_one({"_id": ObjectId(blog_id)})if not blog:raise HTTPException(status_code=404, detail="Blog not found")return {"_id": str(blog["_id"]), **blog}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
React渲染:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [selectedBlog, setSelectedBlog] = useState(null);const [newBlog, setNewBlog] = useState({ title: '', content: '', author: '', created_at: '' });useEffect(() => {// 請求所有博客fetch('http://127.0.0.1:8080/blogs/').then(response => response.json()).then(data => setBlogs(data)).catch(error => console.error('Error fetching blogs:', error));}, []);const handleBlogClick = (blogId) => {// 請求單個博客fetch(`http://127.0.0.1:8080/blogs/${blogId}`).then(response => response.json()).then(data => setSelectedBlog(data)).catch(error => console.error('Error fetching blog:', error));};const handleInputChange = (e) => {const { name, value } = e.target;setNewBlog(prev => ({ ...prev, [name]: value }));};const handleSubmit = (e) => {e.preventDefault();fetch('http://127.0.0.1:8080/blogs/', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(newBlog)}).then(response => response.json()).then(data => {setBlogs(prev => [...prev, { ...newBlog, _id: data._id }]);setNewBlog({ title: '', content: '', author: '', created_at: '' });}).catch(error => console.error('Error submitting blog:', error));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card" onClick={() => handleBlogClick(blog._id)}><h2>{blog.title}</h2></div>))}</div>{selectedBlog && (<div className="blog-detail"><h2>{selectedBlog.title}</h2><p>{selectedBlog.content}</p><p>作者: {selectedBlog.author}</p><p>創建時間: {selectedBlog.created_at}</p></div>)}<h2>新增博客</h2><form onSubmit={handleSubmit}><inputtype="text"name="title"value={newBlog.title}onChange={handleInputChange}placeholder="標題"required/><textareaname="content"value={newBlog.content}onChange={handleInputChange}placeholder="內容"required/><inputtype="text"name="author"value={newBlog.author}onChange={handleInputChange}placeholder="作者"required/><inputtype="text"name="created_at"value={newBlog.created_at}onChange={handleInputChange}placeholder="創建時間"required/><button type="submit">提交</button></form></header></div>);
}export default App;
樣式App.css
* {margin: 0;padding: 0;box-sizing: border-box;
}.App {text-align: center;
}.App-header {background-color: #282c34;min-height: 100vh;display: flex;flex-direction: column;align-items: center;justify-content: center;font-size: calc(10px + 2vmin);color: white;
}.blog-card {background-color: #333;padding: 20px;margin: 10px 0;border-radius: 8px;box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);cursor: pointer;
}.blog-card h2 {margin-top: 0;
}.blog-card p {margin: 10px 0;
}.blog-detail {background-color: #444;padding: 20px;margin: 10px 0;border-radius: 8px;box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);
}.blog-detail h2 {margin-top: 0;
}.blog-detail p {margin: 10px 0;
}form {display: flex;flex-direction: column;align-items: center;
}input, textarea {margin: 10px 0;padding: 10px;border: 1px solid #ccc;border-radius: 4px;width: 80%;
}button {padding: 10px 20px;background-color: #61dafb;border: none;border-radius: 4px;cursor: pointer;
}button:hover {background-color: #007bff;
}
3.2 查詢參數
后端代碼:
from fastapi import FastAPI, HTTPException, Query
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 連接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允許所有來源訪問
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定義博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 寫入100條測試數據
async def create_test_data():for i in range(100):blog = Blog(title=f"測試博客 {i + 1}",content=f"這是第 {i + 1} 篇博客的內容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化時創建測試數據
@app.on_event("startup")
async def startup_event():await create_test_data()# 分頁查詢博客
@app.get("/blogs/")
async def get_blogs(page: int = Query(1, ge=1),page_size: int = Query(10, ge=1, le=100)
):skip = (page - 1) * page_sizeblogs = await collection.find().skip(skip).limit(page_size).to_list(length=page_size)total = await collection.count_documents({})total_pages = (total + page_size - 1) // page_sizedata = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return {"blogs": data,"total": total,"page": page,"page_size": page_size,"total_pages": total_pages}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代碼:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [pagination, setPagination] = useState({page: 1,pageSize: 10,totalPages: 1,total: 0});useEffect(() => {fetchBlogs();}, [pagination.page, pagination.pageSize]);const fetchBlogs = () => {fetch(`http://127.0.0.1:8080/blogs/?page=${pagination.page}&page_size=${pagination.pageSize}`).then(response => response.json()).then(data => {setBlogs(data.blogs);setPagination(prev => ({...prev,totalPages: data.total_pages,total: data.total}));}).catch(error => console.error('Error fetching blogs:', error));};const handlePageChange = (newPage) => {if (newPage >= 1 && newPage <= pagination.totalPages) {setPagination(prev => ({ ...prev, page: newPage }));}};const handlePageSizeChange = (e) => {const newPageSize = parseInt(e.target.value);setPagination(prev => ({...prev,page: 1,pageSize: newPageSize}));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>創建時間: {blog.created_at}</p></div>))}</div><div className="pagination"><button onClick={() => handlePageChange(pagination.page - 1)} disabled={pagination.page === 1}>上一頁</button><span>{pagination.page}</span><button onClick={() => handlePageChange(pagination.page + 1)} disabled={pagination.page === pagination.totalPages}>下一頁</button><select value={pagination.pageSize} onChange={handlePageSizeChange}><option value="10">10條/頁</option><option value="20">20條/頁</option><option value="50">50條/頁</option></select><p>總共 {pagination.total} 條,共 {pagination.totalPages} 頁</p></div></header></div>);
}export default App;
3.3 JSON參數
后端代碼:
from fastapi import FastAPI, Request, Query, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 連接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允許所有來源訪問
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定義博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 寫入100條測試數據
async def create_test_data():await collection.delete_many({})for i in range(3):blog = Blog(title=f"測試博客 {i + 1}",content=f"這是第 {i + 1} 篇博客的內容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化時創建測試數據
@app.on_event("startup")
async def startup_event():await create_test_data()# 創建一個博客
@app.post("/blogs/")
async def create_blog(blog: Blog):result = await collection.insert_one(blog.dict())return {"_id": str(result.inserted_id), **blog.dict()}# 獲取所有博客
@app.get("/blogs/")
async def get_blogs():blogs = await collection.find().to_list(length=100)data = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return dataif __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代碼:
import { useState, useEffect } from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [newBlog, setNewBlog] = useState({ title: '', content: '', author: '', created_at: '' });useEffect(() => {fetchBlogs();}, []);const fetchBlogs = () => {fetch('http://127.0.0.1:8080/blogs/').then(response => response.json()).then(data => setBlogs(data)).catch(error => console.error('Error fetching blogs:', error));};const handleInputChange = (e) => {const { name, value } = e.target;setNewBlog(prev => ({ ...prev, [name]: value }));};const handleSubmit = (e) => {e.preventDefault();fetch('http://127.0.0.1:8080/blogs/', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(newBlog)}).then(response => response.json()).then(data => {setBlogs(prev => [...prev, data]);setNewBlog({ title: '', content: '', author: '', created_at: '' });fetchBlogs();}).catch(error => console.error('Error submitting blog:', error));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>創建時間: {blog.created_at}</p></div>))}</div><h2>新增博客</h2><form onSubmit={handleSubmit}><inputtype="text"name="title"value={newBlog.title}onChange={handleInputChange}placeholder="標題"required/><textareaname="content"value={newBlog.content}onChange={handleInputChange}placeholder="內容"required/><inputtype="text"name="author"value={newBlog.author}onChange={handleInputChange}placeholder="作者"required/><inputtype="text"name="created_at"value={newBlog.created_at}onChange={handleInputChange}placeholder="創建時間"required/><button type="submit">提交</button></form></header></div>);
}export default App;
3.4 請求頭參數
后端代碼:
from fastapi import FastAPI, Request, Query, HTTPException
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from bson import ObjectId
from fastapi.middleware.cors import CORSMiddlewareapp = FastAPI()# MongoDB 連接
client = AsyncIOMotorClient("mongodb://zhangdapeng:zhangdapeng520@localhost:27017")
db = client["blogdb"]
collection = db["blogs"]# 允許所有來源訪問
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 定義博客模型
class Blog(BaseModel):title: strcontent: strauthor: strcreated_at: str# 寫入100條測試數據
async def create_test_data():await collection.delete_many({})for i in range(100):blog = Blog(title=f"測試博客 {i + 1}",content=f"這是第 {i + 1} 篇博客的內容",author=f"作者 {i + 1}",created_at="2025-05-10 12:33:33")await collection.insert_one(blog.dict())# 初始化時創建測試數據
@app.on_event("startup")
async def startup_event():await create_test_data()# 分頁查詢博客
@app.get("/blogs/")
async def get_blogs(request: Request,page: int = Query(1, ge=1),page_size: int = Query(10, ge=1, le=100)
):# 從請求頭中獲取參數api_key = request.headers.get("X-API-Key")if not api_key or api_key != "your_api_key":raise HTTPException(status_code=401, detail="Invalid API Key")# 分頁查詢skip = (page - 1) * page_sizeblogs = await collection.find().skip(skip).limit(page_size).to_list(length=page_size)total = await collection.count_documents({})total_pages = (total + page_size - 1) // page_sizedata = []for blog in blogs:blog["_id"] = str(blog["_id"])data.append(blog)return {"blogs": data,"total": total,"page": page,"page_size": page_size,"total_pages": total_pages}if __name__ == '__main__':import uvicornuvicorn.run(app, host='0.0.0.0', port=8080)
前端代碼:
import {useState, useEffect} from 'react';
import './App.css';function App() {const [blogs, setBlogs] = useState([]);const [apiKey, setApiKey] = useState('your_api_key');const [pagination, setPagination] = useState({page: 1,pageSize: 10,totalPages: 1,total: 0});useEffect(() => {fetchBlogs();}, [pagination.page, pagination.pageSize]);const fetchBlogs = () => {fetch(`http://127.0.0.1:8080/blogs/?page=${pagination.page}&page_size=${pagination.pageSize}`, {headers: {'X-API-Key': apiKey}}).then(response => response.json()).then(data => {setBlogs(data.blogs);setPagination(prev => ({...prev,totalPages: data.total_pages,total: data.total}));}).catch(error => console.error('Error fetching blogs:', error));};const handlePageChange = (newPage) => {if (newPage >= 1 && newPage <= pagination.totalPages) {setPagination(prev => ({...prev, page: newPage}));}};const handlePageSizeChange = (e) => {const newPageSize = parseInt(e.target.value);setPagination(prev => ({...prev,page: 1,pageSize: newPageSize}));};return (<div className="App"><header className="App-header"><h1>博客列表</h1><div>{blogs.map(blog => (<div key={blog._id} className="blog-card"><h2>{blog.title}</h2><p>{blog.content}</p><p>作者: {blog.author}</p><p>創建時間: {blog.created_at}</p></div>))}</div><div className="pagination"><button onClick={() => handlePageChange(pagination.page - 1)} disabled={pagination.page === 1}>上一頁</button><span>{pagination.page}</span><button onClick={() => handlePageChange(pagination.page + 1)}disabled={pagination.page === pagination.totalPages}>下一頁</button><select value={pagination.pageSize} onChange={handlePageSizeChange}><option value="10">10條/頁</option><option value="20">20條/頁</option><option value="50">50條/頁</option></select><p>總共 {pagination.total} 條,共 {pagination.totalPages} 頁</p></div></header></div>);
}export default App;
總結
源滾滾編程提供全套的PDF文檔,配套源代碼,錄播課,私教課和直播課,關注并私信我咨詢獲取。