Kafka.net使用編程入門(一)

最近研究分布式消息隊列,分享下!

首先zookeeper ?和 kafka 壓縮包 解壓 并配置好!

我本機zookeeper環境配置如下:

D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg

以下是kafka的配置

D:\Worksoftware\Apachekafka2.11\config\server.properties

我已經加了path環境變量,沒加的話需要到zookeeper對應bin目錄下執行zkServer

然后執行cmd命令:

?

結果:

?然后打開第二個dos窗口,我沒加環境變量path,執行kafka命令如下:

?

?

重頭戲來了,開始kafka C#客戶端處理:

首先引用kafka-net.dll,可以用vs2013的nuget下載,

以下是Prorame.cs:

?

[csharp]?view plaincopy
  1. class?Program??
  2. ?????{??
  3. ?????????static?void?Main(string[]?args)??
  4. ?????????{??
  5. ?????????????const?string?topicName?=?"test";??
  6. ?????????????var?options?=?new?KafkaOptions(new?Uri("http://localhost:9092"))??
  7. ?????????????{??
  8. ?????????????????Log?=?new?ConsoleLog()??
  9. ?????????????};??
  10. ???????????????
  11. ?????????????Task.Run(()?=>??
  12. ?????????????{??
  13. ?????????????????var?consumer?=?new?Consumer(new?ConsumerOptions(topicName,?new?BrokerRouter(options))?{?Log?=?new?ConsoleLog()?});??
  14. ?????????????????foreach?(var?data?in?consumer.Consume())??
  15. ?????????????????{??
  16. ?????????????????????Console.WriteLine("Response:?PartitionId={0},Offset={1}?:Value={2}",?data.Meta.PartitionId,?data.Meta.Offset,?data.Value.ToUtf8String());??
  17. ?????????????????}??
  18. ?????????????});??
  19. ???
  20. ?????????????//創建一個生產者發消息??
  21. ?????????????var?producer?=?new?Producer(new?BrokerRouter(options))??
  22. ?????????????{??
  23. ?????????????????BatchSize?=?100,??
  24. ?????????????????BatchDelayTime?=?TimeSpan.FromMilliseconds(2000)??
  25. ?????????????};??
  26. ???
  27. ?????????????Console.WriteLine("打出一條消息按?enter...");??
  28. ?????????????while?(true)??
  29. ?????????????{??
  30. ?????????????????var?message?=?Console.ReadLine();??
  31. ?????????????????if?(message?==?"quit")?break;??
  32. ???
  33. ?????????????????if?(string.IsNullOrEmpty(message))??
  34. ?????????????????{??
  35. ?????????????????????//??
  36. ?????????????????????SendRandomBatch(producer,?topicName,?200);??
  37. ?????????????????}??
  38. ?????????????????else??
  39. ?????????????????{??
  40. ?????????????????????producer.SendMessageAsync(topicName,?new[]?{?new?Message(message)?});??
  41. ?????????????????}??
  42. ?????????????}??
  43. ???
  44. ?????????????//釋放資源??
  45. ?????????????using?(producer)??
  46. ?????????????{??
  47. ???
  48. ?????????????}??
  49. ?????????}??
  50. ?????????private?static?async?void?SendRandomBatch(Producer?producer,?string?topicName,?int?count)??
  51. ?????????{??
  52. ?????????????//發送多個消息??
  53. ?????????????var?sendTask?=?producer.SendMessageAsync(topicName,?Enumerable.Range(0,?count).Select(x?=>?new?Message(x.ToString())));??
  54. ???
  55. ?????????????Console.WriteLine("傳送了?#{0}?messages.??Buffered:{1}?AsyncCount:{2}",?count,?producer.BufferCount,?producer.AsyncCount);??
  56. ???
  57. ?????????????var?response?=?await?sendTask;??
  58. ???
  59. ?????????????Console.WriteLine("已完成批量發送:?{0}.?Buffered:{1}?AsyncCount:{2}",?count,?producer.BufferCount,?producer.AsyncCount);??
  60. ?????????????foreach?(var?result?in?response.OrderBy(x?=>?x.PartitionId))??
  61. ?????????????{??
  62. ?????????????????Console.WriteLine("主題:{0}?PartitionId:{1}?Offset:{2}",?result.Topic,?result.PartitionId,?result.Offset);??
  63. ?????????????}??
  64. ???
  65. ?????????}??
  66. ?????}??
