目錄
一、環境
二、MySQL的連接和使用
2.1方式一:sql為主
2.1.1創建連接
2.1.2 表結構
2.1.3?新增數據
?編輯
2.1.4 查看數據
?編輯
2.1.5?修改數據
2.1.6?刪除數據
2.2方式二:orm對象關系映射
2.2.1 mysql連接
2.2.2 創建表
2.2.3 新增數據
?編輯
2.2.4 查詢數據
2.2.5 修改數據
?編輯
2.2.6 刪除數據
一、環境
工作中需要用到python和mysql數據庫,本次文檔記錄相關操作。
環境:windows10、python 3.11.7
mysql版本:5.7
二、MySQL的連接和使用
本人使用過的兩種方式
2.1方式一:sql為主
2.1.1創建連接
import sqlalchemy
from sqlalchemy.orm import scoped_session, sessionmakerUSERNAME = "root" # 用戶名
PASSWORD = "123456" # 密碼
ADDR = "localhost" # 連接地址 本地localhost 服務器就是服務器的地址XXX
PORT = "3306" # mysql端口號
DATABASE = "test" # 連接的數據庫名class MysqlSession:def __init__(self):self.create_connection()def create_connection(self):engine = sqlalchemy.create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",max_overflow=50, # 超過連接池大小外最多創建的連接pool_size=50, # 連接池大小pool_recycle=30 # 多久之后對線程池中的線程進行一次連接的回收(重置),, echo=False)self.connection = scoped_session(sessionmaker(bind=engine))def get_connection(self):return self.connectiondef ins(self, sql): # 新增數據return self.connection.execute(sql).lastrowiddef arr(self, sql): # 查詢多條數據return self.connection.execute(sql).fetchall()def obj(self, sql): # 查詢單個數據return self.connection.execute(sql).fetchone()def upd(self, sql): # 修改單個數據return self.connection.execute(sql)def dlt(self, sql): # 刪除數據return self.connection.execute(sql)def commit(self):self.connection.commit() # 提交self.connection.remove() # 結束會話
2.1.2 表結構
2.1.3?新增數據
mysql_session = MysqlSession()
connection = mysql_session.get_connection()
table_name = 'test.user' # 表名(這里是數據庫名+表名)
# 新增數據,返回的該條數據的id
name_remark = [{'name': 'Alice', 'remark': '可能是個女生'},{'name': 'Bob', 'remark': "可能是個男生"},{'name': 'Tammi'}
]
new_ids = []
for p in name_remark:name = p.get('name')remark = p.get('remark')sql = f"""insert into {table_name} (name,remark) values ('{name}','{remark}')"""user_id = mysql_session.ins(sql)new_ids.append(user_id)
print(new_ids)
mysql_session.commit()
2.1.4 查看數據
# 查看所有數據
sql = f"""select * from {table_name}"""
users = mysql_session.arr(sql)
for u in users:print(u)
# 查看指定數據
sql = f"""select * from {table_name} where name='Alice'"""
user = mysql_session.obj(sql)
print(user)
2.1.5?修改數據
# 修改指定數據
sql = f"""update {table_name} set name='Alice_changed' where id=16"""
mysql_session.upd(sql)
mysql_session.commit()
# 查看剛才修改的數據
sql = f"""select * from {table_name} where id=16 """
user = mysql_session.obj(sql)
print(user)
2.1.6?刪除數據
# 刪除指定數據
sql = f"""delete from {table_name} where id=16"""
mysql_session.dlt(sql)
mysql_session.commit()
# 查看所有數據
users = show_test(mysql_session, table_name)
print(users)
2.2方式二:orm對象關系映射
因為現目前工作中沒有用到這個(以前用django的時候有用到過orm),這里就簡單記錄一下測試情況。
2.2.1 mysql連接
mysql_session.py
import sqlalchemy
from sqlalchemy.orm import sessionmakerUSERNAME = "root" # 用戶名
PASSWORD = "123456" # 密碼
ADDR = "localhost" # 連接地址 本地localhost 服務器就是服務器的地址XXX
PORT = "3306" # mysql端口號
DATABASE = "test" # 連接的數據庫名class MysqlSession:def __init__(self):self.create_connection()def create_connection(self):self.engine = sqlalchemy.create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{ADDR}:{PORT}/{DATABASE}?charset=utf8",max_overflow=50, # 超過連接池大小外最多創建的連接pool_size=50, # 連接池大小pool_recycle=30, # 多久之后對線程池中的線程進行一次連接的回收(重置),echo=False)Session = sessionmaker(bind=self.engine)self.session = Session()def get_session(self):return self.sessiondef get_engine(self):return self.engine
2.2.2 創建表
models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_basefrom mysql_session import MysqlSessionBase = declarative_base()class User(Base):__tablename__ = 'person'id = Column(Integer, primary_key=True, autoincrement=True) # 主鍵 自增name = Column(String(255))age = Column(Integer)engine = MysqlSession().get_engine()
Base.metadata.create_all(engine) #創建上面的表,運行一次即可
2.2.3 新增數據
from models import Person
from mysql_session import MysqlSessionsession = MysqlSession().get_session()# 新增幾條數據
new_user = Person(name='張三', age=30)
session.add(new_user)
new_user = Person(name='李四', age=40)
session.add(new_user)
new_user = Person(name='王五', age=50)
session.add(new_user)
session.commit()
2.2.4 查詢數據
# 查詢所有數據
users = session.query(Person).all()
for user in users:print(user.name, user.age)
print("**************************************")
# 指定信息
user = session.query(Person).filter_by(name='李四').first()
print(user.name, user.age)
2.2.5 修改數據
# 修改數據
user = session.query(Person).filter_by(name='李四').first()
if user:user.age = 400 # 將年齡修改為400session.commit()print("updated success.")print(user.name, user.age)
else:print("not found.")
2.2.6 刪除數據
# 刪除所有年齡大于100的用戶
users = session.query(Person).filter(Person.age > 100).all()
for u in users:print(u.name, u.age)
for user in users:session.delete(user)
session.commit()