RocketMQ 消息消費 輪詢機制 PullRequestHoldService

1. 概述

先來看看 RocketMQ 消費過程中的輪詢機制是啥。首先需要補充一點消費相關的前置知識。

1.1 消息消費方式

RocketMQ 支持多種消費方式,包括 Push 模式和 Pull 模式

  • Pull 模式:用戶自己進行消息的拉取和消費進度的更新
  • Push 模式:Broker 將新的消息自動發送給用戶進行消費

1.2 Push 消費模式

我們一般使用 RocketMQ 時用的是 Push 模式,因為比較方便,不需要手動拉取消息和更新消費進度。

那么你有沒有想過 Push 模式是如何做到能夠立即消費新的消息?

1.2.1 Push 模式原理

實際上,在 Push 消費時,消費者是在不斷輪詢 Broker,詢問是否有新消息可供消費。一旦有新消息到達,馬上拉取該消息。也就是說 Push 模式內部也用了 Pull 消息的模式,這樣就可以立即消費到最新的消息。

1.3 如何進行輪詢?

那么 Push 模式或 Pull 模式如何進行消息的查詢?

能夠想到的比較笨的方法是,每隔一定的時間(如1ms)就向 Broker 發送一個查詢請求,如果沒有新消息則立刻返回。可想而知這種方法非常浪費網絡資源。

RocketMQ 為了提高網絡性能,在拉取消息時如果沒有新消息,不會馬上返回,而是會將該查詢請求掛起一段時間,然后再重試查詢。如果一直沒有新消息,直到輪詢時間超過設定的閾值才會返回。

根據輪詢設定的超時閾值大小的不同,RocketMQ 有兩種輪詢方式,分別為長輪詢(默認)和短輪詢。

1.4 長輪詢和短輪詢

RocketMQ 的 Broker 端參數 longPollingEnable 可以配置輪詢方式,默認為 true

  • 短輪詢:longPollingEnable=false,輪詢時間為 shortPollingTimeMills ,默認為 1s
  • 長輪詢:longPollingEnable=true,輪詢時間為 5s。拉取請求掛起時間:受 DefaultMQPullConsumerbrokerSuspendMaxTimeMillis 控制,默認push模式固定15s,pull模式固定20s。

2. 概要流程

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-b6pQzSWr-1646145661686)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_activity.drawio.svg)]

根據上面的活動圖來看一下 RocketMQ 消費時的輪詢機制流程

  1. Consumer 發送拉取消息請求
  2. Broker 收到請求后交給請求處理模塊處理
  3. 嘗試從存儲的消息中拉取消息
  4. 如果能夠拉取消息,那么將拉取到的消息直接返回
  5. 如果沒有拉取到消息,那么根據 Broker 是否支持掛起和是否開啟長輪詢來判斷是否要進行輪詢以及進行哪種輪詢。
    1. 如果支持掛起,那么會將該拉取請求掛起
    2. 長輪詢等待 5s
    3. 短輪詢等待 1s
  6. 檢查消費隊列中是否有新消息到達,如果沒有則繼續等待,以此循環。如果有新消息,處理掛起的拉取消息請求并返回消費者。
  7. 如果沒有新消息到達,輪詢后會檢查每個掛起的拉取請求的掛起時間是否超過掛起時間閾值,如果超過那么也會直接返回消費者,否則繼續循環進行輪詢操作。


那么按照上述流程,開啟長輪詢的情況下,如果一次輪詢沒有找到消息,要等待 5s 才能進行下一次查詢。如果這 5s 當中有新的消息存入,如何保證能夠立刻消費到?

解決方案不難想到,就是新的消息寫入后,主動進行通知,讓掛起的拉取請求立刻進行拉取操作。

RocketMQ 就是這么做的,在消息存入 CommitLog 后的 doReput 方法中,會判斷是否是長輪詢,如果是則會發送一個通知,讓掛起的拉取請求立刻進行處理。

3. 詳細流程

3.1 涉及到的類

3.1.1 PullMessageProcessor

