多線程爬蟲中實現線程安全的MySQL連接池
在日常開發中,數據庫操作頻繁建立/關閉連接會帶來性能損耗,尤其在多線程場景中更容易出現連接復用、阻塞等問題。因此,本文介紹如何使用 Python 封裝一個 線程安全的 MySQL 連接池,并通過 threading
模擬多線程高并發操作數據庫。
一、項目背景
- 目標: 封裝一個通用的、可復用的、線程安全的 MySQL 連接池類
- 實現: 使用
DBUtils
的PooledDB
實現底層連接池邏輯,支持最大連接數、最小緩存連接數等參數 - 特點:
- 多線程安全
- 自動釋放連接
- 封裝常用的 CRUD 操作
- 內置異常處理與日志輸出
二、依賴準備
pip install pymysql DBUtils
三、連接池封裝代碼(ConnectionPool)
# CoreUtils/Sql.py
import pymysql
import logging
import traceback
from DBUtils.PooledDB import PooledDBclass ConnectionPool:"""多線程同步連接池"""def __init__(self, host, database, user=None, password=None,port=3306, charset="utf8mb4", max_connections=10,min_cached=2, max_cached=5, blocking=True):"""初始化連接池"""self._pool = PooledDB(creator=pymysql,maxconnections=max_connections,mincached=min_cached,maxcached=max_cached,blocking=blocking,host=host,port=port,user=user,password=password,database=database,charset=charset,use_unicode=True,cursorclass=pymysql.cursors.DictCursor,autocommit=True)def _get_conn(self):return self._pool.connection()def query(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchall()finally:cursor.close()conn.close()def get(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.fetchone()finally:cursor.close()conn.close()def execute(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:return cursor.execute(sql, params)except Exception as e:traceback.print_exc()logging.error(f"SQL執行錯誤: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def insert(self, sql, params=None):conn = self._get_conn()cursor = conn.cursor()try:cursor.execute(sql, params)return cursor.lastrowidexcept Exception as e:logging.error(f"插入出錯: {e}\nSQL: {sql}\nParams: {params}")raisefinally:cursor.close()conn.close()def table_has(self, table_name, field, value):sql = f"SELECT {field} FROM {table_name} WHERE {field}=%s LIMIT 1"return self.get(sql, value)def table_insert(self, table_name, item: dict):fields = list(item.keys())values = list(item.values())placeholders = ','.join(['%s'] * len(fields))field_list = ','.join(fields)sql = f"INSERT INTO {table_name} ({field_list}) VALUES ({placeholders})"try:return self.execute(sql, values)except pymysql.MySQLError as e:if e.args[0] == 1062:logging.warning("重復插入被跳過")else:logging.error("插入數據出錯: %s\n數據: %s", e, item)raisedef table_update(self, table_name, updates: dict, field_where: str, value_where):set_clause = ', '.join([f"{k}=%s" for k in updates])values = list(updates.values()) + [value_where]sql = f"UPDATE {table_name} SET {set_clause} WHERE {field_where}=%s"self.execute(sql, values)
四、多線程測試代碼
以下代碼通過 10 個線程并發對數據庫進行增刪改查,測試連接池的穩定性與日志輸出的整潔性。
import threading
import logging
from CoreUtils.Sql import ConnectionPool# 日志格式配置
logging.basicConfig(level=logging.INFO,format='[%(asctime)s][%(levelname)s][%(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S'
)# 初始化連接池
pool = ConnectionPool(host='localhost',database='test_db',user='root',password='your_password',port=3306
)def test_multithread():def thread_task(i):name = f"User-{i}"age = 20 + itry:pool.insert("INSERT INTO test_table (name, age) VALUES (%s, %s)", (name, age))logging.info(f"插入成功: {name}")data = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"查詢結果: {data}")pool.execute("UPDATE test_table SET age = age + 1 WHERE name = %s", (name,))logging.info("年齡更新完成")updated = pool.get("SELECT * FROM test_table WHERE name=%s", (name,))logging.info(f"更新后數據: {updated}")pool.execute("DELETE FROM test_table WHERE name=%s", (name,))logging.info("刪除完成")pool.table_insert("test_table", {"name": f"{name}_new", "age": age})logging.info("table_insert 成功")exists = pool.table_has("test_table", "name", f"{name}_new")logging.info(f"table_has 檢查: {exists is not None}")pool.table_update("test_table", {"age": 99}, "name", f"{name}_new")logging.info("table_update 完成")except Exception as e:logging.exception("線程異常")threads = []for i in range(10):t = threading.Thread(target=thread_task, args=(i,), name=f"Thread-{i}")threads.append(t)t.start()for t in threads:t.join()logging.info("多線程測試完成")if __name__ == "__main__":test_multithread()
五、準備測試數據表
CREATE TABLE test_table (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(100) UNIQUE NOT NULL,age INT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
六、運行結果示意
運行后,你將看到類似如下整齊的日志輸出:
你可以根據實際項目需要將日志輸出到文件中(通過 filename='xxx.log'
配置 logging.basicConfig()
)。
七、總結
本文介紹了一個線程安全的 MySQL 連接池封裝方式,并通過多線程場景驗證其并發穩定性。在高并發讀寫、日志整潔輸出、連接復用等方面表現良好,適用于中小型 Python 爬蟲項目中的數據庫訪問層封裝。