1. 請寫一個 Python 邏輯,計算一個文件中的大寫字母數量
答:讀取‘A.txt’中的大寫字母數量
with open('A.txt') as f:"""計算一個文件中的大寫字母數量"""count = 0for i in f.read():if i.isupper():count += 1
print(count)
運行結果為4。
2.了解數據庫的三范式么?
答: 經過研究和對使用中問題的總結,對于設計數據庫提出了一些規范,這些規范被稱為范式,一般需要遵守下面3范式即可::
第一范式(1NF):強調的是列的原子性,即列不能夠再分成其他幾列。
第二范式(2NF):首先是 1NF,另外包含兩部分內容,一是表必須有一個主鍵;二是沒有包含在主鍵中的列必須完全依賴于主鍵,而不能只依賴于主鍵的一部分。
第三范式(3NF):首先是 2NF,另外非主鍵列必須直接依賴于主鍵,不能存在傳遞依賴。即不能存在:非主鍵列 A 依賴于非主鍵列 B,非主鍵列 B 依賴于主鍵的情況。
數據庫三大范式和五大約束
3.了解分布式鎖么
答: 分布式鎖是控制分布式系統之間的同步訪問共享資源的一種方式。 對于分布式鎖的目標,我們必須首先明確三點:
任何一個時間點必須只能夠有一個客戶端擁有鎖。
不能夠有死鎖,也就是最終客戶端都能夠獲得鎖,盡管可能會經歷失敗。
錯誤容忍性要好,只要有大部分的Redis實例存活,客戶端就應該能夠獲得鎖。
分布式鎖的條件:
互斥性:分布式鎖需要保證在不同節點的不同線程的互斥
可重入性:同一個節點上的同一個線程如果獲取了鎖之后,能夠再次獲取這個鎖。
鎖超時:支持超時釋放鎖,防止死鎖 高效,
高可用:加鎖和解鎖需要高效,同時也需要保證高可用防止分布式鎖失效,可以增加降級。
支持阻塞和非阻塞:可以實現超時獲取失敗,tryLock(long timeOut) 支持公平鎖和非公平鎖
分布式鎖的實現方案 1、數據庫實現(樂觀鎖) 2、基于zookeeper的實現 3、基于Redis的實現(推薦)
添加鏈接描述
添加鏈接描述
4.用 Python 實現一個 Reids 的分布式鎖的功能
答:REDIS分布式鎖實現的方式:SETNX + GETSET,NX是Not eXists的縮寫,如SETNX命令就應該理解為:SET if Not eXists。 多個進程執行以下Redis命令:
SETNX lock.foo <current Unix time + lock timeout + 1>
如果 SETNX 返回1,說明該進程獲得鎖,SETNX將鍵 lock.foo 的值設置為鎖的超時時間(當前時間 + 鎖的有效時間)。 如果 SETNX 返回0,說明其他進程已經獲得了鎖,進程不能進入臨界區。進程可以在一個循環中不斷地嘗試 SETNX 操作,以獲得鎖。
//鎖定的方法-偽代碼
publicbooleanlock(){connection.setAutoCommit(false)for(){result =select* from user where id = 100 for update;if(result){//結果不為空,//則說明獲取到了鎖return true; }//沒有獲取到鎖,繼續獲取sleep(1000); }return false;}//釋放鎖-偽代碼connection.commit();
import time
import redis
from conf.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORDclass RedisLock:def __init__(self):self.conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=1)self._lock = 0self.lock_key = ""@staticmethoddef my_float(timestamp):"""Args:timestamp:Returns:float或者0如果取出的是None,說明原本鎖并沒人用,getset已經寫入,返回0,可以繼續操作。"""if timestamp:return float(timestamp)else:#防止取出的值為None,轉換float報錯return 0@staticmethoddef get_lock(cls, key, timeout=10):cls.lock_key = f"{key}_dynamic_lock"while cls._lock != 1:timestamp = time.time() + timeout + 1cls._lock = cls.conn.setnx(cls.lock_key, timestamp)# if 條件中,可能在運行到or之后被釋放,也可能在and之后被釋放# 將導致 get到一個None,float失敗。if cls._lock == 1 or (time.time() > cls.my_float(cls.conn.get(cls.lock_key)) andtime.time() > cls.my_float(cls.conn.getset(cls.lock_key, timestamp))):breakelse:time.sleep(0.3)@staticmethoddef release(cls):if cls.conn.get(cls.lock_key) and time.time() < cls.conn.get(cls.lock_key):cls.conn.delete(cls.lock_key)def redis_lock_deco(cls):def _deco(func):def __deco(*args, **kwargs):cls.get_lock(cls, args[1])try:return func(*args, **kwargs)finally:cls.release(cls)return __decoreturn _deco@redis_lock_deco(RedisLock())
def my_func():print("myfunc() called.")time.sleep(20)if __name__ == "__main__":my_func()
5. 請寫一段 Python連接Mongo數據庫,并查詢代碼。
答:
import pymongo
db_configs = {'type': 'mongo','host': '地址','port': '端口','user': 'spider_data','passwd': '密碼','db_name': 'spider_data'
}class Mongo():def __init__(self, db=db_configs["db_name"], username=db_configs["user"],password=db_configs["passwd"]):self.client = pymongo.MongoClient(f'mongodb://{db_configs["host"]}:db_configs["port"]')self.username = usernameself.password = passwordif self.username and self.password:self.db1 = self.client[db].authenticate(self.username, self.password)self.db1 = self.client[db]def find_data(self):# 獲取狀態為0的數據data = self.db1.test.find({"status": 0})gen = (item for item in data)return genif __name__ == '__main__':m = Mongo()print(m.find_data())
6.寫一段 Python 使用 mongo 數據庫創建索引的代碼:
答:
import pymongo
db_configs = {'type': 'mongo','host': '地址','port': '端口','user': 'spider_data','passwd': '密碼','db_name': 'spider_data'
}class Mongo():def __init__(self, db=db_configs["db_name"], username=db_configs["user"],password=db_configs["passwd"]):self.client = pymongo.MongoClient(f'mongodb://{db_configs["host"]}:{db_configs["port"]}')self.username = usernameself.password = passwordif self.username and self.password:self.db1 = self.client[db].authenticate(self.username, self.password)self.db1 = self.client[db]def add_index(self):"""通過create_index添加索引"""self.db1.test.create_index([('name', pymongo.ASCENDING)], unique=True)def get_index(self,):"""查看索引列表"""indexlist=self.db1.test.list_indexes()for index in indexlist:print(index)if __name__ == '__main__':m = Mongo()m.add_index()print(m.get_index())
7.說一說Redis的基本類型
答: Redis 支持五種數據類型: string(字符串) 、 hash(哈希)、list(列表) 、 set(集合) 及 zset(sorted set: 有序集合)。
8.了解Redis的事務么
答: 簡單理解,可以認為 redis 事務是一些列 redis 命令的集合,并且有如下兩個特點: 1.事務是一個單獨的隔離操作:事務中的所有命令都會序列化、按順序地執行。事務在執行的過程中,不會被其他客戶端發送來的命令請求所打斷。 2.事務是一個原子操作:事務中的命令要么全部被執行,要么全部都不執行。 一般來說,事務有四個性質稱為ACID,分別是原子性,一致性,隔離性和持久性。 一個事務從開始到執行會經歷以下三個階段:
開始事務
命令入隊
執行事務 代碼示例:
import redis
import sys
def run(): try:conn=redis.StrictRedis('192.168.80.41')# Python中redis事務是通過pipeline的封裝實現的pipe=conn.pipeline()pipe.sadd('s001','a')sys.exit()#在事務還沒有提交前退出,所以事務不會被執行。pipe.sadd('s001','b')pipe.execute()passexcept Exception as err:print(err)pass
if __name__=="__main__":run()
9. 請寫一段 Python連接Redis數據庫的代碼。
答:
from redis import StrictRedis, ConnectionPool
redis_url="redis://:xxxx@112.27.10.168:6379/15"
pool = ConnectionPool.from_url(redis_url, decode_responses=True)
r= StrictRedis(connection_pool=pool)
10. 請寫一段 Python連接Mysql數據庫的代碼。
答:
# 導入pymysql模塊
import pymysql
# 連接database
conn = pymysql.connect(host=“你的數據庫地址”, user=“用戶名”,password=“密碼”,database=“數據庫名”,charset=“utf8”)
# 得到一個可以執行SQL語句的光標對象
cursor = conn.cursor()
# 定義要執行的SQL語句
sql = """
CREATE TABLE USER1 (
id INT auto_increment PRIMARY KEY ,
name CHAR(10) NOT NULL UNIQUE,
age TINYINT NOT NULL
)ENGINE=innodb DEFAULT CHARSET=utf8;
"""
# 執行SQL語句
cursor.execute(sql)
# 關閉光標對象
cursor.close()
# 關閉數據庫連接
conn.close()
推薦文獻:Python連接MySQL數據庫之pymysql模塊使用
謝謝作者分享!