目錄
- 項目環境
- 一、db_config.py
- 二、mysql_executor.py
- 三、test/main.py
在使用mysql-connector-python連接MySQL數據庫的時候,如同Java中的jdbc一般,每條sql需要創建和刪除連接,很自然就想到寫一個抽象方法,但是找了找沒有官方標準的,或者使用SQLAlchemy等類似的orm框架,于是調試deepseek寫了一個
項目背景:【保姆級喂飯教程】uv教程一文講透:安裝,創建,配置,工具,命令
項目環境
安裝庫,
【沉浸式解決問題】mysql-connector-python連接數據庫:RuntimeError: Failed raising error.
uv add mysql-connector-python==8.0.33
目錄,init文件為空
一、db_config.py
class DBConfig:"""數據庫配置基類"""@classmethoddef get_config(cls, name: str) -> dict:"""獲取指定數據庫配置"""return getattr(cls, name).valueclass MySQLConfig(DBConfig):"""MySQL數據庫配置"""# 基礎配置模板BASE = {"pool_name": "mysql_pool","pool_size": 10,"pool_reset_session": True,"charset": "utf8mb4","autocommit": False}# 具體數據庫實例配置PYTHON_DB = {**BASE,"host": "127.0.0.1","port": 3306,"user": "python","password": "python","database": "test-python"}LOG_DB = {**BASE,"host": "log-db.example.com","port": 3307,"user": "log_user","password": "log_password","database": "app_logs"}
二、mysql_executor.py
"""
MySQL 數據庫連接池執行器
提供安全、高效的數據庫操作接口,支持連接池、事務管理和批量操作
"""from mysql.connector import pooling, Error
from typing import Union, List, Tuple, Dict, Anyclass MySQLExecutor:"""MySQL 數據庫操作通用封裝類(連接池版)特性:1. 基于連接池實現高效連接管理2. 支持自動和手動事務控制3. 提供查詢、執行、批量操作等多種方法4. 完善的異常處理和資源管理5. 詳細的日志記錄使用示例:with MySQLExecutor(config) as executor:# 執行查詢results = executor.query("SELECT * FROM users WHERE age > %s", (25,))# 執行寫入操作executor.execute("UPDATE users SET status = 'active' WHERE id = %s", (1,))# 批量操作executor.batch_execute("INSERT INTO logs (message) VALUES (%s)", [('log1',), ('log2',)])"""def __init__(self, config: Dict[str, Any]):"""初始化數據庫執行器參數:config: 數據庫配置字典,必須包含連接池相關參數示例:{"pool_name": "mysql_pool","pool_size": 10,"host": "127.0.0.1","port": 3306,"user": "root","password": "password","database": "test_db"}異常:ConnectionError: 如果連接池創建失敗"""self.config = configself.pool = self._create_pool()self.active_connection = None # 當前活動連接(用于事務)def _create_pool(self) -> pooling.MySQLConnectionPool:"""創建數據庫連接池返回:pooling.MySQLConnectionPool: MySQL連接池實例異常:ConnectionError: 如果連接池創建失敗"""try:print(f"正在創建MySQL連接池: {self.config.get('pool_name', 'unnamed_pool')}")return pooling.MySQLConnectionPool(**self.config)except Error as e:error_msg = f"創建MySQL連接池失敗: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __enter__(self):"""支持with上下文管理返回執行器實例本身"""return selfdef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文時自動關閉活動連接"""self.close_active_connection()def connection(self):"""返回一個連接上下文管理器使用示例:with executor.connection() as conn:with conn.cursor() as cursor:cursor.execute("SELECT * FROM table")results = cursor.fetchall()"""return self._ConnectionContext(self)class _ConnectionContext:"""連接上下文管理器內部類用于自動管理連接的獲取和釋放"""def __init__(self, executor):self.executor = executorself.conn = Nonedef __enter__(self) -> pooling.PooledMySQLConnection:"""進入上下文時從連接池獲取連接"""try:self.conn = self.executor.get_connection()return self.connexcept Error as e:error_msg = f"獲取數據庫連接失敗: {e}"print(error_msg)raise ConnectionError(error_msg) from edef __exit__(self, exc_type, exc_val, exc_tb):"""退出上下文時關閉連接"""if self.conn and self.conn.is_connected():try:self.conn.close()except Error as e:print(f"關閉數據庫連接時出錯: {e}")def get_connection(self) -> pooling.PooledMySQLConnection:"""從連接池獲取一個新的數據庫連接返回:pooling.PooledMySQLConnection: 數據庫連接對象注意:使用后需要手動關閉連接,推薦使用connection()上下文管理器"""return self.pool.get_connection()def close_active_connection(self):"""關閉當前活動連接(如果存在)主要用于清理事務連接"""if self.active_connection and self.active_connection.is_connected():try:self.active_connection.close()self.active_connection = Noneexcept Error as e:print(f"關閉活動連接時出錯: {e}")def start_transaction(self):"""開始一個新的事務異常:ConnectionError: 如果獲取連接失敗"""try:if not self.active_connection or not self.active_connection.is_connected():self.active_connection = self.get_connection()self.active_connection.start_transaction()print("事務已開始")except Error as e:error_msg = f"開始事務失敗: {e}"print(error_msg)raise ConnectionError(error_msg) from edef commit(self):"""提交當前事務并關閉活動連接異常:ConnectionError: 如果提交失敗或沒有活動事務"""if not self.active_connection:print("警告: 嘗試提交但無活動事務")returntry:self.active_connection.commit()print("事務已提交")except Error as e:error_msg = f"提交事務失敗: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def rollback(self):"""回滾當前事務并關閉活動連接異常:ConnectionError: 如果回滾失敗"""if not self.active_connection:print("警告: 嘗試回滾但無活動事務")returntry:self.active_connection.rollback()print("事務已回滾")except Error as e:error_msg = f"回滾事務失敗: {e}"print(error_msg)raise ConnectionError(error_msg) from efinally:self.close_active_connection()def query(self,sql: str,params: Union[Tuple, List, None] = None,dictionary: bool = False) -> Union[List[Tuple], List[Dict]]:"""執行查詢語句(SELECT)參數:sql: SQL查詢語句params: 查詢參數(可選)dictionary: 是否返回字典格式結果(默認為元組)返回:查詢結果列表(元組或字典格式)異常:DatabaseError: 如果查詢執行失敗"""try:with self.connection() as conn:with conn.cursor(dictionary=dictionary) as cursor:cursor.execute(sql, params)return cursor.fetchall()except Error as e:error_msg = f"查詢執行失敗: {e}\nSQL: {sql}\n參數: {params}"print(error_msg)raise DatabaseError(error_msg) from edef execute(self,sql: str,params: Union[Tuple, List, None] = None,commit: bool = True) -> int:"""執行非查詢語句(INSERT/UPDATE/DELETE)參數:sql: SQL操作語句params: 操作參數(可選)commit: 是否自動提交事務(默認為True)返回:受影響的行數異常:DatabaseError: 如果操作執行失敗"""try:with self.connection() as conn:with conn.cursor() as cursor:cursor.execute(sql, params)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"操作執行失敗: {e}\nSQL: {sql}\n參數: {params}"print(error_msg)raise DatabaseError(error_msg) from edef batch_execute(self,sql: str,params_list: List[Union[Tuple, List]],commit: bool = True) -> int:"""批量執行操作(高效)參數:sql: SQL操作語句params_list: 參數列表commit: 是否提交事務(默認為True)返回:受影響的總行數異常:DatabaseError: 如果批量操作失敗ValueError: 如果參數列表為空"""if not params_list:raise ValueError("參數列表不能為空")try:with self.connection() as conn:with conn.cursor() as cursor:cursor.executemany(sql, params_list)affected_rows = cursor.rowcountif commit:conn.commit()return affected_rowsexcept Error as e:error_msg = f"批量操作失敗: {e}\nSQL: {sql}\n參數數量: {len(params_list)}"print(error_msg)# 嘗試回滾事務try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滾批量操作失敗: {rollback_err}")raise DatabaseError(error_msg) from edef execute_many(self,sql_commands: List[str],params_list: List[Union[Tuple, List, None]] = None,commit: bool = True) -> int:"""執行多個SQL命令(可包含不同操作)參數:sql_commands: SQL命令列表params_list: 參數列表(可選,默認為None)commit: 是否提交事務(默認為True)返回:受影響的總行數異常:DatabaseError: 如果執行失敗ValueError: 如果命令和參數長度不匹配"""if params_list is None:params_list = [None] * len(sql_commands)if len(sql_commands) != len(params_list):raise ValueError(f"SQL命令({len(sql_commands)})和參數列表({len(params_list)})長度不一致")try:with self.connection() as conn:with conn.cursor() as cursor:total_affected = 0for i, (sql, params) in enumerate(zip(sql_commands, params_list)):try:cursor.execute(sql, params)total_affected += cursor.rowcountexcept Error as e:# 記錄具體哪個命令失敗error_msg = f"執行第 {i + 1} 條命令失敗: {e}\nSQL: {sql}\n參數: {params}"print(error_msg)raise DatabaseError(error_msg) from eif commit:conn.commit()return total_affectedexcept Error as e:# 回滾整個事務try:if conn and conn.is_connected():conn.rollback()except Error as rollback_err:print(f"回滾操作失敗: {rollback_err}")raise DatabaseError(f"多命令執行失敗: {e}") from e# 自定義異常類,提供更清晰的錯誤分類
class DatabaseError(Exception):"""數據庫操作異常基類"""passclass ConnectionError(DatabaseError):"""數據庫連接相關異常"""pass
三、test/main.py
from mysql.db_config import MySQLConfig
from mysql.mysql_executor import MySQLExecutor, ConnectionError, DatabaseError# 獲取數據庫配置
db_config = MySQLConfig.get_config("PYTHON_DB")try:# 創建執行器(使用with上下文確保資源清理)with MySQLExecutor(db_config) as executor:print("=" * 50)print("示例1: 基本查詢")print("=" * 50)try:# 執行查詢 - 返回字典格式結果users = executor.query("SELECT * FROM users WHERE age > %s",(25,),dictionary=True)print(f"查詢成功,獲取到 {len(users)} 條記錄")if users:print(f"第一條記錄: {users[0]}")except DatabaseError as e:print(f"查詢操作失敗: {e}")# 在實際應用中,這里可以記錄日志或執行恢復操作print("\n" + "=" * 50)print("示例2: 插入數據")print("=" * 50)try:# 插入新用戶new_user = ("Alice", "alice@example.com", 30)insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"affected = executor.execute(insert_sql, new_user)print(f"插入成功,影響行數: {affected}")except DatabaseError as e:print(f"插入操作失敗: {e}")print("\n" + "=" * 50)print("示例3: 批量插入")print("=" * 50)try:# 批量插入用戶users_data = [("Bob", "bob@example.com", 28),("Charlie", "charlie@example.com", 35),("David", "david@example.com", 42)]insert_sql = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"# 批量執行batch_affected = executor.batch_execute(insert_sql, users_data)print(f"批量插入成功,總影響行數: {batch_affected}")except (DatabaseError, ValueError) as e:print(f"批量插入失敗: {e}")print("\n" + "=" * 50)print("示例4: 事務處理")print("=" * 50)try:# 開始事務executor.start_transaction()print("事務已開始")try:# 事務內操作1:更新用戶update_sql = "UPDATE users SET age = %s WHERE id = %s"executor.execute(update_sql, (31, 1), commit=False)print("用戶更新成功")# 事務內操作2:插入日志log_sql = "INSERT INTO activity_log (user_id, action) VALUES (%s, %s)"executor.execute(log_sql, (1, "age_update"), commit=False)print("日志插入成功")# 查詢事務內數據user_data = executor.query("SELECT * FROM users WHERE id = %s",(1,),dictionary=True)print(f"事務內用戶數據: {user_data}")# 提交事務executor.commit()print("事務操作成功完成")except DatabaseError as e:# 事務內部操作失敗,回滾事務print(f"事務內操作失敗: {e}")executor.rollback()print("已回滾事務")raise # 繼續向上拋出異常except (DatabaseError, ConnectionError) as e:print(f"事務處理失敗: {e}")print("\n" + "=" * 50)print("示例5: 復雜操作(多個SQL命令)")print("=" * 50)try:# 轉賬操作(多個SQL命令)sql_commands = ["UPDATE accounts SET balance = balance - 100 WHERE id = 1","UPDATE accounts SET balance = balance + 100 WHERE id = 2","INSERT INTO transactions (from_acc, to_acc, amount) VALUES (1, 2, 100)"]# 執行多個命令total_affected = executor.execute_many(sql_commands)print(f"轉賬操作完成,總影響行數: {total_affected}")except (DatabaseError, ValueError) as e:print(f"多命令執行失敗: {e}")print("\n" + "=" * 50)print("示例6: 直接使用連接上下文")print("=" * 50)try:# 直接使用連接上下文with executor.connection() as conn:with conn.cursor(dictionary=True) as cursor:cursor.execute("SELECT * FROM users WHERE status = 'active'")active_users = cursor.fetchall()print(f"活躍用戶數量: {len(active_users)}")# 同一個連接中執行另一個操作with conn.cursor() as cursor:cursor.execute("UPDATE users SET last_login = NOW() WHERE status = 'active'")conn.commit()print(f"更新了 {cursor.rowcount} 個用戶的登錄時間")except (ConnectionError, DatabaseError) as e:print(f"連接上下文操作失敗: {e}")except ConnectionError as e:print(f"數據庫連接初始化失敗: {e}")# 在實際應用中,這里應該進行更嚴格的錯誤處理
finally:print("所有數據庫操作完成")