【Kafka】|?總結/Edison Zhou
1可用的Kafka .NET客戶端
作為一個.NET Developer,自然想要在.NET項目中集成Kafka實現發布訂閱功能。那么,目前可用的Kafka客戶端有哪些呢?
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客戶端還有rdkafka-dotnet項目,但是其已經被并入confluent-kakfa-dotnet項目進行維護了。
因此,推薦使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星項目CAP的Kafka擴展包(DotNetCore.CAP.Kafka)內部也是基于Confluent.Kafka來實現的:
接下來,本文就來在.NET Core項目下通過Confluent.Kafka和CAP兩個主流開源項目來操作一下Kafka,實現一下發布訂閱的功能。
2基于Confluent.Kafka的Sample
要完成本文示例,首先得有一個啟動好的Kafka Broker服務。關于如何搭建Kafka,請參考上一篇:通過Docker部署Kafka集群。
安裝相關組件
在.NET Core項目中新建一個類庫,暫且命名為EDT.Kafka.Core,安裝Confluent.Kafka組件:
PM>Install-Package Confluent.Kafka
編寫KafkaService
編寫IKafkaService接口:
namespace EDT.Kafka.Core
{public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}
}
編寫KafkaService實現類:
namespace EDT.Kafka.Core
{public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小為16KLingerMs = 20 // 修改等待時間為20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假設只需要Leader響應即可AutoOffsetReset = AutoOffsetReset.Earliest // 從最早的開始消費起};using?(var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}
}
為了方便后續的演示,在此項目中再創建一個類 EventData:
public class EventData
{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }
}
編寫Producer
新建一個Console項目,暫且命名為:EDT.Kafka.Demo.Producer,其主體內容如下:
namespace EDT.Kafka.Demo.Producer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}}
}
編寫Consumer
新建一個Console項目,暫且命名為:EDT.Kafka.Demo.Consumer,其主體內容如下:
namespace EDT.Kafka.Demo.Consumer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理");});}}
}
測試Pub/Sub效果
將Producer和Consumer兩個項目都啟動起來,可以看到當Consumer消費完50條消息并一一確認之后,Producer這邊就算發布結束。
3基于CAP項目的Sample
模擬場景說明
假設我們有兩個微服務,一個是Catalog微服務,一個是Basket微服務,當Catalog微服務產生了Product價格更新的事件,就會將其發布到Kafka,Basket微服務作為消費者就會訂閱這個消息然后更新購物車中對應商品的最新價格。
Catalog API
新建一個ASP.NET Core WebAPI項目,然后分別安裝以下組件:
PM>Install Package DotNetCore.CAP
PM>Install Package DotNetCore.CAP.MongoDB
PM>Install Package DotNetCore.CAP.Kafka
在Startup中的ConfigureServices方法中注入CAP:
public void ConfigureServices(IServiceCollection services)
{......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
}
新建一個ProductController,實現一個Update產品價格的接口,在其中通過CapPublisher完成發布消息到Kafka:
namespace EDT.Demo.Catalog.API.Controllers
{[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "電動牙刷A", Price = 99.90M, Introduction = "暫無介紹" },new Product { Id = "0002", Name = "電動牙刷B", Price = 199.90M, Introduction = "暫無介紹" },new Product { Id = "0003", Name = "洗衣機A", Price = 2999.90M, Introduction = "暫無介紹" },new Product { Id = "0004", Name = "洗衣機B", Price = 3999.90M, Introduction = "暫無介紹" },new Product { Id = "0005", Name = "電視機A", Price = 1899.90M, Introduction = "暫無介紹" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 業務代碼var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 發布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}}
}
Basket API
參照Catalog API項目創建ASP.NET Core WebAPI項目,并安裝對應組件,在ConfigureServices方法中注入CAP。
新建一個BasketController,用于訂閱Kafka對應Topic:ProductPriceChanged 的消息。
namespace EDT.Demo.Basket.API.Controllers
{[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "電動牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "電視機A", Price = 1899.90M }, Count = 1 },} },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "電動牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣機B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}
}
測試效果
同時啟動Catalog API 和 Basket API兩個項目。
首先,通過Swagger在Basket API中查看所有用戶購物車中的商品的價格,可以看到,0002的商品是199.9元。
然后,通過Swagger在Catalog API中更新Id為0002的商品的價格至499.9元。
最后,通過Swagger在Basket API中查看所有用戶購物車中的商品的價格,可以看到,0002的商品已更新至499.9元。
End總結
本文總結了.NET Core如何通過對應客戶端操作Kafka,基于Confluent.Kafka項目和CAP項目可以方便的實現發布訂閱的效果。
參考資料
阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741
麥比烏斯皇,《.NET使用Kafka小結》:https://www.cnblogs.com/hsxian/p/12907542.html
Tony,《.NET Core事件總線解決方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html
極客時間,胡夕《Kafka核心技術與實戰》
B站,尚硅谷《Kafka 3.x入門到精通教程》
年終總結:Edison的2020年終總結
數字化轉型:我在傳統企業做數字化轉型
C#刷題:C#刷劍指Offer算法題系列文章目錄
.NET面試:.NET開發面試知識體系
.NET大會:2020年中國.NET開發者大會PDF資料