Kafka學習征途:.NET Core操作Kafka

4afb10bfeb4373ffb5c7d742384e9dcc.png

【Kafka】|?總結/Edison Zhou

1可用的Kafka .NET客戶端

作為一個.NET Developer,自然想要在.NET項目中集成Kafka實現發布訂閱功能。那么,目前可用的Kafka客戶端有哪些呢?

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客戶端還有rdkafka-dotnet項目,但是其已經被并入confluent-kakfa-dotnet項目進行維護了。

因此,推薦使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星項目CAP的Kafka擴展包(DotNetCore.CAP.Kafka)內部也是基于Confluent.Kafka來實現的:

efba0e142bd6509587b5ba732694541e.png

接下來,本文就來在.NET Core項目下通過Confluent.Kafka和CAP兩個主流開源項目來操作一下Kafka,實現一下發布訂閱的功能。

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一個啟動好的Kafka Broker服務。關于如何搭建Kafka,請參考上一篇:通過Docker部署Kafka集群。

安裝相關組件

在.NET Core項目中新建一個類庫,暫且命名為EDT.Kafka.Core,安裝Confluent.Kafka組件:

PM>Install-Package Confluent.Kafka

編寫KafkaService

編寫IKafkaService接口:

namespace EDT.Kafka.Core
{public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}
}

編寫KafkaService實現類:

namespace EDT.Kafka.Core
{public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小為16KLingerMs = 20 // 修改等待時間為20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假設只需要Leader響應即可AutoOffsetReset = AutoOffsetReset.Earliest // 從最早的開始消費起};using?(var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}
}

為了方便后續的演示,在此項目中再創建一個類 EventData:

public class EventData
{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }
}

編寫Producer

新建一個Console項目,暫且命名為:EDT.Kafka.Demo.Producer,其主體內容如下:

namespace EDT.Kafka.Demo.Producer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}}
}

編寫Consumer

新建一個Console項目,暫且命名為:EDT.Kafka.Demo.Consumer,其主體內容如下:

namespace EDT.Kafka.Demo.Consumer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理");});}}
}

測試Pub/Sub效果

將Producer和Consumer兩個項目都啟動起來,可以看到當Consumer消費完50條消息并一一確認之后,Producer這邊就算發布結束。

0eb57c565ee5d83e8431b90d680012a7.png

3基于CAP項目的Sample

模擬場景說明

假設我們有兩個微服務,一個是Catalog微服務,一個是Basket微服務,當Catalog微服務產生了Product價格更新的事件,就會將其發布到Kafka,Basket微服務作為消費者就會訂閱這個消息然后更新購物車中對應商品的最新價格。

5e67dffa1e91a1b651d43cf91bf6760a.png

Catalog API

新建一個ASP.NET Core WebAPI項目,然后分別安裝以下組件:

PM>Install Package DotNetCore.CAP
PM>Install Package DotNetCore.CAP.MongoDB
PM>Install Package DotNetCore.CAP.Kafka

在Startup中的ConfigureServices方法中注入CAP:

public void ConfigureServices(IServiceCollection services)
{......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
}

新建一個ProductController,實現一個Update產品價格的接口,在其中通過CapPublisher完成發布消息到Kafka:

namespace EDT.Demo.Catalog.API.Controllers
{[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "電動牙刷A", Price = 99.90M,  Introduction = "暫無介紹" },new Product { Id = "0002", Name = "電動牙刷B", Price = 199.90M,  Introduction = "暫無介紹" },new Product { Id = "0003", Name = "洗衣機A", Price = 2999.90M,  Introduction = "暫無介紹" },new Product { Id = "0004", Name = "洗衣機B", Price = 3999.90M,  Introduction = "暫無介紹" },new Product { Id = "0005", Name = "電視機A", Price = 1899.90M,  Introduction = "暫無介紹" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 業務代碼var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 發布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}}
}

Basket API

參照Catalog API項目創建ASP.NET Core WebAPI項目,并安裝對應組件,在ConfigureServices方法中注入CAP。

新建一個BasketController,用于訂閱Kafka對應Topic:ProductPriceChanged 的消息。

namespace EDT.Demo.Basket.API.Controllers
{[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "電動牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "電視機A", Price = 1899.90M }, Count = 1 },}    },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "電動牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣機B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}
}

測試效果

同時啟動Catalog API 和 Basket API兩個項目。

首先,通過Swagger在Basket API中查看所有用戶購物車中的商品的價格,可以看到,0002的商品是199.9元。

2e675b691deb7d5697ca8bfc944aaa4b.png

然后,通過Swagger在Catalog API中更新Id為0002的商品的價格至499.9元。

18f6e41e8cc48e13b8a4ec3824ebe107.png

