原始數據相關內容鏈接
處理excel數據加工成SQL的腳本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel行政區域數據轉SQL腳本
- 支持特殊行政單位處理(如省直轄縣級行政單位)
- 支持批量處理
"""import pandas as pd
import os
import redef process_excel_to_sql(excel_path, output_sql_path=None):"""將Excel行政區域數據轉換為SQL插入語句Args:excel_path: Excel文件路徑output_sql_path: 輸出SQL文件路徑,如果為None則打印到控制臺"""try:# 讀取Excel文件df = pd.read_excel(excel_path)print(f"讀取Excel文件: {excel_path}")print(f"數據行數: {len(df)}")print(f"列名: {list(df.columns)}")# 檢查必要的列是否存在required_columns = ['省份', '地級市', '縣區', '級別']if not all(col in df.columns for col in required_columns):raise ValueError(f"Excel文件缺少必要的列: {required_columns}")# 獲取省份名稱province_name = df['省份'].iloc[0]print(f"正在處理: {province_name}")# 生成SQL語句sql_lines = []sql_lines.append("-- " + "=" * 60)sql_lines.append(f"-- {province_name}行政區域數據插入SQL")sql_lines.append("-- 適用表結構:sys_area (id, area_name, parent_id, area_level, sort_order)")sql_lines.append("-- " + "=" * 60)sql_lines.append("")# 設置省級變量province_clean = re.sub(r'[省市區縣州盟地]', '', province_name)province_var = f"@{province_clean}_province_id"sql_lines.append("-- 設置變量")sql_lines.append(f"SET {province_var} = NULL;")# 處理市級數據 - 分離正常地級市和省直轄縣級行政單位normal_cities = df[(df['級別'] == '地級市') & (df['地級市'] != '省直轄縣級行政單位')]special_admin_units = df[(df['地級市'] == '省直轄縣級行政單位') &(df['縣區'].notna()) & (df['縣區'] != '')]# 收集所有城市名稱,為每個城市創建變量all_city_names = []if not normal_cities.empty:all_city_names.extend(normal_cities['地級市'].tolist())if not special_admin_units.empty:all_city_names.extend(special_admin_units['縣區'].tolist())# 為每個城市生成變量名city_vars = {}for city_name in all_city_names:# 生成變量名,去除特殊字符var_clean = re.sub(r'[市區縣州盟地族苗土家自治]', '', city_name)var_name = f"@{var_clean}_city_id"city_vars[city_name] = var_namesql_lines.append(f"SET {var_name} = NULL;")# 直轄市的情況is_municipality = len(all_city_names) == 1 and special_admin_units.emptyif is_municipality:city_var = f"@{province_clean}_city_id"sql_lines.append(f"SET {city_var} = NULL;")sql_lines.append("")# 1. 插入省級sql_lines.append("-- 1. 插入省級")sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{province_name}', 0, 1, 1);")sql_lines.append(f"SET {province_var} = LAST_INSERT_ID();")sql_lines.append("")# 2. 處理地級市all_cities_count = len(all_city_names)if all_cities_count > 0:sql_lines.append("-- 2. 插入地級市")city_sort = 1# 插入正常地級市if not normal_cities.empty:sql_lines.append("-- 2.1 正常地級市")for _, city_row in normal_cities.iterrows():city_name = city_row['地級市']city_var_name = city_vars[city_name]sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{city_name}', {province_var}, 2, {city_sort});")sql_lines.append(f"SET {city_var_name} = LAST_INSERT_ID();")# 如果是直轄市,同時設置通用變量if is_municipality:sql_lines.append(f"SET {city_var} = LAST_INSERT_ID();")city_sort += 1sql_lines.append("")# 處理省直轄縣級行政單位,將其作為地級市if not special_admin_units.empty:sql_lines.append("-- 2.2 省直轄縣級行政單位(作為地級市處理)")for _, special_row in special_admin_units.iterrows():special_city_name = special_row['縣區'] # 縣區列作為市名city_var_name = city_vars[special_city_name]sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{special_city_name}', {province_var}, 2, {city_sort});")sql_lines.append(f"SET {city_var_name} = LAST_INSERT_ID();")city_sort += 1sql_lines.append("")print(f"發現省直轄縣級行政單位 {len(special_admin_units)} 個,已作為地級市處理")sql_lines.append(f"-- 總計地級市數量: {all_cities_count}個")sql_lines.append("")# 3. 處理區縣# 正常區縣(不包括省直轄縣級行政單位下的)districts = df[(df['級別'] == '區縣') & (df['地級市'] != '省直轄縣級行政單位')]if not districts.empty:sql_lines.append("-- 3. 插入區縣")if is_municipality:# 直轄市情況sql_lines.append("-- 直轄市區縣")district_sort = 1for _, district_row in districts.iterrows():district_name = district_row['縣區']sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{district_name}', {city_var}, 3, {district_sort});")district_sort += 1else:# 普通省份,需要按城市分組處理區縣,使用變量避免子查詢問題grouped = districts.groupby('地級市')for city_name, city_districts in grouped:if city_name != '省直轄縣級行政單位' and city_name in city_vars:sql_lines.append(f"-- {city_name}下的區縣")city_var_name = city_vars[city_name]district_sort = 1for _, district_row in city_districts.iterrows():district_name = district_row['縣區']sql_lines.append(f"INSERT INTO sys_area (area_name, parent_id, area_level, sort_order) VALUES ('{district_name}', {city_var_name}, 3, {district_sort});")district_sort += 1sql_lines.append("")sql_lines.append("")# 統計信息province_count = 1city_count = all_cities_countdistrict_count = len(districts)sql_lines.append("")sql_lines.append("-- " + "=" * 60)sql_lines.append(f"-- 統計:省級{province_count}個,地級市{city_count}個,區縣{district_count}個")if not special_admin_units.empty:sql_lines.append(f"-- 其中省直轄縣級行政單位{len(special_admin_units)}個已作為地級市處理")sql_lines.append("-- 修復:使用變量替代子查詢,避免MySQL錯誤")sql_lines.append("-- " + "=" * 60)# 輸出結果sql_content = "\n".join(sql_lines)if output_sql_path:try:# 確保目錄存在output_dir = os.path.dirname(output_sql_path)if output_dir and not os.path.exists(output_dir):os.makedirs(output_dir, exist_ok=True)print(f"創建目錄: {output_dir}")# 寫入文件with open(output_sql_path, 'w', encoding='utf-8') as f:f.write(sql_content)print(f"? SQL文件已保存到: {output_sql_path}")except PermissionError:print(f"? 權限錯誤:無法寫入文件 {output_sql_path}")print("可能的解決方案:")print("1. 檢查目錄是否存在寫權限")print("2. 嘗試以管理員身份運行")print("3. 選擇其他目錄保存文件")print("4. 直接復制下面的SQL內容:")print("-" * 40)print(sql_content)print("-" * 40)except Exception as e:print(f"? 保存文件時出錯: {e}")print("將在控制臺顯示SQL內容:")print("-" * 40)print(sql_content)print("-" * 40)else:print("\n生成的SQL語句:")print("-" * 60)print(sql_content)print("-" * 60)return sql_contentexcept Exception as e:print(f"? 處理Excel文件時出錯: {e}")return Nonedef batch_process_excel_files(folder_path):"""批量處理文件夾中的所有Excel文件Args:folder_path: 包含Excel文件的文件夾路徑"""if not os.path.exists(folder_path):print(f"? 錯誤:文件夾 {folder_path} 不存在")returnif not os.path.isdir(folder_path):print(f"? 錯誤:{folder_path} 不是一個目錄")returnexcel_files = [f for f in os.listdir(folder_path) if f.endswith(('.xlsx', '.xls'))]if not excel_files:print("? 文件夾中沒有找到Excel文件")returnprint(f"? 找到{len(excel_files)}個Excel文件:")for file in excel_files:print(f" - {file}")# 創建sql子目錄sql_folder = os.path.join(folder_path, 'sql')try:if not os.path.exists(sql_folder):os.makedirs(sql_folder, exist_ok=True)print(f"? 創建SQL輸出目錄: {sql_folder}")except PermissionError:print(f"?? 無法創建SQL目錄 {sql_folder},將保存到原目錄")sql_folder = folder_pathexcept Exception as e:print(f"?? 創建目錄時出錯: {e},將保存到原目錄")sql_folder = folder_pathsuccess_count = 0fail_count = 0for excel_file in excel_files:excel_path = os.path.join(folder_path, excel_file)sql_file = excel_file.replace('.xlsx', '.sql').replace('.xls', '.sql')sql_path = os.path.join(sql_folder, sql_file)print(f"\n{'=' * 50}")print(f"處理文件: {excel_file}")print(f"{'=' * 50}")try:result = process_excel_to_sql(excel_path, sql_path)if result:print(f"? 成功處理: {excel_file}")success_count += 1else:print(f"? 處理失敗: {excel_file}")fail_count += 1except Exception as e:print(f"? 處理失敗: {excel_file} - {e}")fail_count += 1# 嘗試只輸出到控制臺try:print("嘗試只在控制臺顯示結果:")result = process_excel_to_sql(excel_path, None)if result:print("? 控制臺顯示成功")except Exception as e2:print(f"? 完全失敗: {e2}")print(f"\n{'=' * 50}")print(f"批量處理完成")print(f"成功: {success_count} 個")print(f"失敗: {fail_count} 個")print(f"{'=' * 50}")def validate_file_path(file_path):"""驗證文件路徑"""if not file_path:return False, "文件路徑不能為空"# 去除可能的引號file_path = file_path.strip('"').strip("'")if not os.path.exists(file_path):return False, f"文件不存在: {file_path}"if not file_path.endswith(('.xlsx', '.xls')):return False, f"文件格式不正確,需要.xlsx或.xls文件: {file_path}"return True, file_pathdef validate_folder_path(folder_path):"""驗證文件夾路徑"""if not folder_path:folder_path = "./"# 去除可能的引號folder_path = folder_path.strip('"').strip("'")if not os.path.exists(folder_path):return False, f"目錄不存在: {folder_path}"if not os.path.isdir(folder_path):return False, f"路徑不是目錄: {folder_path}"return True, folder_pathif __name__ == "__main__":print("=" * 60)print("Excel行政區域數據轉SQL工具 v2.0")print("? 支持特殊行政單位處理(如省直轄縣級行政單位)")print("? 修復MySQL子查詢錯誤,使用變量替代")print("? 完善的權限錯誤處理")print("=" * 60)print()while True:print("選擇處理方式:")print("1. 處理單個Excel文件")print("2. 批量處理文件夾中的所有Excel文件")print("3. 查看使用說明")print("4. 退出")choice = input("\n請選擇 (1/2/3/4): ").strip()if choice == "1":print("\n" + "-" * 40)print("單文件處理模式")print("-" * 40)while True:file_path = input("請輸入Excel文件路徑: ").strip()is_valid, result = validate_file_path(file_path)if is_valid:file_path = resultbreakelse:print(f"? {result}")retry = input("是否重新輸入? (y/n): ").strip().lower()if retry != 'y':breakif 'file_path' in locals() and os.path.exists(file_path):save_to_file = input("是否保存到SQL文件? (y/n): ").strip().lower()if save_to_file == 'y':sql_path = input("請輸入SQL文件保存路徑 (直接回車使用默認路徑): ").strip()if not sql_path:base_name = os.path.splitext(file_path)[0]sql_path = base_name + '.sql'print(f"使用默認路徑: {sql_path}")try:process_excel_to_sql(file_path, sql_path)except Exception as e:print(f"? 處理失敗: {e}")show_console = input("是否在控制臺顯示結果? (y/n): ").strip().lower()if show_console == 'y':try:process_excel_to_sql(file_path, None)except Exception as e2:print(f"? 完全失敗: {e2}")else:try:process_excel_to_sql(file_path, None)except Exception as e:print(f"? 處理失敗: {e}")elif choice == "2":print("\n" + "-" * 40)print("批量處理模式")print("-" * 40)while True:folder_path = input("請輸入文件夾路徑 (直接回車使用當前目錄): ").strip()is_valid, result = validate_folder_path(folder_path)if is_valid:folder_path = resultprint(f"使用目錄: {os.path.abspath(folder_path)}")breakelse:print(f"? {result}")retry = input("是否重新輸入? (y/n): ").strip().lower()if retry != 'y':breakif 'folder_path' in locals() and os.path.exists(folder_path):batch_process_excel_files(folder_path)elif choice == "3":print("\n" + "=" * 60)print("使用說明")print("=" * 60)print("1. Excel文件格式要求:")print(" - 必須包含列:省份、地級市、縣區、級別")print(" - 支持.xlsx和.xls格式")print()print("2. 特殊處理說明:")print(" - 省直轄縣級行政單位會被自動識別")print(" - 仙桃市、潛江市等會作為地級市處理")print(" - 直轄市會自動識別并正確處理")print()print("3. 生成的SQL特點:")print(" - 使用MySQL變量,避免子查詢錯誤")print(" - 支持完整的三級行政區域結構")print(" - 包含詳細的注釋和統計信息")print()print("4. 常見問題:")print(" - 權限錯誤:嘗試以管理員身份運行或選擇其他目錄")print(" - 文件格式錯誤:確保Excel文件包含必要的列")print(" - MySQL錯誤:新版本已修復子查詢問題")print("=" * 60)elif choice == "4":print("\n感謝使用Excel轉SQL工具!")breakelse:print("? 無效的選擇,請輸入 1、2、3 或 4")print() # 添加空行,便于下次選擇