該類是 Broker 處理 Consumer 拉取清求的入口類。當 Broker 收到 Consumer 發送的拉取請求時,調用該類的 processRequest 方法

3.1.2 PullRequestHoldService

長輪詢請求管理線程,掛起的拉取請求會在這里進行保存。每等待一段時間(長輪詢/短輪詢等待時間)會檢查掛起的請求中是否有可以進行拉取的數據。

3.1.3 DefaultMessageStore#ReputMessageService

該線程負責將存儲到 CommitLog 的消息重新轉發,用以生成 ConsumeQueue 和 IndexFile 索引。在生成索引之后,會向長輪詢線程發送提醒,立刻喚醒相應隊列的拉取請求,執行消息拉取。

3.2 時序圖

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-brvsznhE-1646145661687)(https://raw.githubusercontent.com/HScarb/drawio-diagrams/main/rocketmq/consume/long_polling_sequence.drawio.svg)]

著重體現了長輪詢邏輯,其他邏輯有所省略

  1. 消費者調用 pullKernelImpl() 發送拉取請求,調用時用 brokerSuspendMaxTimeMillis 指定了 Broker 掛起的最長時間,默認為 20s
  2. Broker 中 PullMessageProcess 處理拉取請求,從 ConsumeQueue 中查詢消息
  3. 如果沒有查詢到消息,判斷是否啟用長輪詢,調用 PullRequestHoldService#suspendPullRequest() 方法將該請求掛起
  4. PullRequestHoldService 線程 run() 方法循環等待輪詢時間,然后周期性調用 checkHoldRequest() 方法檢查掛起的請求是否有消息可以拉取
  5. 如果檢查到有新消息可以拉取,調用 notifyMessageArriving() 方法
  6. ReputMessageService 的 doReput() 如果被調用,說明也有新消息到達,需要喚醒掛起的拉取請求。這里也會發送一個 notify,進而調用 notifyMessageArriving() 方法
  7. notifyMessageArriving() 方法中也會查詢 ConsumeQueue 的最大 offset,如果確實有新消息,那么將喚醒對應的拉取請求,具體的方法是調用 executeRequestWhenWakeup() 方法
  8. executeRequestWhenWakeup() 方法喚醒拉取請求,調用 processRequest() 方法處理該請求

3.3 每個類的具體邏輯

3.3.1 PullMessageProcessor

Broker 處理 Consumer 拉取清求的入口類

  • RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request):處理 Consumer 拉取請求的入口方法,收到 Consumer 拉取請求時調用。該方法主要完成如下操作

    1. 校驗
    2. 消息過濾
    3. 從存儲中查詢消息
    4. 返回響應給 Consumer

    如果從存儲中沒有查詢到消息,會將響應碼設置為 ResponseCode.PULL_NOT_FOUND,并且啟動長輪詢

  • void executeRequestWhenWakeup(Channel channel, final RemotingCommand request):將 Hold 的拉取請求喚醒,再次拉取消息

    • 該方法在長輪詢收到新消息時調用,立即喚醒掛起的拉取請求,然后對這些請求調用 processRequest 方法
    • 何時需要提醒長輪詢新消息已經到達?上面說到,在長輪詢等待時如果有新消息到達,CommitLogdoReput 方法中會進行提醒,最終會調用 executeRequestWhenWakeup 方法

3.3.2 PullRequestHoldService

該服務線程會從 pullRequestTable 本地緩存變量中取PullRequest請求,檢查輪詢條件“待拉取消息的偏移量是否小于消費隊列最大偏移量”是否成立,如果條件成立則說明有新消息達到Broker端,則通過PullMessageProcessor的executeRequestWhenWakeup()方法重新嘗試發起Pull消息的RPC請求

  • pullRequestTable

    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest/* 同一隊列積累的拉取請求 */> pullRequestTable = new ConcurrentHashMap<>(1024)
    

    上面是掛起的消息拉取請求容器,它是一個 ConcurrentHashMap,key 是拉取請求的隊列,value 是該隊列掛起的所有拉取請求。其中 ManyPullRequest 底層是一個 ArrayList,它的 add 方法加了鎖。

  • suspendPullRequest(String topic, int queueId, PullRequest pullRequest):將 Consumer 拉取請求暫時掛起,會將請求加入到 pullRequestTable

  • checkHoldRequest():檢查所有掛起的拉取請求,如果有數據滿足要求,就喚醒該請求,對其執行 PullMessageProcessor#processRequest 方法

  • run():線程主循環,每等待一段時間就調用 checkHoldRequest() 方法檢查是否有請求需要喚醒。等待的時間根據長輪詢/短輪詢的配置決定,長輪詢等待 5s,短輪詢默認等待 1s

  • notifyMessageArriving():被 checkHoldRequest()ReputMessageService#doReput() 調用,表示新消息到達,喚醒對應隊列掛起的拉取請求

