14_基于Flink將pulsar數據寫入到HBase

3.7.基于Flink將數據寫入到HBase

3.7.1.編寫Flink完成數據寫入到Hbase操作, 完成數據備份, 便于后續進行即席查詢和離線分析

3.7.1.1.HBase基本介紹

hbase是基于Google發布bigTable論文產生一款軟件, 是一款noSQL型數據, 不支持SQL. 不支持join的操作, 沒有表關系, 不支持事務(多行事務),hbase是基于 HDFS的采用java 語言編寫

查詢hbase數據一般有三種方案(主鍵(row key)查詢, 主鍵的范圍檢索,查詢全部數據)

都是以字節類型存儲,存儲結構化和半結構化數據。

hbase表的特點: 大 面向列的存儲方案 稀疏性

2.7.1.2.應用場景

1)需要進行隨機讀寫的操作。
2)數據量比較大。
3)數據比較稀疏。

2.7.1.3.HBase安裝操作

本次安裝的HBase為2.2.7,詳細的安裝手冊大家可以參考資料, 還需要大家注意,HBase的啟動需要依賴于zookeeper
和HDFS的, 顧需要先安裝 HADOOP與zookeeper
在這里插入圖片描述

  • 1-在Hbase中創建目標表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
  • 2- 編寫Flink代碼完成寫入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Properties;// 基于Flink消費Pulsar數據, 然后將數據灌入到HBase中, 完成數據備份, 以及后續即席查詢和離線分析
