在電商數據分析、競品監控和市場調研等場景中,高效采集淘寶商品數據是關鍵環節。本文將詳細介紹如何利用 Python 結合 API,構建一套自動化的商品數據采集系統,涵蓋從 API 申請到數據存儲的完整流程,并提供可直接運行的代碼實現。?
淘寶API 基礎?
淘寶提供了標準化的 API 接口,相比網頁爬蟲具有穩定性高、合規性強、數據結構統一等優勢。常用的商品數據相關 API 包括:?
- 商品詳情 API:獲取商品基本信息、價格、庫存等?
- 商品搜索 API:按關鍵詞、分類等條件搜索商品?
- 店鋪商品 API:獲取指定店鋪的所有商品?
- 商品評價 API:獲取商品的用戶評價數據?
使用淘寶 API 需遵守平臺規范,注意調用頻率限制和數據使用范圍,避免違規操作導致賬號封禁。?
開發前的準備工作?
1. 賬號注冊?
- 注冊開發者賬號?
- 完成認證(個人或企業認證)?
- 獲取 Api?Key 和 Api?Secret?
- 申請所需的 API 權限(如商品詳情、商品搜索等)?
2. 開發環境搭建?
推薦使用 Python 3.7 + 版本,需安裝以下依賴庫:
pip install top-api-sdk # 淘寶API官方Python SDK
pip install pandas # 數據處理
pip install pymysql # 數據庫連接
pip install python-dotenv # 環境變量管理
3. 核心參數配置?
創建.env文件存儲敏感信息:
APP_KEY=你的AppKey
APP_SECRET=你的AppSecret
REDIRECT_URI=你的回調地址
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=你的密碼
MYSQL_DB=taobao_data
自動化采集系統設計?
系統架構?
本系統采用模塊化設計,主要包含以下組件:?
- 認證模塊:處理 API 授權與令牌管理?
- 采集模塊:調用 API 獲取商品數據?
- 存儲模塊:將數據保存到數據庫?
- 調度模塊:實現定時自動采集?
- 日志模塊:記錄系統運行狀態?
數據采集流程?
- 獲取 API 訪問令牌(AccessToken)?
- 構造 API 請求參數(關鍵詞、頁碼、排序方式等)?
- 調用 API 并處理返回結果?
- 數據清洗與格式轉換?
- 存儲到數據庫?
- 實現增量采集與去重機制?
完整代碼實現?
加載環境變量?
load_dotenv()?
配置日志?
logging.basicConfig(?
level=logging.INFO,?
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',?
filename='taobao_collector.log'?
)?
logger = logging.getLogger('taobao_collector')?
class TaobaoAPICollector:?
def init(self):?
"""初始化淘寶 API 采集器"""?
self.app_key = os.getenv ('APP_KEY')?
self.app_secret = os.getenv ('APP_SECRET')?
self.redirect_uri = os.getenv ('REDIRECT_URI')?
數據庫連接配置?
self.db_config = {?
'host': os.getenv('MYSQL_HOST'),?
'port': int(os.getenv('MYSQL_PORT')),?
'user': os.getenv('MYSQL_USER'),?
'password': os.getenv('MYSQL_PASSWORD'),?
'db': os.getenv('MYSQL_DB'),?
'charset': 'utf8mb4'?
}?
初始化數據庫連接?
self.db_conn = self._get_db_connection()?
self._create_tables()?
API 調用配置?
self.page_size = 20 # 每頁商品數量?
self.max_pages = 5 # 最大采集頁數?
self.request_interval = 2 # API 請求間隔 (秒)?
def _get_db_connection (self):?
"""獲取數據庫連接"""?
try:?
conn = pymysql.connect (?
host=self.db_config ['host'],?
port=self.db_config ['port'],?
user=self.db_config ['user'],?
password=self.db_config ['password'],?
db=self.db_config ['db'],?
charset=self.db_config ['charset'],?
cursorclass=DictCursor?
)?
logger.info("數據庫連接成功")?
return conn?
except Exception as e:?
logger.error (f"數據庫連接失敗: {str (e)}")?
raise?
def _create_tables (self):?
"""創建數據庫表"""?
try:?
with self.db_conn.cursor () as cursor:?
商品表?
cursor.execute ('''?
CREATE TABLE IF NOT EXISTS products (?
id BIGINT PRIMARY KEY COMMENT ' 商品 ID',?
title VARCHAR (255) COMMENT ' 商品標題 ',?
cat_name VARCHAR (100) COMMENT ' 商品分類 ',?
price DECIMAL (10,2) COMMENT ' 商品價格 ',?
sales INT COMMENT ' 銷量 ',?
stock INT COMMENT ' 庫存 ',?
nick VARCHAR (100) COMMENT ' 賣家昵稱 ',?
shop_title VARCHAR (100) COMMENT ' 店鋪名稱 ',?
pic_url VARCHAR (255) COMMENT ' 商品主圖 URL',?
detail_url VARCHAR (255) COMMENT ' 商品詳情頁 URL',?
created_time DATETIME COMMENT ' 商品創建時間 ',?
collected_time DATETIME COMMENT ' 采集時間 ',?
updated_time DATETIME COMMENT ' 更新時間 '?
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 淘寶商品表 ';?
''')?
商品詳情表?
cursor.execute ('''?
CREATE TABLE IF NOT EXISTS product_details (?
product_id BIGINT PRIMARY KEY COMMENT ' 商品 ID',?
desc TEXT COMMENT ' 商品描述 ',?
props TEXT COMMENT ' 商品屬性 ',?
sku TEXT COMMENT 'SKU 信息 ',?
collected_time DATETIME COMMENT ' 采集時間 ',?
updated_time DATETIME COMMENT ' 更新時間 '?
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 淘寶商品詳情表 ';?
''')?
采集任務表?
cursor.execute ('''?
CREATE TABLE IF NOT EXISTS collection_tasks (?
id INT AUTO_INCREMENT PRIMARY KEY,?
keyword VARCHAR (100) COMMENT ' 搜索關鍵詞 ',?
page INT COMMENT ' 采集頁數 ',?
status ENUM ('pending', 'running', 'completed', 'failed') DEFAULT 'pending' COMMENT ' 任務狀態 ',?
start_time DATETIME COMMENT ' 開始時間 ',?
end_time DATETIME COMMENT ' 結束時間 ',?
total_products INT COMMENT ' 采集商品總數 ',?
comment VARCHAR (255) COMMENT ' 備注 '?
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=' 采集任務表 ';?
''')?
self.db_conn.commit()?
logger.info("數據庫表創建 / 檢查完成")?
except Exception as e:?
logger.error (f"創建數據庫表失敗: {str (e)}")?
self.db_conn.rollback ()?
def _search_products (self, keyword, page=1):?
"""?
搜索商品?
:param keyword: 搜索關鍵詞?
:param page: 頁碼?
:return: 商品列表?
"""?
try:?
request = TbkItemGetRequest ()?
request.set_app_info (appinfo (self.app_key, self.app_secret))?
設置請求參數?
request.keyword = keyword?
request.page_no = page?
request.page_size = self.page_size?
request.platform = 2 # 1:PC,2: 無線?
request.sort = "sale_desc" # 按銷量降序?
調用 API?
response = request.getResponse()?
處理返回結果?
if "tbk_item_get_response" in response and "results" in response ["tbk_item_get_response"]:?
return response ["tbk_item_get_response"]["results"]["n_tbk_item"]?
return []?
except Exception as e:?
logger.error (f"搜索商品失敗 (keyword: {keyword}, page: {page}): {str (e)}")?
return []?
def _get_product_detail (self, product_id):?
"""?
獲取商品詳情?
:param product_id: 商品 ID?
:return: 商品詳情?
"""?
try:?
request = TbkItemDetailGetRequest ()?
request.set_app_info (appinfo (self.app_key, self.app_secret))?
request.num_iid = product_id?
調用 API?
response = request.getResponse()?
if "tbk_item_detail_get_response" in response and "data" in response ["tbk_item_detail_get_response"]:?
return response ["tbk_item_detail_get_response"]["data"]?
return None?
except Exception as e:?
logger.error (f"獲取商品詳情失敗 (id: {product_id}): {str (e)}")?
return None?
def _save_products (self, products):?
"""保存商品數據到數據庫"""?
if not products:?
return 0?
try:?
with self.db_conn.cursor() as cursor:?
count = 0?
now = datetime.now()?
for item in products:?
檢查商品是否已存在?
cursor.execute("SELECT id FROM products WHERE id = %s", (item['num_iid'],))?
exists = cursor.fetchone() is not None?
if exists:?
更新現有商品?
sql = '''?
UPDATE products SET?
title = %s, price = %s, sales = %s, stock = %s,?
pic_url = %s, updated_time = %s?
WHERE id = %s?
'''?
cursor.execute(sql, (?
item['title'], item['zk_final_price'], item.get('sales', 0),?
item.get('stock', 0), item['pict_url'], now, item['num_iid']?
))?
else:?
插入新商品?
sql = '''?
INSERT INTO products (?
id, title, cat_name, price, sales, stock, nick, shop_title,?
pic_url, detail_url, created_time, collected_time, updated_time?
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)?
'''?
cursor.execute(sql, (?
item['num_iid'], item['title'], item.get('cat_name', ''),?
item['zk_final_price'], item.get('sales', 0), item.get('stock', 0),?
item['nick'], item.get('shop_title', ''), item['pict_url'],?
item['item_url'], datetime.now(), now, now?
))?
count += 1?
self.db_conn.commit()?
logger.info(f"成功保存 / 更新 {count} 個商品")?
return count?
except Exception as e:?
logger.error (f"保存商品失敗: {str (e)}")?
self.db_conn.rollback ()?
return 0?
def _save_product_details (self, product_id, detail):?
"""保存商品詳情到數據庫"""?
if not detail:?
return False?
try:?
with self.db_conn.cursor() as cursor:?
now = datetime.now()?
提取 SKU 信息?
sku_info = []?
if "sku" in detail and "sku_map" in detail["sku"]:?
for key, value in detail["sku"]["sku_map"].items():?
sku_info.append({?
"key": key,?
"price": value.get("price", ""),?
"stock": value.get("stock", 0),?
"props": value.get("props", "")?
})?
檢查詳情是否已存在?
cursor.execute("SELECT product_id FROM product_details WHERE product_id = %s", (product_id,))?
exists = cursor.fetchone() is not None?
if exists:?
更新現有詳情?
sql = '''?
UPDATE product_details SET?
desc = %s, props = %s, sku = %s, updated_time = %s?
WHERE product_id = %s?
'''?
cursor.execute(sql, (?
str(detail.get("desc", "")), str(detail.get("props", "")),?
str(sku_info), now, product_id?
))?
else:?
插入新詳情?
sql = '''?
INSERT INTO product_details (?
product_id, desc, props, sku, collected_time, updated_time?
) VALUES (%s, %s, %s, %s, %s, %s)?
'''?
cursor.execute(sql, (?
product_id, str(detail.get("desc", "")), str(detail.get("props", "")),?
str(sku_info), now, now?
))?
self.db_conn.commit ()?
return True?
except Exception as e:?
logger.error (f"保存商品詳情失敗 (id: {product_id}): {str (e)}")?
self.db_conn.rollback ()?
return False?
def collect_by_keyword (self, keyword, collect_details=True):?
"""?
按關鍵詞采集商品數據?
:param keyword: 搜索關鍵詞?
:param collect_details: 是否采集商品詳情?
:return: 采集總數?
"""?
logger.info(f"開始按關鍵詞采集: {keyword}")?
創建采集任務記錄?
task_id = self._create_task (keyword)?
if not task_id:?
logger.error ("創建采集任務失敗")?
return 0?
total_count = 0?
try:?
分頁采集?
for page in range(1, self.max_pages + 1):?
logger.info(f"采集第 {page} 頁,關鍵詞: {keyword}")?
搜索商品?
products = self._search_products(keyword, page)?
if not products:?
logger.info(f"第 {page} 頁沒有獲取到商品數據,停止采集")?
break?
保存商品數據?
saved_count = self._save_products(products)?
total_count += saved_count?
采集商品詳情?
if collect_details:?
for product in products:?
product_id = product['num_iid']?
logger.info(f"采集商品詳情: {product_id}")?
detail = self._get_product_detail(product_id)?
if detail:?
self._save_product_details(product_id, detail)?
控制請求頻率?
time.sleep(self.request_interval)?
控制請求頻率?
time.sleep(self.request_interval)?
更新任務狀態?
self._update_task(task_id, "completed", total_count)?
logger.info(f"關鍵詞 {keyword} 采集完成,共采集 {total_count} 個商品")?
return total_count?
except Exception as e:?
logger.error (f"采集過程出錯: {str (e)}")?
self._update_task (task_id, "failed", total_count, str (e))?
return total_count?
def _create_task (self, keyword):?
"""創建采集任務記錄"""?
try:?
with self.db_conn.cursor () as cursor:?
sql = '''?
INSERT INTO collection_tasks (?
keyword, page, status, start_time?
) VALUES (% s, % s, % s, % s)?
'''?
cursor.execute (sql, (keyword, self.max_pages, 'running', datetime.now ()))?
self.db_conn.commit ()?
return cursor.lastrowid?
except Exception as e:?
logger.error (f"創建任務記錄失敗: {str (e)}")?
self.db_conn.rollback ()?
return None?
def _update_task (self, task_id, status, total_products=0, comment=""):?
"""更新任務狀態"""?
try:?
with self.db_conn.cursor () as cursor:?
sql = '''?
UPDATE collection_tasks SET?
status = % s, end_time = % s,?
total_products = % s, comment = % s?
WHERE id = % s?
'''?
cursor.execute (sql, (status, datetime.now (), total_products, comment, task_id))?
self.db_conn.commit ()?
except Exception as e:?
logger.error (f"更新任務狀態失敗: {str (e)}")?
self.db_conn.rollback ()?
def export_to_excel (self, keyword, filename=None):?
"""將采集的商品數據導出為 Excel"""?
try:?
with self.db_conn.cursor () as cursor:?
搜索包含關鍵詞的商品?
sql = "SELECT * FROM products WHERE title LIKE %s LIMIT 1000"?
cursor.execute(sql, (f'%{keyword}%',))?
products = cursor.fetchall()?
if not products:?
logger.info(f"沒有找到包含關鍵詞 {keyword} 的商品數據")?
return False?
轉換為 DataFrame?
df = pd.DataFrame(products)?
生成文件名?
if not filename:?
filename = f"taobao_products_{keyword}_{datetime.now().strftime