在設計 MMORPG(大規模多人在線角色扮演游戲)時,數據庫系統是游戲架構中至關重要的一部分。數據庫不僅承擔了游戲中各種數據(如玩家數據、物品數據、游戲世界狀態等)的存儲和管理任務,還必須高效地支持并發訪問、事務處理和復雜的查詢。為了確保系統的可擴展性和維護性,我們需要對數據庫操作進行封裝和模塊化設計。
為了實現上述目標,本文設計了一個基于 Twisted 的數據庫封裝系統。Twisted 是一個異步框架,適用于處理大量并發任務。結合 Twisted 和數據庫連接池(adbapi.ConnectionPool),我們可以高效地執行異步數據庫操作。
1. DatabaseError
作用:自定義異常類,用于在數據庫操作發生錯誤時拋出詳細的錯誤信息,包含錯誤消息和錯誤碼(默認為500)。2. DatabaseOperation
作用:抽象基類,定義了數據庫操作的統一接口。所有具體的數據庫操作類(如 SelectOperation, InsertOperation 等)都繼承自此類,必須實現 excute 方法來執行數據庫事務。
關鍵方法:
executeQuery:執行實際的 SQL 查詢。
handleSuccess:操作成功時的回調函數。
handleFailure:操作失敗時的回調函數。3. SelectOperation
作用:繼承自 DatabaseOperation,封裝了 SELECT 查詢操作。提供了 executeQuery 方法來執行 SQL 查詢,并返回查詢結果。
關鍵方法:
executeQuery:執行 SELECT 查詢,并根據提供的表名、列名、查詢條件等生成 SQL 語句。
excute:實現 DatabaseOperation 中的抽象方法,執行查詢操作。4. InsertOperation
作用:繼承自 DatabaseOperation,封裝了 INSERT 插入操作。通過 executeQuery 方法生成插入的 SQL 語句并執行。
關鍵方法:
executeQuery:構建并執行 INSERT SQL 語句,將數據插入指定的表。
excute:實現 DatabaseOperation 中的抽象方法,執行插入操作。5. UpdateOperation
作用:繼承自 DatabaseOperation,封裝了 UPDATE 更新操作。通過 executeQuery 方法生成更新的 SQL 語句并執行。
關鍵方法:
executeQuery:構建并執行 UPDATE SQL 語句,用新值更新指定的行。
excute:實現 DatabaseOperation 中的抽象方法,執行更新操作。6. DeleteOperation
作用:繼承自 DatabaseOperation,封裝了 DELETE 刪除操作。通過 executeQuery 方法生成刪除的 SQL 語句并執行。
關鍵方法:
executeQuery:構建并執行 DELETE SQL 語句,根據指定條件刪除記錄。
excute:實現 DatabaseOperation 中的抽象方法,執行刪除操作。
7. DatabaseManager
作用:負責數據庫連接池的管理和數據庫操作的執行。它使用 adbapi.ConnectionPool 創建數據庫連接池,執行操作并處理事務。
關鍵方法:
getConnection:返回數據庫連接池的實例。
executeOperation:接受一個數據庫操作對象,調用 runInteraction 方法來執行異步數據庫事務,并處理操作成功或失敗的回調。
8. GameQueryPlayerId
作用:繼承自 SelectOperation,封裝了查詢玩家信息的操作。它指定查詢條件為玩家的名稱,并通過 excute 方法執行查詢。
關鍵方法:
excute:執行 SelectOperation 中的 executeQuery 方法,查詢玩家 ID。
set_query_name:設置查詢的玩家名稱。
代碼
from twisted.enterprise import adbapi
from twisted.internet.defer import Deferred
import pymysql
import traceback
from twisted.internet import reactor
from functools import partial
from abc import ABC, abstractmethod
# 異常類定義
class DatabaseError(Exception):def __init__(self, message, code=500):self.message = messageself.code = code# 抽象的數據庫操作類
class IDatabaseOperation(ABC):@abstractmethoddef executeQuery(self, txn, table: str, columns: list, values: dict = {}, condition: dict = None) -> any:passdef handleSuccess(self, result: any):print("Operation succeeded with result:", result)def handleFailure(self, error: Exception):# 這里做一些額外的錯誤處理,比如記錄日志或者返回友好的錯誤信息print("Operation failed:", error)@abstractmethoddef excute(self, txn):#這里封裝代碼passclass ABC_SelectOperation(IDatabaseOperation):def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:try:column_str = ", ".join(columns) if columns else "*"query = f"SELECT {column_str} FROM {table}"if condition:condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])query += f" WHERE {condition_str}"print(f"Executing query: {query}, with values: {tuple(condition.values()) if condition else ()}")txn.execute(query, tuple(condition.values()) if condition else ()) # use condition values if anyresult = txn.fetchall()return resultexcept Exception as e:print(f"Error executing query: {e}")traceback.print_exc()# 返回一個失敗的結果以便繼續后續操作return {"error": str(e)}@abstractmethoddef excute(self, txn):passclass ABC_InsertOperation(IDatabaseOperation):def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:try:column_str = ", ".join(columns)placeholders = ", ".join(["%s"] * len(columns)) # Create placeholders based on column length# Extract values from the dictionary for each columnvalue_tuple = tuple(values[col] for col in columns)query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"print(f"Executing insert query: {query}, with values: {value_tuple}")txn.execute(query, value_tuple) # Use parameterized queryreturn txn.lastrowid # Return the inserted record IDexcept Exception as e:print(f"Error executing insert query: {e}")traceback.print_exc()# 返回一個失敗的結果以便繼續后續操作return {"error": str(e)}@abstractmethoddef excute(self, txn):passclass ABC_UpdateOperation(IDatabaseOperation):def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:try:if condition and not isinstance(condition, dict):raise TypeError("Condition must be a dictionary")set_str = ", ".join([f"{col} = %s" for col in columns])query = f"UPDATE {table} SET {set_str}"# Ensure that values is passed as a tuple for updatevalue_tuple = tuple(values[col] for col in columns)if condition:condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])query += f" WHERE {condition_str}"value_tuple += tuple(condition.values()) # Append condition valuesprint(f"Executing update query: {query}, with values: {value_tuple}")txn.execute(query, value_tuple) # Use parameterized querytxn.connection.commit()return txn.rowcount # Return the number of updated rowsexcept Exception as e:print(f"Error executing update query: {e}")traceback.print_exc()# 返回一個失敗的結果以便繼續后續操作return {"error": str(e)}@abstractmethoddef excute(self, txn):passclass ABC_DeleteOperation(IDatabaseOperation):def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:try:if not condition:raise ValueError("Condition for deletion cannot be empty.")condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])query = f"DELETE FROM {table} WHERE {condition_str}"print(f"Executing delete query: {query}, with values: {tuple(condition.values())}")txn.execute(query, tuple(condition.values())) # Use condition values for parameterized queryreturn txn.rowcountexcept Exception as e:print(f"Error executing delete query: {e}")traceback.print_exc()# 返回一個失敗的結果以便繼續后續操作return {"error": str(e)}@abstractmethoddef excute(self):pass# 數據庫管理類,負責數據庫連接池和事務
class DatabaseManager:def __init__(self, db_config):# 初始化數據庫連接池self.dbConnectionPool = adbapi.ConnectionPool("pymysql", **db_config)def getConnection(self):return self.dbConnectionPooldef executeOperation(self, operation: DatabaseOperation) -> Deferred:try:# 使用 partial 創建一個指定了參數的函數deferred = self.dbConnectionPool.runInteraction(operation.excute)deferred.addCallback(operation.handleSuccess)deferred.addErrback(operation.handleFailure)return deferredexcept Exception as e:error = DatabaseError(str(e), 500)operation.handleFailure(error)return None# 示例數據庫配置
db_config = {'host': 'localhost','user': 'root','password': 'root','database': 'test',
}class GameQueryPlayerId(ABC_SelectOperation):def __init__(self):self.select_columns = ["name", "id"]self.elect_condition = {"name": "new_name"} # 在此給出查詢條件def excute(self, txn):# 執行查詢操作return self.executeQuery(txn,"test", columns=self.select_columns, condition=self.elect_condition )def set_query_name(self, name):self.elect_condition["name"] = name# 示例操作
def main():# 創建DatabaseManager實例db_manager = DatabaseManager(db_config)ccGameQueryPlayerId = GameQueryPlayerId()ccGameQueryPlayerId.set_query_name("new_name")deferred = db_manager.executeOperation(ccGameQueryPlayerId)reactor.run()if __name__ == "__main__":main()