Spark Streaming整合logstash + Kafka wordCount

1、安裝logstash,直接解壓即可

測試logstash是否可以正常運行

bin/logstash -e 'input { stdin { } } output { stdout {codec => rubydebug } }'

只獲取消息

bin/logstash -e 'input { stdin { } } output { stdout {codec => plain { format => "%{message}" } } }'

2、編寫logstash配置文件
2、1在logstash目錄下創建conf目錄
2、2在conf目錄下創建文件logstash.conf,內容如下

input {
file {
type => "logs"
path => "/home/hadoop/logs/*.log"
discover_interval => 10
start_position => "beginning" 
}
}output {
kafka {
codec => plain {
format => "%{message}"
}
topic_id => "spark"	
}
}

logstash input: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
logstash output: https://www.elastic.co/guide/en/logstash/current/output-plugins.html

3、啟動logstash采集數據

bin/logstash -f conf/logstash.conf

4、代碼

?

package bigdata.sparkimport org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}/*** Created by Administrator on 2017/4/28.*/
object SparkStreamDemo {def main(args: Array[String]) {val conf = new SparkConf()conf.setAppName("spark_streaming")conf.setMaster("local[*]")val sc = new SparkContext(conf)sc.setCheckpointDir("D:/checkpoints")sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc, Seconds(5))val topics = Map("spark" -> 2)val lines = KafkaUtils.createStream(ssc, "m1:2181,m2:2181,m3:2181", "spark", topics).map(_._2)val ds1 = lines.flatMap(_.split(" ")).map((_, 1))val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {Some(x.sum + y.getOrElse(0))})ds2.print()ssc.start()ssc.awaitTermination()}
}

  

?

轉載于:https://www.cnblogs.com/heml/p/6796131.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/372287.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/372287.shtml
英文地址,請注明出處:http://en.pswp.cn/news/372287.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

js 變量類型

變量類型分為:基礎類型和引用類型 基礎類型:boolean, string, number, null, undefined, symbol 引用類型: array, object typeof: 判斷變量的類型instanceof:判斷某個對象是否是另外一個對象的實例主要還是理解這兩個判斷的不同之處&#xf…

python 相對導入_python 相對導入與絕對導入

Python 相對導入與絕對導入Python | Jul 21, 2016 | pythonPython 相對導入與絕對導入,這兩個概念是相對于包內導入而言的。包內導入即是包內的模塊導入包內部的模塊。Python import 的搜索路徑1.在當前目錄下搜索該模塊2.在環境變量 sys.path 中指定的路徑列表中依…

具有Java Kickstart的MongoDB

NoSQL數據庫由于其可伸縮性而變得越來越流行。 適當使用時 NoSQL數據庫可以提供真正的好處。 MongoDB是使用C 編寫的高度可擴展的開源NoSQL數據庫。 1.安裝MongoDB 您可以根據所使用的操作系統,按照MongoDB官方網站上的說明安裝MongoDB,而不會遇到很多麻…

Linux Shell——函數的使用

文/一介書生&#xff0c;一枚碼農。 scripts are for lazy people. 函數是存在內存里的一組代碼的命名的元素。函數創建于腳本運行環境之中&#xff0c;并且可以執行。 函數的語法結構為&#xff1a; function <function-name> {<code to execute> } 創建函數不需要…

FFmpeg學習2:解碼數據結構及函數總結

在上一篇文章中&#xff0c;對FFmpeg的視頻解碼過程做了一個總結。由于才接觸FFmpeg&#xff0c;還是挺陌生的&#xff0c;這里就解碼過程再做一個總結。本文的總結分為以下兩個部分&#xff1a; 數據讀取&#xff0c;主要關注在解碼過程中所用到的FFmpeg中的結構體。解碼過程中…

python1~10階乘while_Python3基礎 while 階乘

?python : 3.7.0OS : Ubuntu 18.04.1 LTSIDE : PyCharm 2018.2.4conda : 4.5.11type setting : Markdown?code"""Author : 行初心Date : 18-9-24Blog : www.cnblogs.com/xingchuxinGitHub : github.com/GratefulHeartCoder"""def main():count…

JavaFX 2 GameTutorial第4部分

