centos7.9使用docker-compose安裝kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生產者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已發送: " + message + " 到主題: " + topic;}
}

生產者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 發送消息到指定主題* @param topic 主題名稱* @param message 消息內容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息發送成功: " + message +", 分區: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息發送失敗: " + ex.getMessage());}});}
}

消費者監聽

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 監聽指定的主題,groupId用于區分不同的消費者組@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主題: %s, 分區: %d, 偏移量: %d, 鍵: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 這里可以添加你的業務邏輯處理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/84544.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/84544.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/84544.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STM32:Modbus通信協議核心解析:關鍵通信技術

知識點1【 Modbus通信】 1、Modbus的概述 Modbus是OSI模型第七層的應用層報文傳輸協議 協議&#xff1a;說明有組包和解包的過程 2、通信機制 Modelbus是一個請求/應答協議 通信機制&#xff1a;主機輪詢&#xff0c;從機應答的機制。每個從設備有唯一的地址&#xff0c;主…

LeetCode 3362.零數組變換 III:貪心+優先隊列+差分數組——清晰題解

【LetMeFly】3362.零數組變換 III&#xff1a;貪心優先隊列差分數組——清晰題解 力扣題目鏈接&#xff1a;https://leetcode.cn/problems/zero-array-transformation-iii/ 給你一個長度為 n 的整數數組 nums 和一個二維數組 queries &#xff0c;其中 queries[i] [li, ri] …

ORM++ 封裝實戰指南:安全高效的 C++ MySQL 數據庫操作

ORM 封裝實戰指南&#xff1a;安全高效的 C MySQL 數據庫操作 一、環境準備 1.1 依賴安裝 # Ubuntu/Debian sudo apt-get install libmysqlclient-dev # CentOS sudo yum install mysql-devel# 編譯時鏈接庫 (-I 指定頭文件路徑 -L 指定庫路徑) g main.cpp -stdc17 -I/usr/i…

JESD204B 協議介紹

一、協議概述 JESD204B是由JEDEC&#xff08;固態技術協會&#xff09;制定的高速串行接口標準&#xff0c;專為模數轉換器&#xff08;ADC&#xff09;、數模轉換器&#xff08;DAC&#xff09;與邏輯器件&#xff08;如FPGA、ASIC&#xff09;之間的數據傳輸設計。其核心目標…

yolov8,c++案例匯總

文章目錄 引言多目標追蹤案例人體姿態估計算法手勢姿態估計算法目標分割算法 引言 以下案例,基于c,ncnn,yolov8既可以在windows10/11上部署, 也可以在安卓端部署, 也可以在嵌入式端部署, 服務器端可支持部署封裝為DLL,支持c/c#/java端調用 多目標追蹤案例 基于yolov8, ncnn,…

運動規劃實戰案例 | 圖解基于狀態晶格(State Lattice)的路徑規劃(附ROS C++/Python仿真)

目錄 1 控制采樣 vs 狀態采樣2 State Lattice路徑規劃2.1 算法流程2.2 Lattice運動基元生成2.3 幾何代價函數2.4 運動學約束啟發式 3 算法仿真3.1 ROS C仿真3.2 Python仿真 1 控制采樣 vs 狀態采樣 控制采樣的技術路線源自經典的運動學建模思想。這種方法將機器人的控制指令空…

BERT框架:自然語言處理的革命性突破

引言 在自然語言處理&#xff08;NLP&#xff09;領域&#xff0c;2018年Google推出的BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;框架無疑是一場革命。作為基于Transformer架構的雙向編碼器表示模型&#xff0c;BERT通過預訓練學習…

【Fifty Project - D31】

結束了一個超級消耗周末&#xff0c;滿安排之健身梅溪湖游泳做飯喝酒羽毛球賽 完全力竭了&#xff0c;久久不能恢復過來&#xff0c;暫停健身安排了 端午后再繼續 今日完成記錄 TimePlan完成情況7&#xff1a;30 - 8&#xff1a;10有氧爬坡√9&#xff1a;00 - 11&#xff1a;…

信息學奧賽一本通 1547:【 例 1】區間和

【題目鏈接】 ybt 1547&#xff1a;【 例 1】區間和 【題目考點】 1. 線段樹 2. 樹狀數組 【解題思路】 本題要求維護區間和&#xff0c;實現單點修改、區間查詢。 解法1&#xff1a;線段樹 線段樹原理&#xff0c;及實現方法見&#xff1a;洛谷 P3374 【模板】樹狀數組…

力扣面試150題--求根節點到葉節點數字之和

