MaxCompute MaxFrame評測 | 分布式Python計算服務MaxFrame(完整操作版)
- 前言
- MaxCompute MaxFrame
- 服務開通
- 開通 MaxCompute 服務
- 開通 DataWorks 服務
- 資源準備
- 創建 DataWorks 工作空間
- 創建 MaxCompute 項目
- 創建MaxCompute數據源
- 綁定數據源或集群
- 創建MaxCompute節點
- 在DataWorks中使用MaxFrame
- 分布式Pandas 處理
- 總的來說
前言
在當今數字化迅猛發展的時代,數據信息的保存與數據分析對企業的決策和工作方向具有極為重要的指導價值。通過企業數據分析,企業能夠精準統計出自身的成本投入、經營收益以及利潤等重要數據。這些數據猶如企業運營的“晴雨表”,為企業后續的決策提供了堅實可靠的依據,助力企業在市場競爭中優化經營策略,從而實現更大的價值創造。
今天我們要講的正是可以幫助企業實現數據保存于數據分析的一款分布式計算框架MaxCompute MaxFrame,那么什么是MaxCompute MaxFrame?
MaxCompute MaxFrame
在開始測評之前,先了解一下什么是MaxCompute MaxFrame?
以下是來自官網的介紹:【MaxFrame是由阿里云自研的分布式計算框架,支持Python編程接口、兼容Pandas接口且自動進行分布式計算。您可利用MaxCompute的海量計算資源及數據進行大規模數據處理、可視化數據探索分析以及科學計算、ML/AI開發等工作。】關于MaxCompute MaxFrame 的更多內容你可以直接在官網中詳細了解,包括產品優勢、產品功能、應用場景等,這里我主要是測評 MaxCompute MaxFrame 的操作體驗,因此對于 MaxCompute MaxFrame 的詳細介紹大家可以移步官網:https://www.aliyun.com/product/bigdata/odps/maxframe
MaxCompute MaxFrame,那么下面就開始今天的操作吧。
服務開通
在開始測試 MaxCompute MaxFrame 的功能前,首先需要開通 MaxCompute和DataWorks 服務。
開通 MaxCompute 服務
對于全新的、此前未開通過MaxCompute的阿里云賬號,阿里云在部分地域提供了免費的MaxCompute資源包,您可以先申請免費資源包體驗試用,體驗地址:https://free.aliyun.com/ 在免費試用頁面輸入你想要試用的服務,比如輸入 MaxCompute
如果你的阿里云賬號沒有免費試用資格,那么你只能通過 阿里云MaxCompute產品首頁 :https://www.aliyun.com/product/maxcompute 單擊【立即購買】,選擇【按量付費標準版】開通MaxCompute服務
這里我的賬號在當前北京地域下已經開通過 MaxCompute 服務,因此這里才有這個提示,正常情況下的話你直接開通即可
開通完 MaxCompute 服務之后,我們還需要開通 DataWorks 服務。
開通 DataWorks 服務
同樣的,對于全新的、此前未開通過DataWorks 的阿里云賬號,阿里云在部分地域提供了免費的DataWorks資源包,您可以先申請免費資源包體驗試用。免費試用地址同上,在免費試用界面輸入 DataWorks 可以查看試用資格
如果你的阿里云賬號沒有試用資格的話,你可以到 DataWorks 官網,官網地址: https://www.aliyun.com/product/bigdata/ide
點擊【立即購買】選擇需要開通 DataWorks 服務的地域以及 【按量付費】計費方式后開通即可,這里我已經開通過按量付費了
資源準備完成之后,下面就開始創建資源用于后面的操作。
資源準備
在我們開通了MaxCompute和DataWorks 服務 之后,下面我們就可以創建資源了,下面按照步驟創建資源內容。
創建 DataWorks 工作空間
登錄DataWorks控制臺 ,在控制臺頂部菜單欄切換所需地域,單擊左側導航欄的【工作空間】,進入工作空間列表頁面,點擊【創建工作空間】,這里我已經創建好了工作空間 User_dataworks
輸入工作空間名稱,定義工作空間模式,即工作空間的生產環境和開發環境是否隔離等參數,根據實際情況選擇即可
創建 MaxCompute 項目
登錄MaxCompute控制臺,在左上角選擇地域,選擇左側菜單【項目管理】,點擊【新建項目】,這里為了區分后面的測試和生產環境,需要創建兩個 MaxCompute 項目空間,這里我已經創建好了
在 MaxCompute 新增項目頁面,需要輸入項目名稱,選擇 計算資源付費類型、默認Quota 等信息后,點擊確定即可完成 MaxCompute 創建。
創建MaxCompute數據源
完成上述操作之后,回到 DataWorks控制臺 ,在 DataWorks 控制臺 選擇查看 【工作空間】列表頁面,點擊工作空間名稱,進入工作空間詳情頁面,在工作空間詳情頁面點擊【數據源】-【數據源列表】可以看到這里我已經創建成功的數據源
點擊【新增數據源】,選擇 MaxCompute,根據界面指引創建數據源
選擇 MaxCompute 在新增 MaxCompute 數據源頁面,我們需要輸入 數據源名稱 ,所屬云賬號、地域等信息,可以選擇我們剛才創建好的 MaxCompute項目名稱
點擊【完成創建】之后,創建完成MaxCompute 數據源,就可以返回【數據源列表】查看已經創建好的數據源信息了。
綁定數據源或集群
等待MaxCompute 數據源創建成功之后, 登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的【數據開發與治理】 - 【數據開發】,在下拉框中選擇對應工作空間后單擊【進入數據開發】
在左側導航欄單擊【數據源】,進入數據源或集群綁定頁面,您可通過名稱搜索找到目標數據源或集群進行綁定操作。綁定后,便可基于數據源的連接信息讀取該數據源的數據,進行相關開發操作。如果你找不到具體的 數據源 綁定操作入口,又不想去按照官方文檔說的那樣去個人設置里面找,這里可以直接點擊 綁定 進入到綁定頁面,在綁定頁面選擇資源綁定,這里我已經綁定過了
在開發 PyODPS 3 任務之前,先來簡單說一下PyODPS 3 任務。 DataWorks為我們提供PyODPS 3節點,我們可以在該節點中直接使用Python代碼編寫MaxCompute作業,并進行作業的周期性調度。開始之前需要先創建一個 PyODPS 3節點。
創建MaxCompute節點
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的【數據開發與治理】 > 【數據開發】,在下拉框中選擇對應工作空間后單擊【進入數據開發】,在 數據開發頁面選擇【新建】>【MaxCompute】 > 【PyODPS 3 】
在彈框中配置節點的名稱,選擇路徑,完成后單擊確認,完成MaxCompute節點創建操作,后續您即可在節點中進行對應MaxCompute任務開發與配置
節點創建成功之后,可以在數據開發頁面看到節點信息
以上的資源準備好了之后,我們就可以使用DataWorks的PyODPS 3節點開發和運行MaxFrame作業。
在DataWorks中使用MaxFrame
在開始使用MaxFrame之前,先來簡單介紹一下。 DataWorks為MaxCompute項目提供任務調度能力,且已在PyODPS 3節點內置了MaxFrame,我們可直接使用DataWorks的PyODPS 3節點開發和運行MaxFrame作業。PyODPS 3內置了MaxCompute用戶和項目信息,因此我們可以直接創建MaxFrame會話,復制代碼放入創建的 PyODPS 3節點 的 命令操作臺
代碼示例如下
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.config import options
options.sql.enable_mcqa = Falsetable = o.create_table("test_source_table", "a string, b bigint", if_not_exists=True)
with table.open_writer() as writer:writer.write([["value1", 0],["value2", 1],])# 創建MaxFrame session
session = new_session(o)df = md.read_odps_table("test_source_table",index_col="b")
df["a"] = "prefix_" + df["a"]# 打印dataframe數據
print(df.execute().fetch())# MaxFrame DataFrame數據寫入MaxCompute表
md.to_odps_table(df, "test_prefix_source_table").execute()# 銷毀 maxframe session
session.destroy()
在數據開發頁面命令控制臺上側點擊【執行】按鈕,執行Python代碼可以看到如下的返回結果
此結果表示MaxFrame安裝成功,且已成功連接MaxCompute集群。在目標MaxCompute項目中運行如下SQL,查詢test_prefix_source_table表的數據,新建 ODPS SQL 節點
點擊【確認】完成 新建 ODPS SQL 節點 新建 成功之后,在我們新建的 User_sql2 節點輸入查詢語句
SELECT * FROM test_prefix_source_table;
點擊【執行】可以看到sql 查詢的結果數據
到這里就說明我們的 MaxFrame 以及所有需要的服務和資源都可以正常運行,下面來使用與Pandas相同的API來分析數據。
分布式Pandas 處理
在基于MaxFrame實現分布式Pandas處理 之前,首先需要準備一些調用過程中需要用到的ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET 、your-default-project、your-end-point。 這里 進入AccessKey管理頁面獲取AccessKey ID以及對應的AccessKey Secret
復制 后備用。登錄MaxCompute控制臺,在左側導航欄選擇【工作區】-【項目管理】,查看MaxCompute項目名稱
在 Endpoint 頁面找到當前地域對應的 Endpoint 并復制,
替換掉示例代碼中對應的上述獲取的賬號信息,這里給出的是示例代碼,替換后的代碼這里不方便給出哈
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
import oso = ODPS(# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變量設置為用戶 Access Key ID,# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變量設置為用戶 Access Key Secret,# 不建議直接使用AccessKey ID和 AccessKey Secret字符串。os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),project='your-default-project',endpoint='your-end-point',
)data_sets = [{"table_name": "product","table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint","source_type": "records","records" : [[1, 100, 'Nokia', 1000],[2, 200, 'Apple', 5000],[3, 300, 'Samsung', 9000]],
},
{"table_name" : "sales","table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint","source_type": "records","records" : [[1, 1, 100, 101, 2008, 10, 5000],[2, 2, 300, 101, 2009, 7, 4000],[3, 4, 100, 102, 2011, 9, 4000],[4, 5, 200, 102, 2013, 6, 6000],[5, 8, 300, 102, 2015, 10, 9000],[6, 9, 100, 102, 2015, 6, 2000]],"lifecycle": 5
}]def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):for index, data in enumerate(data_sets):table_name = data.get("table_name")table_schema = data.get("table_schema")source_type = data.get("source_type")if not table_name or not table_schema or not source_type:raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")lifecycle = data.get("lifecycle", 5)table_name += suffixprint(f"Processing {table_name}...")if drop_if_exists:print(f"Deleting {table_name}...")o.delete_table(table_name, if_exists=True)o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)if source_type == "local_file":file_path = data.get("file")if not file_path:raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")sep = data.get("sep", ",")pd_df = pd.read_csv(file_path, sep=sep)ODPSDataFrame(pd_df).persist(table_name, drop_table=True)elif source_type == 'records':records = data.get("records")if not records:raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")with o.get_table(table_name).open_writer() as writer:writer.write(records)else:raise ValueError(f"Unknown data set source_type: {source_type}")print(f"Processed {table_name} Done")prepare_data(o, data_sets, "_maxframe_demo", True)
這里我們新建 PyODPS 3節點 User_node2 來執行替換了密鑰信息后的上述示例代碼,等待運行成功
查詢sales_maxframe_demo表和product_maxframe_demo表的數據,SQL命令如下
--查詢sales_maxframe_demo表
SELECT * FROM sales_maxframe_demo;
--查詢product_maxframe_demo表數據
SELECT * FROM product_maxframe_demo;
這里需要說明一下,我沒有執行結束,在執行的過程中,一直執行超時,不知道什么原因
Executing user script with PyODPS 0.12.1 (wrapper version: 0.12.1spawn)Processing product_maxframe_demo...
Deleting product_maxframe_demo.../opt/taobao/tbdpapp/pyodps/pyodpswrapper.py:1191: UserWarning: Global variable __doc__ you are about to set conflicts with pyodpswrapper or builtin variables. It might not be runnable with multiprocessing."It might not be runnable with multiprocessing." % key
2025-01-07 20:34:40,348 WARNING:urllib3.connectionpool:Retrying (Retry(total=3, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7f48ac8fb198>, 'Connection to service.cn-beijing.maxcompute.aliyun.com timed out. (connect timeout=120)')': /api/tenants?curr_project=user_project_dev
總的來說
MaxFrame可以在分布式環境下使用與Pandas相同的API來分析數據,通過MaxFrame,您能夠以高于開源Pandas數十倍的性能在MaxCompute上快速完成數據分析和計算工作。MaxFrame兼容Pandas接口且自動進行分布式處理,在保證強大數據處理能力的同時,可以大幅度提高數據處理規模及計算效率。