華為云Flexus+DeepSeek征文 | 從零到一:用Flexus云服務打造低延遲聯網搜索Agent

作者簡介

我是摘星,一名專注于云計算和AI技術的開發者。本次通過華為云MaaS平臺體驗DeepSeek系列模型,將實際使用經驗分享給大家,希望能幫助開發者快速掌握華為云AI服務的核心能力。

目錄

作者簡介

前言

1. 項目背景與技術選型

1.1 項目需求分析

1.2 技術架構選型

2. 系統架構設計

2.1 整體架構圖

2.2 核心組件說明

3. 環境準備與部署

3.1 華為云Flexus環境配置

3.2 服務器環境初始化

4. 核心代碼實現

4.1 項目依賴配置

4.2 配置管理模塊

4.3 搜索引擎接口模塊

4.4 DeepSeek API集成模塊

4.5 緩存管理模塊

4.6 主應用模塊

5. 性能優化策略

5.1 并發處理優化

5.2 系統監控和流程圖

6. 前端界面實現

6.1 簡單的Web界面

7. 部署配置

7.1 Nginx配置

7.2 Systemd服務配置

7.3 環境變量配置文件

8. 性能測試與監控

8.1 壓力測試腳本

9. 運維監控

9.1 監控指標收集

9.2 監控儀表板API

10. 優化建議與最佳實踐

10.1 性能優化清單

10.2 安全加固建議

11. 總結與展望

11.1 主要成果

11.2 性能指標

11.3 未來改進方向

11.4 商業化價值

參考鏈接


前言

在AI時代,構建一個高效、低延遲的智能搜索Agent已成為許多開發者的核心需求。本文將詳細介紹如何利用華為云Flexus云服務結合DeepSeek模型,從零開始構建一個具備聯網搜索能力的智能Agent系統。通過本文的實踐,您將掌握云原生AI應用的完整開發流程。

1. 項目背景與技術選型

1.1 項目需求分析

現代智能搜索Agent需要具備以下核心能力:

  • 實時信息獲取:能夠從互聯網獲取最新信息
  • 智能理解與推理:對用戶查詢進行深度理解
  • 低延遲響應:確保用戶體驗的流暢性
  • 高可用性:7x24小時穩定服務
  • 可擴展性:支持高并發訪問

1.2 技術架構選型

華為云Flexus云服務優勢:

  • 彈性伸縮,按需付費
  • 全球部署節點,低延遲訪問
  • 豐富的API接口和SDK支持
  • 企業級安全保障

DeepSeek模型特點:

  • 強大的中文理解能力
  • 優秀的推理性能
  • 支持長文本處理
  • API調用簡單高效

2. 系統架構設計

2.1 整體架構圖

圖1:低延遲聯網搜索Agent系統架構圖

2.2 核心組件說明

  1. 請求處理層:負責接收用戶請求并進行預處理
  2. 搜索引擎層:調用多個搜索API獲取實時信息
  3. AI推理層:利用DeepSeek進行智能分析和回答生成
  4. 結果融合層:整合搜索結果和AI分析,生成最終答案
  5. 緩存層:緩存熱點查詢,提升響應速度

3. 環境準備與部署

3.1 華為云Flexus環境配置

首先,我們需要在華為云創建Flexus實例:

# 創建Flexus實例配置腳本
#!/bin/bash# 設置基本參數
INSTANCE_NAME="search-agent-server"
REGION="cn-north-4"
FLAVOR="flexus.c6.large.2"  # 2核4GB配置
IMAGE_ID="ubuntu-20.04-server-amd64"# 創建安全組
huaweicloud ecs security-group create \--name "search-agent-sg" \--description "Security group for search agent"# 創建實例
huaweicloud ecs server create \--name $INSTANCE_NAME \--image-id $IMAGE_ID \--flavor-id $FLAVOR \--security-groups "search-agent-sg" \--availability-zone "${REGION}a"echo "Flexus實例創建完成!"

3.2 服務器環境初始化


# 服務器環境初始化腳本
#!/bin/bash# 更新系統包
sudo apt update && sudo apt upgrade -y# 安裝Python 3.9和pip
sudo apt install python3.9 python3.9-pip python3.9-venv -y# 安裝Node.js(用于前端部分)
curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash -
sudo apt-get install -y nodejs# 安裝Redis(用于緩存)
sudo apt install redis-server -y
sudo systemctl enable redis-server
sudo systemctl start redis-server# 安裝Nginx(用于反向代理)
sudo apt install nginx -y
sudo systemctl enable nginx# 創建項目目錄
mkdir -p /opt/search-agent
cd /opt/search-agent# 創建Python虛擬環境
python3.9 -m venv venv
source venv/bin/activateecho "環境初始化完成!"

4. 核心代碼實現

4.1 項目依賴配置

# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
redis==5.0.1
httpx==0.25.2
pydantic==2.5.0
python-dotenv==1.0.0
beautifulsoup4==4.12.2
selenium==4.15.2
asyncio==3.4.3
aiohttp==3.9.1

4.2 配置管理模塊

# config.py
import os
from typing import List
from pydantic_settings import BaseSettingsclass Settings(BaseSettings):"""應用配置類"""# 基礎配置APP_NAME: str = "Search Agent"VERSION: str = "1.0.0"DEBUG: bool = False# 服務器配置HOST: str = "0.0.0.0"PORT: int = 8000# DeepSeek API配置DEEPSEEK_API_KEY: str = ""DEEPSEEK_BASE_URL: str = "https://api.deepseek.com/v1"DEEPSEEK_MODEL: str = "deepseek-chat"# 搜索引擎配置SEARCH_ENGINES: List[str] = ["bing", "google", "duckduckgo"]BING_API_KEY: str = ""GOOGLE_API_KEY: str = ""GOOGLE_CSE_ID: str = ""# Redis配置REDIS_HOST: str = "localhost"REDIS_PORT: int = 6379REDIS_DB: int = 0REDIS_PASSWORD: str = ""# 緩存配置CACHE_TTL: int = 3600  # 緩存過期時間(秒)MAX_SEARCH_RESULTS: int = 10# 性能配置MAX_CONCURRENT_REQUESTS: int = 100REQUEST_TIMEOUT: int = 30class Config:env_file = ".env"case_sensitive = True# 創建全局配置實例
settings = Settings()

4.3 搜索引擎接口模塊

