文章目錄
- 1. 分塊接收與流式處理
- 2. 異步處理
- 3. 內存映射與臨時文件
- 4. 數據庫優化
- 5. 緩存策略
- 6. 壓縮與格式優化
- 7. 限流與并發控制
- 8. 分布式存儲
- 9. 響應優化
- 10. 監控與錯誤處理
- 11. 數據庫連接池優化
1. 分塊接收與流式處理
使用流式處理避免將所有圖片加載到內存中:
from flask import Flask, request
import osapp = Flask(__name__)@app.route('/upload', methods=['POST'])
def upload_images():uploaded_files = request.files.getlist("images")# 流式處理,避免一次性加載所有文件到內存for file in uploaded_files:if file and allowed_file(file.filename):filename = secure_filename(file.filename)# 直接保存到磁盤,不加載到內存file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))return {'status': 'success', 'count': len(uploaded_files)}
2. 異步處理
使用異步任務隊列處理耗時操作:
from celery import Celery
from flask import Flask, requestapp = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379')@celery.task
def process_images_task(file_paths):# 在后臺處理圖片(壓縮、格式轉換等)results = []for file_path in file_paths:# 處理邏輯result = process_single_image(file_path)results.append(result)return results@app.route('/upload', methods=['POST'])
def upload_images():file_paths = []for file in request.files.getlist("images"):filename = secure_filename(file.filename)file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)file.save(file_path)file_paths.append(file_path)# 異步處理圖片task = process_images_task.delay(file_paths)return {'status': 'success', 'task_id': task.id}
3. 內存映射與臨時文件
使用內存映射和臨時文件減少內存占用:
import tempfile
import mmapdef process_large_image(file):# 創建臨時文件而不是加載到內存with tempfile.NamedTemporaryFile(delete=False) as tmp_file:file.save(tmp_file.name)# 使用內存映射處理大文件with open(tmp_file.name, 'r+b') as f:with mmap.mmap(f.fileno(), 0) as mmapped_file:# 處理映射的文件內容process_mapped_data(mmapped_file)# 清理臨時文件os.unlink(tmp_file.name)
4. 數據庫優化
批量插入和連接池管理:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker# 創建連接池
engine = create_engine('postgresql://user:password@localhost/db',pool_size=20,max_overflow=30
)
Session = sessionmaker(bind=engine)def batch_insert_image_records(image_data_list):session = Session()try:# 批量插入session.bulk_insert_mappings(ImageModel, image_data_list)session.commit()except Exception as e:session.rollback()raise efinally:session.close()
5. 緩存策略
使用Redis等緩存減少重復處理:
import redis
import hashlib
import jsonredis_client = redis.Redis(host='localhost', port=6379, db=0)def get_cached_result(file_hash):cached = redis_client.get(f"image_result:{file_hash}")return json.loads(cached) if cached else Nonedef cache_result(file_hash, result):redis_client.setex(f"image_result:{file_hash}",3600, # 1小時過期json.dumps(result))def process_image_with_cache(file):file_content = file.read()file_hash = hashlib.md5(file_content).hexdigest()# 檢查緩存cached_result = get_cached_result(file_hash)if cached_result:return cached_result# 處理圖片result = process_image_logic(file_content)# 緩存結果cache_result(file_hash, result)return result
6. 壓縮與格式優化
在服務器端進一步優化圖片:
from PIL import Image
import iodef optimize_image(file, max_size=(1920, 1080), quality=85):image = Image.open(file)# 調整尺寸image.thumbnail(max_size, Image.LANCZOS)# 優化并保存output = io.BytesIO()image.save(output, format='JPEG', quality=quality, optimize=True)output.seek(0)return output
7. 限流與并發控制
控制并發請求數量:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_addresslimiter = Limiter(app,key_func=get_remote_address,default_limits=["100 per hour"]
)@app.route('/upload', methods=['POST'])
@limiter.limit("10 per minute")
def upload_images():# 上傳處理邏輯pass
8. 分布式存儲
使用分布式文件系統存儲大量圖片:
import boto3
from botocore.exceptions import ClientErrors3_client = boto3.client('s3')def upload_to_s3(file, bucket, key):try:s3_client.upload_fileobj(file, bucket, key)return f"https://{bucket}.s3.amazonaws.com/{key}"except ClientError as e:print(f"Error uploading to S3: {e}")return Nonedef batch_upload_to_s3(files, bucket):urls = []for file in files:key = f"images/{secure_filename(file.filename)}"url = upload_to_s3(file, bucket, key)if url:urls.append(url)return urls
9. 響應優化
使用流式響應和壓縮:
from flask import Response
import json@app.route('/upload', methods=['POST'])
def upload_images_stream():def generate():yield '{"status": "processing", "files": ['files = request.files.getlist("images")for i, file in enumerate(files):# 處理每個文件result = process_file(file)yield json.dumps(result)if i < len(files) - 1:yield ","yield ']}'return Response(generate(), mimetype='application/json')
10. 監控與錯誤處理
集成監控和錯誤處理機制:
import logging
from prometheus_client import Counter, Histogram# 定義監控指標
upload_counter = Counter('image_uploads_total', 'Total image uploads')
upload_duration = Histogram('image_upload_duration_seconds', 'Image upload duration')@app.route('/upload', methods=['POST'])
@upload_duration.time()
def upload_images():try:files = request.files.getlist("images")upload_counter.inc(len(files))# 處理邏輯results = process_files(files)return {'status': 'success', 'count': len(results)}except Exception as e:logging.error(f"Upload error: {e}")return {'status': 'error', 'message': str(e)}, 500
11. 數據庫連接池優化
優化數據庫連接池配置:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePoolengine = create_engine('postgresql://user:password@localhost/db',poolclass=QueuePool,pool_size=20, # 連接池大小max_overflow=30, # 超出pool_size后最多可創建的連接數pool_recycle=3600, # 連接回收時間(秒)pool_pre_ping=True, # 檢查連接有效性pool_timeout=30 # 獲取連接超時時間
)