-
遇見的問題:
測試用例使用thrift資源和redis資源,單獨運行case沒有問題,但是使用并發pytest-xdist(-n 10 和 --dist=loadscope)運行失敗 -
原因:
測試用例間存在共享資源競爭(如 Redis、Thrift 連接)和測試類狀態未隔離 -
解決辦法:
原來的測試前置是通過傳統的setup來實現初始化,會導致資源共享
def setup(self):self.check = common_check.CommonCheck()self.req = common_req.CommonReq()self.driverId = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_one = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_two = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.passengerId_three = int(''.join(str(random.randint(0, 9)) for _ in range(10)))self.orderId = str(int(''.join(str(random.randint(0, 9)) for _ in range(10))))self.orderId_two = str(int(self.orderId) + 1)self.orderId_three = str(int(self.orderId)+ 2)self.travel_id = int(''.join(str(random.randint(0, 9)) for _ in range(10)))# self.redish = redis.StrictRedis(host=globalVar.g_fusion_ip, port=globalVar.g_fusion_port,password=globalVar.g_fusion_password, username=globalVar.g_fusion_username, db=0)self.redish = redis.StrictRedis(host=globalVar.g_redis_ip, port=globalVar.g_redis_port, db=0)try:transport = TSocket.TSocket(globalVar.crm_ip, globalVar.crm_port)transport.setTimeout(10000)transport = TTransport.TFramedTransport(transport)protocol = TBinaryProtocol.TBinaryProtocol(transport)self.client = Client(protocol)transport.open()self.trans = transportexcept Thrift.TException as tx:print('%s' % (tx.message))except Exception as ex:print('%s' % (ex.message))
現在通過fixture,為每個用例創建獨立資源 + 自動清理」,實現了用例間的完全隔離,從根本上避免了并發沖突
@pytest.fixture(autouse=True)def setup_isolated(self):self.test_uuid = str(uuid.uuid4()) # 測試用例唯一IDself.driverId = self._generate_unique_id()self.passengerId_one = self._generate_unique_id()self.passengerId_two = self._generate_unique_id()self.passengerId_three = self._generate_unique_id()self.orderId = str(self._generate_unique_id())self.orderId_two = str(int(self.orderId) + 1)self.orderId_three = str(int(self.orderId) + 2)self.travel_id = self._generate_unique_id()# 2. 初始化工具類(無狀態,可安全復用)self.check = common_check.CommonCheck()self.req = common_req.CommonReq()# 3. 初始化 Redis 連接(每個用例獨立連接,避免共享)self.redish = redis.StrictRedis(host=globalVar.g_redis_ip,port=globalVar.g_redis_port,db=0,decode_responses=True # 避免 bytes/str 類型混亂)# 4. 初始化 Thrift 客戶端(每個用例獨立連接,避免共享)self.transport = Noneself.client = Nonetry:self.transport = TSocket.TSocket(globalVar.crm_ip, globalVar.crm_port)self.transport.setTimeout(10000)self.transport = TTransport.TFramedTransport(self.transport)protocol = TBinaryProtocol.TBinaryProtocol(self.transport)self.client = Client(protocol)self.transport.open()except Thrift.TException as tx:pytest.fail("Thrift 連接初始化失敗:",tx.message)except Exception as ex:pytest.fail("未知錯誤: ",ex.message)# 5. 用例執行前的鉤子(yield 前為 setup,后為 teardown)yield# 6. 用例結束后清理資源(避免連接泄漏)if self.transport and self.transport.isOpen():self.transport.close()self.redish.close() # 關閉 Redis 連接@contextmanagerdef redis_lock(self, key, timeout=5):"""Redis 分布式鎖(解決多進程共享資源競爭)"""lock_key = "lock:{key}"lock_acquired = Falsetry:# 嘗試獲取鎖(NX=不存在才設置,PX=過期時間毫秒)lock_acquired = self.redish.set(lock_key, self.test_uuid, nx=True, px=timeout * 1000)if not lock_acquired:pytest.fail("獲取 Redis 鎖失敗(key: )" + lock_key + ",可能存在并發競爭")yield # 鎖內邏輯執行區finally:# 釋放鎖(僅刪除自己持有的鎖,避免誤刪其他進程的鎖)if lock_acquired:current_lock_val = self.redish.get(lock_key)if current_lock_val == self.test_uuid:self.redish.delete(lock_key)
生成安全的key:UUID前10位 + 時間戳,避免碰撞
def _generate_unique_id(self):"""生成并發安全的唯一ID(UUID前10位 + 時間戳,避免碰撞)"""timestamp = int(time.time() * 1000) # 毫秒級時間戳(確保時序唯一)uuid_part = int(str(uuid.uuid4()).replace('-', '')[:8], 16) # UUID前8位(16進制轉10進制)return int("{}{}".format(timestamp, uuid_part)[:10]) # 截取10位,符合原ID長度
@contextmanager 是 Python 標準庫 contextlib 模塊中的一個裝飾器,用于快速定義上下文管理器(Context Manager)。它的核心作用是簡化「資源獲取 - 使用 - 釋放」的流程,確保資源(如文件、數據庫連接、鎖等)在使用后被正確釋放,即使過程中發生異常
- redis枷鎖
@contextmanagerdef redis_lock(self, key, timeout=5):"""Redis 分布式鎖(解決多進程共享資源競爭)"""lock_key = "lock:{key}"lock_acquired = Falsetry:# 嘗試獲取鎖(NX=不存在才設置,PX=過期時間毫秒)lock_acquired = self.redish.set(lock_key, self.test_uuid, nx=True, px=timeout * 1000)if not lock_acquired:pytest.fail("獲取 Redis 鎖失敗(key: )" + lock_key + ",可能存在并發競爭")yield # 鎖內邏輯執行區finally:# 釋放鎖(僅刪除自己持有的鎖,避免誤刪其他進程的鎖)if lock_acquired:current_lock_val = self.redish.get(lock_key)if current_lock_val == self.test_uuid:self.redish.delete(lock_key)
在用到redis非刪除操作的地方,先判斷redis鎖是否釋放
- 需要導入的模塊:
import uuid
from contextlib import contextmanager