public class ItcastFlinkToHBase {public static void main(String[] args) throws Exception {//1. 創建Flinnk流式處理核心環境類對象 和 Table API 核心環境類對象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2. 添加Source組件, 從Pulsar中讀取消息數據Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 設置pulsarSource組件在消費數據的時候, 默認從什么位置開始消費pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2 轉換為Flink TableSchema schema = Schema.newBuilder().column("id", DataTypes.INT()).column("sid", DataTypes.STRING()).column("ip", DataTypes.STRING()).column("session_id", DataTypes.STRING()).column("create_time", DataTypes.STRING()).column("yearInfo", DataTypes.STRING()).column("monthInfo", DataTypes.STRING()).column("dayInfo", DataTypes.STRING()).column("hourInfo", DataTypes.STRING()).column("seo_source", DataTypes.STRING()).column("area", DataTypes.STRING()).column("origin_channel", DataTypes.STRING()).column("msg_count", DataTypes.INT()).column("from_url", DataTypes.STRING()).build();tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);//2.3: 定義HBase的目標表String hTable = "create table itcast_h_ems("+"rowkey int,"+"f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+"primary key(rowkey) NOT ENFORCED" +") WITH ("+"'connector'='hbase-2.2',"+"'table-name'='itcast_h_ems',"+"'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+")";//4. 執行操作tableEnv.executeSql(hTable);tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");}}

PulsarTopicPojo

public class PulsarTopicPojo {private Integer id;private String sid;private String ip;private String session_id;private String create_time;private String yearInfo;private String monthInfo;private String dayInfo;private String hourInfo;private String seo_source;private String area;private String origin_channel;private Integer msg_count;private  String from_url;public PulsarTopicPojo() {}public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getSid() {return sid;}public void setSid(String sid) {this.sid = sid;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getSession_id() {return session_id;}public void setSession_id(String session_id) {this.session_id = session_id;}public String getCreate_time() {return create_time;}public void setCreate_time(String create_time) {this.create_time = create_time;}public String getYearInfo() {return yearInfo;}public void setYearInfo(String yearInfo) {this.yearInfo = yearInfo;}public String getMonthInfo() {return monthInfo;}public void setMonthInfo(String monthInfo) {this.monthInfo = monthInfo;}public String getDayInfo() {return dayInfo;}public void setDayInfo(String dayInfo) {this.dayInfo = dayInfo;}public String getHourInfo() {return hourInfo;}public void setHourInfo(String hourInfo) {this.hourInfo = hourInfo;}public String getSeo_source() {return seo_source;}public void setSeo_source(String seo_source) {this.seo_source = seo_source;}public String getArea() {return area;}public void setArea(String area) {this.area = area;}public String getOrigin_channel() {return origin_channel;}public void setOrigin_channel(String origin_channel) {this.origin_channel = origin_channel;}public Integer getMsg_count() {return msg_count;}public void setMsg_count(Integer msg_count) {this.msg_count = msg_count;}public String getFrom_url() {return from_url;}public void setFrom_url(String from_url) {this.from_url = from_url;}@Overridepublic String toString() {return "PulsarTopicPojo{" +"id=" + id +", sid='" + sid + '\'' +", ip='" + ip + '\'' +", session_id='" + session_id + '\'' +", create_time='" + create_time + '\'' +", yearInfo='" + yearInfo + '\'' +", monthInfo='" + monthInfo + '\'' +", dayInfo='" + dayInfo + '\'' +", hourInfo='" + hourInfo + '\'' +", seo_source='" + seo_source + '\'' +", area='" + area + '\'' +", origin_channel='" + origin_channel + '\'' +", msg_count=" + msg_count +", from_url='" + from_url + '\'' +'}';}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35403.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35403.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35403.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Codeforces 757F. Team Rocket Rises Again 最短路 + 支配樹

題意&#xff1a; 給你 n 個點&#xff0c; m 條雙向邊&#xff0c;求爆了某個點后&#xff0c;從s出發的最短路距離&#xff0c;會改變最多的數量。 分析&#xff1a; 建出最短路樹&#xff08;DAG&#xff09;之后&#xff0c;在最短路樹上跑一下支配樹&#xff0c;找出支…

鏈表OJ詳解

&#x1f495;人生不滿百&#xff0c;常懷千歲憂&#x1f495; 作者&#xff1a;Mylvzi 文章主要內容&#xff1a;鏈表oj詳解 題目一&#xff1a;移除元素 題目要求&#xff1a; 畫圖分析&#xff1a; 代碼實現&#xff1a; struct ListNode* removeElements(struct List…

flutter項目 環境搭建

開發flutter項目 搭建工具環境 flutter項目本身 所需開發工具環境 flutter 谷歌公司開發 系統支持庫 鏡像庫 搭建流程&#xff1a; flutter 官網&#xff1a; https://flutter.dev/community/china //步驟1 .bash_profile touch .bash_profile pwd /Users/haijunyan open ~ e…

商品首頁(sass+git本地初始化)

目錄 安裝sass/sass-loader 首頁(vue-setup) 使用git本地提交 同步遠程git庫 安裝sass/sass-loader #安裝sass npm i sass -D#安裝sass-loader npm i sass-loader10.1.1 -D 首頁(vue-setup) <template><view class"u-wrap"><!-- 輪播圖 --><…

C++lambda表達式

先來說背景&#xff1a;當我們需要對一些的元素進行排序的時候&#xff0c;可以使用std::sort來進行排序&#xff0c;而當需要對一些自定義類型的元素來排序的時候&#xff0c;要去寫一個類&#xff0c;或者說是需要寫一個仿函數&#xff0c;而如果功能要求上需要根據不同的比較…

基于chatgpt動手實現一個ai_translator

動手實現一個ai翻譯 前言 最近在極客時間學習《AI 大模型應用開發實戰營》&#xff0c;自己一邊跟著學一邊開發了一個進階版本的 OpenAI-Translator&#xff0c;在這里簡單記錄下開發過程和心得體會&#xff0c;供有興趣的同學參考&#xff1b; ai翻譯程序 版本迭代 在學習…

VLC播放主要流程

前言 VLC 播放流程大概是先加載解封裝器,然后通過es_out控制所有的stream。然后會加載decoder。最終通過resource文件的方法交給輸出 模塊。下面簡要介紹。 正文 播放器主要分為三層。主要通過兩個接口實現了功能隔離。分別是es_out.c和decoder.c的實現了&#xff1a; //控…

算法練習-搜索 相關

文章目錄 迷宮問題 迷宮問題 定義一個二維數組 m行 * n列 &#xff0c;如 4 5 數組下所示&#xff1a; int arr[5][5] { 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, }; 它表示一個迷宮&#xff0c;1表示墻壁&#xff0c;0表示可以走的路&#xff0c;只…

Synchronized八鎖

/** * Description: 8 鎖 * 1 標準訪問&#xff0c;先打印短信還是郵件 ------sendSMS ------sendEmail 2 停 4 秒在短信方法內&#xff0c;先打印短信還是郵件 ------sendSMS ------sendEmail 3 新增普通的 hello 方法&#xff0c;是先打短信還是 hello ------getHello ------…

Idea中使用statement接口對象,顯示mysql版本號,所有庫和表名

使用statement 接口對象&#xff0c;進行以下操作&#xff1a; 顯示數據庫版本號顯示所有庫顯示所有庫中的table表 顯示數據庫版本號&#xff1a; public class StatementDemo {Testvoid showall(){try{Statement st conn.createStatement();ResultSet rs st.executeQuery(…

pytest fixture 常用參數

fixture 常用的參數 參數一&#xff1a;autouse&#xff0c;作用&#xff1a;自動運行&#xff0c;無需調用 舉例一&#xff1a;我們在類中定義一個function 范圍的fixture; 設置它自動執行autouseTrue&#xff0c;那么我們看下它執行結果 輸出&#xff1a; 說明&#xff1a;…

Leetcode-每日一題【劍指 Offer 12. 矩陣中的路徑】

題目 單詞必須按照字母順序&#xff0c;通過相鄰的單元格內的字母構成&#xff0c;其中“相鄰”單元格是那些水平相鄰或垂直相鄰的單元格。同一個單元格內的字母不允許被重復使用。 例如&#xff0c;在下面的 34 的矩陣中包含單詞 "ABCCED"&#xff08;單詞中的字母…

CUDA執行模型

一、CUDA執行模型概述 二、線程束執行 1. 線程束與線程塊 線程束是SM中基本的執行單元。 當一個線程塊的網格被啟動后&#xff0c;網格中的線程塊分布在SM中。 一旦線程塊被調度到一個SM中&#xff0c;線程塊中的線程會被進一步劃分成線程束。 一個線程束由32個連續的線程…

【Express.js】數據庫初始化

數據庫初始化 在軟件開發階段和測試階段&#xff0c;為了方便調試&#xff0c;我們通常會進行一系列的數據庫初始化操作&#xff0c;比如重置數據表&#xff0c;插入記錄等等&#xff0c;或者在部署階段進行數據初始化的操作 根據前面章節介紹過的 knex.js 和 sequelize.js&…

基于自適應曲線閾值和非局部稀疏正則化的壓縮感知圖像復原研究【自適應曲線閾值去除加性穩態白/有色高斯噪聲】(Matlab代碼實現)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;歡迎來到本博客????&#x1f4a5;&#x1f4a5; &#x1f3c6;博主優勢&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客內容盡量做到思維縝密&#xff0c;邏輯清晰&#xff0c;為了方便讀者。 ??座右銘&a…

什么是媒體代發布?媒體代發布注意事項

傳媒如春雨&#xff0c;潤物細無聲&#xff0c;大家好&#xff0c;我是51媒體網胡老師。 媒體代發布是指將新聞稿或其他宣傳內容委托給專業的媒體代理機構或公司進行發布和推廣的活動。這些機構通常擁有豐富的媒體資源、人脈和經驗&#xff0c;能夠更好地將信息傳遞給目標受眾…

C語言 指針與內存之間的關系

一、內存與字節 一個內存單元一個字節一個地址 整型 int 類型中int類型的字節數是4 且一個字節表示八個bite位 一個二進制數位有著32個bite 所以又可以表示為&#xff1a;一個字節 8個比特位 32位數的二進制數位的八分之一 例如&#xff1a; int a 10&#xff1b; 該表達式…

項目實戰 — 消息隊列(9){編寫demo程序}

消息隊列服務器核心功能就是&#xff0c;提供了虛擬主機&#xff0c;交換機&#xff0c; 隊列&#xff0c;消息等概念的管理&#xff0c;實現三種典型的消息轉發方式&#xff0c;可以實現跨主機/服務器之間的生產者消費模型。 這里&#xff0c;就編寫一個demo&#xff0c;實現…

【實戰講解】數據血緣落地實施

?在復雜的社會分工協作體系中&#xff0c;我們需要明確個人定位&#xff0c;才能更好的發揮價值&#xff0c;數據也是一樣&#xff0c;于是&#xff0c;數據血緣應運而生。 今天這篇文章會全方位的講解數據血緣&#xff0c;并且給出具體的落地實施方案。 一、數據血緣是什么…

JAVA多線程和并發基礎面試問答(翻譯)

JAVA多線程和并發基礎面試問答(翻譯) java多線程面試問題 1. 進程和線程之間有什么不同&#xff1f; 一個進程是一個獨立(self contained)的運行環境&#xff0c;它可以被看作一個程序或者一個應用。而線程是在進程中執行的一個任務。Java運行環境是一個包含了不同的類和程序…