MQTT 入門教程:三步從 Docker 部署到 Java 客戶端實現

在物聯網(IoT)與邊緣計算快速發展的今天,設備間的高效通信成為核心需求。MQTT 作為一種輕量級的發布 / 訂閱模式協議,憑借其低帶寬占用、強穩定性和靈活的消息路由能力,已成為物聯網通信的事實標準。無論是智能家居的設備聯動、工業傳感器的數據采集,還是車聯網的實時信息交互,MQTT 都在其中扮演著關鍵角色。
本文將從零開始搭建:從使用 Docker 部署輕量級 MQTT 服務器(Broker),到基于 Java 語言實現完整的消息發布與訂閱功能,通過清晰的步驟和可直接運行的代碼,最短時間內搭建起自己的 MQTT 通信系統。

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一種輕量級的發布 / 訂閱模式消息傳輸協議,專為低帶寬、不穩定網絡環境設計,廣泛應用于物聯網(IoT)、傳感器網絡和移動設備通信等場景。
核心概念

  • Broker:消息服務器,負責接收和轉發所有消息
  • Publisher:消息發布者,發送消息到 Broker
  • Subscriber:消息訂閱者,從 Broker 接收消息
  • Topic:消息主題,用于消息分類和路由
  • QoS (Quality of Service):服務質量等級,定義消息傳遞的可靠性
    -在這里插入圖片描述

第一步:使用 Docker 部署 MQTT Broker

我們將使用 Eclipse Mosquitto,一個流行的開源 MQTT Broker。

1. 拉取 Mosquitto 鏡像

docker pull eclipse-mosquitto

2. 創建配置文件

首先創建一個目錄用于存放配置文件和數據:

mkdir -p ~/mosquitto/config ~/mosquitto/data ~/mosquitto/log

創建配置文件 mosquitto.conf:

nano ~/mosquitto/config/mosquitto.conf

添加以下內容:

persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
allow_anonymous true
listener 1883:MQTT 默認端口
allow_anonymous true:允許匿名連接(生產環境建議關閉)

3. 啟動 Mosquitto 容器

docker run -d \--name mosquitto \-p 1883:1883 \-v ~/mosquitto/config:/mosquitto/config \-v ~/mosquitto/data:/mosquitto/data \-v ~/mosquitto/log:/mosquitto/log \eclipse-mosquitto

4. 驗證 Broker 是否運行

docker ps | grep mosquitto

如果看到運行中的容器,說明 Broker 部署成功。

第二步:Java 客戶端實現

我們將使用 Eclipse Paho Java 客戶端庫來實現 MQTT 客戶端。

1. 添加依賴

如果使用 Maven,在pom.xml中添加:

<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>

2. MQTT 工具類

首先創建一個工具類封裝 MQTT 連接的通用功能:

//運行
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MQTTUtils {// MQTT Broker地址private static final String BROKER = "tcp://localhost:1883";/*** 創建MQTT客戶端并連接到Broker* @param clientId 客戶端ID,應唯一* @return 已連接的MQTT客戶端* @throws MqttException 連接異常*/public static MqttClient connect(String clientId) throws MqttException {// 設置客戶端持久化方式為內存MemoryPersistence persistence = new MemoryPersistence();// 創建客戶端MqttClient client = new MqttClient(BROKER, clientId, persistence);// 配置連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true); // 清除會話connOpts.setConnectionTimeout(10); // 連接超時時間connOpts.setKeepAliveInterval(20); // 心跳間隔// 連接到BrokerSystem.out.println("Connecting to broker: " + BROKER);client.connect(connOpts);System.out.println("Connected");return client;}/*** 發布消息* @param client MQTT客戶端* @param topic 消息主題* @param content 消息內容* @param qos 服務質量等級 (0, 1, 2)* @throws MqttException 發布異常*/public static void publish(MqttClient client, String topic, String content, int qos) throws MqttException {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(topic, message);System.out.println("Published message: " + content + " to topic: " + topic);}/*** 訂閱主題* @param client MQTT客戶端* @param topic 要訂閱的主題* @param qos 服務質量等級* @throws MqttException 訂閱異常*/public static void subscribe(MqttClient client, String topic, int qos) throws MqttException {System.out.println("Subscribing to topic: " + topic);client.subscribe(topic, qos);}
}

3. 訂閱者客戶端實現