Day 48 題目描述 思路 我們利用sum這個全局變量來保存總和值&#xff0c;遞歸函數sum來計算每個根到葉子節點路徑所代表的數&#xff0c;由于我們需要遍歷到每條根到葉子節點的路徑&#xff0c;所有我采取了前序遍歷&#xff0c;如果不是葉子節點&#xff0c;就計算到該節點代…

DJI上云API官方demo學習

1、websocket&#xff0c;所在位置如下圖&#xff0c;調用的可以用//websocket搜索 2、用到的http客戶端&#xff0c;axios 3、很多和后端交互都是走的http請求

uniapp開發小程序,如何根據權限動態配置按鈕或頁面內容

前言 寫了好幾個項目&#xff0c;發現小程序對權限控制非常麻煩&#xff0c;于是有了這個想法&#xff0c;但是網上找了一圈沒有一個比較完善的講解&#xff0c;因為小程序不支持自定義指令&#xff0c;所以不能像后臺那樣方便&#xff0c;于是就將幾個博主的想法結合。 思路就…

LSTM+Transformer混合模型架構文檔

LSTMTransformer混合模型架構文檔 模型概述 本項目實現了一個LSTMTransformer混合模型&#xff0c;用于超臨界機組協調控制系統的數據驅動建模。該模型結合了LSTM的時序建模能力和Transformer的自注意力機制&#xff0c;能夠有效捕捉時間序列數據中的長期依賴關系和變量間的復…

測量尺子:多功能測量工具,科技改變生活

測量尺子是一款專業的測距儀測量萬能工具箱類型手機APP&#xff0c;旨在為用戶提供最貼心的測量助手。它擁有和現實測量儀器一樣的測量標準&#xff0c;更簡單便捷且精準的測量方式&#xff0c;最新AR科技測量更是大大拓寬了可以被測量的高度和深度。無論是日常使用、學習還是工…

結課作業01. 用戶空間 MPU6050 體感鼠標驅動程序

目錄 一. qt界面實現 二. 虛擬設備模擬模擬鼠標實現體感鼠標 2.1 函數聲明 2.2 虛擬鼠標實現 2.2.1 虛擬鼠標創建函數 2.2.2 鼠標移動函數 2.2.3 鼠標點擊函數 2.3 mpu6050相關函數實現 2.3.1 i2c設備初始化 2.3.2 mpu6050寄存器寫入 2.3.3 mpu6050寄存器讀取 2.3.…

深入淺出 Python Testcontainers:用容器優雅地編寫集成測試

在現代軟件開發中&#xff0c;自動化測試已成為敏捷開發與持續集成中的關鍵環節。單元測試可以快速驗證函數或類的行為是否符合預期&#xff0c;而集成測試則確保多個模塊協同工作時依然正確。問題是&#xff1a;如何讓集成測試可靠、可重復且易于維護&#xff1f; 這時&#…

JVM 的垃圾回收器

新生代回收器 通性 會觸發StW&#xff0c;暫停所有應用線程復制算法 Serial 單線程回收適合單線程系統 ParNew 多線程回收優先保證響應速度&#xff0c;降低 STW&#xff08;STW 越大&#xff0c;執行垃圾回收的時間越長&#xff0c;回收的垃圾越多&#xff0c;減少垃圾回…

【筆記】排查并解決Error in LLM call after 3 attempts: (status code: 502)

#工作記錄 一、問題描述 在部署運行部署對沖基金分析工具 ai-hedge-fund 時&#xff0c;不斷出現以下報錯&#xff0c;導致項目運行異常&#xff1a; Error in LLM call after 3 attempts: (status code: 502) Error in LLM call after 3 attempts: [WinError 10054] 遠程主…

GO 語言進階之 Template 模板使用

更多個人筆記見&#xff1a; github個人筆記倉庫 gitee 個人筆記倉庫 個人學習&#xff0c;學習過程中還會不斷補充&#xff5e; &#xff08;后續會更新在github上&#xff09; 文章目錄 Template 模板基本示例語法1. 基本輸出語法2. 控制結構3. 空白字符控制4. Must函數 Temp…

origin繪圖之【如何將多條重疊、高度重疊的點線圖、折線圖分開】

在日常的數據可視化工作中&#xff0c;Origin 作為一款功能強大的科研繪圖軟件&#xff0c;廣泛應用于實驗數據處理、結果展示與論文圖表制作等領域。然而&#xff0c;在處理多組數據、特別是繪制多條曲線的折線圖或點線圖時&#xff0c;常常會遇到這樣一個困擾&#xff1a;多條…