提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
文章目錄
- 前言
- 一、mysql連接池?
- 二、使用步驟
- 1.引入庫
前言
提示:這里可以添加本文要記錄的大概內容:
例如:
提示:以下是本篇文章正文內容,下面案例可供參考
一、mysql連接池?
安裝包 DBUtils
pip install DBUtils==1.3
二、使用步驟
1.引入庫
代碼如下(示例):
# -*- coding:utf-8 -*-
# author: cai bao jun
# datetime: 2024/3/1 11:38
# @File: 4數據庫操作2.pyimport pymysql
from DBUtils.PooledDB import PooledDB
import datetimefrom logger import logger#### DBUtils 1.3
#### DBUtils 1.3
#### DBUtils 1.3class MysqlConfig(object):database = "test2022" # 測試 trainerNhost = "127.0.0.1"user = "root"port = 3306password = "root"# Mysql數據庫相關操作
# @Singleton
class DMLMysql(object):_pool = None_isinstance = None_flag = Truedef __new__(cls, *args, **kwargs):if not cls._isinstance:print('new')cls._pool = PooledDB(creator=pymysql, # 使用鏈接數據庫的模塊mincached=10, # 初始化時,鏈接池中至少創建的鏈接,0表示不創建maxconnections=200, # 連接池允許的最大連接數,0和None表示不限制連接數blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯host=MysqlConfig.host,port=MysqlConfig.port,user=MysqlConfig.user,password=MysqlConfig.password,database=MysqlConfig.database,)cls._isinstance = super().__new__(cls)return cls._isinstancedef __init__(self, host=MysqlConfig.host, database=MysqlConfig.database, user=MysqlConfig.user, password=MysqlConfig.password, port=MysqlConfig.port):try:# print('開始鏈接mysql22332')self.database = databaseself.pool = DMLMysql._poolexcept Exception as e:logger.error(f"database connect error message is {str(e)}")passpassdef open(self):self.conn = self.pool.connection()self.cursor = self.conn.cursor() # 表示讀取的數據為字典類型return self.conn, self.cursordef close(self, cursor, conn):cursor.close()conn.close()def execute_sql(self, sqlQuery, value):""":param sqlQuery: 拼接好的sql語句:param value: 需要拼接的值:return:"""try:conn, cursor = self.open()conn.ping(reconnect=True) # 超時斷開重連cursor.execute(sqlQuery, value)# logger.info('數據執行成功!')except Exception as e:logger.error(f"database name is {self.database} error info is:{str(e)},sql is : {sqlQuery}")conn.rollback()else:conn.commit()finally:self.close(cursor, conn)def select_sql(self,sqlQuery, value):ret = Nonetry:conn, cursor = self.open()conn.ping(reconnect=True) # 超時斷開重連cursor.execute(sqlQuery, value)ret = cursor.fetchall()# logger.info('查詢數據執行成功!')except Exception as e:logger.error(f"database name is {self.database} error info is:{str(e)},sql is : {sqlQuery}")# self.conn.rollback()finally:self.close(cursor, conn)return retdef __del__(self):# self.cursor.close()# self.conn.close()# print('關閉mysql22332')passif __name__ == '__main__':dml = DMLMysql()select_sql = 'select author_id,category_id,views from article where id=%s'value = (1,)ret1 = dml.select_sql(sqlQuery=select_sql,value=value)print(ret1)dml1 = DMLMysql()dml2 = DMLMysql()print(id(dml1))print(id(dml2))print(id(dml1)==id(dml2))pass