最近研究分布式消息隊列,分享下!
首先zookeeper ?和 kafka 壓縮包 解壓 并配置好!
我本機zookeeper環境配置如下:
D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg
以下是kafka的配置
D:\Worksoftware\Apachekafka2.11\config\server.properties
我已經加了path環境變量,沒加的話需要到zookeeper對應bin目錄下執行zkServer
然后執行cmd命令:
?
結果:
?然后打開第二個dos窗口,我沒加環境變量path,執行kafka命令如下:
?
?
重頭戲來了,開始kafka C#客戶端處理:
首先引用kafka-net.dll,可以用vs2013的nuget下載,
以下是Prorame.cs:
?
- class?Program??
- ?????{??
- ?????????static?void?Main(string[]?args)??
- ?????????{??
- ?????????????const?string?topicName?=?"test";??
- ?????????????var?options?=?new?KafkaOptions(new?Uri("http://localhost:9092"))??
- ?????????????{??
- ?????????????????Log?=?new?ConsoleLog()??
- ?????????????};??
- ???????????????
- ?????????????Task.Run(()?=>??
- ?????????????{??
- ?????????????????var?consumer?=?new?Consumer(new?ConsumerOptions(topicName,?new?BrokerRouter(options))?{?Log?=?new?ConsoleLog()?});??
- ?????????????????foreach?(var?data?in?consumer.Consume())??
- ?????????????????{??
- ?????????????????????Console.WriteLine("Response:?PartitionId={0},Offset={1}?:Value={2}",?data.Meta.PartitionId,?data.Meta.Offset,?data.Value.ToUtf8String());??
- ?????????????????}??
- ?????????????});??
- ???
- ?????????????//創建一個生產者發消息??
- ?????????????var?producer?=?new?Producer(new?BrokerRouter(options))??
- ?????????????{??
- ?????????????????BatchSize?=?100,??
- ?????????????????BatchDelayTime?=?TimeSpan.FromMilliseconds(2000)??
- ?????????????};??
- ???
- ?????????????Console.WriteLine("打出一條消息按?enter...");??
- ?????????????while?(true)??
- ?????????????{??
- ?????????????????var?message?=?Console.ReadLine();??
- ?????????????????if?(message?==?"quit")?break;??
- ???
- ?????????????????if?(string.IsNullOrEmpty(message))??
- ?????????????????{??
- ?????????????????????//??
- ?????????????????????SendRandomBatch(producer,?topicName,?200);??
- ?????????????????}??
- ?????????????????else??
- ?????????????????{??
- ?????????????????????producer.SendMessageAsync(topicName,?new[]?{?new?Message(message)?});??
- ?????????????????}??
- ?????????????}??
- ???
- ?????????????//釋放資源??
- ?????????????using?(producer)??
- ?????????????{??
- ???
- ?????????????}??
- ?????????}??
- ?????????private?static?async?void?SendRandomBatch(Producer?producer,?string?topicName,?int?count)??
- ?????????{??
- ?????????????//發送多個消息??
- ?????????????var?sendTask?=?producer.SendMessageAsync(topicName,?Enumerable.Range(0,?count).Select(x?=>?new?Message(x.ToString())));??
- ???
- ?????????????Console.WriteLine("傳送了?#{0}?messages.??Buffered:{1}?AsyncCount:{2}",?count,?producer.BufferCount,?producer.AsyncCount);??
- ???
- ?????????????var?response?=?await?sendTask;??
- ???
- ?????????????Console.WriteLine("已完成批量發送:?{0}.?Buffered:{1}?AsyncCount:{2}",?count,?producer.BufferCount,?producer.AsyncCount);??
- ?????????????foreach?(var?result?in?response.OrderBy(x?=>?x.PartitionId))??
- ?????????????{??
- ?????????????????Console.WriteLine("主題:{0}?PartitionId:{1}?Offset:{2}",?result.Topic,?result.PartitionId,?result.Offset);??
- ?????????????}??
- ???
- ?????????}??
- ?????}??
閑的蛋疼,隨便研究一些好東西,.net環境太封閉,每個.net程序員都要擴展視野,技術交流,本人QQ827937686