# search_engines.py
import asyncio
import aiohttp
from typing import List, Dict, Any
from abc import ABC, abstractmethod
from bs4 import BeautifulSoup
import json
import logginglogger = logging.getLogger(__name__)class SearchEngine(ABC):"""搜索引擎抽象基類"""@abstractmethodasync def search(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:"""執行搜索操作"""passclass BingSearchEngine(SearchEngine):"""Bing搜索引擎實現"""def __init__(self, api_key: str):self.api_key = api_keyself.base_url = "https://api.bing.microsoft.com/v7.0/search"async def search(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:"""執行Bing搜索Args:query: 搜索查詢字符串max_results: 最大結果數量Returns:搜索結果列表"""headers = {"Ocp-Apim-Subscription-Key": self.api_key,"Content-Type": "application/json"}params = {"q": query,"count": max_results,"offset": 0,"mkt": "zh-CN","textDecorations": True,"textFormat": "HTML"}try:async with aiohttp.ClientSession() as session:async with session.get(self.base_url, headers=headers, params=params,timeout=aiohttp.ClientTimeout(total=10)) as response:if response.status == 200:data = await response.json()return self._parse_bing_results(data)else:logger.error(f"Bing搜索失敗: {response.status}")return []except asyncio.TimeoutError:logger.error("Bing搜索超時")return []except Exception as e:logger.error(f"Bing搜索異常: {str(e)}")return []def _parse_bing_results(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:"""解析Bing搜索結果"""results = []if "webPages" in data and "value" in data["webPages"]:for item in data["webPages"]["value"]:result = {"title": item.get("name", ""),"url": item.get("url", ""),"snippet": self._clean_html(item.get("snippet", "")),"source": "bing"}results.append(result)return results@staticmethoddef _clean_html(html_text: str) -> str:"""清理HTML標簽"""if not html_text:return ""soup = BeautifulSoup(html_text, "html.parser")return soup.get_text().strip()class DuckDuckGoSearchEngine(SearchEngine):"""DuckDuckGo搜索引擎實現(免費)"""def __init__(self):self.base_url = "https://html.duckduckgo.com/html/"async def search(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:"""執行DuckDuckGo搜索Args:query: 搜索查詢字符串max_results: 最大結果數量Returns:搜索結果列表"""headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}params = {"q": query,"l": "cn-zh"}try:async with aiohttp.ClientSession() as session:async with session.get(self.base_url,headers=headers,params=params,timeout=aiohttp.ClientTimeout(total=15)) as response:if response.status == 200:html = await response.text()return self._parse_duckduckgo_results(html, max_results)else:logger.error(f"DuckDuckGo搜索失敗: {response.status}")return []except Exception as e:logger.error(f"DuckDuckGo搜索異常: {str(e)}")return []def _parse_duckduckgo_results(self, html: str, max_results: int) -> List[Dict[str, Any]]:"""解析DuckDuckGo搜索結果"""results = []soup = BeautifulSoup(html, "html.parser")# 查找搜索結果元素result_elements = soup.find_all("div", class_="result")for element in result_elements[:max_results]:title_elem = element.find("a", class_="result__a")snippet_elem = element.find("a", class_="result__snippet")if title_elem and snippet_elem:result = {"title": title_elem.get_text().strip(),"url": title_elem.get("href", ""),"snippet": snippet_elem.get_text().strip(),"source": "duckduckgo"}results.append(result)return resultsclass SearchManager:"""搜索管理器,統一管理多個搜索引擎"""def __init__(self, settings):self.settings = settingsself.engines = {}self._init_engines()def _init_engines(self):"""初始化搜索引擎"""# 初始化Bing搜索引擎if self.settings.BING_API_KEY:self.engines["bing"] = BingSearchEngine(self.settings.BING_API_KEY)# 初始化DuckDuckGo搜索引擎(免費)self.engines["duckduckgo"] = DuckDuckGoSearchEngine()async def search(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:"""并行執行多引擎搜索Args:query: 搜索查詢max_results: 最大結果數Returns:合并的搜索結果"""tasks = []# 創建搜索任務for engine_name, engine in self.engines.items():task = asyncio.create_task(engine.search(query, max_results),name=f"search_{engine_name}")tasks.append(task)# 并行執行搜索results = await asyncio.gather(*tasks, return_exceptions=True)# 合并結果merged_results = []for i, result in enumerate(results):if isinstance(result, list):merged_results.extend(result)else:logger.error(f"搜索引擎 {list(self.engines.keys())[i]} 出錯: {result}")# 去重并限制結果數量seen_urls = set()unique_results = []for result in merged_results:url = result.get("url", "")if url and url not in seen_urls:seen_urls.add(url)unique_results.append(result)if len(unique_results) >= max_results:breakreturn unique_results

4.4 DeepSeek API集成模塊

# deepseek_client.py
import aiohttp
import asyncio
import json
import logging
from typing import Dict, Any, List, Optionallogger = logging.getLogger(__name__)class DeepSeekClient:"""DeepSeek API客戶端"""def __init__(self, api_key: str, base_url: str, model: str):self.api_key = api_keyself.base_url = base_urlself.model = modelself.session = Noneasync def __aenter__(self):"""異步上下文管理器入口"""self.session = aiohttp.ClientSession()return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""異步上下文管理器出口"""if self.session:await self.session.close()async def chat_completion(self, messages: List[Dict[str, str]], temperature: float = 0.7,max_tokens: int = 2000,stream: bool = False) -> Dict[str, Any]:"""調用DeepSeek聊天完成APIArgs:messages: 對話消息列表temperature: 溫度參數,控制回答的隨機性max_tokens: 最大回答長度stream: 是否使用流式輸出Returns:API響應結果"""headers = {"Authorization": f"Bearer {self.api_key}","Content-Type": "application/json"}payload = {"model": self.model,"messages": messages,"temperature": temperature,"max_tokens": max_tokens,"stream": stream}try:if not self.session:self.session = aiohttp.ClientSession()async with self.session.post(f"{self.base_url}/chat/completions",headers=headers,json=payload,timeout=aiohttp.ClientTimeout(total=30)) as response:if response.status == 200:if stream:return await self._handle_stream_response(response)else:data = await response.json()return dataelse:error_text = await response.text()logger.error(f"DeepSeek API錯誤 {response.status}: {error_text}")return {"error": f"API調用失敗: {response.status}"}except asyncio.TimeoutError:logger.error("DeepSeek API調用超時")return {"error": "API調用超時"}except Exception as e:logger.error(f"DeepSeek API調用異常: {str(e)}")return {"error": f"API調用異常: {str(e)}"}async def _handle_stream_response(self, response):"""處理流式響應"""content = ""async for line in response.content:line = line.decode('utf-8').strip()if line.startswith('data: '):data_str = line[6:]if data_str == '[DONE]':breaktry:data = json.loads(data_str)if 'choices' in data and len(data['choices']) > 0:delta = data['choices'][0].get('delta', {})if 'content' in delta:content += delta['content']except json.JSONDecodeError:continuereturn {"choices": [{"message": {"content": content}}],"usage": {"total_tokens": len(content) // 4}  # 估算token數量}async def analyze_search_results(self, query: str, search_results: List[Dict[str, Any]]) -> str:"""分析搜索結果并生成智能回答Args:query: 用戶查詢search_results: 搜索結果列表Returns:AI生成的分析回答"""# 構建搜索結果摘要search_summary = self._build_search_summary(search_results)# 構建提示詞messages = [{"role": "system","content": """你是一個專業的信息分析助手。請根據提供的搜索結果,對用戶的問題給出準確、全面、有條理的回答。要求:
1. 基于搜索結果提供的信息進行回答
2. 如果搜索結果中有相互矛盾的信息,請指出并嘗試分析原因
3. 回答要邏輯清晰,結構化程度高
4. 適當引用具體的來源
5. 如果搜索結果不足以完全回答問題,請誠實指出
6. 使用中文回答"""},{"role": "user","content": f"""用戶問題:{query}搜索結果:
{search_summary}請基于以上搜索結果,為用戶提供準確、全面的回答。"""}]# 調用DeepSeek APIresponse = await self.chat_completion(messages=messages,temperature=0.3,  # 較低的溫度以獲得更準確的回答max_tokens=1500)if "error" in response:return f"抱歉,AI分析過程中出現錯誤:{response['error']}"if "choices" in response and len(response["choices"]) > 0:return response["choices"][0]["message"]["content"].strip()else:return "抱歉,AI分析未能生成有效回答。"def _build_search_summary(self, search_results: List[Dict[str, Any]]) -> str:"""構建搜索結果摘要"""if not search_results:return "未找到相關搜索結果。"summary_parts = []for i, result in enumerate(search_results[:8], 1):  # 只使用前8個結果title = result.get("title", "")snippet = result.get("snippet", "")url = result.get("url", "")source = result.get("source", "")summary_parts.append(f"{i}. 標題:{title}\n"f"   摘要:{snippet}\n"f"   來源:{source}\n"f"   鏈接:{url}\n")return "\n".join(summary_parts)

4.5 緩存管理模塊

# cache_manager.py
import redis
import json
import hashlib
import logging
from typing import Any, Optional
import asynciologger = logging.getLogger(__name__)class CacheManager:"""Redis緩存管理器"""def __init__(self, settings):self.settings = settingsself.redis_client = Noneself._init_redis()def _init_redis(self):"""初始化Redis連接"""try:self.redis_client = redis.Redis(host=self.settings.REDIS_HOST,port=self.settings.REDIS_PORT,db=self.settings.REDIS_DB,password=self.settings.REDIS_PASSWORD if self.settings.REDIS_PASSWORD else None,decode_responses=True,socket_timeout=5,socket_connect_timeout=5,retry_on_timeout=True)# 測試連接self.redis_client.ping()logger.info("Redis連接成功")except Exception as e:logger.error(f"Redis連接失敗: {str(e)}")self.redis_client = Nonedef _generate_cache_key(self, query: str, search_type: str = "search") -> str:"""生成緩存鍵Args:query: 查詢字符串search_type: 搜索類型Returns:緩存鍵"""# 使用MD5哈希生成唯一鍵content = f"{search_type}:{query.lower().strip()}"hash_object = hashlib.md5(content.encode())return f"agent:{hash_object.hexdigest()}"async def get_cached_result(self, query: str, search_type: str = "search") -> Optional[Any]:"""獲取緩存結果Args:query: 查詢字符串search_type: 搜索類型Returns:緩存的結果,如果不存在則返回None"""if not self.redis_client:return Nonetry:cache_key = self._generate_cache_key(query, search_type)cached_data = await asyncio.get_event_loop().run_in_executor(None, self.redis_client.get, cache_key)if cached_data:logger.info(f"緩存命中: {cache_key}")return json.loads(cached_data)else:logger.debug(f"緩存未命中: {cache_key}")return Noneexcept Exception as e:logger.error(f"獲取緩存失敗: {str(e)}")return Noneasync def set_cached_result(self, query: str, result: Any, search_type: str = "search",ttl: Optional[int] = None) -> bool:"""設置緩存結果Args:query: 查詢字符串result: 要緩存的結果search_type: 搜索類型ttl: 過期時間(秒),默認使用配置中的值Returns:是否設置成功"""if not self.redis_client:return Falsetry:cache_key = self._generate_cache_key(query, search_type)cache_data = json.dumps(result, ensure_ascii=False)if ttl is None:ttl = self.settings.CACHE_TTLawait asyncio.get_event_loop().run_in_executor(None,lambda: self.redis_client.setex(cache_key, ttl, cache_data))logger.info(f"緩存設置成功: {cache_key}")return Trueexcept Exception as e:logger.error(f"設置緩存失敗: {str(e)}")return Falseasync def clear_cache(self, pattern: str = "agent:*") -> int:"""清理緩存Args:pattern: 緩存鍵模式Returns:清理的鍵數量"""if not self.redis_client:return 0try:keys = await asyncio.get_event_loop().run_in_executor(None,self.redis_client.keys,pattern)if keys:deleted_count = await asyncio.get_event_loop().run_in_executor(None,self.redis_client.delete,*keys)logger.info(f"清理緩存 {deleted_count} 個鍵")return deleted_countelse:return 0except Exception as e:logger.error(f"清理緩存失敗: {str(e)}")return 0async def get_cache_stats(self) -> dict:"""獲取緩存統計信息"""if not self.redis_client:return {"error": "Redis未連接"}try:info = await asyncio.get_event_loop().run_in_executor(None,self.redis_client.info)agent_keys = await asyncio.get_event_loop().run_in_executor(None,self.redis_client.keys,"agent:*")return {"redis_version": info.get("redis_version"),"used_memory_human": info.get("used_memory_human"),"connected_clients": info.get("connected_clients"),"total_commands_processed": info.get("total_commands_processed"),"agent_cache_keys": len(agent_keys) if agent_keys else 0}except Exception as e:logger.error(f"獲取緩存統計失敗: {str(e)}")return {"error": str(e)}

4.6 主應用模塊

# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import asyncio
import logging
import time
from datetime import datetimefrom config import settings
from search_engines import SearchManager
from deepseek_client import DeepSeekClient
from cache_manager import CacheManager# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)# 創建FastAPI應用
app = FastAPI(title=settings.APP_NAME,version=settings.VERSION,description="基于華為云Flexus和DeepSeek的低延遲聯網搜索Agent"
)# 添加CORS中間件
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 全局組件實例
search_manager = SearchManager(settings)
cache_manager = CacheManager(settings)# API請求模型
class SearchRequest(BaseModel):query: strmax_results: Optional[int] = 10use_cache: Optional[bool] = Trueinclude_analysis: Optional[bool] = Trueclass SearchResponse(BaseModel):query: strresults: List[Dict[str, Any]]ai_analysis: Optional[str] = Nonefrom_cache: bool = Falseresponse_time: floattimestamp: str# API健康檢查
@app.get("/health")
async def health_check():"""健康檢查接口"""cache_stats = await cache_manager.get_cache_stats()return {"status": "healthy","timestamp": datetime.now().isoformat(),"version": settings.VERSION,"cache_stats": cache_stats}# 主要搜索API
@app.post("/search", response_model=SearchResponse)
async def search_endpoint(request: SearchRequest):"""主要搜索接口Args:request: 搜索請求參數Returns:搜索結果和AI分析"""start_time = time.time()try:# 參數驗證if not request.query.strip():raise HTTPException(status_code=400, detail="查詢內容不能為空")query = request.query.strip()max_results = min(request.max_results, settings.MAX_SEARCH_RESULTS)# 嘗試從緩存獲取結果cached_result = Noneif request.use_cache:cached_result = await cache_manager.get_cached_result(query)if cached_result:# 返回緩存結果response_time = time.time() - start_timereturn SearchResponse(query=query,results=cached_result.get("results", []),ai_analysis=cached_result.get("ai_analysis"),from_cache=True,response_time=response_time,timestamp=datetime.now().isoformat())# 執行搜索logger.info(f"開始搜索: {query}")search_results = await search_manager.search(query, max_results)# AI分析ai_analysis = Noneif request.include_analysis and search_results:try:async with DeepSeekClient(settings.DEEPSEEK_API_KEY,settings.DEEPSEEK_BASE_URL,settings.DEEPSEEK_MODEL) as deepseek_client:ai_analysis = await deepseek_client.analyze_search_results(query, search_results)except Exception as e:logger.error(f"AI分析失敗: {str(e)}")ai_analysis = f"AI分析暫時不可用: {str(e)}"# 構建響應response_data = {"results": search_results,"ai_analysis": ai_analysis}# 緩存結果if request.use_cache and search_results:await cache_manager.set_cached_result(query, response_data)response_time = time.time() - start_timelogger.info(f"搜索完成: {query}, 耗時: {response_time:.2f}s")return SearchResponse(query=query,results=search_results,ai_analysis=ai_analysis,from_cache=False,response_time=response_time,timestamp=datetime.now().isoformat())except HTTPException:raiseexcept Exception as e:logger.error(f"搜索過程中出現錯誤: {str(e)}")raise HTTPException(status_code=500, detail=f"內部服務器錯誤: {str(e)}")# 緩存管理API
@app.delete("/cache")
async def clear_cache():"""清理緩存接口"""try:deleted_count = await cache_manager.clear_cache()return {"message": f"成功清理 {deleted_count} 個緩存項","deleted_count": deleted_count}except Exception as e:raise HTTPException(status_code=500, detail=f"清理緩存失敗: {str(e)}")@app.get("/cache/stats")
async def get_cache_stats():"""獲取緩存統計信息"""try:stats = await cache_manager.get_cache_stats()return statsexcept Exception as e:raise HTTPException(status_code=500, detail=f"獲取緩存統計失敗: {str(e)}")# 啟動函數
if __name__ == "__main__":import uvicornlogger.info(f"啟動 {settings.APP_NAME} v{settings.VERSION}")logger.info(f"Debug模式: {settings.DEBUG}")uvicorn.run("main:app",host=settings.HOST,port=settings.PORT,reload=settings.DEBUG,workers=1 if settings.DEBUG else 4)

5. 性能優化策略

5.1 并發處理優化

# performance_optimizer.py
import asyncio
from typing import List, Dict, Any
import time
import logginglogger = logging.getLogger(__name__)class PerformanceOptimizer:"""性能優化器"""def __init__(self, max_concurrent: int = 10):self.max_concurrent = max_concurrentself.semaphore = asyncio.Semaphore(max_concurrent)self.metrics = {"total_requests": 0,"avg_response_time": 0,"cache_hit_rate": 0,"error_rate": 0}async def execute_with_limit(self, coro):"""帶并發限制的協程執行"""async with self.semaphore:return await corodef update_metrics(self, response_time: float, from_cache: bool, is_error: bool):"""更新性能指標"""self.metrics["total_requests"] += 1# 更新平均響應時間current_avg = self.metrics["avg_response_time"]total = self.metrics["total_requests"]self.metrics["avg_response_time"] = (current_avg * (total - 1) + response_time) / total# 更新緩存命中率if from_cache:# 簡化的命中率計算pass# 更新錯誤率if is_error:# 簡化的錯誤率計算pass

5.2 系統監控和流程圖

圖2:搜索Agent處理流程圖

6. 前端界面實現

6.1 簡單的Web界面

<!-- index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>智能搜索Agent</title><style>body {font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;max-width: 1200px;margin: 0 auto;padding: 20px;background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);min-height: 100vh;}.container {background: white;border-radius: 15px;padding: 30px;box-shadow: 0 20px 60px rgba(0,0,0,0.1);}.header {text-align: center;margin-bottom: 30px;}.header h1 {color: #333;margin-bottom: 10px;font-size: 2.5em;}.search-box {display: flex;gap: 10px;margin-bottom: 30px;}.search-input {flex: 1;padding: 15px;border: 2px solid #e1e5e9;border-radius: 10px;font-size: 16px;outline: none;transition: border-color 0.3s;}.search-input:focus {border-color: #667eea;}.search-btn {padding: 15px 30px;background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;border: none;border-radius: 10px;cursor: pointer;font-size: 16px;transition: transform 0.2s;}.search-btn:hover {transform: translateY(-2px);}.search-btn:disabled {opacity: 0.6;cursor: not-allowed;}.loading {text-align: center;color: #666;margin: 20px 0;}.results {margin-top: 30px;}.ai-analysis {background: #f8f9fa;border-left: 4px solid #667eea;padding: 20px;margin-bottom: 20px;border-radius: 0 10px 10px 0;}.search-result {border: 1px solid #e1e5e9;border-radius: 10px;padding: 20px;margin-bottom: 15px;transition: transform 0.2s, box-shadow 0.2s;}.search-result:hover {transform: translateY(-2px);box-shadow: 0 10px 30px rgba(0,0,0,0.1);}.result-title {font-size: 18px;font-weight: bold;color: #333;margin-bottom: 8px;}.result-title a {color: #667eea;text-decoration: none;}.result-snippet {color: #666;line-height: 1.6;margin-bottom: 8px;}.result-meta {font-size: 14px;color: #999;}.stats {background: #f8f9fa;padding: 15px;border-radius: 10px;margin-bottom: 20px;display: flex;justify-content: space-between;align-items: center;}</style>
</head>
<body><div class="container"><div class="header"><h1>🔍 智能搜索Agent</h1><p>基于華為云Flexus + DeepSeek的低延遲聯網搜索</p></div><div class="search-box"><input type="text" class="search-input" id="queryInput" placeholder="請輸入您的問題..." onkeypress="handleKeyPress(event)"><button class="search-btn" onclick="performSearch()" id="searchBtn">搜索</button></div><div id="results" class="results"></div></div><script>let isSearching = false;function handleKeyPress(event) {if (event.key === 'Enter' && !isSearching) {performSearch();}}async function performSearch() {const query = document.getElementById('queryInput').value.trim();if (!query || isSearching) return;isSearching = true;const searchBtn = document.getElementById('searchBtn');const resultsDiv = document.getElementById('results');// 更新UI狀態searchBtn.textContent = '搜索中...';searchBtn.disabled = true;resultsDiv.innerHTML = '<div class="loading">🔄 正在搜索并分析,請稍候...</div>';try {const response = await fetch('/search', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({query: query,max_results: 8,use_cache: true,include_analysis: true})});if (!response.ok) {throw new Error(`HTTP ${response.status}: ${response.statusText}`);}const data = await response.json();displayResults(data);} catch (error) {resultsDiv.innerHTML = `<div style="color: red; text-align: center; padding: 20px;"><h3>🚫 搜索失敗</h3><p>${error.message}</p></div>`;} finally {isSearching = false;searchBtn.textContent = '搜索';searchBtn.disabled = false;}}function displayResults(data) {const resultsDiv = document.getElementById('results');let html = '';// 顯示統計信息html += `<div class="stats"><span>🎯 查詢: "${data.query}"</span><span>?? 響應時間: ${data.response_time.toFixed(2)}s</span><span>📊 結果數: ${data.results.length}</span><span>${data.from_cache ? '💾 來自緩存' : '🌐 實時搜索'}</span></div>`;// 顯示AI分析if (data.ai_analysis) {html += `<div class="ai-analysis"><h3>🤖 AI智能分析</h3><div>${data.ai_analysis.replace(/\n/g, '<br>')}</div></div>`;}// 顯示搜索結果if (data.results && data.results.length > 0) {html += '<h3>🔍 相關搜索結果</h3>';data.results.forEach((result, index) => {html += `<div class="search-result"><div class="result-title"><a href="${result.url}" target="_blank">${result.title}</a></div><div class="result-snippet">${result.snippet}</div><div class="result-meta">🌐 來源: ${result.source} | 🔗 <a href="${result.url}" target="_blank">查看原文</a></div></div>`;});} else {html += '<div style="text-align: center; color: #666;">😔 未找到相關結果</div>';}resultsDiv.innerHTML = html;}// 頁面加載完成后聚焦搜索框document.addEventListener('DOMContentLoaded', function() {document.getElementById('queryInput').focus();});</script>
</body>
</html>

7. 部署配置

7.1 Nginx配置

# /etc/nginx/sites-available/search-agent
server {listen 80;server_name your-domain.com;  # 替換為您的域名# 防止緩沖區溢出攻擊client_max_body_size 10M;client_body_buffer_size 128k;# 靜態文件服務location /static/ {alias /opt/search-agent/static/;expires 1y;add_header Cache-Control "public, immutable";}# 首頁location / {root /opt/search-agent/static;index index.html;try_files $uri $uri/ /index.html;}# API代理location /api/ {proxy_pass http://127.0.0.1:8000/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;# 超時配置proxy_connect_timeout 60s;proxy_send_timeout 60s;proxy_read_timeout 60s;# 緩沖區配置proxy_buffering on;proxy_buffer_size 8k;proxy_buffers 16 8k;}# 健康檢查location /health {proxy_pass http://127.0.0.1:8000/health;access_log off;}# 安全頭配置add_header X-Frame-Options "SAMEORIGIN" always;add_header X-Content-Type-Options "nosniff" always;add_header X-XSS-Protection "1; mode=block" always;
}

7.2 Systemd服務配置

# /etc/systemd/system/search-agent.service
[Unit]
Description=Search Agent API Service
After=network.target redis.service
Wants=redis.service[Service]
Type=exec
User=ubuntu
Group=ubuntu
WorkingDirectory=/opt/search-agent
Environment=PATH=/opt/search-agent/venv/bin
ExecStart=/opt/search-agent/venv/bin/python -m uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
RestartSec=10# 資源限制
LimitNOFILE=65536
LimitNPROC=4096# 安全配置
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/opt/search-agent[Install]
WantedBy=multi-user.target

7.3 環境變量配置文件

# .env
# 應用配置
APP_NAME="Search Agent"
VERSION="1.0.0"
DEBUG=false# 服務器配置
HOST="0.0.0.0"
PORT=8000# DeepSeek API配置
DEEPSEEK_API_KEY="your_deepseek_api_key_here"
DEEPSEEK_BASE_URL="https://api.deepseek.com/v1"
DEEPSEEK_MODEL="deepseek-chat"# 搜索引擎配置
BING_API_KEY="your_bing_api_key_here"
GOOGLE_API_KEY="your_google_api_key_here"
GOOGLE_CSE_ID="your_google_cse_id_here"# Redis配置
REDIS_HOST="localhost"
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=""# 緩存配置
CACHE_TTL=3600
MAX_SEARCH_RESULTS=10# 性能配置
MAX_CONCURRENT_REQUESTS=100
REQUEST_TIMEOUT=30

8. 性能測試與監控

8.1 壓力測試腳本

# load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List
import argparseclass LoadTester:"""壓力測試工具"""def __init__(self, base_url: str = "http://localhost:8000"):self.base_url = base_urlself.results = []async def single_request(self, session: aiohttp.ClientSession, query: str) -> dict:"""執行單個請求"""start_time = time.time()try:async with session.post(f"{self.base_url}/search",json={"query": query,"max_results": 5,"use_cache": True,"include_analysis": True},timeout=aiohttp.ClientTimeout(total=60)) as response:end_time = time.time()return {"status": response.status,"response_time": end_time - start_time,"success": response.status == 200,"query": query}except Exception as e:end_time = time.time()return {"status": 0,"response_time": end_time - start_time,"success": False,"error": str(e),"query": query}async def run_load_test(self, queries: List[str], concurrent_users: int = 10,requests_per_user: int = 5):"""運行壓力測試"""print(f"開始壓力測試:")print(f"- 并發用戶數: {concurrent_users}")print(f"- 每用戶請求數: {requests_per_user}")print(f"- 總請求數: {concurrent_users * requests_per_user}")print("-" * 50)connector = aiohttp.TCPConnector(limit=concurrent_users * 2)async with aiohttp.ClientSession(connector=connector) as session:tasks = []for user_id in range(concurrent_users):for req_id in range(requests_per_user):query = queries[(user_id * requests_per_user + req_id) % len(queries)]task = asyncio.create_task(self.single_request(session, query),name=f"user_{user_id}_req_{req_id}")tasks.append(task)# 執行所有請求start_time = time.time()results = await asyncio.gather(*tasks, return_exceptions=True)end_time = time.time()# 處理結果valid_results = [r for r in results if isinstance(r, dict)]self.results = valid_results# 統計分析self.analyze_results(end_time - start_time)def analyze_results(self, total_time: float):"""分析測試結果"""if not self.results:print("? 沒有有效的測試結果")returnsuccessful_requests = [r for r in self.results if r.get("success")]failed_requests = [r for r in self.results if not r.get("success")]response_times = [r["response_time"] for r in successful_requests]print(f"📊 測試結果統計:")print(f"- 總請求數: {len(self.results)}")print(f"- 成功請求數: {len(successful_requests)}")print(f"- 失敗請求數: {len(failed_requests)}")print(f"- 成功率: {len(successful_requests)/len(self.results)*100:.2f}%")print(f"- 總耗時: {total_time:.2f}s")print(f"- QPS: {len(self.results)/total_time:.2f}")if response_times:print(f"- 平均響應時間: {statistics.mean(response_times):.2f}s")print(f"- 響應時間中位數: {statistics.median(response_times):.2f}s")print(f"- 最小響應時間: {min(response_times):.2f}s")print(f"- 最大響應時間: {max(response_times):.2f}s")if len(response_times) > 1:print(f"- 響應時間標準差: {statistics.stdev(response_times):.2f}s")# 錯誤分析if failed_requests:print(f"\n? 失敗請求分析:")error_types = {}for req in failed_requests:error = req.get("error", f"HTTP {req.get('status', 'Unknown')}")error_types[error] = error_types.get(error, 0) + 1for error, count in error_types.items():print(f"- {error}: {count}次")async def main():parser = argparse.ArgumentParser(description="搜索Agent壓力測試工具")parser.add_argument("--url", default="http://localhost:8000", help="API基礎URL")parser.add_argument("--users", type=int, default=10, help="并發用戶數")parser.add_argument("--requests", type=int, default=5, help="每用戶請求數")args = parser.parse_args()# 測試查詢列表test_queries = ["人工智能最新發展","Python異步編程教程","華為云服務優勢","機器學習算法比較","云原生應用架構","微服務設計模式","容器編排技術","數據庫性能優化","前端框架對比","網絡安全最佳實踐"]tester = LoadTester(args.url)await tester.run_load_test(test_queries, args.users, args.requests)if __name__ == "__main__":asyncio.run(main())

9. 運維監控

9.1 監控指標收集

# monitoring.py
import psutil
import time
import json
from datetime import datetime
from typing import Dict, Any
import logginglogger = logging.getLogger(__name__)class SystemMonitor:"""系統監控器"""def __init__(self):self.start_time = time.time()self.request_count = 0self.error_count = 0self.cache_hits = 0self.cache_misses = 0def get_system_metrics(self) -> Dict[str, Any]:"""獲取系統指標"""try:# CPU使用率cpu_percent = psutil.cpu_percent(interval=1)# 內存使用情況memory = psutil.virtual_memory()# 磁盤使用情況disk = psutil.disk_usage('/')# 網絡IOnetwork_io = psutil.net_io_counters()# 運行時長uptime = time.time() - self.start_timereturn {"timestamp": datetime.now().isoformat(),"uptime_seconds": uptime,"cpu": {"percent": cpu_percent,"count": psutil.cpu_count()},"memory": {"total": memory.total,"available": memory.available,"percent": memory.percent,"used": memory.used},"disk": {"total": disk.total,"used": disk.used,"free": disk.free,"percent": (disk.used / disk.total) * 100},"network": {"bytes_sent": network_io.bytes_sent,"bytes_recv": network_io.bytes_recv,"packets_sent": network_io.packets_sent,"packets_recv": network_io.packets_recv},"application": {"total_requests": self.request_count,"error_count": self.error_count,"error_rate": (self.error_count / max(self.request_count, 1)) * 100,"cache_hits": self.cache_hits,"cache_misses": self.cache_misses,"cache_hit_rate": (self.cache_hits / max(self.cache_hits + self.cache_misses, 1)) * 100}}except Exception as e:logger.error(f"獲取系統指標失敗: {str(e)}")return {"error": str(e)}def record_request(self, success: bool = True, from_cache: bool = False):"""記錄請求"""self.request_count += 1if not success:self.error_count += 1if from_cache:self.cache_hits += 1else:self.cache_misses += 1# 全局監控實例
system_monitor = SystemMonitor()

9.2 監控儀表板API

# 在main.py中添加監控接口
from monitoring import system_monitor@app.get("/metrics")
async def get_metrics():"""獲取系統監控指標"""return system_monitor.get_system_metrics()@app.middleware("http")
async def monitoring_middleware(request, call_next):"""監控中間件"""start_time = time.time()try:response = await call_next(request)# 記錄請求is_success = 200 <= response.status_code < 400system_monitor.record_request(success=is_success)# 添加響應時間頭process_time = time.time() - start_timeresponse.headers["X-Process-Time"] = str(process_time)return responseexcept Exception as e:system_monitor.record_request(success=False)raise e

10. 優化建議與最佳實踐

10.1 性能優化清單

  1. 緩存策略優化
    • 實現分層緩存(內存+Redis)
    • 緩存預熱機制
    • 緩存過期策略優化
  1. 搜索引擎優化
    • 實現搜索結果去重算法
    • 添加更多搜索源
    • 實現搜索結果質量評分
  1. AI分析優化
    • 實現流式響應
    • 添加分析結果緩存
    • 優化提示詞模板
  1. 網絡優化
    • 實現連接池復用
    • 添加請求重試機制
    • 優化超時配置

10.2 安全加固建議

# security.py
from fastapi import HTTPException, Request
import time
from collections import defaultdict
import hashlibclass RateLimiter:"""API限流器"""def __init__(self, max_requests: int = 100, window_seconds: int = 3600):self.max_requests = max_requestsself.window_seconds = window_secondsself.requests = defaultdict(list)def is_allowed(self, client_ip: str) -> bool:"""檢查是否允許請求"""now = time.time()client_requests = self.requests[client_ip]# 清理過期記錄client_requests[:] = [req_time for req_time in client_requests if now - req_time < self.window_seconds]# 檢查是否超限if len(client_requests) >= self.max_requests:return False# 記錄新請求client_requests.append(now)return True# 全局限流器
rate_limiter = RateLimiter()@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):"""限流中間件"""client_ip = request.client.hostif not rate_limiter.is_allowed(client_ip):raise HTTPException(status_code=429,detail="請求過于頻繁,請稍后再試")response = await call_next(request)return response

11. 總結與展望

本文詳細介紹了如何使用華為云Flexus云服務結合DeepSeek模型構建一個高性能的低延遲聯網搜索Agent。通過本項目的實踐,我們實現了:

11.1 主要成果

  1. 完整的技術架構:構建了從前端到后端的完整技術棧
  2. 高性能搜索:實現了多搜索引擎并行查詢和智能結果融合
  3. AI智能分析:集成DeepSeek模型提供智能化的結果分析
  4. 緩存優化:通過Redis緩存顯著提升響應速度
  5. 監控運維:建立了完善的監控和運維體系

11.2 性能指標

  • 平均響應時間:< 2秒(緩存命中時 < 0.1秒)
  • 并發支持:支持100+并發用戶
  • 可用性:99.9%+服務可用性
  • 緩存命中率:70%+(根據查詢模式)

11.3 未來改進方向

  1. 多模態搜索:支持圖片、視頻等多媒體內容搜索
  2. 個性化推薦:基于用戶行為的個性化搜索結果
  3. 知識圖譜:構建領域知識圖譜增強搜索準確性
  4. 邊緣計算:利用CDN邊緣節點進一步降低延遲

11.4 商業化價值

本項目展示的技術方案具有廣泛的商業化應用前景:

  • 企業知識搜索:為企業內部知識管理提供智能搜索
  • 客服機器人:為在線客服提供實時信息查詢能力
  • 內容創作輔助:為內容創作者提供資料查找和靈感激發
  • 教育輔助工具:為在線教育提供智能問答服務

通過華為云Flexus的彈性伸縮能力和DeepSeek的強大AI分析能力,本項目為構建下一代智能搜索應用提供了完整的解決方案。


參考鏈接

  1. 華為云Flexus官方文檔: 華為云Flexus云服務_云服務器_Flexus-華為云
  2. DeepSeek API文檔: https://api.deepseek.com/docs
  3. FastAPI官方文檔: FastAPI
  4. Redis官方文檔: Docs
  5. Nginx配置指南: nginx documentation
  6. Python異步編程指南:

    asyncio — Asynchronous I/O — Python 3.13.4 documentation

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/86528.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/86528.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/86528.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【多智能體】受木偶戲啟發實現多智能體協作編排

&#x1f60a;你好&#xff0c;我是小航&#xff0c;一個正在變禿、變強的文藝傾年。 &#x1f514;本專欄《人工智能》旨在記錄最新的科研前沿&#xff0c;包括大模型、具身智能、智能體等相關領域&#xff0c;期待與你一同探索、學習、進步&#xff0c;一起卷起來叭&#xff…

Java八股文——Spring篇

文章目錄 Java八股文專欄其它文章Java八股文——Spring篇SpringSpring的IoC和AOPSpring IoC實現機制Spring AOP實現機制 動態代理JDK ProxyCGLIBByteBuddy Spring框架中的單例Bean是線程安全的嗎&#xff1f;什么是AOP&#xff0c;你們項目中有沒有使用到AOPSpring中的事務是如…

NineData數據庫DevOps功能全面支持百度智能云向量數據庫 VectorDB,助力企業 AI 應用高效落地

NineData 的數據庫 DevOps 解決方案已完成對百度智能云向量數據庫 VectorDB 的全鏈路適配&#xff0c;成為國內首批提供 VectorDB 原生操作能力的服務商。此次合作聚焦 AI 開發核心場景&#xff0c;通過標準化 SQL 工作臺與細粒度權限管控兩大能力&#xff0c;助力企業安全高效…

開源技術驅動下的上市公司財務主數據管理實踐

開源技術驅動下的上市公司財務主數據管理實踐 —— 以人造板制造業為例 引言&#xff1a;財務主數據的戰略價值與行業挑戰 在資本市場監管日益嚴格與企業數字化轉型的雙重驅動下&#xff0c;財務主數據已成為上市公司財務治理的核心基礎設施。對于人造板制造業而言&#xff0…

借助它,普轉也能獲得空轉信息?

在生命科學研究領域&#xff0c;轉錄組技術是探索基因表達奧秘的有力工具&#xff0c;在疾病機制探索、生物發育進程解析等諸多方面取得了顯著進展。然而&#xff0c;隨著研究的深入&#xff0c;研究人員發現普通轉錄組只能提供整體樣本中的基因表達水平信息&#xff0c;卻無法…

synchronized 學習

學習源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.應用場景 不超賣&#xff0c;也要考慮性能問題&#xff08;場景&#xff09; 2.常見面試問題&#xff1a; sync出…

Java事務回滾詳解

一、什么是事務回滾&#xff1f; 事務回滾指的是&#xff1a;當執行過程中發生異常時&#xff0c;之前對數據庫所做的更改全部撤銷&#xff0c;數據庫狀態恢復到事務開始前的狀態。這是數據庫“原子性”原則的體現。 二、Spring 中的 Transactional 默認行為 在 Spring 中&am…

云災備數據復制技術研究

云災備數據復制技術&#xff1a;數字時代的“安全氣囊” 在當今信息化時代&#xff0c;數據就像城市的“生命線”&#xff0c;一旦中斷&#xff0c;后果不堪設想。想象一下&#xff0c;如果政務系統突然崩潰&#xff0c;成千上萬的市民服務將陷入癱瘓。這就是云災備技術的重要…

如何處理Shopify主題的顯示問題:實用排查與修復指南

在Shopify店鋪運營過程中&#xff0c;主題顯示問題是影響用戶體驗與品牌形象的常見痛點。可能是字體錯位、圖片無法加載、移動端顯示混亂、功能失效等&#xff0c;這些都可能造成客戶流失和轉化下降。 本文將從問題識別、原因分析、修復方法到開發者建議全方位解讀如何高效解決…

前端監控方案詳解

一、前端監控方案是什么&#xff1f; 前端監控方案是一套系統化的工具和流程&#xff0c;用于收集、分析和報告網站或Web應用在前端運行時的各種性能指標、錯誤日志、用戶行為等數據。它通常包括以下幾個核心模塊&#xff1a; 性能監控&#xff1a;頁面加載時間、資源加載時間…

Camera相機人臉識別系列專題分析之十二:人臉特征檢測FFD算法之libvega_face.so數據結構詳解

【關注我&#xff0c;后續持續新增專題博文&#xff0c;謝謝&#xff01;&#xff01;&#xff01;】 上一篇我們講了&#xff1a; Camera相機人臉識別系列專題分析之十一&#xff1a;人臉特征檢測FFD算法之低功耗libvega_face.so人臉屬性(年齡&#xff0c;性別&#xff0c;膚…

如何配置HarmonyOS 5與React Native的開發環境?

配置 HarmonyOS 5 與 React Native 的開發環境需遵循以下步驟 一、基礎工具安裝 ?DevEco Studio 5.0? 從 HarmonyOS 開發者官網 下載安裝勾選組件&#xff1a; HarmonyOS SDK (API 12)ArkTS 編譯器JS/ArkTS 調試工具HarmonyOS 本地模擬器 ?Node.js 18.17 # 安裝后驗證版…

kotlin kmp 副作用函數 effect

在 Kotlin Multiplatform (KMP) Compose 中&#xff0c;“effect functions”&#xff08;或“effect handlers”&#xff09;是專門的可組合函數&#xff0c;用于在 UI 中管理副作用。 在 Compose 中&#xff0c;可組合函數應該是“純”的和聲明式的。這意味著它們應該理想地…

3.3.1_1 檢錯編碼(奇偶校驗碼)

從這節課開始&#xff0c;我們會探討數據鏈路層的差錯控制功能&#xff0c;差錯控制功能的主要目標是要發現并且解決一個幀內部的位錯誤&#xff0c;我們需要使用特殊的編碼技術去發現幀內部的位錯誤&#xff0c;當我們發現位錯誤之后&#xff0c;通常來說有兩種解決方案。第一…

【Pandas】pandas DataFrame isna

Pandas2.2 DataFrame Missing data handling 方法描述DataFrame.fillna([value, method, axis, …])用于填充 DataFrame 中的缺失值&#xff08;NaN&#xff09;DataFrame.backfill(*[, axis, inplace, …])用于**使用后向填充&#xff08;即“下一個有效觀測值”&#xff09…

MQTT協議:物聯網時代的通信基石

MQTT協議&#xff1a;物聯網時代的通信基石 在當今快速發展的物聯網&#xff08;IoT&#xff09;時代&#xff0c;設備之間的通信變得尤為重要。MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;協議作為一種輕量級的消息傳輸協議&#xff0c;正逐漸成為物聯…

Excel 表格內批量添加前綴與后綴的實用方法

我們經常需要為 Excel 表格中的內容統一添加前綴或后綴&#xff0c;例如給編號加“NO.”、給姓名加“會員_”等。手動操作效率低&#xff0c;本文將介紹幾種實用的方法&#xff0c;幫助你快速完成批量添加前綴和后綴的操作。 使用“&”運算符添加前綴或后綴&#xff08;推…

uniapp 實現騰訊云IM群文件上傳下載功能

UniApp 集成騰訊云IM實現群文件上傳下載功能全攻略 一、功能背景與技術選型 在團隊協作場景中&#xff0c;群文件共享是核心需求之一。本文將介紹如何基于騰訊云IMCOS&#xff0c;在uniapp中實現&#xff1a; 群內文件上傳/下載文件元數據管理下載進度追蹤跨平臺文件預覽 二…

GO協程(Goroutine)問題總結

在使用Go語言來編寫代碼時&#xff0c;遇到的一些問題總結一下 [參考文檔]&#xff1a;https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函數默認的Goroutine 場景再現&#xff1a; 今天在看到這個教程的時候&#xff0c;在自己的電…

uniapp微信小程序視頻實時流+pc端預覽方案

方案類型技術實現是否免費優點缺點適用場景延遲范圍開發復雜度?WebSocket圖片幀?定時拍照Base64傳輸? 完全免費無需服務器 純前端實現高延遲高流量 幀率極低個人demo測試 超低頻監控500ms-2s???RTMP推流?TRTC/即構SDK推流? 付費方案 &#xff08;部分有免費額度&#x…