最后,通過Swagger在Basket API中查看所有用戶購物車中的商品的價格,可以看到,0002的商品已更新至499.9元。

922f592196747ba84c8318fccc03abf5.png

End總結

本文總結了.NET Core如何通過對應客戶端操作Kafka,基于Confluent.Kafka項目和CAP項目可以方便的實現發布訂閱的效果。

參考資料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麥比烏斯皇,《.NET使用Kafka小結》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件總線解決方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

極客時間,胡夕《Kafka核心技術與實戰》

B站,尚硅谷《Kafka 3.x入門到精通教程》

3b5db3496503b68f3f6146c45eb2b8bb.gif

年終總結:Edison的2020年終總結

數字化轉型:我在傳統企業做數字化轉型

C#刷題:C#刷劍指Offer算法題系列文章目錄

.NET面試:.NET開發面試知識體系

.NET大會:2020年中國.NET開發者大會PDF資料

91cc2e3d310148633b7d09b52fc5af49.png

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/285539.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/285539.shtml
英文地址,請注明出處:http://en.pswp.cn/news/285539.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C語言試題107之判斷 101至200 之間有多少個素數,并輸出所有素數。

?作者簡介:大家好我是碼莎拉蒂,CSDN博客專家?????? ??個人主頁:個人主頁 ??系列專欄:C語言試題200例 ??推薦一款模擬面試、刷題神器?? 點擊跳轉進入網站 1、題目 題目:判斷 101至200 之間有多少個素數,并輸出所有素數 分析:判斷素數的方法:用一個數分…

C語言將十進制輸出二進制、八進制、十六進制的方法總結