//運行
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MQTTSubscriber {// 訂閱的主題private static final String TOPIC = "test/topic";// 客戶端IDprivate static final String CLIENT_ID = "subscriber-client";// QoS等級private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 連接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 設置消息監聽器client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message on topic: " + topic);System.out.println("Message content: " + new String(message.getPayload()));System.out.println("QoS: " + message.getQos());}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 對于訂閱者來說,這個方法通常不需要實現}});// 訂閱主題MQTTUtils.subscribe(client, TOPIC, QOS);// 保持客戶端運行以接收消息System.out.println("Waiting for messages...");while (true) {Thread.sleep(1000);}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

4. 發布者客戶端實現

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;public class MQTTPublisher {// 發布的主題private static final String TOPIC = "test/topic";// 客戶端IDprivate static final String CLIENT_ID = "publisher-client";// QoS等級private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 連接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 發布幾條測試消息for (int i = 1; i <= 5; i++) {String message = "Hello, MQTT! This is message " + i;MQTTUtils.publish(client, TOPIC, message, QOS);Thread.sleep(2000); // 間隔2秒發送一條}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

第三步:運行和測試

1. 啟動訂閱者

首先運行MQTTSubscriber類,它會連接到 Broker 并開始等待接收消息:

Connecting to broker: tcp://localhost:1883
Connected
Subscribing to topic: test/topic
Waiting for messages...

2. 啟動發布者

然后運行MQTTPublisher類,它會發送 5 條消息到指定主題:

Connecting to broker: tcp://localhost:1883
Connected
Published message: Hello, MQTT! This is message 1 to topic: test/topic
Published message: Hello, MQTT! This is message 2 to topic: test/topic

3. 查看結果

在訂閱者的控制臺,你應該能看到接收到的消息:

Received message on topic: test/topic
Message content: Hello, MQTT! This is message 1
QoS: 1
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 2
QoS: 1

總結
本文介紹了 MQTT 的基本概念,展示了如何使用 Docker 快速部署 Mosquitto Broker,并通過 Java 代碼實現了 MQTT 客戶端的發布和訂閱功能。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/91906.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/91906.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/91906.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

公網服務器上Nginx或者Openresty如何屏蔽IP直接掃描

0x01 背景云服務器很多時候為了通信需要設置公網訪問&#xff0c;但是網絡當中存在很多的掃描器&#xff0c;無時無刻在掃描&#xff0c;當80,443端口暴露時&#xff0c;成了這些掃描IP的攻擊對象&#xff0c;無時無刻收到威脅。0x02 掃描攻擊方式1.直接通過公網IP地址進行一些…

C語言(長期更新)第8講 函數遞歸

C語言&#xff08;長期更新&#xff09; 第8講:函數遞歸 跟著潼心走&#xff0c;輕松拿捏C語言&#xff0c;困惑通通走&#xff0c;一去不回頭~歡迎開始今天的學習內容&#xff0c;你的支持就是博主最大的動力。 目錄 C語言&#xff08;長期更新&#xff09; 第8講 函數遞歸…

[硬件電路-129]:模擬電路 - 繼電器的工作原理、關鍵指標、常用芯片與管腳定義

一、工作原理繼電器是一種基于電磁感應原理的自動開關裝置&#xff0c;通過控制小電流電路實現大電流電路的通斷。其核心結構包括&#xff1a;電磁鐵&#xff08;線圈鐵芯&#xff09;&#xff1a;通電時產生磁場&#xff0c;吸引銜鐵動作。觸點系統&#xff1a;包含常開觸點&a…

Haproxy調度算法 - 靜態算法介紹與使用

文章目錄一、概述二、socat工具三、static-rr四、firstHAProxy通過固定參數 balance 指明對后端服務器的調度算法&#xff0c;該參數可以配置在listen或backend選項中。HAProxy的調度算法分為靜態和動態調度算法&#xff0c;但是有些算法可以根據參數在靜態和動態算法中相互轉換…

模擬激光相機工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20

模擬激光相機工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20

AWS Blockchain Templates:快速部署企業級區塊鏈網絡的終極解決方案

無需精通底層架構&#xff0c;一鍵搭建Hyperledger Fabric或以太坊網絡&#xff01;AWS Blockchain Templates 可幫助您快速基于不同的區塊鏈框架在 AWS 上創建和部署區塊鏈網絡。區塊鏈是一種分布式數據庫技術&#xff0c;用于維護不斷增長的交易記錄和智能合約集合&#xff0…

Vue 服務端渲染 Nuxt 使用詳解

Nuxt 是基于 Vue 的高層框架&#xff0c;專注于服務器端渲染應用開發。它封裝了繁瑣的配置和通用模式&#xff0c;提供了開箱即用的 SSR 功能&#xff0c;使開發者能夠專注于編寫業務邏輯。 1. Nuxt 的核心特性 SSR 支持&#xff1a;默認支持服務端渲染&#xff0c;提高應用性…

使用ACK Serverless容器化部署大語言模型FastChat

核心概念 阿里云ACK Serverless&#xff1a;是一種基于 Kubernetes 的無服務器容器服務。用戶無需管理底層節點和服務器&#xff0c;即可快速部署容器化應用&#xff0c;并根據實際使用的 CPU 和內存資源按需付費&#xff0c;只專注于應用本身而非基礎設施管理。 FastChat&…

最新Android Studio漢化教程--兼容插件包

[ ] 軟件版本&#xff1a;Android Studio Meerkat Feature Drop | 2024.3.2 Build #AI-243.25659.59.2432.13423653, built on April 30, 2025 Runtime version: 21.0.613368085-b895.109 amd64 VM: OpenJDK 64-Bit Server VM by JetBrains s.r.o. Toolkit: sun.awt.windows.WT…

Unity_數據持久化_IXmlSerializable接口

Unity數據持久化 三、XML數據持久化 3.5 IXmlSerializable接口 3.5.1 IXmlSerializable接口基礎概念 什么是IXmlSerializable接口&#xff1a; IXmlSerializable 是.NET框架提供的一個接口&#xff0c;允許類自定義XML序列化和反序列化的過程。當默認的XML序列化行為無法滿足需…

如何快速解決PDF解密新方法?

有時從網絡下載的PDF文檔會帶有加密限制&#xff0c;導致無法編輯、復制或打印。它的體積僅約10MB&#xff0c;無需安裝&#xff0c;解壓即用。遇到受限制的文件時&#xff0c;只需將其拖入界面&#xff0c;選擇是否覆蓋原文件&#xff0c;點擊執行&#xff0c;瞬間完成解密。「…

譯|數據驅動智慧供應鏈的構成要素與關聯思考

數據質量&#xff0c;通過識別關鍵決策和瓶頸構建信息供應鏈。該模型適用于優化庫存管理、自動化物流、預測需求、實現產品全生命周期追溯及應對突發風險。例如&#xff0c;通過AI機器人自動管理倉庫&#xff0c;或利用數字孿生模擬和優化全球采購網絡。 匯總來自三篇文章&…

OS21.【Linux】環境變量

目錄 1.與環境變量有關的實驗 A.對比命令和自制程序的運行 為什么.像ls、pwd這樣的命令運行是不需要加路徑? 執行自制程序而不加路徑的方法,看看PATH環境變量 方法1:將自制程序移動到系統的搜索路徑下 方法2:臨時修改PATH環境變量 B.查看系統中所有環境變量 解釋幾個常…

加密流量論文復現:《Detecting DNS over HTTPS based data exfiltration》(上)

本文將以我個人的理解去閱讀該篇流量加密論文&#xff0c;并在下一篇盡力對其中的實驗部分進行復現。話不多說&#xff0c;先從論文開始著手。 內容介紹 傳統的DNS(Domain Name System)協議是以明文傳輸的。DNS作為互聯網的基礎設施&#xff0c;最初設計時主要考慮的是功能和效…

Apache RocketMQ 中Message (消息)的核心概念

好的&#xff0c;我們來深入理解一下 Apache RocketMQ 中 Message (消息) 這個核心概念。這份文檔詳細闡述了消息的定義、在模型中的位置、內部屬性、約束和使用建議。 你可以將 Message 看作是 RocketMQ 系統中數據傳輸和處理的最小原子單位。它承載了業務數據&#xff0c;并附…

C 語言問題

1. C語言中 union 與 struct 的區別類型structunion內存分配機制編譯器為每個成員?獨立分配內存空間&#xff0c;總內存大小 所有成員大小之和&#xff08;考慮內存對齊&#xff09;所有成員?共享同一段內存空間&#xff0c;總內存大小 ?最大成員的大小?數據存儲特性1. 所…

[ LeetCode優選算法專題一雙指針-----盛最多的水]

1.題目鏈接 LeetCode盛最多的水 2.題目描述 3.題目解析 問題本質分析 "盛最多水的容器" 問題可以抽象為&#xff1a;在坐標軸上有 n 條垂直線段&#xff0c;第 i 條線段的兩個端點分別是 (i, 0) 和 (i, height [i])。找到兩條線段&#xff0c;使得它們與 x 軸共同…

舊筆記本電腦如何安裝飛牛OS

01引言隨著電子產品的更新換代&#xff0c;我們有很多的電子產品已經滿足不了現在的工作需求和日常娛樂了&#xff0c;比如&#xff1a;用了很久厚重筆記本電腦放在現在辦公也是有點吃力了&#xff0c;我們現在換新了舊的還不想放在那里吃灰&#xff0c;怎么辦呢&#xff1f;我…

某金服Java面試終極指南:25題完整解析與場景化方案

涵蓋分布式鎖、緩存、事務、高并發等金融系統核心考點&#xff0c;附解決方案與抗風險設計一、分布式鎖深度解決方案 1. Redis分布式鎖完整實現 // 原子加鎖 防死鎖 String uuid UUID.randomUUID().toString(); Boolean locked redisTemplate.opsForValue().setIfAbsent(&qu…

MATLAB 2025a的下載以及安裝,安裝X310的測試附加功能(附加安裝包)

首先將安裝包下載到本地中之后解壓該文件夾&#xff0c;打開文件發現有兩個文件&#xff0c;其中crach文件夾中是破解matlab所用到的文件。而另一個壓縮包就是需要安裝的文件&#xff0c;要先解壓在安裝。在安裝之前將網絡斷開&#xff0c;不然可能破解不成功&#xff0c;先進入…