單機鎖
服務器只有一個,JVM只有一個。
用synchronized加鎖,對lock對象加鎖,只有線程1結束,線程2,3才會開始。
再用uid避免一個線程多次進來。
分布式鎖
真正上線時:
【注:這些服務器連接的是一個Redis集群(就是同一個Redis)】
在一個JVM1只能鎖這個服務器里的lock對象
所以用分布式鎖,一個共享的鎖
在開發應用的時候,如果需要對某一個共享變量進行多線程同步訪問的時候,可以使用我們學到的鎖進行處理。
為了保證一個方法或屬性在高并發情況下的同一時間只能被同一個線程執行, 為了解決這個問題就需要一種跨機器的互斥機制來控制共享資源的訪問,這就是分布式鎖要解決的問題!
分布式鎖應該具備哪些條件:
- 1、在分布式系統環境下,一個方法在同一時間只能被一個機器的一個線程執行;
- 2、高可用的獲取鎖與釋放鎖;
- 3、高性能的獲取鎖與釋放鎖;
- 4、具備可重入特性;
- 5、具備鎖失效機制,防止死鎖;
- 6、具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗。
三種方式:
- 基于數據庫實現分布式鎖;(很少)
- 基于緩存(Redis等)實現分布式鎖; (較多)
- 基于Zookeeper實現分布式鎖;
1.基于數據庫的實現方式
于數據庫的實現方式的核心思想是:在數據庫中創建一個表,表中包含方法名等字段,并在方法名字段上創建唯一索引,想要執行某個方法,就使用這個方法名向表中插入數據,成功插入則獲取鎖,執行完成后刪除對應的行數據釋放鎖。
缺點:
- 1、因為是基于數據庫實現的,數據庫的可用性和性能將直接影響分布式鎖的可用性及性能
- 2、不具備可重入的特性,因為同一個線程在釋放鎖之前,行數據一直存在,無法再次成功插入數據,所以,需要在表中新增一列,用于記錄當前獲取到鎖的機器和線程信息,在再次獲取鎖的時候,先查詢表中機器和線程信息是否和當前機器和線程相同,若相同則直接獲取鎖;(沒明白)
- 3、沒有鎖失效機制,因為有可能出現成功插入數據后,服務器宕機了,對應的數據沒有被刪除,當服務恢復后一直獲取不到鎖,所以,需要在表中新增一列,用于記錄失效時間,并且需要有定時任務清除這些失效的數據;
- 4、不具備阻塞鎖特性,獲取不到鎖直接返回失敗,所以需要優化獲取邏輯,循環多次去獲取。
- 5、依賴數據庫需要一定的資源開銷,性能問題需要考慮。
2.基于Redis的實現方式
1、選用Redis實現分布式鎖原因:
(1)Redis有很高的性能;
(2)Redis命令對此支持較好,實現起來比較方便
2、使用命令介紹:
(1)SETNX
SETNX key val
:當且僅當key不存在時,set一個key為val的字符串,返回1;若key存在,則什么都不做,返回0。
(2)expire
expire key timeout
:為key設置一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖。
(3)delete
delete key
:刪除key
在使用Redis實現分布式鎖的時候,主要就會使用到這三個命令。
3、實現思想:
(1)獲取鎖的時候,使用setnx加鎖,并使用expire命令為鎖添加一個超時時間,超過該時間則自動釋放鎖,鎖的value值為一個隨機生成的UUID,通過此在釋放鎖的時候進行判斷。
(2)獲取鎖的時候還設置一個獲取的超時時間,若超過這個時間則放棄獲取鎖。
(3)釋放鎖的時候,通過UUID判斷是不是該鎖,若是該鎖,則執行delete進行鎖釋放。
#連接redis
redis_client = redis.Redis(host="localhost",port=6379,password=password,db=10)#獲取一個鎖
lock_name:鎖定名稱
acquire_time: 客戶端等待獲取鎖的時間
time_out: 鎖的超時時間
def acquire_lock(lock_name, acquire_time=10, time_out=10):"""獲取一個分布式鎖"""identifier = str(uuid.uuid4())end = time.time() + acquire_time//當前時間+獲取時間=結束時間lock = "string:lock:" + lock_namewhile time.time() < end:if redis_client.setnx(lock, identifier):#成功設置一個lock_name的鎖# 給鎖設置超時時間, 防止進程崩潰導致其他進程無法獲取鎖redis_client.expire(lock, time_out)#設超時間時間return identifierelif not redis_client.ttl(lock):#獲取鍵的生存時間redis_client.expire(lock, time_out)time.sleep(0.001)return False#釋放一個鎖
def release_lock(lock_name, identifier):"""通用的鎖釋放函數"""lock = "string:lock:" + lock_namepip = redis_client.pipeline(True)while True:try:pip.watch(lock)lock_value = redis_client.get(lock)if not lock_value:return Trueif lock_value.decode() == identifier:pip.multi()pip.delete(lock)#刪除鎖pip.execute()return Truepip.unwatch()breakexcept redis.excetions.WacthcError:passreturn False
為什么不用單體鎖
單體應用難以滿足實際高并發訪問需求,會將單體應用部署到多個tomcat實例上,由負載均衡將請求分發到不同實例上。
單體鎖(synchronized、ReentrantLock)是JVM層面的鎖,只能控制單個實例上的并發訪問安全,多實例下依然存在數據一致性問題。
分布式鎖
分布式鎖指的是,所有服務中的所有線程都去獲取同一把鎖,但只有一個線程可以成功的獲得鎖,其他沒有獲得鎖的線程必須全部等待,直到持有鎖的線程釋放鎖。
存在問題:當前線程處理完釋放其他線程的鎖
問題描述:假設有多個線程,鎖的過期時間10s,線程1上鎖后執行業務邏輯的時長超過十秒,鎖到期釋放鎖,線程2就可以獲得鎖執行,此時線程1執行完刪除鎖,刪除的就是線程2持有的鎖,線程3又可以獲取鎖,線程2執行完刪除鎖,刪除的是線程3的鎖,如此往后,這樣就會出問題。
解決辦法就是讓線程只能刪除自己的鎖,即給每個線程上的鎖添加唯一標識(UUID實現),刪除鎖時判斷這個標識。
要讓刪除鎖具有原子性,可以利用redis事務或lua腳本實現原子操作判斷+刪除。(lua腳本的執行是原子的)
//使用Lua腳本實現@RequestMapping(" /deduct_stock")public String deductStock() {String REDIS_LOCK = "good_lock";// 每個人進來先要進行加鎖,key值為"good_lock"String value = UUID.randomUUID().toString().replace("-","");try{// 為key加一個過期時間Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);// 加鎖失敗if(!flag){return "搶鎖失敗!";}System.out.println( value+ " 搶鎖成功");String result = template.opsForValue().get("goods:001");int total = result == null ? 0 : Integer.parseInt(result);if (total > 0) {// 如果在此處需要調用其他微服務,處理時間較長。。。int realTotal = total - 1;template.opsForValue().set("goods:001", String.valueOf(realTotal));System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8002");return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8002";} else {System.out.println("購買商品失敗,服務端口為8002");}return "購買商品失敗,服務端口為8002";}finally {// 誰加的鎖,誰才能刪除// 也可以使用redis事務// https://redis.io/commands/set// 使用Lua腳本,進行鎖的刪除Jedis jedis = null;try{jedis = RedisUtils.getJedis();//lua腳本命令:String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +"then " +"return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));if("1".equals(eval.toString())){System.out.println("-----del redis lock ok....");}else{System.out.println("-----del redis lock error ....");}}catch (Exception e){}finally {if(null != jedis){jedis.close();}}}}
}
使用Redisson簡化操作
Redis提供的Redisson組件實現Redis鎖。
Redisson原理:
這里UUID+線程ID應該是VALUE, KEY是商品ID
線程去獲取鎖,鎖的VALUE是UUID+線程ID來保證鎖和當前形成綁定怒會釋放其他線程的鎖
當前線程獲取鎖成功開始處理業務時,內部會有watch dog看門狗,每隔10s看當前線程是否還持有鎖,如果持有則給鎖延長生存時間。
Redission執行流程如下:(只要線程一加鎖成功,就會啟動一個watch dog看門狗,它是一個后臺線程,會每隔10秒檢查一下(鎖續命周期就是設置的超時時間的三分之一),如果線程還持有鎖,就會不斷的延長鎖key的生存時間。因此,Redis就是使用Redisson解決了鎖過期釋放,業務沒執行完問題。當業務執行完,釋放鎖后,再關閉守護線程,
RedLock解決Redis集群主從不同步數據丟失問題
Redisson使用主從集群模式,主節點掛掉,從節點沒有同步到鎖的情況:
使用RedLock,針對Redis中所有節點來進行同步,能夠保證超過半數的Redis加鎖了才算加鎖成功,從而保證并發安全。
用戶限流,防止同一用戶多次秒殺
使用布隆過濾器記錄用戶和商品ID來解決。
當用戶參與秒殺時,判斷是否ID是否記錄存在布隆過濾器中,不存在證明該用戶是第一次參與秒殺改商品,放行繼續后續業務;
過濾器中存在,則禁止繼續秒殺。
import cn.hutool.bloomfilter.BitMapBloomFilter;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.UUID;@RestController
@CrossOrigin //開啟跨域支持
public class SpikeController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate BitMapBloomFilter bitMapBloomFilter;@GetMapping("spike/{userId}/{goodsId}")public Object doSpike(@PathVariable Integer userId, @PathVariable Integer goodsId, HttpServletResponse response) {//設置商品的唯一標識String spikeId = userId + "-" + goodsId;//判斷是否已經參與過if (bitMapBloomFilter.contains(spikeId)) {return "你已參加過秒殺活動,請選擇其他秒殺商品";}//判斷庫存Long stoke = Long.parseLong(redisTemplate.opsForValue().get(goodsId + "stock"));if (stoke < 1) {return "該商品已被搶購一空,請選擇其他商品";}//參與過秒殺,添加到過濾器中bitMapBloomFilter.add(spikeId);//封裝發送信息Map<String, Integer> spikeMessage = Map.of("userId", userId, "goodsId", goodsId);//發送消息rabbitTemplate.convertAndSend("", "spike-web", JSON.toJSONString(spikeMessage), message -> {MessageProperties messageProperties = message.getMessageProperties();String messageId = UUID.randomUUID().toString().replaceAll("-", "");messageProperties.setMessageId(messageId);return message;});return "正在拼命搶購中,請稍后查看訂單詳情...";}
}
3.基于ZooKeeper的實現方式
ZooKeeper是一個為分布式應用提供一致性服務的開源組件,它內部是一個分層的文件系統目錄樹結構,規定同一個目錄下只能有一個唯一文件名。
基于ZooKeeper實現分布式鎖的步驟如下:
(1)創建一個目錄mylock;
(2)線程A想獲取鎖就在mylock目錄下創建臨時順序節點;
(3)獲取mylock目錄下所有的子節點,然后獲取比自己小的兄弟節點,如果不存在,則說明當前線程順序號最小,獲得鎖;
(4)線程B獲取所有節點,判斷自己不是最小節點,設置監聽比自己次小的節點;
(5)線程A處理完,刪除自己的節點,線程B監聽到變更事件,判斷自己是不是最小的節點,如果是則獲得鎖。
優點:具備高可用、可重入、阻塞鎖特性,可解決失效死鎖問題。
缺點:因為需要頻繁的創建和刪除節點,性能上不如Redis方式。