3.3.3 DefaultMessageStore#ReputMessageService

該服務線程 doReput() 方法會在 Broker 端不斷地從數據存儲對象 CommitLog 中解析數據并分發請求,隨后構建出 ConsumeQueue(邏輯消費隊列)和 IndexFile(消息索引文件)兩種類型的數據。

同時從本地緩存變量 PullRequestHoldService#pullRequestTable 中,取出掛起的拉起請求并執行。

4. 源碼解析

4.1 PullMessageProcessor

4.1.1 processRequest

如果從存儲中沒有查詢到消息,會將響應碼設置為 ResponseCode.PULL_NOT_FOUND,并且啟動長輪詢

以下三種情況會將響應碼設置為ResponseCode.PULL_NOT_FOUND

  1. NO_MESSAGE_IN_QUEUE:消費隊列中沒有任何消息
  2. OFFSET_FOUND_NULL:offset未找到任何數據
  3. OFFSET_OVERFLOW_ONE:待拉取偏移量等于隊列最大偏移量

/*** 處理客戶端請求入口** @param channel 網絡通道,通過該通道向消息拉取客戶端發送響應結果* @param request 消息拉取請求* @param brokerAllowSuspend Broker端是否允許掛起,默認true。true:如果未找到消息則掛起。false:未找到消息直接返回消息未找到* @return 響應* @throws RemotingCommandException 當解析請求發生異常時*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {// ...switch (response.getCode()) {// ...// 如果從消費隊列中未找到新的可以拉取的消息,判斷并掛起該拉取請求case ResponseCode.PULL_NOT_FOUND:// 長輪詢if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}// ...
}

4.1.2 executeRequestWhenWakeup

在PullMessageProcessor的executeRequestWhenWakeup()方法中,通過業務線程池pullMessageExecutor,異步提交重新Pull消息的請求任務,即為重新調了一次PullMessageProcessor業務處理器的processRequest()方法,來實現Pull消息請求的二次處理)。

/*** 將Hold的拉取請求喚醒,再次拉取消息* 該方法調用線程池,因此,不會阻塞** @param channel 通道* @param request Consumer拉取請求* @throws RemotingCommandException 當遠程調用發生異常*/
public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {// 處理Consumer拉取請求,獲取返回體final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {// 將返回體寫入channel,返回給Consumerchannel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};// 異步執行請求處理和返回this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}

4.2 PullRequestHoldService

4.2.1 suspendPullRequest

/*** 掛起(保存)客戶端請求,當有數據的時候觸發請求** @param topic 主題* @param queueId 隊列編號* @param pullRequest 拉取消息請求*/
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {// 根據topic和queueId構造map的keyString key = this.buildKey(topic, queueId);// map的key如果為空,創建一個空的request隊列,填充key和valueManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}// 保存該次Consumer拉取請求mpr.addPullRequest(pullRequest);
}

4.2.2 checkHoldRequest

/*** 檢查所有已經掛起的長輪詢請求* 如果有數據滿足要求,就觸發請求再次執行*/
private void checkHoldRequest() {// 遍歷拉取請求容器中的每個隊列for (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);// 從store中獲取隊列的最大偏移量final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {// 根據store中獲取的最大偏移量,判斷是否有新消息到達,如果有則執行拉取請求操作this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}
}

4.2.3 run

@Override
public void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {// 等待一定時間if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {// 開啟長輪詢,每5s判斷一次消息是否到達this.waitForRunning(5 * 1000);} else {// 未開啟長輪詢,每1s判斷一次消息是否到達this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();// 檢查是否有消息到達,可以喚醒掛起的請求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());
}

4.2.4 notifyMessageArriving

這個方法在兩個地方被調用,如下圖所示

Untitled

這個方法是重新喚醒拉取請求的核心方法。調用這個方法,提醒 PullRequestHoldService 線程有新消息到達

我們來看看這個方法具體做了什么

  1. 根據 topic 和 queueId 獲取掛起的拉取請求列表
  2. 從 store 中獲取該隊列消息的最大offset
  3. 遍歷該隊列的所有拉取請求,符合以下兩種條件之一的拉取請求會被處理并返回
    1. 消費隊列最大offset比消費者拉取請求的offset大,說明有新的消息可以被拉取,處理該拉取請求
    2. 拉取請求掛起時間超過閾值,直接返回消息未找到
  4. 如果不滿足以上兩個條件,那么該拉取請求會重新放回 pullRequestTable,等待下次檢查

/*** 當有新消息到達的時候,喚醒長輪詢的消費端請求** @param topic     消息Topic* @param queueId   消息隊列ID* @param maxOffset 消費隊列的最大Offset*/
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 根據topic和queueId從容器中取出掛起的拉取請求列表String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {// 獲取掛起的拉取請求列表List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {// 預先定義需要繼續掛起的拉取請求列表List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;// 從store中獲取該隊列消息的最大offsetif (newestOffset <= request.getPullFromThisOffset()) {newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}// 消費隊列最大offset比消費者拉取請求的offset大,說明有新的消息可以被拉取if (newestOffset > request.getPullFromThisOffset()) {// 消息過濾匹配boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 會調用PullMessageProcessor#processRequest方法拉取消息,然后將結果返回給消費者this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}// 查看是否超時,如果Consumer請求達到了超時時間,也觸發響應,直接返回消息未找到if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}// 當前不滿足要求,重新放回Hold列表中replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}}
}

4.3 DefaultMessageStore#ReputMessageService

4.3.1 doReput

private void doReput() {// ...DefaultMessageStore.this.doDispatch(dispatchRequest);// 通知消息消費長輪詢線程,有新的消息落盤,立即喚醒掛起的消息拉取請求if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}

這里調用了 NotifyMessageArrivingListener#arriving() 方法,進而調用 PullRequestHoldService.notifyMessageArriving()。

為什么不直接調用 pullRequestHoldService.notifyMessageArriving() ?因為 doReput 所處的類所在的包是 store,存儲包,而 PullRequestHoldService 在 broker 包中

所以需要一個橋梁,就是 NotifyMessageArrivingListener。它在 Broker 初始化 DefaultMessageStore 時被寫入 DefaultMessageStore

4.3.2 NotifyMessageArrivingListener#arriving

public class NotifyMessageArrivingListener implements MessageArrivingListener {@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// 提醒長輪詢請求管理容器,新的消息到達,立刻拉取最新消息this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties);}
}

參考資料

  • 源碼分析RocketMQ消息PULL-長輪詢模式
  • 消息中間件—RocketMQ 消息消費(二)(push 模式實現)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:
http://www.pswp.cn/news/43357.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/43357.shtml
英文地址,請注明出處:http://en.pswp.cn/news/43357.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

探索心律失常:病因、診斷與治療以及與腸道菌群的關聯

谷禾健康 你是否有時會感到心悸、心慌、胸悶、氣短、頭暈、乏力&#xff1f;你是否有時感覺自己的心跳過快或過慢&#xff1f; 如果有上述情況&#xff0c;就要引起重視了&#xff0c;你可能存在心律失常。心律失常是最常見的心臟疾病之一&#xff0c;涉及到心臟的電活動節奏異…

麻辣燙數據可視化,麻辣燙市場將持續蓬勃發展

麻辣燙&#xff0c;這道源自中國的美食&#xff0c;早已成為人們生活中不可或缺的一部分。它獨特的香辣口味&#xff0c;讓人忍不住每每流連忘返。與人們的關系&#xff0c;簡直如同摯友一般。每當寒冷的冬日或疲憊的時刻&#xff0c;麻辣燙總是悄然走進人們的心房&#xff0c;…

i.MX6ULL開發板無法進入NFS掛載文件系統的解決辦法

問題 使用NFS網絡掛載文件系統后卡住無法進入系統。 解決辦法 此處不詳細講述NFS安裝流程 查看板卡掛載在/home/etc/rc.init下的自啟動程序 進入到../../home/etc目錄下&#xff0c;查看rc.init文件&#xff0c;首先從第一行排查&#xff0c;查看/home/etc/netcfg文件代碼內容&…

Flask-SQLAlchemy

認識Flask-SQLAlchemy Flask-SQLAlchemy 是一個為 Flask 應用增加 SQLAlchemy 支持的擴展。它致力于簡化在 Flask 中 SQLAlchemy 的使用。SQLAlchemy 是目前python中最強大的 ORM框架, 功能全面, 使用簡單。 ORM優缺點 優點 有語法提示, 省去自己拼寫SQL&#xff0c;保證SQL…

Python爬蟲——scrapy_crawlspider讀書網

創建crawlspider爬蟲文件&#xff1a; scrapy genspider -t crawl 爬蟲文件名 爬取的域名scrapy genspider -t crawl read https://www.dushu.com/book/1206.htmlLinkExtractor 鏈接提取器通過它&#xff0c;Spider可以知道從爬取的頁面中提取出哪些鏈接&#xff0c;提取出的鏈…

spring的核心技術---bean的生命周期加案例分析詳細易懂

目錄 一.spring管理JavaBean的初始化過程&#xff08;生命周期&#xff09; Spring Bean的生命周期&#xff1a; 二.spring的JavaBean管理中單例模式及原型&#xff08;多例&#xff09;模式 2.1 . 默認為單例&#xff0c;但是可以配置多例 2.2.舉例論證 2.2.1 默認單例 2.2…

前端常用算法(一):防抖+節流

目錄 第一章 防抖 1.1 防抖&#xff08;debounce&#xff09;簡介 1.2 應用場景 1.3 實現思路 1.4 手撕防抖代碼 第二章 節流 2.1 節流&#xff08;throttle&#xff09;簡介 2.2 應用場景 2.3 實現思路 2.4 手撕節流代碼&#xff08;方法&#xff1a;時間戳和計時器…

MR300C工業無線WiFi圖傳模塊 內窺鏡機器人圖像傳輸有線無線的兩種方式

MR300C無線WiFi圖傳模使用方法工業機器人圖像高清傳輸 ? MR300C圖傳模塊基于MIPS處理器實現&#xff0c;電腦/手機連接模塊的WIFI熱點或網口即可查看視頻流 ? 模塊的USB 2.0 Host接口&#xff0c;可接入USB uvc攝像頭/內窺鏡默認輸出的視頻格式必須是MJPG ? 模塊支持接入攝…

計算機競賽 圖像識別-人臉識別與疲勞檢測 - python opencv

文章目錄 0 前言1 課題背景2 Dlib人臉識別2.1 簡介2.2 Dlib優點2.3 相關代碼2.4 人臉數據庫2.5 人臉錄入加識別效果 3 疲勞檢測算法3.1 眼睛檢測算法3.3 點頭檢測算法 4 PyQt54.1 簡介4.2相關界面代碼 5 最后 0 前言 &#x1f525; 優質競賽項目系列&#xff0c;今天要分享的是…

在 PyTorch 中使用關鍵點 RCNN 進行人體姿勢估計--附源碼

人體姿態估計是計算機視覺領域的一個重要研究領域。它涉及估計人體上的獨特點,也稱為關鍵點。在這篇博文中,我們將討論一種在包含人類的圖像上查找關鍵點的算法,稱為Keypoint-RCNN。該代碼是使用 Pytorch 使用Torchvision庫編寫的。 假設您想要建立一名私人健身教練,可以通…

MongoDB升級經歷(4.0.23至5.0.19)

MongoDB從4.0.23至5.0.19升級經歷 引子&#xff1a;為了解決MongoDB的兩個漏洞決定把MongoDB升級至最新版本&#xff0c;期間也踩了不少坑&#xff0c;在這里分享出來供大家學習與避坑~ 1、MongoDB的兩個漏洞 漏洞1&#xff1a;MongoDB Server 安全漏洞(CVE-2021-20330) 漏洞2…

SpringBoot + Vue 微人事(十二)

職位批量刪除實現 編寫后端接口 PositionController DeleteMapping("/")public RespBean deletePositionByIds(Integer[] ids){if(positionsService.deletePositionsByIds(ids)ids.length){return RespBean.ok("刪除成功");}return RespBean.err("刪…

工業視覺相機鏡頭選型方法

一、相機選型 1、首先&#xff0c;根據檢測需求確定選用黑白/彩色、面陣/線陣相機&#xff0c;接口類型一般選擇GigE 2、確定檢測精度要求&#xff08;最小特征尺寸mm&#xff09;、視野范圍&#xff0c;一個測量精度對應幾個像素數&#xff08;一般取3-5&#xff09; 3、計…

GPT法律領域

法律領域 LaWGPT Github: https://github.com/pengxiao-song/LaWGPT 簡介&#xff1a;基于中文法律知識的大語言模型。 數據&#xff1a;基于中文裁判文書網公開法律文書數據、司法考試數據等數據集展開&#xff0c;利用Stanford_alpaca、self-instruct方式生成對話問答數據…

esp32c3 micropython oled實時天氣信息

目錄 簡介 效果展示 代碼 main.py ssd1306.py font.py 實現思路 簡介 合宙esp32c3 micropython框架&#xff0c;只支持128*64 I2C oled ssd1306驅動我優化過的&#xff0c;與其他的不一樣&#xff0c;為避免出錯&#xff0c;使用我的驅動 把下面兩個py文件放入單片機內…

SqlServer的with(nolock)關鍵字的用法介紹

舉個例子 下面就來演示這個情況。 為了演示兩個事務死鎖的情況&#xff0c;我們下面的測試都需要在SQL Server Management Studio中打開兩個查詢窗口。保證事務不被干擾。 --1、 沒有提交的事務&#xff0c;NOLOCK 和 READPAST處理的策略&#xff1a; --查詢窗口一請執行如下…

【馬蹄集】第二十三周——進位制專題

進位制專題 目錄 MT2186 二進制&#xff1f;不同&#xff01;MT2187 excel的煩惱MT2188 單條件和MT2189 三進制計算機1MT2190 三進制計算機2 MT2186 二進制&#xff1f;不同&#xff01; 難度&#xff1a;黃金 ?? 時間限制&#xff1a;1秒 ?? 占用內存&#xff1a;128M 題目…

Kotlin的Map

在 Kotlin 中&#xff0c;Map 是一種鍵值對的集合數據結構&#xff0c;用于存儲一組關聯的鍵和值。Kotlin 標準庫提供了 Map 接口和多種實現類&#xff0c;使得操作和處理鍵值對數據更加方便。下面詳細描述 Kotlin 的 Map 的用法&#xff1a; 創建 Map Kotlin 提供了幾種方式…

SQL力扣練習(十一)

目錄 1.樹節點(608) 示例 1 解法一(case when) 解法二(not in) 2.判斷三角形(610) 示例 1 解法一(case when) 解法二(if) 解法三(嵌套if) 3.只出現一次的最大數字(619) 示例 1 解法一(count limit) 解法二(max) 4.有趣的電影(620) 解法一 5.換座位(626) 示例 …

同步jenkinsfile流水線(sync-job)

環境 變量&#xff1a;env&#xff08;環境變量&#xff1a;sit/dev/simulation/prod/all&#xff09;&#xff0c;job&#xff08;job-name/all&#xff09;目錄&#xff1a;/var/lib/jenkins/jenkinsfile environment.json&#xff1a; [roottest-01 jenkinsfile]# cat env…