場景:
使用分庫分表的業務有時分庫數量幾百甚至上千,當主管需要查詢每個庫中的數據,掌握數據分布情況。要你查看哪些庫中的表數量大于某個量級的給找出來 ,你會怎么做。
例子 :
mysql庫數量:db_xx_deviceinfo0-999 共1000個庫每個庫中 28個表。
一、查系統表(缺點:數據不是很精確,優點:快速。)
root@localhost 14:17: [information_schema]>select TABLE_SCHEMA,TABLE_NAME,TABLE_ROWS from tables where TABLE_SCHEMA like 'db_xx_deviceinfo%' and TABLE_ROWS>200000;
+-----------------------+------------------------+------------+
| TABLE_SCHEMA | TABLE_NAME | TABLE_ROWS |
+-----------------------+------------------------+------------+
| db_xx_deviceinfo104 | electric_meter_reading | 1578844 |
| db_xx_deviceinfo696 | electric_meter_reading | 3579983 |
| db_xx_deviceinfo696 | push_data_record | 975528 |
+-----------------------+------------------------+------------+
二、采用查詢業務表的方式(缺點:寫腳本去完成,有點麻煩,優點:快速與準確)
1、腳本
#! _*_ coding:utf-8 _*_import pymysql
import sys
from concurrent.futures import ThreadPoolExecutor, as_completedm_host = sys.argv[1]
m_user='tmp_select'
m_port = sys.argv[2] # 這里是字符串類型
m_db = sys.argv[3]
t_count = sys.argv[4]def get_mysql_connection():"""獲取MySQL數據庫連接"""# 從密碼文件中讀取密碼try:with open('/root/.ssh/.password.txt', 'r') as f:password = f.read().strip()except Exception as e:print(f"無法讀取密碼文件: {e}")sys.exit(1)# 連接MySQLtry:conn = pymysql.connect(host=m_host, # MySQL服務器IPport=int(m_port), # 關鍵修復:將字符串轉換為整數user=m_user, # 用戶名password=password, # 密碼charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor,connect_timeout=30)return connexcept Exception as e:print(f"數據庫連接失敗: {e}")sys.exit(1)def check_database_exists(conn, db_name):"""檢查數據庫是否存在"""try:with conn.cursor() as cursor:cursor.execute("SHOW DATABASES LIKE %s", (db_name,))return cursor.fetchone() is not Noneexcept Exception as e:print(f"檢查數據庫 {db_name} 是否存在時出錯: {e}")return Falsedef check_table_data_count(db_name):"""檢查單個庫中所有表的數據量"""results = []try:# 為每個線程創建獨立的連接with open('/root/.ssh/.pwd.txt', 'r') as f:password = f.read().strip()conn = pymysql.connect(host=m_host,port=int(m_port),user=m_user,password=password,database=db_name,charset='utf8mb4',cursorclass=pymysql.cursors.DictCursor,connect_timeout=10)with conn.cursor() as cursor:# 獲取當前數據庫中的所有表cursor.execute("SHOW TABLES")tables = cursor.fetchall()for table in tables:table_name = list(table.values())[0]# 查詢表的數據量try:cursor.execute(f"SELECT COUNT(*) as count FROM `{table_name}`")count_result = cursor.fetchone()data_count = count_result['count']# 如果數據量大于t_count,記錄結果if data_count > int(t_count):result_str = f"庫名: {db_name}, 表名: {table_name}, 數據量: {data_count}"results.append(result_str)except Exception as e:print(f"查詢表 {db_name}.{table_name} 數據量時出錯: {e}")continueconn.close()except Exception as e:print(f"檢查數據庫 {db_name} 時出錯: {e}")return resultsdef main():"""主函數"""print("開始檢查各庫表數據量...")print(f"連接MySQL服務器: %s:%s, 用戶: %s" %(m_host,m_port,m_user))# 生成所有數據庫名database_names = [f"{m_db}{str(i)}" for i in range(1000)]# 先檢查哪些數據庫存在conn = get_mysql_connection()existing_dbs = []print("正在檢查存在的數據庫...")for db_name in database_names:if check_database_exists(conn, db_name):existing_dbs.append(db_name)conn.close()print(f"發現 {len(existing_dbs)} 個數據庫存在")# 使用多線程并行檢查每個數據庫all_results = []with ThreadPoolExecutor(max_workers=20) as executor:# 提交所有任務future_to_db = {executor.submit(check_table_data_count, db_name): db_name for db_name in existing_dbs}# 處理完成的任務for i, future in enumerate(as_completed(future_to_db)):db_name = future_to_db[future]try:results = future.result()all_results.extend(results)# 實時輸出結果for result in results:print(result)# 顯示進度if (i + 1) % 10 == 0:print(f"已完成 {i + 1}/{len(existing_dbs)} 個數據庫的檢查")except Exception as e:print(f"處理數據庫 {db_name} 時發生錯誤: {e}")# 保存結果到文件if all_results:with open('table_data_count_results.txt', 'w', encoding='utf-8') as f:for result in all_results:f.write(result + '\n')print(f"\n檢查完成,共找到 {len(all_results)} 個表的數據量大于{t_count}")print("結果已保存到 table_data_count_results.txt 文件中")else:print(f"未找到數據量大于{t_count}的表")if __name__ == "__main__":main()
2、運行查詢
python3 910.py 1x2.1x.5.x1 3305 db_xx_deviceinfo 200000