Redis --- 秒殺優化方案(阻塞隊列+基于Stream流的消息隊列)

下面是我們的秒殺流程:

對于正常的秒殺處理,我們需要多次查詢數據庫,會給數據庫造成相當大的壓力,這個時候我們需要加入緩存,進而緩解數據庫壓力。

在上面的圖示中,我們可以將一條流水線的任務拆成兩條流水線來做,如果我們直接將判斷秒殺庫存與校驗一人一單放在流水線A上,剩下的放在另一條流水線B,那么如果流水線A就可以相當于服務員直接判斷是否符合資格,如果符合資格那么直接生成信息給另一條流水線B去處理業務,這里的流水線就是咱們的線程,而流水線A也是基于數據庫進行查詢,也會壓力數據庫,那么這種情況我們就可以將待查詢信息保存在Redis緩存中。

但是我們不能再流水線A判斷完成后去直接調用流水線B,這樣的效率是大打折扣的,這種情況我們需要開啟獨立線程去執行流水線B的操作,如何知道給哪個用戶創建訂單呢?這個時候就要流水線A在判斷成功后去生成信息給獨立線程

最后的業務就變成,用戶直接訪問流水線A,通過流水線A去判斷,如果通過則生成信息給流水線B去創建訂單,過程如下圖:

那么什么樣的數據結構滿足下面條件:① 一個key能夠保存很多值? ?②唯一性:一人一單需要保證用戶id不能重復。

所以我們需要使用set:

那么如何判斷校驗用戶的購買資格呢?

?而上述判斷需要保證原子性,所以我們需要使用Lua腳本進行編寫:

local voucherId = ARGV[1]; -- 優惠劵id
local userId = ARGV[2]; -- 用戶id-- 庫存key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 訂單key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 判斷庫存是否充足
if(tonumber(redis.call('get',stockKey) <= 0)) then-- 庫存不足,返回1return 1;
end;
-- 判斷用戶是否下單
if(redis.call('sismember',orderKey,userId)) then-- 存在,說明重復下單,返回2return 2;
end
-- 扣減庫存 incrby stockKey -1
redis.call('incrby',stockKey,-1);
-- 下單(保存用戶) sadd orderKey userId
redis.call('sadd',orderKey,userId);
return 0;

之后我們按照下面步驟來實現代碼:

