🙋?♀?Tiktok APP的基于關鍵字檢索的視頻及評論信息爬蟲共分為兩期,希望對大家有所幫助。
第一期見下文。
第二期:基于視頻URL的評論信息爬取
1. Node.js環境配置
首先配置 JavaScript 運行環境(如 Node.js),用于執行加密簽名代碼。
Node.js下載網址:https://nodejs.org/en
Node.js的安裝方法(環境配置非常關鍵,決定了后面的程序是否可以使用):https://blog.csdn.net/liufeifeihuawei/article/details/132425239
2. Py環境配置
import time
import requests
import execjs
import os
from datetime import datetime
from urllib.parse import urlencode
from loguru import logger
import json
import random
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
3. 基于關鍵字檢索的視頻信息爬取
1. 主程序:設定爬取的關鍵字
通過文件topics.csv
導入你希望爬取的關鍵字。
通過文件videosInfo.json
存儲爬取的結果,以字典格式存儲。
if __name__ == '__main__':os.makedirs('../results', exist_ok=True)keywords, fields = read_csv(file_path='topics.csv') # 設定爬取的關鍵字output_file = f'../results/videosInfo.json' # 保存結果的文件cookie_str = read_cookie()# 使用多線程并發爬取with ThreadPoolExecutor(max_workers=1) as executor:futures = []for i in range(len(keywords)):futures.append(executor.submit(crawl_keyword, keywords[i], output_file, cookie_str, fields[i], 20))for future in as_completed(futures):try:future.result()except Exception as e:logger.error(f"爬取過程中發生錯誤: {str(e)}")logger.info("所有主題的視頻爬取完成")
2. 多線程爬取單個關鍵詞,限制最大請求次數
通過request_count
設定爬取的請求次數。
def crawl_keyword(keyword: str, output_file: str, cookie_str: str, field: str, max_requests: int = 10):tiktok = TiktokUserSearch(output_file=output_file)has_more = 1cursor = '0'search_id = Nonerequest_count = 0 # 初始化請求計數器while has_more and request_count < max_requests:data = tiktok.main(keyword, field, cookie_str, cursor, search_id)logger.info(f"Request {request_count + 1}: {data}")if data and isinstance(data, dict):# has_more = data.get('has_more', 0)cursor = data.get('cursor', '0')search_id = data.get('log_pb', {}).get('impr_id')if 'data' in data:data = data['data']request_count += 1 # 更新請求計數else:logger.error("No data found in response")breakelse:logger.error("Invalid response format")breaktime.sleep(random.randint(0, 5)) # 隨機延時,避免請求過快write_csv(keyword, request_count, file_path='../results/records.csv')logger.info(f"爬取 {keyword} 的視頻完成,共請求 {request_count} 次")
3. 定義TiktokUserSearch類
允許獲得24類
字段,包括:
🖥?視頻的URL、視頻時長、標題等;
👨視頻的發布者個人簡介、獲贊數據、視頻數據等;
👍視頻的點贊信息、分享次數、評論數量、播放次數、收藏次數等;
🎶視頻的背景音樂ID,音樂來源等… …
class TiktokUserSearch:def __init__(self, output_file: Optional[str] = None):self.config = read_config()self.headers = self.config.get("headers", {})self.cookies = Noneself.output_file = output_file if output_file else f'tiktok_videos_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'self.proxies = self.config.get("proxies", None) # 代理配置self.lock = threading.Lock() # 線程鎖def cookie_str_to_dict(self, cookie_str: str) -> Dict[str, str]:"""將cookie字符串轉換為字典"""cookie_dict = {}try:cookies = [i.strip() for i in cookie_str.split('; ') if i.strip() != ""]for cookie in cookies:key, value = cookie.split('=', 1)cookie_dict[key] = valueexcept Exception as e:logger.error(f"轉換cookie時出錯: {str(e)}")raisereturn cookie_dictdef get(self, keyword: str, cursor: str, search_id: Optional[str], cookie_str: str) -> Dict[str, Any]:"""發送請求并獲取數據"""self.cookies = self.cookie_str_to_dict(cookie_str)url = "https://www.tiktok.com/api/search/general/full/"focus_state = "true" if cursor == "0" else "false"params = {"WebIdLastTime": f"{int(time.time())}","aid": "1988","app_language": "zh-Hans","app_name": "tiktok_web","browser_language": "zh-CN",# ... 略"webcast_language": "zh-Hans","msToken": self.cookies["msToken"],}if cursor != "0":params.update({"search_id": search_id})try:x_b = execjs.compile(open('../configs/encrypt.js', encoding='utf-8').read()).call("sign", urlencode(params),self.headers["user-agent"])params.update({"X-Bogus": x_b})except Exception as e:logger.error(f"生成X-Bogus時出錯: {str(e)}")return {"error": str(e)}headers = self.headers.copy()headers.update({"referer": "https://www.tiktok.com/search?q=" + keyword})max_retries = 3for attempt in range(max_retries):try:response = requests.get(url,headers=headers,cookies=self.cookies,params=params,timeout=(3, 10),proxies=self.proxies)response.raise_for_status()return response.json()except (ex1, ex2, ex3) as e:logger.warning(f"嘗試 {attempt + 1}/{max_retries} 發生網絡錯誤:{e}")if attempt < max_retries - 1:time.sleep(2)else:return {"error": f"Network error after {max_retries} attempts: {str(e)}"}except Exception as e:logger.error(f"發生其他錯誤:{e}")return {"error": str(e)}def parse_data(self, data_list: List[Dict[str, Any]], keyword: str, field: str) -> List[str]:"""解析數據并保存到json文件"""resultList = []video_data = []for u in data_list:try:item = u['item']author = item['author']stats = item['stats']author_stats = item['authorStats']video_id = str(item['id']), # 視頻的唯一標識符(TikTok 視頻 ID)author_name = str(author['uniqueId']), # 作者的 TikTok 賬號video_url = f'https://www.tiktok.com/@{author_name[0]}/video/{video_id[0]}'video_info = {'search_keyword': keyword,'video_field': field,'video_id': video_id[0], # 視頻的唯一標識符(TikTok 視頻 ID)'desc': item['desc'], # 視頻的文字描述(caption/標題)'create_time': datetime.fromtimestamp(item['createTime']).strftime('%Y-%m-%d %H:%M:%S'), # 視頻的發布時間'duration': item['video']['duration'], # 視頻時長(單位:秒)'video_url': video_url, # 視頻播放地址'author_id': author['id'], # 作者的唯一 ID'author_name': author_name[0], # 作者的 TikTok 賬號(uniqueId,即用戶名)#... 略'author_following_count': author_stats['followingCount'], # 作者關注的人數'digg_count': stats['diggCount'], # 視頻的點贊(like)數量'share_count': stats['shareCount'], # 視頻的分享次數'comment_count': stats['commentCount'], # 視頻的評論數量'play_count': stats['playCount'], # 視頻的播放次數'collect_count': stats.get('collectCount', 0), # 視頻的收藏次數}# video_info['comments'] = self.get_comments(video_url)if 'challenges' in item:video_info['hashtags'] = ','.join([tag['title'] for tag in item['challenges']])else:video_info['hashtags'] = ''# 背景音樂if 'music' in item:music = item['music']video_info.update({'music_id': music['id'],'music_title': music['title'],'music_author': music['authorName'],'music_original': music['original']})video_data.append(video_info)resultList.append(f"https://www.tiktok.com/@{author['uniqueId']}")except Exception as e:logger.error(f"解析視頻數據時出錯: {str(e)}")continue# **追加寫入 JSON 文件**try:# 如果文件存在,讀取已有數據if os.path.exists(self.output_file):with open(self.output_file, 'r', encoding='utf-8') as f:try:existing_data = json.load(f)except json.JSONDecodeError:existing_data = [] # 如果 JSON 解析失敗,重置為空列表else:existing_data = []# 追加新數據existing_data.extend(video_data)# 保存回 JSON 文件with open(self.output_file, 'w', encoding='utf-8') as f:json.dump(existing_data, f, ensure_ascii=False, indent=4)logger.info(f"數據已{'追加' if existing_data else '保存'}到文件: {self.output_file}")except Exception as e:logger.error(f"保存 JSON 文件時出錯: {str(e)}")return resultListdef main(self, keyword: str, field: str, cookie_str: str, cursor: str = "0", search_id: Optional[str] = None) -> Dict[str, Any]:"""主函數,執行搜索并解析數據"""dataJson = self.get(keyword, cursor, search_id, cookie_str)if dataJson:if "error" in dataJson:return {"cursor": cursor, "search_id": search_id, "data": [], "status": "-2","error": dataJson["error"]}elif "verify_event" in str(dataJson):return {"cursor": cursor, "search_id": search_id, "data": [], "status": "-1"}else:if 'data' in dataJson:self.parse_data(dataJson['data'], keyword, field)return dataJson