文章目錄 方法一:直接輸出方法二:itoa函數方法一:直接輸出 C語言中,控制printf函數輸出格式的是格式字符,printf沒有直接打出2進制數的格式符,直接打出16進制的格式符是x格式符,即%x。 printf函數中輸出的格式為printf("<格式化字符串>", <參量表&…

[Android] 修改ImageView的圖片顏色

有兩種方法&#xff1a; 方法1&#xff1a; ImageView imageView (ImageView) findViewById(R.id.arrow_image); Drawable tipsArrow imageView.getDrawable(); tipsArrow.setColorFilter(mContext.getResources().getColor(R.color.red_bg1), PorterDuf…

四叉樹算法

2019獨角獸企業重金招聘Python工程師標準>>> title: 四叉樹算法 date: 2016-1-11 15:10 categories: IOS tags: 算法 小小程序猿我的博客&#xff1a;http://daycoding.com 轉載&#xff1a;http://blog.csdn.net/zhanxinhang/article/details/6706217 高德iOS聚合…

2019年中國教育信息化行業研究報告

2019年中國教育信息化行業研究報告 教育行業丨研究報告 本文轉自&#xff1a;艾瑞咨詢 核心摘要&#xff1a; 教育信息化2.0時代&#xff0c;教育相關政府/學校以更開放的姿態對待社會各類業態的進入&#xff0c;共建共享優質教育資源&#xff0c;提升教育公平與教育質量。同…

C語言試題109之將一個正整數分解質因數。例如:輸入 90,打印出 90=2乘3乘3乘5

?作者簡介:大家好我是碼莎拉蒂,CSDN博客專家?????? ??個人主頁:個人主頁 ??系列專欄:C語言試題200例 ??推薦一款模擬面試、刷題神器?? 點擊跳轉進入網站 1、題目 題目:將一個正整數分解質因數。例如:輸入 90,打印出 90=233*5。 分析:對 n 進行分解質因…

【ArcGIS遇上Python】使用add-in向導開發ArcGIS插件(1):add-in工具介紹及安裝

文章目錄 addin介紹addin開發方式Python Add-In開發addin下載addin安裝基于ArcObject/ArcGIS Engine的Add-In開發addin介紹 ArcGIS從10.0開始支持addin(ArcGIS軟件中又叫作加載項)的方式進行插件制作。相對于以往9.x系列,addin的無論是從使用或者編寫都更加方便快捷。通過開…

dotnet 使用 Crossgen2 對 DLL 進行 ReadyToRun 提升啟動性能

我對幾個應用進行嚴格的啟動性能評估&#xff0c;對比了在 .NET Framework 和 dotnet 6 下的應用啟動性能&#xff0c;非常符合預期的可以看到&#xff0c;在用戶的設備上&#xff0c;經過了 NGen 之后的 .NET Framework 可以提供非常優越的啟動性能&#xff0c;再加上 .NET Fr…

使用myeclipse建立maven項目(重要)

maven是管理項目的&#xff0c;myeclipse是編寫代碼的。第一次寫項目都要配置好多東西&#xff0c;很麻煩&#xff0c;now 來看看怎樣新建一個maven項目。 工具/原料 myeclipsemaven方法/步驟 因為教程使用的maven是自己下載配置的&#xff0c;并沒有使用myeclipse自帶的&#…

LeetCode 每日一題 Day 22 || 枚舉(數學方法)/二分

1954. 收集足夠蘋果的最小花園周長 給你一個用無限二維網格表示的花園&#xff0c;每一個 整數坐標處都有一棵蘋果樹。整數坐標 (i, j) 處的蘋果樹有 |i| |j| 個蘋果。 你將會買下正中心坐標是 (0, 0) 的一塊 正方形土地 &#xff0c;且每條邊都與兩條坐標軸之一平行。 給你…

不用@微信官網了,用python給自己的微信頭像加個小國旗

國旗LOGO&#xff08;png透明格式&#xff09;&#xff1a; 微信頭像 合成結果&#xff1a; import base64 import os import re from io import BytesIO from PIL import Image import tkinter as tk from tkinter import filedialog# 水印圖片 可以自己指定 #markImageImage…

getContentResolver().query()方法selection參數使用詳解(轉)

如何在managedQuery()和getContentResolver().query()方法中實現結果去重 有時候&#xff0c;我們需要對查詢的數據庫結果進行去重。在SQL中我們可以通過distinct關鍵字實現&#xff0c;但是當我們使用android提供的managedQuery()或getContentResolver().query()方法對數據庫進…

C語言試題106之有一對兔子問題

?作者簡介:大家好我是碼莎拉蒂,CSDN博客專家?????? ??個人主頁:個人主頁 ??系列專欄:C語言試題200例 ??推薦一款模擬面試、刷題神器?? 點擊跳轉進入網站 1、題目 題目:有一對兔子,從出生后第 3 個月起每個月都生一對兔子,小兔子長到第三個月 后每個月又…

【C#程序設計】教學講義——第二章:簡單C#程序設計

教學目錄 2.1 面向對象的概念2.2 建立簡單的應用程序2.3 窗體和Label控件2.4 文本框-屬性2.5 按鈕控件本章小結2.1 面向對象的概念 2.1.1 對象和類 1.對象 對象是客觀世界中對象的模型化。對象是有著特殊數據(屬性)與操作(行為)的實體,對象的操作(行為)稱為方法。 程…

Blazor University (34)表單 —— 獲得表單狀態

原文鏈接&#xff1a;https://blazor-university.com/forms/accessing-form-state/獲得表單狀態源代碼[1]有時&#xff0c;我們需要獲得 <EditForm> 子內容中的表單狀態。最常見的用途是當我們需要訪問輸入的 CSS 類時&#xff0c;指示輸入是否被修改或有效/無效。例如&a…

[轉]c# 中間件 的擴展模型(.net webapi/.net Core 的 MiddleWare 處理模型)

在學習 asp.net WebApi 或者asp.net Core 的時候&#xff0c;它們管道的處理模型跟 asp.net MVC/WebForm 的管道模型是不一樣的。 asp.net WebApi 或者asp.net Core 他們使用了一種叫做“中間件”的處理模型&#xff0c;相對于傳統管道模型&#xff0c;剔除了很多非必要的處理…

AIX 環境下遇到Device Busy問題

IBM AIX v5.3操作系統環境下在對網絡或網卡進行操作過程中經常遇到"Device Busy"而終止操作例如:#rmdev -l ent1遇到如下返回信息Method error (/etc/methods/ucfgdevice):0514-062 Cant perform the requested function because the speciafield.device is busy. 解…

mykernel編譯過程中問題解決

fatal error: linux/compiler-gcc5.h: No such file or directorycompilation terminated.解決方法:https://git.kernel.org/cgit/linux/kernel/git/stable/linux-stable.git/plain/include/linux/compiler-gcc5.h?id2c07053b8e1e0c22bb54dfbdf8e86a70f8bf00fc復制內容保存為c…

C#中的 Attribute 與 Python/TypeScript 中的裝飾器是同個東西嗎

前言最近成功把「前端帶師」帶入C#的坑&#xff08;實際是前端帶師開始從cocos轉unity游戲開發了&#xff09;某天&#xff0c;「前端帶師」看到這段代碼后問了個問題&#xff1a;[這個是裝飾器]&#xff1f;[HttpGet] public Response Get() {return ... }我第一反應覺得不是&…

【ArcGIS Engine二次開發】入門基礎(1):ArcGIS Engine簡介及開發環境搭建

文章目錄ArcGIS Engine概述ArcGIS Engine與ArcObjects的關系ArcGIS Engine下載及安裝ArcGIS Engine概述 ArcGIS Engine簡介 ArcGIS Engine是ESRI公司在2004年推出的用于開發C/S架構GIS應用軟件的工具包&#xff0c;是將用于構建ArcGIS整套產品的組件庫——ArcObjects的比分功…