【Python】Python解決阿里云DataWorks導出數據1萬條限制的問題
- 一、前言
- 二、腳本功能概述
- 三、核心代碼解析
- **1. 環境配置與安全設置**
- **2. 用戶配置區**
- **3. 數據清洗函數**
- **4. 核心邏輯**
- 四、完整代碼演示
- 五、總結
一、前言
在日常數據分析工作中,團隊經常需要從阿里云DataWorks(原MaxCompute)中導出臨時表數據進行分析或匯報,但由于受限于阿里云的安全策略,每次只能導出1萬條,反復操作會很麻煩。
本文介紹如何用Python腳本解決方案,能夠安全高效地將DataWorks中的表數據導出為Excel文件,同時處理數據中的非法字符問題。
二、腳本功能概述
主要用Python腳本實現了以下核心功能:
-
安全連接阿里云DataWorks(MaxCompute)
-
驗證目標表及分區配置
-
讀取表數據并轉換為DataFrame
-
對特定字段進行非法字符清洗,確保不受上游臟數據影響
-
將清洗后的數據導出為Excel文件,可直接分析
三、核心代碼解析
1. 環境配置與安全設置
這里要注意,AccessKey ID 和AccessKey Secret 是每個RAM賬號獨有的,要絕對保密,避免風控問題,在實際使用時,建議通過環境變量或配置文件管理這些敏感信息。
# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret
# python_version: 3.12
2. 用戶配置區
配置基本信息,因為dataworks是基于Hive引擎的,所以會有分區的概念,這里把分區概念也加入到了配置項里。
# === 用戶配置區 ===
ACCESS_ID = '填寫對應RAM賬號的ACCESS_ID'
ACCESS_KEY = '填寫對應RAM賬號的ACCESS_KEY'
PROJECT_NAME = 'dataoworks項目空間名'
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api' # 阿里云對應region區域
TABLE_NAME = 'chat_msg_output' # 我這里引用的是客戶聊天記錄表
OUTPUT_DIR = '/Users/admin/Downloads'
OUTPUT_FILENAME = 'export_data.xlsx'
IS_PARTITIONED = 0 # 是否分區表
PARTITION_NAME = 0 # 分區名稱
3. 數據清洗函數
數據清洗時,定義專門用于清洗表字段中的非法字符的函數,主要處理以下問題:
- HTML換行標簽<>轉換為系統換行符
- 刪除所有HTML標簽,只保留純文本內容
- 轉義雙引號,確保Excel文件兼容性
- 去除不可打印的控制字符
def clean_err_msg(text):"""前面說了是聊天記錄表,所以會存在表情包、特殊字符等非法字符,需要進行清洗。功能:處理HTML標簽、轉義雙引號、替換非法換行符"""if pd.isna(text):return texttry:# 替換HTML換行標簽為系統換行符text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)# 刪除所有HTML標簽text = re.sub(r'<.*?>', '', text)# 轉義雙引號為兩個雙引號,實現Excel兼容text = text.replace('"', '""')# 去除其他非打印字符text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)return text.strip()except Exception as e:print(f"回四叔: 清洗發現異常: {str(e)}")return text
4. 核心邏輯
核心包含以下幾個關鍵步驟:
- 路徑校驗與創建:確保輸出目錄存在,如果不存在則自動創建。
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True)
- ODPS連接與表驗證
使用提供的憑據連接DataWorks,并獲取目標表對象。
odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)
table = odps.get_table(TABLE_NAME)
- 分區驗證
腳本會根據IS_PARTITIONED的值檢查表是否為分區表,并驗證分區格式是否正確。 - 數據讀取與清洗
with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:df = reader.to_pandas()if 'err_msg' in df.columns:df['err_msg'] = df['err_msg'].apply(clean_err_msg)
- 數據導出
df.to_excel(OUTPUT_PATH,index=False,engine='openpyxl',sheet_name='數據導出'
)
四、完整代碼演示
# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret 絕對保密
# python_version: 3.12 from odps import ODPS
import pandas as pd
import os
import re# === 用戶配置區 ===
ACCESS_ID = '填寫對應RAM賬號的ACCESS_ID'
ACCESS_KEY = '填寫對應RAM賬號的ACCESS_KEY'
PROJECT_NAME = 'dataoworks項目空間名'
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api' # 阿里云對應region區域
TABLE_NAME = 'chat_msg_output' # 我這里引用的是客戶聊天記錄表
OUTPUT_DIR = '/Users/admin/Downloads'
OUTPUT_FILENAME = 'export_data.xlsx'
IS_PARTITIONED = 0 # 是否分區表
PARTITION_NAME = 0 # 分區名稱 # ====== 核心邏輯 ====== #
def clean_err_msg(text):if pd.isna(text):return texttry:# 替換HTML換行標簽為系統換行符text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)# 刪除所有HTML標簽text = re.sub(r'<.*?>', '', text)# 轉義雙引號為兩個雙引號(Excel兼容)text = text.replace('"', '""')# 去除其他非打印字符text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)return text.strip()except Exception as e:print(f"報告四叔: 清洗發現異常: {str(e)}")return text# 第1步: 路徑校驗
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True) # 第2步: 連接ODPS
try:odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)table = odps.get_table(TABLE_NAME)# 第3步: 分區驗證if IS_PARTITIONED == 1:if not table.table_schema.partitions:raise ValueError("報告四叔:?配置錯誤=>該表不是分區表")if not re.match(r"^\w+='[\w-]+'$", PARTITION_NAME):raise ValueError("報告四叔:?分區條件格式錯誤")table.get_partition(PARTITION_NAME)elif IS_PARTITIONED == 0 and table.table_schema.partitions:raise ValueError("報告四叔:?配置錯誤:該表是分區表")# 第4步: 數據導出(新增清洗邏輯)with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:df = reader.to_pandas()# 新增清洗步驟if 'err_msg' in df.columns:df['err_msg'] = df['err_msg'].apply(clean_err_msg)# 導出Excel(增加編碼參數)df.to_excel(OUTPUT_PATH,index=False,engine='openpyxl',sheet_name='數據導出'# encoding='utf-8-sig' # 編碼問題處理 )print(f"? 數據已成功導出至:{OUTPUT_PATH}")except Exception as e:print(f"報告四叔:?導出失敗:{str(e)}") # 提示失敗原因,便于排查問題if os.path.exists(OUTPUT_PATH):os.remove(OUTPUT_PATH)
五、總結
以上腳本解決團隊經常從阿里云DataWorks導出表數據受限的問題,特別適合需要定期導出數據進行進一步分析的場景。
腳本中的數據清洗功能確保了導出的數據質量,特別是對包含HTML標簽和特殊字符的字段進行了適當處理,能夠涵蓋大部分數據清洗的場景。
數據開發同學只需要通過簡單的配置修改,便可適應不同的表結構和導出需求。