Python異步爬蟲編程技巧:從入門到高級實戰指南

Python異步爬蟲編程技巧:從入門到高級實戰指南 🚀

📚 目錄

  1. 前言:為什么要學異步爬蟲
  2. 異步編程基礎概念
  3. 異步爬蟲核心技術棧
  4. 入門實戰:第一個異步爬蟲
  5. 進階技巧:并發控制與資源管理
  6. 高級實戰:分布式異步爬蟲架構
  7. 反爬蟲對抗策略
  8. 性能優化與監控
  9. 常見問題與解決方案
  10. 最佳實踐總結

前言:為什么要學異步爬蟲? 🤔

在我近幾年的爬蟲開發經驗中,見證了從最初的單線程順序爬取,到多線程并發,再到如今的異步編程范式的演進。記得在做新聞數據采集的時候,傳統的同步爬蟲需要2小時才能完成10萬個新聞的數據采集,而改用異步方案后,同樣的任務僅需15分鐘就能完成。

異步爬蟲的核心優勢在于:

  • 高并發能力:單進程可處理數千個并發請求
  • 資源利用率高:避免了線程切換開銷
  • 響應速度快:非阻塞IO讓程序更高效
  • 擴展性強:更容易構建大規模爬蟲系統

異步編程基礎概念 📖

什么是異步編程?

異步編程是一種編程范式,允許程序在等待某個操作完成時繼續執行其他任務,而不是阻塞等待。在爬蟲場景中,當我們發送HTTP請求時,不需要傻等服務器響應,而是可以繼續發送其他請求。

核心概念解析

協程(Coroutine):可以暫停和恢復執行的函數,是異步編程的基本單元。

事件循環(Event Loop):負責調度和執行協程的核心機制。

awaitable對象:可以被await關鍵字等待的對象,包括協程、Task和Future。

異步爬蟲核心技術棧 🛠?

在實際項目中,我通常會使用以下技術組合:

組件推薦庫用途
HTTP客戶端aiohttp異步HTTP請求
HTML解析BeautifulSoup4頁面內容提取
并發控制asyncio.Semaphore限制并發數量
數據存儲aiofiles異步文件操作
代理管理aiohttp-proxy代理池管理

入門實戰:第一個異步爬蟲 🎯

讓我們從一個簡單但實用的例子開始,爬取多個網頁的標題:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeclass AsyncSpider:def __init__(self, max_concurrent=10):"""初始化異步爬蟲:param max_concurrent: 最大并發數"""self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)async def __aenter__(self):"""異步上下文管理器入口"""# 創建aiohttp會話,設置連接池和超時參數connector = aiohttp.TCPConnector(limit=100,  # 總連接池大小limit_per_host=20,  # 單個host的連接數限制ttl_dns_cache=300,  # DNS緩存時間)timeout = aiohttp.ClientTimeout(total=30, connect=10)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""異步上下文管理器退出"""if self.session:await self.session.close()async def fetch_page(self, url):"""獲取單個頁面內容:param url: 目標URL:return: 頁面標題和URL的元組"""async with self.semaphore:  # 控制并發數try:async with self.session.get(url) as response:if response.status == 200:html = await response.text()soup = BeautifulSoup(html, 'html.parser')title = soup.find('title')title_text = title.get_text().strip() if title else "無標題"print(f"? 成功獲取: {url} - {title_text}")return url, title_textelse:print(f"? HTTP錯誤 {response.status}: {url}")return url, Noneexcept asyncio.TimeoutError:print(f"? 超時: {url}")return url, Noneexcept Exception as e:print(f"🚫 異常: {url} - {str(e)}")return url, Noneasync def crawl_urls(self, urls):"""批量爬取URL列表:param urls: URL列表:return: 結果列表"""print(f"🚀 開始爬取 {len(urls)} 個URL,最大并發數: {self.max_concurrent}")start_time = time.time()# 創建所有任務tasks = [self.fetch_page(url) for url in urls]# 并發執行所有任務results = await asyncio.gather(*tasks, return_exceptions=True)end_time = time.time()print(f"🎉 爬取完成,耗時: {end_time - start_time:.2f}秒")# 過濾掉異常結果valid_results = [r for r in results if isinstance(r, tuple) and r[1] is not None]print(f"📊 成功率: {len(valid_results)}/{len(urls)} ({len(valid_results)/len(urls)*100:.1f}%)")return valid_results# 使用示例
async def main():# 測試URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2', 'https://httpbin.org/delay/1','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json',]# 使用異步上下文管理器確保資源正確釋放async with AsyncSpider(max_concurrent=5) as spider:results = await spider.crawl_urls(urls)# 輸出結果print("\n📋 爬取結果:")for url, title in results:print(f"  {url} -> {title}")if __name__ == "__main__":asyncio.run(main())

這個基礎版本展示了異步爬蟲的核心要素:

  • 使用aiohttp進行異步HTTP請求
  • 通過Semaphore控制并發數量
  • 使用異步上下文管理器管理資源
  • 異常處理和超時控制

進階技巧:并發控制與資源管理 ?

在實際項目中,合理的并發控制和資源管理至關重要。以下是一個更高級的實現:

import asyncio
import aiohttp
import aiofiles
import json
import random
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, urlparse
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@dataclass
class CrawlResult:"""爬取結果數據類"""url: strstatus_code: inttitle: Optional[str] = Nonecontent_length: Optional[int] = Noneresponse_time: Optional[float] = Noneerror: Optional[str] = Noneclass AdvancedAsyncSpider:def __init__(self, config: Dict[str, Any]):"""高級異步爬蟲初始化:param config: 配置字典,包含各種爬蟲參數"""self.max_concurrent = config.get('max_concurrent', 10)self.retry_times = config.get('retry_times', 3)self.retry_delay = config.get('retry_delay', 1)self.request_delay = config.get('request_delay', 0)self.timeout = config.get('timeout', 30)self.session = Noneself.semaphore = asyncio.Semaphore(self.max_concurrent)self.results: List[CrawlResult] = []self.failed_urls: List[str] = []# 用戶代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',]async def __aenter__(self):"""創建會話"""connector = aiohttp.TCPConnector(limit=200,limit_per_host=50,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=60,enable_cleanup_closed=True)timeout = aiohttp.ClientTimeout(total=self.timeout)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,trust_env=True  # 支持代理環境變量)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""關閉會話"""if self.session:await self.session.close()# 等待連接完全關閉await asyncio.sleep(0.1)def get_random_headers(self) -> Dict[str, str]:"""生成隨機請求頭"""return {'User-Agent': random.choice(self.user_agents),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}async def fetch_with_retry(self, url: str) -> CrawlResult:"""帶重試機制的頁面獲取:param url: 目標URL:return: 爬取結果對象"""async with self.semaphore:for attempt in range(self.retry_times + 1):start_time = asyncio.get_event_loop().time()try:# 添加隨機延遲,避免被反爬蟲檢測if self.request_delay > 0:await asyncio.sleep(random.uniform(0, self.request_delay))headers = self.get_random_headers()async with self.session.get(url, headers=headers) as response:response_time = asyncio.get_event_loop().time() - start_timecontent = await response.text()# 解析標題title = Noneif 'text/html' in response.headers.get('content-type', ''):from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')title_tag = soup.find('title')if title_tag:title = title_tag.get_text().strip()result = CrawlResult(url=url,status_code=response.status,title=title,content_length=len(content),response_time=response_time)logger.info(f"? 成功: {url} (狀態碼: {response.status}, 耗時: {response_time:.2f}s)")return resultexcept asyncio.TimeoutError:error_msg = f"超時 (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"? {url} - {error_msg}")except aiohttp.ClientError as e:error_msg = f"客戶端錯誤: {str(e)} (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"🚫 {url} - {error_msg}")except Exception as e:error_msg = f"未知錯誤: {str(e)} (嘗試 {attempt + 1}/{self.retry_times + 1})"logger.error(f"💥 {url} - {error_msg}")# 如果不是最后一次嘗試,等待后重試if attempt < self.retry_times:await asyncio.sleep(self.retry_delay * (2 ** attempt))  # 指數退避# 所有重試都失敗了result = CrawlResult(url=url,status_code=0,error=f"重試 {self.retry_times} 次后仍然失敗")self.failed_urls.append(url)logger.error(f"? 最終失敗: {url}")return resultasync def crawl_batch(self, urls: List[str], callback=None) -> List[CrawlResult]:"""批量爬取URL:param urls: URL列表:param callback: 可選的回調函數,處理每個結果:return: 爬取結果列表"""logger.info(f"🚀 開始批量爬取 {len(urls)} 個URL")start_time = asyncio.get_event_loop().time()# 創建所有任務tasks = [self.fetch_with_retry(url) for url in urls]# 使用as_completed獲取完成的任務,可以實時處理結果results = []completed = 0for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)completed += 1# 調用回調函數if callback:await callback(result, completed, len(urls))# 顯示進度if completed % 10 == 0 or completed == len(urls):logger.info(f"📊 進度: {completed}/{len(urls)} ({completed/len(urls)*100:.1f}%)")total_time = asyncio.get_event_loop().time() - start_timesuccess_count = len([r for r in results if r.status_code > 0])logger.info(f"🎉 批量爬取完成")logger.info(f"📈 總耗時: {total_time:.2f}秒")logger.info(f"📊 成功率: {success_count}/{len(urls)} ({success_count/len(urls)*100:.1f}%)")self.results.extend(results)return resultsasync def save_results(self, filename: str):"""異步保存結果到JSON文件"""data = {'total_urls': len(self.results),'successful': len([r for r in self.results if r.status_code > 0]),'failed': len(self.failed_urls),'results': [asdict(result) for result in self.results],'failed_urls': self.failed_urls}async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))logger.info(f"💾 結果已保存到: {filename}")# 使用示例
async def progress_callback(result: CrawlResult, completed: int, total: int):"""進度回調函數"""if result.status_code > 0:print(f"? [{completed}/{total}] {result.url} - {result.title}")else:print(f"? [{completed}/{total}] {result.url} - {result.error}")async def advanced_demo():"""高級爬蟲演示"""config = {'max_concurrent': 20,  # 并發數'retry_times': 2,      # 重試次數'retry_delay': 1,      # 重試延遲'request_delay': 0.5,  # 請求間隔'timeout': 15,         # 超時時間}# 測試URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json','https://httpbin.org/xml','https://httpbin.org/html','https://httpbin.org/robots.txt',] * 3  # 重復3次,模擬更多URLasync with AdvancedAsyncSpider(config) as spider:# 批量爬取results = await spider.crawl_batch(urls, callback=progress_callback)# 保存結果await spider.save_results('crawl_results.json')# 統計信息successful = [r for r in results if r.status_code > 0]avg_response_time = sum(r.response_time or 0 for r in successful) / len(successful)print(f"\n📊 統計信息:")print(f"  成功: {len(successful)}")print(f"  失敗: {len(spider.failed_urls)}")print(f"  平均響應時間: {avg_response_time:.2f}秒")if __name__ == "__main__":asyncio.run(advanced_demo())

這個進階版本包含了:

  • 完整的重試機制與指數退避
  • 隨機請求頭和延遲,避免反爬蟲檢測
  • 實時進度回調和詳細日志
  • 結構化的結果存儲
  • 更好的資源管理和異常處理

高級實戰:分布式異步爬蟲架構 🏗?

對于大規模爬蟲項目,我們需要考慮分布式架構。以下是一個基于Redis的分布式爬蟲實現:

import asyncio
import aiohttp
import aioredis
import json
import hashlib
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import logginglogger = logging.getLogger(__name__)@dataclass
class Task:"""爬取任務數據結構"""url: strpriority: int = 0retry_count: int = 0max_retries: int = 3metadata: Dict[str, Any] = Nonedef __post_init__(self):if self.metadata is None:self.metadata = {}class DistributedSpider:"""分布式異步爬蟲"""def __init__(self, worker_id: str, redis_config: Dict[str, Any], spider_config: Dict[str, Any]):self.worker_id = worker_idself.redis_config = redis_configself.spider_config = spider_config# Redis連接self.redis = None# 隊列名稱self.task_queue = "spider:tasks"self.result_queue = "spider:results"self.failed_queue = "spider:failed"self.duplicate_set = "spider:duplicates"# HTTP會話self.session = Noneself.semaphore = asyncio.Semaphore(spider_config.get('max_concurrent', 10))# 統計信息self.stats = {'processed': 0,'success': 0,'failed': 0,'start_time': time.time()}async def __aenter__(self):"""初始化連接"""# 連接Redisself.redis = aioredis.from_url(f"redis://{self.redis_config['host']}:{self.redis_config['port']}",password=self.redis_config.get('password'),db=self.redis_config.get('db', 0),encoding='utf-8',decode_responses=True)# 創建HTTP會話connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)timeout = aiohttp.ClientTimeout(total=30)self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)logger.info(f"🚀 Worker {self.worker_id} 已啟動")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""清理資源"""if self.session:await self.session.close()if self.redis:await self.redis.close()logger.info(f"🛑 Worker {self.worker_id} 已停止")def generate_task_id(self, url: str) -> str:"""生成任務唯一ID"""return hashlib.md5(url.encode()).hexdigest()async def add_task(self, task: Task) -> bool:"""添加任務到隊列"""task_id = self.generate_task_id(task.url)# 檢查是否重復is_duplicate = await self.redis.sismember(self.duplicate_set, task_id)if is_duplicate:logger.debug(f"?? 重復任務: {task.url}")return False# 添加到去重集合await self.redis.sadd(self.duplicate_set, task_id)# 添加到任務隊列(使用優先級隊列)task_data = json.dumps(asdict(task))await self.redis.zadd(self.task_queue, {task_data: task.priority})logger.info(f"? 任務已添加: {task.url} (優先級: {task.priority})")return Trueasync def get_task(self) -> Optional[Task]:"""從隊列獲取任務"""# 使用BZPOPMAX獲取最高優先級任務(阻塞式)result = await self.redis.bzpopmax(self.task_queue, timeout=5)if not result:return Nonetask_data = json.loads(result[1])return Task(**task_data)async def process_task(self, task: Task) -> Dict[str, Any]:"""處理單個任務"""async with self.semaphore:start_time = time.time()try:# 發送HTTP請求headers = {'User-Agent': 'DistributedSpider/1.0','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'}async with self.session.get(task.url, headers=headers) as response:content = await response.text()response_time = time.time() - start_timeresult = {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': response.status,'content_length': len(content),'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'content': content[:1000],  # 只保存前1000字符'metadata': task.metadata}# 這里可以添加內容解析邏輯if response.status == 200:result['success'] = True# 解析頁面,提取新的URL(示例)new_urls = await self.extract_urls(content, task.url)result['extracted_urls'] = new_urlselse:result['success'] = Falseresult['error'] = f"HTTP {response.status}"return resultexcept Exception as e:response_time = time.time() - start_timereturn {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': 0,'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'success': False,'error': str(e),'metadata': task.metadata}async def extract_urls(self, content: str, base_url: str) -> List[str]:"""從頁面內容中提取URL(示例實現)"""try:from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')urls = []for link in soup.find_all('a', href=True):url = urljoin(base_url, link['href'])# 簡單過濾if url.startswith('http') and len(urls) < 10:urls.append(url)return urlsexcept Exception:return []async def save_result(self, result: Dict[str, Any]):"""保存處理結果"""if result['success']:# 保存成功結果await self.redis.lpush(self.result_queue, json.dumps(result))self.stats['success'] += 1# 如果有提取的URL,添加為新任務if 'extracted_urls' in result:for url in result['extracted_urls']:new_task = Task(url=url, priority=0, metadata={'parent_url': result['url']})await self.add_task(new_task)else:# 處理失敗的任務task_id = result['task_id']# 重試邏輯original_task = Task(url=result['url'],retry_count=result.get('retry_count', 0) + 1,metadata=result.get('metadata', {}))if original_task.retry_count <= original_task.max_retries:# 重新加入隊列,降低優先級original_task.priority = -original_task.retry_countawait self.add_task(original_task)logger.info(f"🔄 重試任務: {original_task.url} (第{original_task.retry_count}次)")else:# 超過重試次數,保存到失敗隊列await self.redis.lpush(self.failed_queue, json.dumps(result))logger.error(f"💀 任務最終失敗: {original_task.url}")self.stats['failed'] += 1async def run_worker(self, max_tasks: Optional[int] = None):"""運行工作進程"""logger.info(f"🏃 Worker {self.worker_id} 開始工作")processed = 0while True:if max_tasks and processed >= max_tasks:logger.info(f"🎯 達到最大任務數量: {max_tasks}")break# 獲取任務task = await self.get_task()if not task:logger.debug("? 暫無任務,等待中...")continue# 處理任務logger.info(f"🔧 處理任務: {task.url}")result = await self.process_task(task)# 保存結果await self.save_result(result)# 更新統計processed += 1self.stats['processed'] = processed# 定期輸出統計信息if processed % 10 == 0:await self.print_stats()async def print_stats(self):"""打印統計信息"""elapsed = time.time() - self.stats['start_time']rate = self.stats['processed'] / elapsed if elapsed > 0 else 0logger.info(f"📊 Worker {self.worker_id} 統計:")logger.info(f"  已處理: {self.stats['processed']}")logger.info(f"  成功: {self.stats['success']}")logger.info(f"  失敗: {self.stats['failed']}")logger.info(f"  速率: {rate:.2f} tasks/sec")logger.info(f"  運行時間: {elapsed:.2f}秒")# 使用示例
async def distributed_demo():"""分布式爬蟲演示"""redis_config = {'host': 'localhost','port': 6379,'password': None,'db': 0}spider_config = {'max_concurrent': 5,'request_delay': 1,}worker_id = f"worker-{int(time.time())}"async with DistributedSpider(worker_id, redis_config, spider_config) as spider:# 添加一些初始任務initial_urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/json','https://httpbin.org/html',]for url in initial_urls:task = Task(url=url, priority=10)  # 高優先級await spider.add_task(task)# 運行工作進程await spider.run_worker(max_tasks=20)if __name__ == "__main__":logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')asyncio.run(distributed_demo())

這個分布式版本實現了:

  • 基于Redis的任務隊列和結果存儲
  • 優先級任務調度
  • 自動去重機制
  • 失敗重試和降級處理
  • 多工作進程協作
  • 實時統計和監控

反爬蟲對抗策略 🛡?

在實際爬蟲開發中,反爬蟲對抗是必須面對的挑戰。以下是一些常用的策略:

import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from urllib.parse import urlparse
import jsonclass AntiAntiSpider:"""反反爬蟲工具類"""def __init__(self):# 用戶代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',]# 代理池self.proxy_pool = [# 'http://proxy1:port',# 'http://proxy2:port',# 可以從代理服務商API動態獲取]# 請求頻率控制self.domain_delays = {}  # 每個域名的延遲配置self.last_request_times = {}  # 每個域名的最后請求時間def get_random_user_agent(self) -> str:"""獲取隨機User-Agent"""return random.choice(self.user_agents)def get_proxy(self) -> Optional[str]:"""獲取代理(如果有的話)"""if self.proxy_pool:return random.choice(self.proxy_pool)return Noneasync def respect_robots_txt(self, session: aiohttp.ClientSession, url: str) -> bool:"""檢查robots.txt(可選實現)"""try:parsed_url = urlparse(url)robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"async with session.get(robots_url) as response:if response.status == 200:robots_content = await response.text()# 這里可以實現robots.txt解析邏輯# 簡化版本:檢查是否包含Disallowreturn 'Disallow: /' not in robots_contentexcept:passreturn True  # 默認允許async def rate_limit(self, url: str):"""頻率限制"""domain = urlparse(url).netloc# 獲取該域名的延遲配置(默認1-3秒)if domain not in self.domain_delays:self.domain_delays[domain] = random.uniform(1, 3)# 檢查上次請求時間if domain in self.last_request_times:elapsed = time.time() - self.last_request_times[domain]required_delay = self.domain_delays[domain]if elapsed < required_delay:sleep_time = required_delay - elapsedawait asyncio.sleep(sleep_time)# 更新最后請求時間self.last_request_times[domain] = time.time()def get_headers(self, url: str, referer: Optional[str] = None) -> Dict[str, str]:"""生成請求頭"""headers = {'User-Agent': self.get_random_user_agent(),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Cache-Control': 'max-age=0',}# 添加Referer(如果提供)if referer:headers['Referer'] = referer# 根據域名添加特定頭部domain = urlparse(url).netlocif 'github' in domain:headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'return headersclass StealthSpider:"""隱蔽爬蟲實現"""def __init__(self, max_concurrent: int = 3):self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)self.anti_anti = AntiAntiSpider()# 會話狀態管理self.cookies = {}self.session_data = {}async def __aenter__(self):# 創建更真實的連接器配置connector = aiohttp.TCPConnector(limit=50,limit_per_host=10,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=30,enable_cleanup_closed=True,# 模擬真實瀏覽器的連接行為family=0,  # 支持IPv4和IPv6)# 設置合理的超時timeout = aiohttp.ClientTimeout(total=30,connect=10,sock_read=20)# 創建會話,支持自動重定向和cookieself.session = aiohttp.ClientSession(connector=connector,timeout=timeout,cookie_jar=aiohttp.CookieJar(),# 支持自動解壓auto_decompress=True,# 信任環境變量中的代理設置trust_env=True)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()await asyncio.sleep(0.1)  # 確保連接完全關閉async def fetch_with_stealth(self, url: str, referer: Optional[str] = None) -> Dict:"""隱蔽模式獲取頁面"""async with self.semaphore:# 頻率限制await self.anti_anti.rate_limit(url)# 生成請求頭headers = self.anti_anti.get_headers(url, referer)# 獲取代理proxy = self.anti_anti.get_proxy()try:# 發送請求async with self.session.get(url, headers=headers, proxy=proxy,allow_redirects=True,max_redirects=5) as response:content = await response.text()return {'url': str(response.url),'status': response.status,'headers': dict(response.headers),'content': content,'cookies': {cookie.key: cookie.value for cookie in response.cookies.values()},'final_url': str(response.url),'redirects': len(response.history),'success': True}except Exception as e:return {'url': url,'status': 0,'error': str(e),'success': False}async def crawl_with_session_management(self, urls: List[str]) -> List[Dict]:"""帶會話管理的爬取"""results = []for i, url in enumerate(urls):# 使用前一個URL作為referer(模擬用戶行為)referer = urls[i-1] if i > 0 else Noneresult = await self.fetch_with_stealth(url, referer)results.append(result)# 模擬用戶閱讀時間if result['success']:reading_time = random.uniform(2, 8)print(f"📖 模擬閱讀時間: {reading_time:.1f}秒")await asyncio.sleep(reading_time)return results# 驗證碼處理示例(需要OCR服務)
class CaptchaHandler:"""驗證碼處理器"""async def solve_captcha(self, captcha_image_url: str, session: aiohttp.ClientSession) -> Optional[str]:"""解決驗證碼(示例實現)實際項目中可能需要:1. 第三方OCR服務2. 機器學習模型3. 人工打碼平臺"""try:# 下載驗證碼圖片async with session.get(captcha_image_url) as response:if response.status == 200:image_data = await response.read()# 這里應該調用OCR服務# 示例:使用第三方服務# result = await self.call_ocr_service(image_data)# return result# 臨時返回None,表示無法處理return Noneexcept Exception as e:print(f"驗證碼處理失敗: {e}")return Noneasync def handle_captcha_page(self, session: aiohttp.ClientSession, response_text: str, current_url: str):"""處理包含驗證碼的頁面"""# 檢測是否包含驗證碼if 'captcha' in response_text.lower() or '驗證碼' in response_text:print("🤖 檢測到驗證碼頁面")# 提取驗證碼圖片URL(需要根據具體網站實現)# 這里只是示例from bs4 import BeautifulSoupsoup = BeautifulSoup(response_text, 'html.parser')captcha_img = soup.find('img', {'id': 'captcha'})if captcha_img:captcha_url = captcha_img.get('src')if captcha_url:# 解決驗證碼captcha_result = await self.solve_captcha(captcha_url, session)if captcha_result:print(f"? 驗證碼識別結果: {captcha_result}")return captcha_resultprint("? 驗證碼處理失敗")return None# 使用示例
async def stealth_demo():"""隱蔽爬蟲演示"""urls = ['https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/cookies','https://httpbin.org/redirect/2',]async with StealthSpider(max_concurrent=2) as spider:results = await spider.crawl_with_session_management(urls)for result in results:if result['success']:print(f"? {result['url']} (狀態: {result['status']})")if result['redirects'] > 0:print(f"   重定向次數: {result['redirects']}")else:print(f"? {result['url']} - {result['error']}")if __name__ == "__main__":asyncio.run(stealth_demo())

性能優化與監控 📊

性能優化是異步爬蟲的關鍵環節,以下是一個完整的監控和優化方案:

import asyncio
import aiohttp
import time
import psutil
import gc
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
from collections import deque, defaultdict
import json@dataclass
class PerformanceMetrics:"""性能指標數據類"""timestamp: floatrequests_per_second: floataverage_response_time: floatmemory_usage_mb: floatcpu_usage_percent: floatactive_connections: intqueue_size: intsuccess_rate: floatclass PerformanceMonitor:"""性能監控器"""def __init__(self, window_size: int = 60):self.window_size = window_size  # 監控窗口大小(秒)self.metrics_history = deque(maxlen=window_size)self.request_times = deque()self.response_times = deque()self.success_count = 0self.total_count = 0self.start_time = time.time()# 連接池監控self.active_connections = 0self.queue_size = 0def record_request(self, response_time: float, success: bool):"""記錄請求指標"""current_time = time.time()self.request_times.append(current_time)self.response_times.append(response_time)if success:self.success_count += 1self.total_count += 1# 清理過期數據cutoff_time = current_time - self.window_sizewhile self.request_times and self.request_times[0] < cutoff_time:self.request_times.popleft()self.response_times.popleft()def get_current_metrics(self) -> PerformanceMetrics:"""獲取當前性能指標"""current_time = time.time()# 計算RPSrps = len(self.request_times) / self.window_size if self.request_times else 0# 計算平均響應時間avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0# 系統資源使用memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MBcpu_usage = psutil.Process().cpu_percent()# 成功率success_rate = self.success_count / self.total_count if self.total_count > 0 else 0metrics = PerformanceMetrics(timestamp=current_time,requests_per_second=rps,average_response_time=avg_response_time,memory_usage_mb=memory_usage,cpu_usage_percent=cpu_usage,active_connections=self.active_connections,queue_size=self.queue_size,success_rate=success_rate)self.metrics_history.append(metrics)return metricsdef print_stats(self):"""打印統計信息"""metrics = self.get_current_metrics()uptime = time.time() - self.start_timeprint(f"\n📊 性能監控報告 (運行時間: {uptime:.1f}s)")print(f"  RPS: {metrics.requests_per_second:.2f}")print(f"  平均響應時間: {metrics.average_response_time:.2f}s")print(f"  內存使用: {metrics.memory_usage_mb:.1f}MB")print(f"  CPU使用: {metrics.cpu_usage_percent:.1f}%")print(f"  活躍連接: {metrics.active_connections}")print(f"  隊列大小: {metrics.queue_size}")print(f"  成功率: {metrics.success_rate:.1%}")print(f"  總請求數: {self.total_count}")class OptimizedAsyncSpider:"""優化版異步爬蟲"""def __init__(self, config: Dict):self.config = configself.session = Noneself.semaphore = Noneself.monitor = PerformanceMonitor()# 連接池優化配置self.connector_config = {'limit': config.get('max_connections', 100),'limit_per_host': config.get('max_connections_per_host', 30),'ttl_dns_cache': config.get('dns_cache_ttl', 300),'use_dns_cache': True,'keepalive_timeout': config.get('keepalive_timeout', 60),'enable_cleanup_closed': True,# 優化TCP socket選項'socket_options': [(1, 6, 1),  # TCP_NODELAY] if hasattr(1, '__index__') else None}# 自適應并發控制self.adaptive_concurrency = config.get('adaptive_concurrency', True)self.min_concurrent = config.get('min_concurrent', 5)self.max_concurrent = config.get('max_concurrent', 50)self.current_concurrent = self.min_concurrent# 請求池self.request_pool = asyncio.Queue(maxsize=config.get('request_queue_size', 1000))# 響應時間統計(用于自適應調整)self.response_time_window = deque(maxlen=100)async def __aenter__(self):# 創建優化的連接器connector = aiohttp.TCPConnector(**self.connector_config)# 設置超時timeout = aiohttp.ClientTimeout(total=self.config.get('total_timeout', 30),connect=self.config.get('connect_timeout', 10),sock_read=self.config.get('read_timeout', 20))# 創建會話self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,# 啟用壓縮auto_decompress=True,# 設置最大響應大小read_bufsize=self.config.get('read_bufsize', 64 * 1024),)# 初始化信號量self.semaphore = asyncio.Semaphore(self.current_concurrent)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()# 強制垃圾回收gc.collect()async def adjust_concurrency(self):"""自適應并發調整"""if not self.adaptive_concurrency or len(self.response_time_window) < 10:return# 計算最近的平均響應時間recent_avg = sum(list(self.response_time_window)[-10:]) / 10overall_avg = sum(self.response_time_window) / len(self.response_time_window)# 如果最近響應時間明顯增加,降低并發if recent_avg > overall_avg * 1.5 and self.current_concurrent > self.min_concurrent:self.current_concurrent = max(self.min_concurrent, self.current_concurrent - 2)print(f"📉 降低并發數至: {self.current_concurrent}")# 如果響應時間穩定且較快,增加并發elif recent_avg < overall_avg * 0.8 and self.current_concurrent < self.max_concurrent:self.current_concurrent = min(self.max_concurrent, self.current_concurrent + 1)print(f"📈 提高并發數至: {self.current_concurrent}")# 更新信號量self.semaphore = asyncio.Semaphore(self.current_concurrent)async def fetch_optimized(self, url: str) -> Dict:"""優化的請求方法"""async with self.semaphore:start_time = time.time()try:# 更新連接數self.monitor.active_connections += 1async with self.session.get(url) as response:content = await response.text()response_time = time.time() - start_time# 記錄響應時間self.response_time_window.append(response_time)self.monitor.record_request(response_time, response.status == 200)return {'url': url,'status': response.status,'content': content,'response_time': response_time,'content_length': len(content),'success': response.status == 200}except Exception as e:response_time = time.time() - start_timeself.monitor.record_request(response_time, False)return {'url': url,'status': 0,'error': str(e),'response_time': response_time,'success': False}finally:self.monitor.active_connections -= 1async def batch_crawl_optimized(self, urls: List[str]) -> List[Dict]:"""優化的批量爬取"""print(f"🚀 開始優化爬取 {len(urls)} 個URL")# 分批處理,避免內存過載batch_size = self.config.get('batch_size', 100)all_results = []for i in range(0, len(urls), batch_size):batch_urls = urls[i:i + batch_size]print(f"📦 處理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")# 創建任務tasks = [self.fetch_optimized(url) for url in batch_urls]# 執行任務batch_results = await asyncio.gather(*tasks, return_exceptions=True)# 過濾異常valid_results = [r for r in batch_results if isinstance(r, dict)]all_results.extend(valid_results)# 自適應調整并發await self.adjust_concurrency()# 打印監控信息if i % (batch_size * 2) == 0:  # 每2個批次打印一次self.monitor.print_stats()# 批次間短暫休息,避免過載if i + batch_size < len(urls):await asyncio.sleep(0.1)return all_resultsasync def memory_cleanup(self):"""內存清理"""# 手動觸發垃圾回收gc.collect()# 清理過期的監控數據current_time = time.time()cutoff_time = current_time - 300  # 保留5分鐘的數據while (self.monitor.metrics_history and self.monitor.metrics_history[0].timestamp < cutoff_time):self.monitor.metrics_history.popleft()# 使用示例
async def performance_demo():"""性能優化演示"""config = {'max_connections': 50,'max_connections_per_host': 15,'max_concurrent': 20,'min_concurrent': 5,'adaptive_concurrency': True,'batch_size': 50,'total_timeout': 15,'connect_timeout': 5,}# 生成大量測試URLtest_urls = []for i in range(200):delay = i % 5 + 1  # 1-5秒延遲test_urls.append(f'https://httpbin.org/delay/{delay}')async with OptimizedAsyncSpider(config) as spider:# 啟動監控任務monitor_task = asyncio.create_task(periodic_monitoring(spider.monitor))try:# 執行爬取results = await spider.batch_crawl_optimized(test_urls)# 最終統計print(f"\n🎉 爬取完成!")print(f"📊 總URL數: {len(test_urls)}")print(f"📊 成功數: {len([r for r in results if r.get('success')])}")print(f"📊 失敗數: {len([r for r in results if not r.get('success')])}")spider.monitor.print_stats()finally:monitor_task.cancel()async def periodic_monitoring(monitor: PerformanceMonitor):"""定期監控任務"""while True:await asyncio.sleep(10)  # 每10秒監控一次try:metrics = monitor.get_current_metrics()# 檢查異常情況if metrics.memory_usage_mb > 500:  # 內存超過500MBprint("?? 內存使用過高!")if metrics.requests_per_second < 1 and monitor.total_count > 0:print("?? RPS過低,可能存在瓶頸!")if metrics.success_rate < 0.8 and monitor.total_count > 10:print("?? 成功率過低!")except Exception as e:print(f"監控異常: {e}")if __name__ == "__main__":asyncio.run(performance_demo())

常見問題與解決方案 ?

1. 內存泄露問題

問題:長時間運行后內存持續增長
解決方案

# 正確的資源管理
async with aiohttp.ClientSession() as session:# 使用完自動關閉pass# 定期垃圾回收
import gc
gc.collect()# 限制并發數量
semaphore = asyncio.Semaphore(10)

2. 連接池耗盡

問題:大量并發請求導致連接池耗盡
解決方案

connector = aiohttp.TCPConnector(limit=100,  # 增加連接池大小limit_per_host=20,  # 單host連接數限制ttl_dns_cache=300,  # DNS緩存enable_cleanup_closed=True  # 自動清理關閉的連接
)

3. 超時處理不當

問題:請求超時導致任務堆積
解決方案

timeout = aiohttp.ClientTimeout(total=30,      # 總超時connect=10,    # 連接超時sock_read=20   # 讀取超時
)

4. 異常傳播

問題:單個任務異常影響整體爬取
解決方案

# 使用return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)# 過濾異常結果
valid_results = [r for r in results if not isinstance(r, Exception)]

最佳實踐總結 🏆

經過近幾年的異步爬蟲開發經驗,我總結出以下最佳實踐:

1. 架構設計原則

  • 單一職責:每個組件只負責特定功能
  • 可擴展性:支持水平擴展和配置調整
  • 容錯性:優雅處理各種異常情況
  • 監控性:完整的性能監控和日志記錄

2. 性能優化要點

  • 合理的并發數:根據目標網站和網絡環境調整
  • 連接復用:使用連接池減少握手開銷
  • 內存管理:及時清理資源,避免內存泄露
  • 批處理:分批處理大量URL,避免過載

3. 反爬蟲對抗策略

  • 請求頭偽裝:模擬真實瀏覽器行為
  • 頻率控制:合理的請求間隔
  • IP輪換:使用代理池分散請求
  • 會話管理:維護登錄狀態和cookie

4. 錯誤處理機制

  • 重試策略:指數退避重試
  • 降級處理:失敗任務的備用方案
  • 監控告警:及時發現和處理問題
  • 日志記錄:詳細的操作日志

5. 數據質量保證

  • 去重機制:避免重復抓取
  • 數據驗證:確保抓取數據的完整性
  • 增量更新:只抓取變化的數據
  • 備份恢復:重要數據的備份策略

通過遵循這些最佳實踐,你可以構建出高效、穩定、可維護的異步爬蟲系統。記住,爬蟲開發不僅僅是技術問題,還需要考慮法律合規、網站負載、數據質量等多個方面。

異步爬蟲是一門實踐性很強的技術,建議在實際項目中不斷優化和完善。隨著Python異步生態的不斷發展,相信會有更多優秀的工具和庫出現,讓我們的爬蟲開發更加高效和便捷。

希望這份指南能夠幫助你在異步爬蟲的道路上走得更遠!🚀

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/88144.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/88144.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/88144.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

JMeter-SSE響應數據自動化3.0

背景 此次因為多了一些需要過濾排除的錯誤(數量很少)&#xff0c;還需要修改下JMeter的jtl文件輸出數據&#xff08;后續統計數據需要&#xff09; 所以只涉及到JSR腳本的一些改動(此部分改動并不會影響到JMeter的HTML報告) 改動 主要通過設置JMeter中prev輸出數據變量threadN…

012 進程狀態和優先級

&#x1f984; 個人主頁: 小米里的大麥-CSDN博客 &#x1f38f; 所屬專欄: Linux_小米里的大麥的博客-CSDN博客 &#x1f381; GitHub主頁: 小米里的大麥的 GitHub ?? 操作環境: Visual Studio 2022 文章目錄 進程狀態和優先級一、進程狀態分類特殊狀態說明 二、如何查看進程…

React JSX原理

JSX本質 實質上是React.createElement()的語法糖

Java-51 深入淺出 Tomcat 手寫 Tomcat 類加載機制 雙親委派機制 生命周期 插件化

點一下關注吧&#xff01;&#xff01;&#xff01;非常感謝&#xff01;&#xff01;持續更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持續更新中&#xff01;&#xff08;長期更新&#xff09; 目前2025年06月13日更新到&#xff1a; AI煉丹日志-28 - Aud…

從C++編程入手設計模式——責任鏈模式

從C編程入手設計模式——責任鏈模式 ? 當我們的一個請求需要多個對象去處理&#xff0c;但具體由誰來處理&#xff0c;是根據情況動態決定的。例如&#xff0c;一個日志系統中&#xff0c;可能希望把錯誤信息寫入文件&#xff0c;把提示信息輸出到控制臺&#xff0c;而不是每…

泛型方法調用需要顯示指定泛型類型的場景

泛型類型的推斷確定 一般來說&#xff0c;泛型類型的推斷可以由以下幾個場景確定&#xff1a; 變量定義指定類型 List<String> strList new ArrayList<>();ArrayList的泛型類型是依據變量的類型確定的。 方法返回值確定 Overridepublic Function<List<I…

Deep Research:開啟深度研究的智能新時代

在當今信息爆炸的時代&#xff0c;人們面臨著海量的信息&#xff0c;無論是專業人士還是普通消費者&#xff0c;都迫切需要一種高效、精準的方式來獲取和分析信息。OpenAI 推出的 Deep Research&#xff0c;宛如一顆璀璨的新星&#xff0c;在知識的海洋中為我們導航&#xff0c…

曼昆《經濟學原理》第九版 宏觀經濟學 第二十四章失業與自然失業率

以下是曼昆《經濟學原理》第九版宏觀經濟學第二十四章**“失業與自然失業率”**的詳細講解&#xff0c;從零基礎開始構建知識框架&#xff0c;結合中國實際案例與生活化比喻&#xff0c;幫助小白系統理解核心概念&#xff1a; 一、知識框架&#xff1a;失業的“全景圖” 1. 核…

【軟考高級系統架構論文】論軟件系統架構風格

論文真題 請以“軟件系統架構風格”為論題,依次從以下三個方面進行論述: 1、概要敘述你參與分析和開發的軟件系統開發項目以及你所擔任的主要工作。 2、分析軟件系統開發中常用的軟件系統架構風格有哪些?詳細闡述每種風格的具體含義。 3、詳細說明在你所參與的軟件系統開發項…

LeetCode--35.搜索插入位置

解題思路&#xff1a; 1.獲取信息&#xff1a; 給定一個升序排列的數組和一個整數&#xff0c;要求查找該整數應該在數組中插入的位置 限制條件是&#xff0c;要求時間復雜度為O(log N) 2.分析題目&#xff1a; 時間復雜度要求O(log N)&#xff0c;那么就使用二分查找法&#x…

Unix、Linux、POSIX、Minix 區別與聯系

一、Unix&#xff1a;現代操作系統的技術原型 誕生&#xff1a;1969年貝爾實驗室&#xff0c;用C語言重寫后實現跨平臺&#xff08;1973年&#xff09;。核心設計&#xff1a; 一切皆文件&#xff08;設備/進程均抽象為文件&#xff09;。管道&#xff08;|&#xff09;和文本…

python計算長方形的周長 2025年3月青少年電子學會等級考試 中小學生python編程等級考試一級真題答案解析

python計算長方形的周長 2025年3月 python編程等級考試一級編程題 博主推薦 所有考級比賽學習相關資料合集【推薦收藏】 1、Python比賽 信息素養大賽Python編程挑戰賽 藍橋杯python選拔賽真題詳解 藍橋杯python省賽真題詳解 藍橋杯python國賽真題詳解 2、Python考級 p…

使用 RedisVL 進行復雜查詢

一、前置條件 在開始之前&#xff0c;請確保&#xff1a; 已安裝 redisvl 并激活相應的 Python 環境。運行 Redis 實例&#xff0c;且 RediSearch 版本 > 2.4。 二、初始化與數據加載 我們將使用一個包含用戶信息的數據集&#xff0c;字段包括 user、age、job、credit_s…

「Linux文件及目錄管理」vi、vim編輯器

知識點解析 vi/vim編輯器簡介 vi:Linux默認的文本編輯器,基于命令行操作,功能強大。vim:vi的增強版,支持語法高亮、多窗口編輯、插件擴展等功能。vi/vim基本模式 命令模式:默認模式,用于移動光標、復制、粘貼、刪除等操作。插入模式:按i進入,用于輸入文本。末行模式:…

電容器保護測控裝置如何選型?

在電力系統的無功補償環節&#xff0c;?電容器保護測控裝置是保障并聯電容器組安全穩定運行的核心設備。其選型需綜合考量保護需求、系統環境及擴展功能。以下是關鍵選型要素分析&#xff1a; ?一、明確核心功能需求? 電容器保護測控裝置&#xff0c;選型時需匹配電容器組實…

最近小峰一直在忙國際化項目,確實有點分身乏術... [特殊字符] 不過! 我正緊鑼密鼓準備一系列干貨文章/深度解析

本人詳解 大家晚上好呀&#xff01;&#x1f319; 最近小峰一直在忙國際化項目&#xff0c;確實有點分身乏術... &#x1f605; 不過&#xff01; 我正緊鑼密鼓準備一系列干貨文章/深度解析&#xff08;選一個更符合你內容的詞&#xff09;&#xff0c;很快就會和大家見面啦&am…

OpenCV CUDA模塊設備層-----設備端(GPU)線程塊級別的一個內存拷貝工具函數blockCopy()

操作系統&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 編程語言&#xff1a;C11 算法描述 在同一個線程塊&#xff08;thread block內&#xff0c;將 [beg, end) 范圍內的數據并行地復制到 out 開始的位置。 它使用了 CUDA 線程協作機制…

https沒有證書可以訪問嗎?外網怎么訪問內網?

沒有SSL證書的網站無法正常通過HTTPS協議訪問?。HTTPS的實現必須依賴有效的SSL證書完成加密握手&#xff0c;否則瀏覽器會直接阻斷連接或顯示嚴重的安全警告。?? 一、技術實現層面? ?HTTPS協議強制要求證書?。 HTTPS基于SSL/TLS協議實現加密通信&#xff0c;而SSL證書是…

Python pytesseract【OCR引擎庫】 簡介

想全面了解DeepSeek的看過來 【包郵】DeepSeek全攻略 人人需要的AI通識課 零基礎掌握DeepSeek的實用操作手冊指南【限量作者親筆簽名版售完即止】 玩轉DeepSeek這本就夠了 【自營包郵】DeepSeek實戰指南 deepseek從入門到精通實用操作指南現代科技科普讀物AI普及知識讀物人工智…

ubuntu安裝postman教程并中文漢化詳細教程

一、下載postman安裝包 通過網盤分享的文件:Postman-linux-x64-8.7.0.tar.gz 鏈接: https://pan.baidu.com/s/10WYeguDJlK85cKJ6ptX01w?pwd=xqkh 提取碼: xqkh 二、解壓到/opt目錄 tar -zxvf Postman-linux-x64-8.7.0.tar.gz如果子用戶沒有/opt權限,可以給子用戶賦予/opt的…