Kafka集成Flume
Flume生產者
③、安裝Flume,上傳apache-flume的壓縮包.tar.gz到Linux系統的software,并解壓到/opt/module目錄下,并修改其名稱為flume
Flume消費者
Kafka集成Spark
生產者
object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//創建一個生產者var producer = new KafkaProducer[String,String](properties)//發送數據for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//關閉資源producer.close()}
}
消費者
Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文環境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消費數據val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//執行代碼,并阻塞ssc.start()ssc.awaitTermination()}
}
Kafka集成Flink
創建maven項目,導入以下依賴
resources里面添加log4j.properties文件,可以更改打印日志的級別為error
Flink生產者
public class FlinkafkaProducer1{public static void main(String[] args){//獲取環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//準備數據源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//創建一個kafka生產者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加數據源Kafka生產者stream.addSink(kafkaProducer);//執行env.execute();}
}
Flink消費者
public class FlinkafkaConsumer1{public static void main(String[] args){//獲取環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//創建一個消費者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//關聯消費者和flink流env.addSource(kafkaConsumer).print();//執行env.execute();}
}
Kafka集成SpringBoot
生產者
通過瀏覽器發送
消費者