在方法體內執行Lua腳本來原子性判斷,然后判斷是否能夠處理并傳入阻塞隊列:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內填入返回值類型static { // 靜態屬性要使用靜態代碼塊進行初始化SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setResultType(Long.class);SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));}public Result seckillVoucherMax(Long voucherId) {// 獲取用戶信息Long userId = UserHolder.getUser().getId();// 1.執行Lua腳本來判斷用戶資格Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // Lua無需接受keyvoucherId.toString(),userId.toString());// 2.判斷結果是否為0int r = result.intValue();if(r != 0) {// 不為0代表無資格購買return Result.fail(r == 1 ? "庫存不足" : "不能重復下單");}// 3.有購買資格則將下單信息保存到阻塞隊列中// ... return Result.ok();}}

?接下來我們創建阻塞隊列,線程池以及線程方法,隨后使用Springboot提供的注解在@PostConstruct去給線程池傳入線程方法:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;@Autowiredprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內填入返回值類型static { // 靜態屬性要使用靜態代碼塊進行初始化SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setResultType(Long.class);SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));}private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 創建阻塞隊列private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  // 創建線程池// 讓大類在開始初始化時就能夠執行線程任務@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());}// 創建線程任務private class VoucherOrderTask implements Runnable {@Overridepublic void run() {while(true){try {// 獲取隊列中的訂單信息VoucherOrder voucherOrder = orderTasks.take();// 取出頭部信息// 創建訂單handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("處理訂單異常",e);}}}}// 創建訂單private void handleVoucherOrder(VoucherOrder voucherOrder) {RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());boolean isLock = lock.tryLock();// 判斷是否獲取鎖成功if (!isLock) {// 獲取鎖失敗,返回錯誤或重試log.error("不允許重復下單");return ;}try {proxy.createVoucherOrderMax(voucherOrder);} finally {lock.unlock();}}@Overridepublic void createVoucherOrderMax(VoucherOrder voucherOrder) {// 一人一單Long userId = voucherOrder.getUserId();// 查詢訂單int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();// 判斷是否存在if(count > 0){// 用戶已經購買過log.error("用戶已經購買過");return ;}// CAS改進:將庫存判斷改成stock > 0以此來提高性能boolean success = seckillVoucherService.update().setSql("stock= stock -1") // set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0.update();if (!success) {//扣減庫存log.error("庫存不足!");return ;}//6.創建訂單save(voucherOrder);}private IVoucherOrderService proxy; // 代理對象public Result seckillVoucherMax(Long voucherId) {// 獲取用戶信息Long userId = UserHolder.getUser().getId();// 1.執行Lua腳本來判斷用戶資格Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(), // Lua無需接受keyvoucherId.toString(),userId.toString());// 2.判斷結果是否為0int r = result.intValue();if(r != 0) {// 不為0代表無資格購買return Result.fail(r == 1 ? "庫存不足" : "不能重復下單");}// 3.有購買資格則將下單信息保存到阻塞隊列中Long orderId = redisIdWorker.nextId("order");// 創建訂單VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);// 放入阻塞隊列orderTasks.add(voucherOrder);// 4.獲取代理對象(線程異步執行,需要手動在方法內獲取)proxy = (IVoucherOrderService)AopContext.currentProxy(); // 獲取當前類的代理對象  (需要引入aspectjweaver依賴,并且在實現類加入@EnableAspectJAutoProxy(exposeProxy = true)以此來暴露代理對象)return Result.ok();}}

在上面代碼中,我們使用下面代碼創建了一個單線程的線程池。它保證所有提交的任務都按照提交的順序執行,每次只有一個線程在工作。

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

下面代碼是一個常見的阻塞隊列實現,具有固定大小(在這里是 1024 * 1024),它的作用是緩沖和排隊任務。ArrayBlockingQueue 是一個線程安全的隊列,它會自動處理線程之間的同步問題。當隊列滿時,調用 put() 方法的線程會被阻塞,直到隊列有空間;當隊列為空時,調用 take() 方法的線程會被阻塞,直到隊列中有數據。

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

在下面代碼中,orderTasks 阻塞隊列用于存放需要處理的訂單對象,每個訂單的處理邏輯都由 VoucherOrderTask 線程池中的線程異步執行:

VoucherOrder voucherOrder = orderTasks.take();
handleVoucherOrder(voucherOrder);

之后我們需要調用 Runnable 接口去實現VoucherOrderTask類以此來創建線程方法

private class VoucherOrderTask implements Runnable {@Overridepublic void run() {while (true) {try {// 獲取隊列中的訂單信息VoucherOrder voucherOrder = orderTasks.take(); // 獲取訂單// 創建訂單handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("處理訂單異常", e);}}}
}

隨后將線程方法通過 submit() 方法將 VoucherOrderTask 提交到線程池中,這個任務是一個無限循環的任務,它會不斷從阻塞隊列中取出訂單并處理,直到線程池關閉。這種方式使得訂單處理任務可以異步執行,而不阻塞主線程,提高了系統的響應能力:

@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}

但是在高并發的情況下就會產生大量訂單,就會超出JVM阻塞隊列的上線,并且每當服務重啟或者宕機的情況發生,阻塞隊列的所有訂單任務就都會丟失。

所以為了解決這種情況,我們就要使用消息隊列去解決這個問題:


什么是消息隊列?


消息隊列(Message Queue, MQ)是一種用于在應用程序之間傳遞消息的通信方式。它允許應用程序通過發送和接收消息來解耦,從而提高系統的可擴展性、可靠性和靈活性。消息隊列通常用于異步通信、任務隊列、事件驅動架構等場景。

消息隊列的核心概念?:

  1. 生產者(Producer):發送消息到消息隊列的應用程序。

  2. 消費者(Consumer):從消息隊列中接收并處理消息的應用程序。

  3. 隊列(Queue):消息的存儲區域,生產者將消息發送到隊列,消費者從隊列中獲取消息。

  4. 消息(Message):在生產者與消費者之間傳遞的數據單元。

  5. Broker:消息隊列的服務器,負責接收、存儲和轉發消息。

消息隊列是在JVM以外的一個獨立的服務,能夠不受JVM內存的限制,并且存入MQ的信息都可以做持久化存儲。

詳細教學可以查詢下面鏈接:微服務架構 --- 使用RabbitMQ進行異步處理?


但是這樣的方式是需要額外提供服務的,所以我們可以使用Redis提供的三種不同的方式來實現消息隊列

  1. List 結構實現消息隊列

  2. Pub/Sub(發布/訂閱)模式

  3. Stream 結構(Redis 5.0 及以上版本)(推薦使用)(詳細介紹)


使用 List 結構實現消息隊列:

Redis 的 List 數據結構是一個雙向鏈表,支持從頭部或尾部插入和彈出元素。我們可以利用?LPUSH?和?BRPOP?命令實現一個簡單的消息隊列。

實現步驟:

  • 生產者:使用?LPUSH?將消息推入隊列。

  • 消費者:使用?BRPOP?阻塞地從隊列中獲取消息。

生產者代碼:

import redis.clients.jedis.Jedis;public class ListProducer {public static void main(String[] args) {Jedis jedis = new Jedis("localhost", 6379); // 連接 RedisString queueName = "myQueue";// 發送消息for (int i = 1; i <= 5; i++) {String message = "Message " + i;jedis.lpush(queueName, message); // 將消息推入隊列System.out.println("Sent: " + message);}jedis.close(); // 關閉連接}
}

消費者代碼:

import redis.clients.jedis.Jedis;public class ListConsumer {public static void main(String[] args) {Jedis jedis = new Jedis("localhost", 6379); // 連接 RedisString queueName = "myQueue";while (true) {// 阻塞獲取消息,超時時間為 0(無限等待)var result = jedis.brpop(0, queueName);String message = result.get(1); // 獲取消息內容System.out.println("Received: " + message);}}
}
  • 優點:簡單易用,適合輕量級場景。

  • 缺點不支持消息確認機制,消息一旦被消費(從隊列內取出)就會從隊列中刪除。并且只支持單消費者(一個消息只能拿出一次)


使用 Pub/Sub 模式實現消息隊列:?

Redis 的 Pub/Sub 模式是一種發布-訂閱模型,生產者將消息發布到頻道,消費者訂閱頻道以接收消息。

實現步驟:

  • 生產者:使用?PUBLISH?命令向頻道發布消息。

  • 消費者:使用?SUBSCRIBE?命令訂閱頻道。

生產者代碼:

import redis.clients.jedis.Jedis;public class PubSubProducer {public static void main(String[] args) {Jedis jedis = new Jedis("localhost", 6379); // 連接 RedisString channelName = "myChannel";// 發布消息for (int i = 1; i <= 5; i++) {String message = "Message " + i;jedis.publish(channelName, message); // 發布消息到頻道System.out.println("Published: " + message);}jedis.close(); // 關閉連接}
}

?消費者代碼:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;public class PubSubConsumer {public static void main(String[] args) {Jedis jedis = new Jedis("localhost", 6379); // 連接 RedisString channelName = "myChannel";// 創建訂閱者JedisPubSub subscriber = new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {System.out.println("Received: " + message);}};// 訂閱頻道jedis.subscribe(subscriber, channelName);}
}
  • 優點:支持一對多的消息廣播。

  • 缺點:消息是即時的,如果消費者不在線,消息會丟失。


但是上面兩方式都是有缺點的:

  1. 不支持消息確認機制,消息一旦被消費(從隊列內取出)就會從隊列中刪除。并且只支持單消費者(一個消息只能拿出一次)
  2. 消息是即時的,如果消費者不在線,消息會丟失。

所以根據上面的兩種方式,我們推出一款全新的方式 ->

使用 Stream 結構實現消息隊列:

Redis Stream 是一種強大的數據結構,用于管理消息流。它將消息存儲在 Redis 中,并允許消費者按順序獲取消息。Stream 具有以下特點:

  • 有序消息:消息按插入順序排列。
  • 消費者組:一個消費者組可以有多個消費者,每個消費者可以獨立消費不同的消息。
  • 消息 ID:每條消息都有唯一的 ID(如:1588890470850-0),ID 按時間戳生成。
  • 自動分配消息:多個消費者可以從 Stream 中并行消費消息,保證消息不會重復消費。

在 Redis Stream 中,一個隊列可以有多個消費者組,每個消費者組可以獨立地消費隊列中的消息。每個消費者組內有多個消費者,而消費者是基于 消費者名稱 進行識別的。?

消費者組的工作方式:

  • 每個消費者組擁有自己的 消費進度,也就是每個消費者組會從 自己獨立的消息 ID 開始消費
  • 多個消費者組之間是相互獨立的,即使它們消費的是同一個隊列,它們也可以從不同的位置開始消費隊列中的消息。
  • 每個消費者組都可以有多個 消費者(在同一個組內,多個消費者可以并行消費同一個隊列的消息,但每個消息在消費者組內只能被一個消費者處理一次)。

假設有一個隊列(Stream)mystream,可以為它創建多個消費者組:

XGROUP CREATE mystream group1 $ MKSTREAM
XGROUP CREATE mystream group2 $ MKSTREAM

這樣,mystream 隊列上就有了兩個消費者組:group1group2。每個消費者組可以有自己的消費者并從該隊列中讀取消息。此時,group1group2 都在消費同一個隊列 mystream,但它們的消費進度是獨立的,它們各自有自己的消息 ID 記錄。

每個消費者組可以有多個消費者,而每個消費者通過一個 唯一的消費者名稱 來標識。


每個消費者組有獨立的消費進度


每個消費者組會記錄自己的消費進度,也就是它消費到隊列中的 哪個消息 ID。即使多個消費者組在消費同一個消息隊列,它們每個組都會從 不同的消費位置(消息 ID)開始讀取消息。

例如,假設有一個隊列 mystream,同時有兩個消費者組 group1group2,它們都從 mystream 隊列中讀取消息:

  • group1mystream 隊列中的消息 id1 開始消費,group1 的進度會記錄在 Redis 中。
  • group2mystream 隊列中的消息 id2 開始消費,group2 的進度也會記錄在 Redis 中。

消費進度互不干擾,即便 group1group2 都在消費 mystream 隊列,它們的消費位置是獨立的。


消費者組內部的消息消費


一個消費者組內的消費者會 共享 組內的消息。即使有多個消費者,每條消息 在消費者組內部只會被 一個消費者 消費。消費者之間會并行處理消息,但每條消息只會被一個消費者處理。

舉個例子:假設 group1 中有三個消費者 consumer1consumer2consumer3,如果隊列 mystream 有 6 條消息,那么它們會如下消費:

  • consumer1 處理消息 12
  • consumer2 處理消息 34
  • consumer3 處理消息 56

但對于消費者組 group2,如果它有自己的消費者,group2 內的消費者也會并行消費 mystream 中的消息,而 group1group2 之間沒有直接關系。

首先初始化一個消息隊列:

在項目啟動時,確保 Redis 中存在對應的 Stream 和消費者組。可以通過程序在啟動時檢查并創建(如果不存在的話)。

@Configuration
public class RedisStreamConfig {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STREAM_KEY = "mystream";private static final String GROUP_NAME = "mygroup";@PostConstructpublic void init() {// 檢查消費者組是否存在,若不存在則創建try {// 如果消費者組不存在則會拋出異常,我們捕獲異常進行創建redisTemplate.opsForStream().groups(STREAM_KEY);} catch (Exception e) {// 創建消費者組,起始位置為 $ 表示從末尾開始消費新消息redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);}}
}

注意:

  • opsForStream().groups(STREAM_KEY):查詢消費者組是否已存在。
  • opsForStream().createGroup(STREAM_KEY, GROUP_NAME):如果沒有消費者組,則創建一個新的組。

隨后我們生產者發送消息示例:

@Service  
public class RedisStreamProducerService {  // 定義生產者服務類 RedisStreamProducerServiceprivate static final String STREAM_KEY = "mystream";  // 定義 Redis Stream 的名稱,這里指定隊列名為 "mystream"@Autowired  private StringRedisTemplate redisTemplate;public void sendMessage(String content) {  // 定義一個方法,發送消息到 Redis Stream,參數 content 是消息的內容Map<String, String> map = new HashMap<>();  // 創建一個 Map 用來存儲消息內容map.put("content", content);  // 將消息內容添加到 Map 中,鍵是 "content",值是傳入的內容// 在消息隊列中添加消息,調用 StringRedisTemplate 的 opsForStream 方法RecordId recordId = redisTemplate.opsForStream()  // 獲取操作 Redis Stream 的操作對象.add(StreamRecords.objectBacked(map)  // 創建一個 Stream 記錄,將 Map 轉化為對象記錄.withStreamKey(STREAM_KEY));  // 設置該記錄屬于的 Stream(消息隊列)的名稱// 輸出記錄的 ID,表示消息已經成功發送System.out.println("消息發送成功,id: " + recordId.getValue());  // 打印消息的 ID,表明該消息已經被成功加入到 Stream 中}
}

RecordId 是 Spring Data Redis 中的一個類,用來表示 消息的唯一標識符。它對應 Redis Stream 中的 消息 ID,該 ID 是 Redis Stream 中每條消息的唯一標識。Redis 中的消息 ID 通常是由時間戳和序號組成的(如 1588890470850-0)。

主要功能:
  • 表示消息 IDRecordId 是一個封裝類,表示 Redis Stream 中消息的 ID。
  • 用于識別和操作消息:在消費和確認消息時,RecordId 用來標識每條消息的唯一性,并幫助 Redis 確定消息是否已經被消費
使用場景:

RecordId 用來標識從 Stream 中讀取到的消息,我們可以通過 RecordId 來進行消息的確認、刪除或其他操作。

RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));

通過 StreamRecords.objectBacked(map)map 對象作為消息內容,并用 add 方法將其寫入 Stream。

在然后編寫消費者服務:

使用 RedisTemplate 的 read 方法(底層執行的是 XREADGROUP 命令)從消費者組中拉取消息,并進行處理。消費者可以采用定時任務或后臺線程不斷輪詢

@Slf4j  
@Service  
public class RedisStreamConsumerService { private static final String STREAM_KEY = "mystream";  // Redis Stream 的名稱,這里指定隊列名為 "mystream"private static final String GROUP_NAME = "mygroup";  // 消費者組的名稱,多個消費者可以通過組名共享消費隊列private static final String CONSUMER_NAME = "consumer-1";  // 消費者的名稱,消費者名稱在同一消費者組內必須唯一@Autowired  private StringRedisTemplate redisTemplate;@PostConstruct  // 使用該注解能讓方法在 Spring 完成依賴注入后自動調用,用于初始化任務@Async  // 將該方法標記為異步執行,允許它在單獨的線程中運行,不會阻塞主線程,@EnableAsync 需要在配置類中啟用public void start() {  // 啟動方法,在應用啟動時執行// 無限循環,不斷從 Redis Stream 中讀取消息(可以改為定時任務等方式)while (true) {try {// 設置 Stream 讀取的阻塞超時,設置最多等待 2 秒StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));// 從指定的消費者組中讀取消息,">" 表示只消費未被消費過的消息List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(Consumer.from(GROUP_NAME, CONSUMER_NAME),  // 指定消費者組和消費者名稱options,  // 設置讀取選項,包含阻塞時間StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())  // 從最后消費的消息開始讀取);// 如果沒有消息,繼續循環讀取if (messages == null || messages.isEmpty()) {continue;  }// 處理每一條讀取到的消息for (MapRecord<String, Object, Object> message : messages) {String messageId = message.getId();  // 獲取消息的唯一標識符(ID)Map<Object, Object> value = message.getValue();  // 獲取消息內容(以 Map 形式存儲)log.info("接收到消息,id={},內容={}", messageId, value);  // 打印日志,記錄消息 ID 和內容// 在這里加入業務邏輯處理// 例如處理消息并執行相應的操作// ...// 消息處理成功后,需要確認消息已經被消費(通過 XACK 命令)redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);  // 確認消費的消息}} catch (Exception e) {log.error("讀取 Redis Stream 消息異常", e);  // 異常捕獲,記錄錯誤日志}}}
}

MapRecord<String, Object, Object> 是 Spring Data Redis 用來表示 Redis Stream 中的 消息記錄 的類。它不僅包含了消息的 ID,還包含了消息的內容(即消息數據)。在 Redis 中,每條消息都存儲為一個 key-value 對。

主要功能:
  • 封裝消息 ID 和消息內容MapRecord 用來封裝消息的 ID 和消息的內容。
  • 消息的內容:消息的內容通常是一個 鍵值對Map<String, Object>),可以是任意對象的數據結構(例如,JSON、Map 或其他序列化對象)。
字段:
  • getId():返回消息的 ID(RecordId 類型)。
  • getValue():返回消息的內容,以 Map<Object, Object> 的形式。
使用場景:

MapRecord 是用來表示從 Stream 中讀取到的消息,它將消息的 ID 和內容(鍵值對)封裝在一起。你可以使用 MapRecord 來獲取消息的 ID 和內容并處理。

MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));

在這個例子中,message 是一個 MapRecord 實例,它封裝了從 mystream 隊列中讀取到的消息。我們可以通過 message.getId() 獲取消息 ID,通過 message.getValue() 獲取消息內容。

在消費者中,我們使用 MapRecord<String, Object, Object> 來封裝消息,獲取 message.getId() 來獲取消息的 ID(RecordId),以及通過 message.getValue() 獲取消息的內容。 隨后在處理完消息后,調用 acknowledge() 來確認消息已經被消費。

最后啟動異步支持:

@SpringBootApplication
@EnableAsync // 啟動異步支持
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class, args);}
}

通過這種方式,Spring Data Redis 提供了高效且類型安全的接口來操作 Redis Stream,幫助我們在分布式系統中實現高效的消息隊列。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/67967.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/67967.shtml
英文地址,請注明出處:http://en.pswp.cn/web/67967.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

使用 Ollama 和 Kibana 在本地為 RAG 測試 DeepSeek R1

作者&#xff1a;來自 Elastic Dave Erickson 及 Jakob Reiter 每個人都在談論 DeepSeek R1&#xff0c;這是中國對沖基金 High-Flyer 的新大型語言模型。現在他們推出了一款功能強大、具有開放權重的思想鏈推理 LLM&#xff0c;這則新聞充滿了對行業意味著什么的猜測。對于那些…

2025年大年初一篇,C#調用GPU并行計算推薦

C#調用GPU庫的主要目的是利用GPU的并行計算能力&#xff0c;加速計算密集型任務&#xff0c;提高程序性能&#xff0c;支持大規模數據處理&#xff0c;優化資源利用&#xff0c;滿足特定應用場景的需求&#xff0c;并提升用戶體驗。在需要處理大量并行數據或進行復雜計算的場景…

Unity 2D實戰小游戲開發跳跳鳥 - 計分邏輯開發

上文對障礙物的碰撞邏輯進行了開發,接下來就是進行跳跳鳥成功穿越過障礙物進行計分的邏輯開發,同時將對應的分數以UI的形式顯示告訴玩家。 計分邏輯 在跳跳鳥通過障礙物的一瞬間就進行一次計分,計分后會同步更新分數的UI顯示來告知玩家當前獲得的分數。 首先我們創建一個用…

langchain基礎(二)

一、輸出解析器&#xff08;Output Parser&#xff09; 作用&#xff1a;&#xff08;1&#xff09;讓模型按照指定的格式輸出&#xff1b; &#xff08;2&#xff09;解析模型輸出&#xff0c;提取所需的信息 1、逗號分隔列表 CommaSeparatedListOutputParser&#xff1a;…

游戲AI,讓AI 玩游戲有什么作用?

讓 AI 玩游戲這件事遠比我們想象的要早得多。追溯到 1948 年&#xff0c;圖靈和同事錢伯恩共同設計了國際象棋程序 Turochamp。之所以設計這么個程序&#xff0c;圖靈是想說明&#xff0c;機器理論上能模擬人腦能做的任何事情&#xff0c;包括下棋這樣復雜的智力活動。 可惜的是…

鴻蒙物流項目之基礎結構

目錄&#xff1a; 1、項目結構2、三種包的區別和使用場景3、靜態資源的導入4、顏色樣式設置5、修改項目名稱和圖標6、靜態包基礎目錄7、組件的抽離8、在功能模塊包里面引用靜態資源包的組件 1、項目結構 2、三種包的區別和使用場景 3、靜態資源的導入 放在har包中&#xff0c;那…

Cursor 與多語言開發:全棧開發的利器

引言 全棧開發要求開發者跨越前端、后端、數據庫甚至數據科學等多個技術領域&#xff0c;而不同技術棧往往需要切換工具和思維方式。Cursor 作為一款 AI 驅動的智能編程助手&#xff0c;憑借其對 20 編程語言 和主流框架的深度支持&#xff0c;正在成為全棧開發的“瑞士軍刀”…

算法設計-0-1背包動態規劃(C++)

一、問題闡述 0-1 背包問題的目標是在給定背包容量 W 的情況下&#xff0c;從 n 個物品中選擇一些物品放入背包&#xff0c;使得背包中物品的總價值最大。每個物品只能選擇一次&#xff08;即要么放入背包&#xff0c;要么不放入&#xff09;。 二、代碼 #include <iostr…

51c視覺~CV~合集10

我自己的原文哦~ https://blog.51cto.com/whaosoft/13241694 一、CV創建自定義圖像濾鏡 熱圖濾鏡 這組濾鏡提供了各種不同的藝術和風格化光學圖像捕捉方法。例如&#xff0c;熱濾鏡會將圖像轉換為“熱圖”&#xff0c;而卡通濾鏡則提供生動的圖像&#xff0c;這些圖像看起來…

全棧開發:使用.NET Core WebAPI構建前后端分離的核心技巧(二)

目錄 配置系統集成 分層項目使用 篩選器的使用 中間件的使用 配置系統集成 在.net core WebAPI前后端分離開發中&#xff0c;配置系統的設計和集成是至關重要的一部分&#xff0c;尤其是在管理不同環境下的配置數據時&#xff0c;配置系統需要能夠靈活、可擴展&#xff0c…

上海路網道路 水系鐵路綠色住宅地工業用地面圖層shp格式arcgis無偏移坐標2023年

標題和描述中提到的資源是關于2023年上海市地理信息數據的集合&#xff0c;主要包含道路、水系、鐵路、綠色住宅區以及工業用地的圖層數據&#xff0c;這些數據以Shapefile&#xff08;shp&#xff09;格式存儲&#xff0c;并且是適用于ArcGIS軟件的無偏移坐標系統。這個壓縮包…

Rust HashMap :當儲物袋遇上物品清單

開場白&#xff1a;哈希映射的魔法本質 在Rust的奇幻世界里&#xff0c;HashMap就像魔法師的儲物袋&#xff1a; 鍵值對存儲 → 每個物品都有專屬咒語&#xff08;鍵&#xff09;和實體&#xff08;值&#xff09;快速查找 → 念咒瞬間召喚物品動態擴容 → 自動伸展的魔法空間…

Spring Boot統一異常攔截實踐指南

Spring Boot統一異常攔截實踐指南 一、為什么需要統一異常處理 在Web應用開發中&#xff0c;異常處理是保證系統健壯性和用戶體驗的重要環節。傳統開發模式中常見的痛點包括&#xff1a; 異常處理邏輯分散在各個Controller中錯誤響應格式不統一敏感異常信息直接暴露給客戶端…

使用 Elastic Cloud Hosted 優化長期數據保留:確保政府合規性和效率

作者&#xff1a;來自 Elastic Jennie Davidowitz 在數字時代&#xff0c;州和地方政府越來越多地承擔著管理大量數據的任務&#xff0c;同時確保遵守嚴格的監管要求。這些法規可能因司法管轄區而異&#xff0c;通常要求將數據保留較長時間 —— 有時從一年到七年不等。遵守刑事…

Oracle Primavera P6 最新版 v24.12 更新 2/2

目錄 一. 引言 二. P6 EPPM 更新內容 1. 用戶管理改進 2. 更輕松地標準化用戶設置 3. 摘要欄標簽匯總數據字段 4. 將里程碑和剩余最早開始日期拖到甘特圖上 5. 輕松訪問審計數據 6. 粘貼數據時排除安全代碼 7. 改進了狀態更新卡片視圖中的篩選功能 8. 直接從活動電子…

linux本地部署deepseek-R1模型

國產開源大模型追平甚至超越了CloseAI的o1模型&#xff0c;大國崛起時刻&#xff01;&#xff01;&#xff01; DeepSeek R1 本地部署指南 ??在人工智能技術飛速發展的今天&#xff0c;本地部署AI模型成為越來越多開發者和企業關注的焦點。本文將詳細介紹如何在本地部署DeepS…

C基礎寒假練習(2)

一、輸出3-100以內的完美數&#xff0c;(完美數&#xff1a;因子和(因子不包含自身)數本身 #include <stdio.h>// 函數聲明 int isPerfectNumber(int num);int main() {printf("3-100以內的完美數有:\n");for (int i 3; i < 100; i){if (isPerfectNumber…

有限元分析學習——Anasys Workbanch第一階段筆記梳理

第一階段筆記主要源自于嗶哩嗶哩《ANSYS-workbench 有限元分析應用基礎教程》 張曄 主要內容導圖&#xff1a; 筆記導航如下&#xff1a; Anasys Workbanch第一階段筆記(1)基本信息與結果解讀_有限元分析變形比例-CSDN博客 Anasys Workbanch第一階段筆記(2)網格單元與應力奇…

html基本結構和常見元素

html5文檔基本結構 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>文檔標題</title> </head> <body>文檔正文部分 </body> </html> html文檔可分為文檔頭和文檔體…

Cursor如何使用Google Gemini以及碰到的坑

Cursor如何使用Google Gemini以及碰到的坑 Cursor介紹下載安裝Google Gemini介紹Google Gemini 官網申請Google Gemini API網址 配置Cursor使用Google Gemini打開Corsur設置 Cursor介紹 ?Cursor是一款基于人工智能的代碼編輯器&#xff0c;旨在幫助開發者更高效地編寫代碼。?…