?結果:

閑的蛋疼,隨便研究一些好東西,.net環境太封閉,每個.net程序員都要擴展視野,技術交流,本人QQ827937686

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/538610.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/538610.shtml
英文地址,請注明出處:http://en.pswp.cn/news/538610.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

下拉推廣系統立擇火星推薦_下拉詞刪除都擇火星下拉

會員注冊發帖做推廣的人都離不開搜索引擎,就像魚離不開水,很多時候我們做SEO的朋友都在研究各大搜索引擎的機制,收錄,排名規則或者是黑帽技術,不管如何,只是希望把自己的企業,產品,服…

python包路徑有幾個_python的搜索路徑與包(package)

python的搜索路徑其實是一個列表,它是指導入模塊時,python會自動去找搜索這個列表當中的路徑,如果路徑中存在要導入的模塊文件則導入成功,否則導入失敗: >>> importsys>>>sys.path [, C:\\Python33\…

配置文件管理服務器,06-配置文件管理

1配置文件管理設備運行于FIPS模式時,本特性部分配置相對于非FIPS模式有所變化,具體差異請見本文相關描述。有關FIPS模式的詳細介紹請參見“安全配置指導”中的“FIPS”。1.1 配置文件簡介配置文件是用來保存配置的文件。配置文件主要用于: …

虛擬機安裝spark配置推薦

如果虛擬機配置的內存太少,spark運行計算的時候會報: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster uito ensure that workers are registered and have sufficient memory 資源不足的問題,導…

c++輸入一個整數判斷是否為完全平方數_matlab判斷一個整數是完全平方數

(C語言c)判斷一個數是否是 完全平方數的 整數倍#include#includeint ispow(int x){ int&用c編判斷一個數是否是完全數#include#includeintmain(){intn;printf("請輸入一個數,然后按回車鍵:\n");scanf("%d",&n);inti,p1;for(i2;i*i編…

Linux常用命令——chattr、lsattr

chattr 改變文件的擴展屬性 語法格式:chattr 【option】【mode】【files】chattr [選項] [模式] [文件或目錄]注意:chattr 命令及后面的選項和文件里,每個元素之間都要至少要有一個空格參數選項: 參數選項解釋說明-a只能向文件中…

兩個相鄰盒子的邊框怎么只顯示一個_【前端小課堂】0044 盒子

這是一個面向零基礎的前端教程,很簡單,用零散時間就可以學習。 推薦早上讀一下,晚上復習一下,如果可以奢侈一點,白天稍微練習一下下,總共花費 5~15 分鐘。就醬!已經好幾次提到塊(block)元素了&a…

scala apply方法 筆記

原文出處:http://blog.csdn.net/pzw_0612/article/details/48576569 ----------------------------------------------------- Scala比Java更面向對象的一個方面是Scala沒有靜態成員。替代品是,Scala有單例對象:singleton object。 當單例對…

轉:6.1海量數據處理

本文轉自看云,原文地址請移步:https://www.kancloud.cn/kancloud/the-art-of-programming/41608 偶然閑游,偶遇某一站點,發現這里寫的關于海量數據處理相關的思路還挺不錯,所以在這里采摘收藏,如有侵權之處還請評論區或…

python爬去中國天氣網_python爬取中國天氣網并展示最低溫度

import requestsfrom bs4 import BeautifulSoupimport lxmlimport jsonfrom echarts import Echart,Bar,Axisimport time#城市與溫度的集合weather_list []#城市集合city_list []#溫蒂集合lowest_list []#獲取溫度def getTemperature(url):result requests.get(url)#print r…

flash一個按鈕控制動畫_flutter閃屏過渡動畫,閃光占位動畫

在程序設計的理念中,講究一切都來源于物理世界,在現實世界中,人們在每接觸到一個新的事物或者說在手指觸碰到一個事物時,總是心里默許期望有一個反饋效果,這就是來源于心底深處常常被人忽略的一個潛在期望。在程序的世…

scala-wordcount

/* * Mapreduce步驟 * 1、map&#xff1a;獲取一行 * 2、按空格分隔 * 3、每個單詞統計&#xff0c;<key,1> 輸出到 reducer * 4、reducer <key,{1,1,1,1,1}> 匯總 * 5、輸出結果 */ 1、聲明一個集合&#xff0c;模仿要統計的…

動態改變_Excel中如何動態改變可編輯區域?

有這樣一個工作場景&#xff1a;我們制作一個工作報表模板給同事填寫數據&#xff0c;這個工作表格只可以在預設的可編輯區域編輯&#xff0c;其它都是受密碼保護的&#xff0c;而且這個可編輯區域是隨著日期——工作周的變化而發生變化的。也就是說在不同的工作周可編輯的區域…

java集合框架之ArrayList與LinkedList的區別

參考http://how2j.cn/k/collection/collection-arraylist-vs-linkedlist/690.html#nowhere ArrayList和LinkedList的區別 ArrayList 插入&#xff0c;刪除數據慢LinkedList&#xff0c; 插入&#xff0c;刪除數據快ArrayList是順序結構&#xff0c;所以定位很快&#xff0c;指哪…

python語言是由誰設計并領導開發的_python語言概述 - osc_lt3ocv4d的個人空間 - OSCHINA - 中文開源技術交流社區...

python語言的發展 python語言誕生于1990年&#xff0c;由Guide van Rossum設計并領導開發。 python語言是開源項目的優秀代表&#xff0c;其解釋器的全部代碼都是開源的。 編寫Hello程序 學習編程語言有一個慣例&#xff0c;即運行最簡單的Hello程序&#xff0c;該程序功能是在…

Spark集群安裝

Spark是獨立的&#xff0c;所以集群安裝的時候&#xff0c;不像hive&#xff0c;hbase等需要先安裝hadoop&#xff0c;除非文件保存在hadoop上&#xff0c;才需要安裝hadoop集群。 如果虛擬機安裝&#xff0c;點擊閱讀推薦配置 前提環境&#xff1a; 1、安裝了JDK1.7及以上版…

列表逆序排序_【Python自學筆記】集合——列表

list列表類型是一個與元組tuple類似的有序序列。構造函數是list()切片# 切片 fruit ["Apple", "Hawthorn", "Loquat", "Medlar", "Pear", "Quince"] print(fruit[:2]) print(fruit[-1])語法與字符串和元組中的一…

esp8266 阿里云 arduino_NUCLEO-G071RB通過WiFi與NB連接阿里云

開箱體驗試用背景去年年初&#xff0c;有新項目要讓移動式容器設備的監控數據上云&#xff0c;選型時主要考慮三個系列STM32L0、STM32G0和STM8。最初有意向選用STM32L052RB&#xff0c;主要是為了滿足低功耗需求。恰逢G0系列上市&#xff0c;價格親民&#xff0c;性能卻要高很多…

“云上金融,智創未來” 騰訊“云+未來”峰會金融專場在廣州舉行

5月24日&#xff0c;騰訊“云未來“峰會金融專場在廣州舉行。來自央行、騰訊公司以及銀行、證券、保險、互金公司等騰訊金融云的合作伙伴代表以及行業專家&#xff0c;共同分享了智慧金融、企業數字化轉型、騰訊金融云業務布局以及與合作伙伴取得的最新成績等話題。活動現場&am…

Spark算子reduceByKey深度解析

原文地址&#xff1a;http://blog.csdn.net/qq_23660243/article/details/51435257 -------------------------------------------- 最近經常使用到reduceByKey這個算子&#xff0c;懵逼的時間占據多數&#xff0c;所以沉下心來翻墻上國外的帖子仔細過了一遍&#xff0c;發現一…