?
1. 項目結構
install-elk/
├── start-elastic.sh
├── es-data/ # Elasticsearch 持久化目錄(自動創建)
├── logstash/├── logstash.yml├── pipeline/│ ├── user.conf│ ├── articles.conf│ └── ... #可以有更多的表├── config/│ └── pipelines.yml├── ext/│ └── mysql-connector-j-9.2.0.jar # 通過 https://downloads.mysql.com/archives/c-j/ 獲取對應的版本。
1.1 本案例項目結構
1.2 start-elastic.sh
#!/bin/bash# ================== 🎨 Logo 開場 ==================
RED='\033[1;31m'
CYAN='\033[1;36m'
YELLOW='\033[1;33m'
RESET='\033[0m'echo -e "${CYAN}"
cat << "EOF"_____ _ _ _/ ____| | (_) || (___ ___ ___ _ __ _ __| |_| |_ ___ _ __\___ \ / _ \/ __| '__| '__| | | __/ _ \ '__|____) | __/ (__| | | | | | | || __/ ||_____/ \___|\___|_| |_| |_|_|\__\___|_|EOF
echo -e "${YELLOW} 🚀 Welcome to the someliber Elastic Stack 🚀${RESET}"
echo# ================== 獲取版本號 ==================
read -p "請輸入要使用的版本號(默認: 8.17.3): " VERSION_INPUT
VERSION=${VERSION_INPUT:-8.17.3}ES_IMAGE="elasticsearch:$VERSION"
KIBANA_IMAGE="kibana:$VERSION"
LOGSTASH_IMAGE="logstash:$VERSION"# ================== 獲取密碼 ==================
read -s -p "請輸入 Elasticsearch 登錄密碼(默認: 123456): " PASSWORD_INPUT
echo
ES_PASSWORD=${PASSWORD_INPUT:-123456}
KIBANA_USER=kibana_user
KIBANA_PASS=someliber# ================== 是否掛載 ES 數據目錄 ==================
read -p "是否掛載 Elasticsearch 數據目錄?(y/n 默認 y): " USE_VOLUME
USE_VOLUME=${USE_VOLUME:-y}# ================== 基本變量 ==================
ES_CONTAINER_NAME=es11
KIBANA_CONTAINER_NAME=kibana11
LOGSTASH_CONTAINER_NAME=logstash11
NETWORK_NAME=elastic11
LOGSTASH_DIR="$PWD/logstash"
# ================== 創建網絡 ==================
docker network create $NETWORK_NAME >/dev/null 2>&1 || echo "🔗 網絡已存在:$NETWORK_NAME"# ================== 啟動 Elasticsearch ==================
echo " 啟動 Elasticsearch..."
if [[ "$USE_VOLUME" == "y" || "$USE_VOLUME" == "Y" ]]; thenmkdir -p ./es-datadocker run -d --name $ES_CONTAINER_NAME \--network $NETWORK_NAME \-p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" \-e "xpack.security.enabled=true" \-e "ELASTIC_PASSWORD=$ES_PASSWORD" \-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \-v "$PWD/es-data:/usr/share/elasticsearch/data" \$ES_IMAGE
elsedocker run -d --name $ES_CONTAINER_NAME \--network $NETWORK_NAME \-p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" \-e "xpack.security.enabled=true" \-e "ELASTIC_PASSWORD=$ES_PASSWORD" \-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \$ES_IMAGE
fi# ================== 等待 ES 啟動 ==================
echo " 正在等待 Elasticsearch 啟動..."
for i in {1..60}; doSTATUS=$(curl -s -u elastic:$ES_PASSWORD http://localhost:9200/_cluster/health | grep -o '"status":"[^"]\+"' || true)if [[ $STATUS == *"green"* || $STATUS == *"yellow"* ]]; thenecho " Elasticsearch 啟動成功:$STATUS"breakfisleep 2
done# ================== 創建 Kibana 用戶 ==================
echo " 創建 Kibana 用戶($KIBANA_USER)..."
docker exec $ES_CONTAINER_NAME bash -c "bin/elasticsearch-users useradd $KIBANA_USER -p $KIBANA_PASS -r kibana_system" || \echo " 用戶可能已存在,忽略錯誤"# ================== 啟動 Kibana ==================
echo " 啟動 Kibana..."
docker run -d --name $KIBANA_CONTAINER_NAME \--network $NETWORK_NAME \-p 5601:5601 \-e "ELASTICSEARCH_HOSTS=http://$ES_CONTAINER_NAME:9200" \-e "ELASTICSEARCH_USERNAME=$KIBANA_USER" \-e "ELASTICSEARCH_PASSWORD=$KIBANA_PASS" \-e "XPACK_ENCRYPTEDSAVEDOBJECTS_ENCRYPTIONKEY=2kR9HmNaesytcVDwEAK3uTQ1obCrvP7B" \-e "XPACK_REPORTING_ENCRYPTIONKEY=aSTr3J7sLgt2BCKbIyw0DE6OjZGMY1kX" \-e "XPACK_SECURITY_ENCRYPTIONKEY=WO6Xetyubr45ZonlLd32DfNmRTkcAhvp" \-e "I18N_LOCALE=zh-CN" \$KIBANA_IMAGE# ================== 啟動 Logstash(全目錄掛載) ==================
if [[ -d "$LOGSTASH_DIR" ]]; thenecho " 啟動 Logstash(全目錄掛載)..."docker run -d --name $LOGSTASH_CONTAINER_NAME \--network $NETWORK_NAME \-p 5044:5044 -p 9600:9600 -p 5000:5000 \-e "xpack.monitoring.elasticsearch.hosts=http://$ES_CONTAINER_NAME:9200" \-e "xpack.monitoring.elasticsearch.username=elastic" \-e "xpack.monitoring.elasticsearch.password=$ES_PASSWORD" \-v "$PWD/logstash/pipeline:/usr/share/logstash/pipeline" \-v "$PWD/logstash/ext:/usr/share/logstash/ext" \-v "$PWD/logstash/config:/usr/share/logstash/config" \$LOGSTASH_IMAGEecho " Logstash 啟動完成(使用 logstash/ 目錄)"
elseecho " 未找到 logstash/ 目錄,Logstash 未啟動"
fi# ================== 提示 ==================
echo
echo -e "${CYAN} 所有容器啟動完成!${RESET}"
echo -e "${YELLOW} Elasticsearch: http://localhost:9200${RESET}"
echo -e "${YELLOW} Kibana: http://localhost:5601${RESET}"
1.3 logstash/pipeline/user.conf
# ====================== 輸入階段:從 MySQL 中讀取數據 ======================
input {jdbc {# MySQL JDBC 連接串jdbc_connection_string => "jdbc:mysql://192.168.167.175:3306/test"# 數據庫賬號和密碼jdbc_user => "root"jdbc_password => "123456"# MySQL JDBC 驅動的路徑(需要確保已經掛載進容器)jdbc_driver_library => "/usr/share/logstash/ext/mysql-connector-j-9.2.0.jar"# JDBC 驅動類名稱jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_paging_enabled => truejdbc_page_size => 10000# ========== 增量同步設置 ==========# 啟用基于字段的增量同步use_column_value => true# 增量字段:用來判斷是否有更新tracking_column => "updated_at"tracking_column_type => "timestamp"record_last_run => true# 記錄上次同步時間的文件(容器中可寫路徑)last_run_metadata_path => "/usr/share/logstash/last_run_metadata/users_last_run.yml"# 執行的 SQL 語句:查所有新增、修改、邏輯刪除的數據statement => "SELECTid,username,gender,email,age,created_at,updated_at,is_deletedFROM usersWHERE updated_at > :sql_last_valueORDER BY updated_at ASC, id ASC"# 每分鐘執行一次同步任務(Cron 格式)schedule => "* * * * *"}
}# ====================== 過濾器階段:處理數據邏輯 ======================
filter {# 如果 is_deleted = 1,則為“邏輯刪除”,我們在元數據中標記 action 為 deleteif [is_deleted] and [is_deleted] in [1, "1", true, "true"] {mutate {add_field => { "[@metadata][action]" => "delete" }}} else {# 否則為“新增或更新”操作,標記 action 為 indexmutate {add_field => { "[@metadata][action]" => "index" }}}# 移除不希望寫入 ES 的字段(如刪除標記)mutate {remove_field => ["is_deleted"]}
}# ====================== 輸出階段:寫入到 Elasticsearch ======================
output {elasticsearch {# Elasticsearch 地址hosts => ["http://es11:9200"]# 索引名稱為 usersindex => "users"# ES 用戶名和密碼(需開啟身份驗證)user => "elastic"password => "123456"# 文檔 ID 使用 MySQL 的主鍵 ID,避免重復寫入document_id => "%{id}"# 根據前面 filter 設置的 action 決定是 index 還是 delete 操作action => "%{[@metadata][action]}"}# 控制臺輸出(調試用)stdout {codec => json_lines}
}
1.3 logstash/pipeline/articles.conf
# ====================== 輸入階段:從 MySQL 中讀取文章數據 ======================
input {jdbc {# MySQL JDBC 連接串jdbc_connection_string => "jdbc:mysql://192.168.167.175:3306/test"# 數據庫賬號和密碼jdbc_user => "root"jdbc_password => "123456"# MySQL JDBC 驅動的路徑(需確保掛載到容器)jdbc_driver_library => "/usr/share/logstash/ext/mysql-connector-j-9.2.0.jar"# JDBC 驅動類名稱jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 啟用分頁拉取數據,提高大數據量時性能jdbc_paging_enabled => truejdbc_page_size => 10000# ========== 增量同步設置 ==========use_column_value => true # 啟用字段值來判斷是否更新tracking_column => "updated_at" # 使用 updated_at 字段做增量對比tracking_column_type => "timestamp"record_last_run => truelast_run_metadata_path => "/usr/share/logstash/last_run_metadata/articles_last_run.yml"# SQL 查詢語句,排除敏感字段,按更新時間 & ID 順序拉取statement => "SELECTid,title,content,author_id,category,created_at,updated_at,is_deletedFROM articlesWHERE updated_at > :sql_last_valueORDER BY updated_at ASC, id ASC"# 同步頻率:每分鐘執行一次schedule => "* * * * *"}
}# ====================== 過濾器階段:邏輯刪除與字段處理 ======================
filter {# 如果 is_deleted = 1/true,標記 action 為 delete(邏輯刪除)if [is_deleted] and [is_deleted] in [1, "1", true, "true"] {mutate {add_field => { "[@metadata][action]" => "delete" }}} else {# 否則表示新增或更新mutate {add_field => { "[@metadata][action]" => "index" }}}# 移除不希望寫入 ES 的字段(如刪除標記)mutate {remove_field => ["is_deleted"]}
}# ====================== 輸出階段:寫入 Elasticsearch ======================
output {elasticsearch {hosts => ["http://es11:9200"]index => "articles"user => "elastic"password => "123456"document_id => "%{id}" # 使用文章主鍵作為文檔 IDaction => "%{[@metadata][action]}" # 動態執行 index 或 delete}# 控制臺調試輸出(開發期使用)stdout {codec => json_lines}
}
1.4 logstash/config/pipelines.yml
# 定義第一個 pipeline(數據同步管道)
- pipeline.id: users # 唯一標識這個 pipeline 的 ID,日志中會看到 users 相關的信息path.config: "/usr/share/logstash/pipeline/user.conf"# 指定該 pipeline 使用的配置文件路徑,里面寫的是 input/filter/output# 定義第二個 pipeline
- pipeline.id: articles # 另一個獨立的 pipeline ID,處理 articles 表的同步path.config: "/usr/share/logstash/pipeline/articles.conf"# 指定第二個 pipeline 的配置文件路徑
1.5 logstash/config/logstash.yml
# ================================
# Logstash HTTP API 接口配置
# ================================api.http.host: 0.0.0.0 # Logstash API 綁定的主機地址,0.0.0.0 表示所有網卡
api.http.port: 9600 # Logstash HTTP API 的端口號,默認是 9600# ================================
# 配置自動加載(熱加載)
# ================================config.reload.automatic: true # 開啟配置文件自動熱加載,無需重啟即可更新 pipeline
config.reload.interval: 5s # 配置檢查的時間間隔(單位:秒)# ================================
# Pipeline 性能調優參數
# ================================pipeline.batch.delay: 50 # 每個批次處理之間的最大等待時間(毫秒),默認 50ms
pipeline.batch.size: 125 # 每個批次處理的最大事件數量
pipeline.workers: 2 # 每個 pipeline 使用的 worker 線程數,建議和 CPU 核心數保持一致# ================================
# X-Pack 監控配置
# ================================xpack.monitoring.elasticsearch.hosts: ${xpack.monitoring.elasticsearch.hosts} # Elasticsearch 監控地址,從環境變量讀取
xpack.monitoring.elasticsearch.username: ${xpack.monitoring.elasticsearch.username} # ES 認證用戶名,從環境變量讀取
xpack.monitoring.elasticsearch.password: ${xpack.monitoring.elasticsearch.password} # ES 認證密碼,從環境變量讀取
1.6 啟動 start-elastic.sh
chmod +x start-elastic.sh && ./start-elastic.sh
1.7 數據庫相關
1.7.1 用戶信息表
CREATE TABLE users (id INT PRIMARY KEY AUTO_INCREMENT COMMENT '用戶唯一標識',username VARCHAR(100) NOT NULL COMMENT '用戶名',password VARCHAR(255) NOT NULL COMMENT '加密后的用戶密碼',gender ENUM('male', 'female', 'other') DEFAULT 'other' COMMENT '性別',email VARCHAR(255) COMMENT '用戶郵箱',age INT COMMENT '年齡',created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '用戶創建時間',updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新時間',is_deleted BOOLEAN DEFAULT FALSE COMMENT '邏輯刪除標記,TRUE 表示已刪除'
) COMMENT='用戶信息表';
1.7.2 文章信息表
CREATE TABLE articles (id INT PRIMARY KEY AUTO_INCREMENT COMMENT '文章唯一標識',title VARCHAR(255) NOT NULL COMMENT '文章標題',content TEXT COMMENT '文章內容正文',author_id INT NOT NULL COMMENT '作者ID,對應 users 表 id',category VARCHAR(100) COMMENT '文章分類',published_at DATETIME COMMENT '發布時間',created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '文章創建時間',updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新時間',is_deleted BOOLEAN DEFAULT FALSE COMMENT '邏輯刪除標記,TRUE 表示已刪除'
) COMMENT='文章信息表';
1.7.3 測試數據
INSERT INTO users (username, password, gender, email, age, is_deleted)
VALUES
('Alice', 'hashed_password_1', 'female', 'alice@example.com', 28, FALSE),
('Bob', 'hashed_password_2', 'male', 'bob@example.com', 35, FALSE),
('Charlie', 'hashed_password_3', 'male', 'charlie@example.com', 22, FALSE),
('Diana', 'hashed_password_4', 'female', 'diana@example.com', 40, TRUE),
('Eve', 'hashed_password_5', 'other', 'eve@example.com', 30, FALSE);INSERT INTO articles (title, content, author_id, category, published_at, is_deleted)
VALUES
('Elasticsearch 入門', '這是關于 Elasticsearch 的基礎教程內容。', 1, '技術', NOW(), FALSE),
('MySQL 優化實踐', '介紹常見的 MySQL 優化技巧。', 2, '數據庫', NOW(), FALSE),
('Logstash 配置指南', '如何配置 Logstash 管道同步數據。', 1, '日志系統', NOW(), FALSE),
('前端與后端的區別', '講解 Web 開發中的前后端職責。', 3, 'Web開發', NULL, FALSE),
('已刪除的文章示例', '這篇文章已被邏輯刪除。', 4, '歷史', NOW(), TRUE);
1.8 測試驗證過程
1.8.1 啟動start-elastic.sh
1.8.2 驗證索引是否創建成功
1.8.3 進行刪除測試
1.8.3.1 刪除前
1.8.3.2 邏輯刪除Alice用戶
1.8.3.3 查看結果
說明:等待一分鐘,可以看到users索引下的文檔數量由4->3,即文檔從ES中刪除。
1.8.4 進行修改測試
1.8.4.1 修改前
1.8.4.2 修改Bob為Bob123
1.8.4.3 查看修改結果
說明:等待一分鐘,數據更新同步到ES文檔。
2 總結
通過 Logstash
從 MySQL
中同步多個業務表(如 users
、articles
),并實現以下能力:
- 增量同步:基于
updated_at
字段,避免全量拉取。 - 邏輯刪除同步:使用
is_deleted
字段自動觸發 ES 刪除操作。 - 定時同步:每分鐘調度,適用于數據近實時場景。
- 多表配置隔離:每個表對應一個獨立的 pipeline,實現清晰可維護結構。
- 全目錄掛載:Logstash 配置、驅動、Pipeline 統一掛載,便于統一管理和部署。