一、前言
最近做web網站的測試,遇到很多需要批量造數據的功能;比如某個頁面展示數據條數需要達到10000條進行測試,此時手動構造數據肯定是不可能的,此時只能通過python腳本進行自動構造數據;本次構造數據主要涉及到在某個表里面批量添加數據、在關聯的幾個表中同步批量添加數據、批量查詢某個表中符合條件的數據、批量更新某個表中符合條件的數據等。
二、數據添加
即批量添加數據到某個表中。
insert_data.py
import pymysql
import random
import time
from get_userinfo import get_userinfo
from get_info import get_info
from get_tags import get_tags
from get_tuser_id import get_utag
class DatabaseAccess():
def __init__(self):
self.__db_host = "xxxxx"
self.__db_port = 3307
self.__db_user = "root"
self.__db_password = "123456"
self.__db_database = "xxxxxx"
# 連接數據庫
def isConnectionOpen(self):
self.__db = pymysql.connect(
host=self.__db_host,
port=self.__db_port,
user=self.__db_user,
password=self.__db_password,
database=self.__db_database,
charset='utf8'
)
# 插入數據
def linesinsert(self,n,user_id,tags_id,created_at):
self.isConnectionOpen()
# 創建游標
global cursor
conn = self.__db.cursor()
try:
sql1 = '''
INSERT INTO `codeforge_new`.`cf_user_tag`(`id`, `user_id`,
`tag_id`, `created_at`, `updated_at`) VALUES ({}, {},
{}, '{}', '{}');
'''.format(n,user_id,tags_id,created_at,created_at)
# 執行SQL
conn.execute(sql1,)
except Exception as e:
print(e)
finally:
# 關閉游標
conn.close()
self.__db.commit()
self.__db.close()
def get_data(self):
# 生成對應數據 1000條
for i in range(0,1001):
created_at = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
# print(create_at)
# 用戶id
tuserids = []
tuserid_list = get_utag()
for tuserid in tuserid_list:
tuserids.append(tuserid[0])
# print(tuserids)
userid_list = get_userinfo()
user_id = random.choice(userid_list)[0]
if user_id not in tuserids:
user_id=user_id
# 標簽id
tagsid_list = get_tags()
tags_id = random.choice(tagsid_list)[0]
self.linesinsert(i,user_id,tags_id,created_at)
if __name__ == "__main__":
# 實例化對象
db=DatabaseAccess()
db.get_data()
二、數據批量查詢
select_data.py
import pymysql
import pandas as pd
import numpy as np
def get_tags():
# 連接數據庫,地址,端口,用戶名,密碼,數據庫名稱,數據格式
conn = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxxx',charset='utf8')
cur = conn.cursor()
# 表cf_users中獲取所有用戶id
sql = 'select id from cf_tags where id between 204 and 298'
# 將user_id列轉成列表輸出
df = pd.read_sql(sql,con=conn)
# 先使用array()將DataFrame轉換一下
df1 = np.array(df)
# 再將轉換后的數據用tolist()轉成列表
df2 = df1.tolist()
# cur.execute(sql)
# data = cur.fetchone()
# print(df)
# print(df1)
# print(df2)
return df2
conn.close()
三、批量更新數據
select_data.py
import pymysql
import pandas as pd
import numpy as np
def get_tags():
# 連接數據庫,地址,端口,用戶名,密碼,數據庫名稱,數據格式
conn = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxxx',charset='utf8')
cur = conn.cursor()
# 表cf_users中獲取所有用戶id
sql = 'select id from cf_tags where id between 204 and 298'
# 將user_id列轉成列表輸出
df = pd.read_sql(sql,con=conn)
# 先使用array()將DataFrame轉換一下
df1 = np.array(df)
# 再將轉換后的數據用tolist()轉成列表
df2 = df1.tolist()
# cur.execute(sql)
# data = cur.fetchone()
# print(df)
# print(df1)
# print(df2)
return df2
conn.close()
以上就是python 實現數據庫中數據添加、查詢與更新的示例代碼的詳細內容,更多關于python 數據庫添加、查詢與更新的資料請關注腳本之家其它相關文章!