介紹 這是與JavaFX 2游戲教程相關的六個部分系列的第四部分。 如果您錯過了第1部分 &#xff0c; 第2部分或第3部分 &#xff0c;我建議您在開始本教程之前仔細閱讀它們。 回顧一下&#xff0c;在第3部分中&#xff0c;我為您提供了許多經典街機風格游戲和所使用的不同輸入設備…

關于ListView的作業

原生布局并未多做修改 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android" xmlns:tools"http://schemas.android.com/tools" android:id"id/activity_m…

Java 7的類型推斷

每個優秀的程序員都喜歡編寫簡潔但有效且經過優化的代碼。 類型推斷是JDK 7中引入的一種方法&#xff0c;它肯定會為您帶來更少鍵入的好處。 您以以下方式使用Java代碼已有很長時間了。 但是&#xff0c;在初始化Collections的特定實現時&#xff0c;您是否曾經想到過代碼重復&…

python實現膠囊網絡_膠囊網絡 -- Capsule Networks

膠囊網絡是 vector in vector out的結構&#xff0c;最后對每個不同的類別&#xff0c;輸出不一個向量&#xff0c;向量的模長表示屬于該類別的概率。例如&#xff0c;在數字識別中&#xff0c;兩個數字雖然重疊在一起&#xff0c;Capsule中的兩個向量能完整表達兩個數字的特征…

基變換與過渡矩陣

取定線性空間的一組基&#xff0c;任何一組向量可以表示為基向量的線性組合&#xff0c;且是同構映射。兩個線性空間是同構。 不同的基向量&#xff0c;基向量之間的過渡矩陣 取線性空間的兩組基任一向量可以表示為這兩組向量的線性組合將一組基向量表示為另外基向量的線性組合…

bootstrap的滾動監聽

<!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1,maximum-scale1, user-scalableno"><title>下拉菜單和滾動監…

java構造函數_JAVA的構造函數是怎么寫的。萬分感謝。路過的請不要嘲笑%_%

展開全部JAVA的構造函數是&#xff1a;SetLocal EnableDelayedExpansionset classpath.for %%c in (lib\*.jar) do set classpath!32313133353236313431303231363533e59b9ee7ad9431333431363030classpath!;%%cset classpath%classpath%;./classes;java com.ham.server.Server。…

在Spring中使用Redis

隨著NoSQL解決方案在許多問題上越來越受歡迎&#xff0c;現代項目越來越多地考慮使用一些&#xff08;或幾種&#xff09;NoSQL代替&#xff08;或并排&#xff09;傳統RDBMS。 我已經在本 &#xff0c; 本和本文章中介紹了我在MongoDB上的經驗。 在本文中&#xff0c;我想對Re…

C# 中winform的一些屬性設置

1 窗體的大小固定住&#xff0c;不能調整其大小 窗體FormBorderStyle 屬性設置為 FixedSingle; MaximizeBox 屬性設置為false; MinimizeBox 屬性設置為 false; 2. 在狀態欄中無圖標顯示 設置為fase即可。 3. 設置窗體的啟動位置 方法1&#xff0c; 用代碼控制 this.Location …

LiveBos---按鈕成下拉

轉載于:https://www.cnblogs.com/luhanzhen/p/6802779.html

Solr:創建拼寫檢查器

在上一篇文章中&#xff0c;我談到了Solr Spellchecker的工作原理&#xff0c;然后向您展示了其性能的一些測試結果。 現在&#xff0c;我們將看到另一種拼寫檢查方法。 與其他方法一樣&#xff0c;此方法使用兩步過程。 相當快速的“候選單詞”選擇&#xff0c;然后對這些單詞…

linux修改機器名稱

1 使用hostname命令&#xff1a;hostname 新機器名稱 2 修改vi /etc/sysconfig/network # cat /etc/sysconfig/network NETWORKINGyes HOSTNAMElocalhost.localdomain 注意&#xff1a;左側都必須大寫&#xff0c;等號附件沒有空格。 查看機器名稱使用hostname命令 轉載于:h…

java property_property在Java中的用法

展開全部在項目中經常用到各種配置文件62616964757a686964616fe78988e69d8331333337623561&#xff0c;有.properties的&#xff0c;也有.xml格式的都可以通過java.utils.Property類進行處理。1. 讀取.properties文件File pFile new File("test.properties");FileIn…

Django 和 html

下面是對應的形式&#xff0c;自定義的forms 轉載于:https://www.cnblogs.com/kilen/p/6804047.html