Clickhouse源碼分析-副本數據同步

1 總體流程

上圖說明了一條insert語句最后如何被副本同步到的流程(圖中ck集群為單shard,雙副本)。

(1)從客戶端發出,寫入ck

(2)ck提交LogEntry到Keeper

(3)另外一個副本從Keeper拉取LogEntry到本地執行

2 參數說明

此部分介紹以下整個鏈路涉及的一些參數。

mergetree settings:

1.zookeeper_session_expiration_check_period

檢查keeper session是否到期,每個以上參數的時間檢查一次,默認為60S:

每個引擎為Replicated的MergeTree表在啟動的時候,會運行以下任務來檢查與keeper 之間的session是否過期。

創建復制表時,內核會啟動這個復制表的引擎,

之后在ReplicatedMergeTreeRestartingThread::runImpl()中啟動各種后臺調度任務:

(1)background_operations_assignee:執行merge,fetch操作

(2)merge_selecting_task:主要功能為選擇合并的part

(3)cleanup_thread:線程,清理過期part等

這些任務的調度有點任務內遞歸的感覺:

都是任務執行的最后在繼續重復上個任務(只是任務的內容不一樣)。

2.max_insert_delayed_streams_for_parallel_write

當part所在的存儲系統支持并行寫入時,這個參數默認為100,否則為0。

3.distributed_ddl_task_timeout

設置來自集群中所有主機的 DDL 查詢響應的超時時間。如果某個 DDL 請求未能在所有主機上執行完成,響應中將包含一個 timeout 錯誤,并且該請求將以異步模式執行。負值表示無限超時時間。

3 示例表結構

db:

CREATE DATABASE replicated_db
ENGINE = Replicated('/clickhouse/databases/replicated_db', '{shard}', '{replica}')

table:

CREATE TABLE replicated_db.replicated_table
(
`id` UInt64,
`event_time` DateTime,
`event_type` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated_table', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity = 8192

1 單節點生成LogEntry

這里我們忽略掉詞語法解析,優化器,計劃生成層以及執行層的部分算子,直接來到寫數據到磁盤以及提交LogEntry的算子 -?ReplicatedMergeTreeSinkImpl。

這里的輸入參數chunk就是插入的數據在內存中的組織結構。

在ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)中,主要有以下步驟:

1.將插入的數據通過分區鍵拆成part,此過程通過MergeTreeDataWriter::splitBlockIntoParts完成

2.遍歷每一個拆分出來的part

(1)通過writeNewTempPart將這個拆分出來的part寫到臨時目錄中。

(2)在這個分支,提交寫入的part到keeper中:

如果開啟了并行寫入,part會攢夠一定的數量后,整體提交到Keeper上,這個默認數量為100。

2 提交LogEntry到Keeper

2.1 提交重試的參數控制

1.insert_keeper_max_retries?

insert_keeper_max_retries?參數控制向復制表(Replicated MergeTree)插入數據時,對 ClickHouse Keeper(或 ZooKeeper)請求的最大重試次數。默認值為20。

只有以下三種錯誤會觸發重試:

(1)network error

(2)Keeper session timeout

(3)request timeout

2.insert_keeper_retry_initial_backoff_ms?

insert_keeper_retry_initial_backoff_ms?參數定義了在INSERT執行期間,對失敗的Keeper(ZooKeeper)請求進行重試時的初始退避等待時間(毫秒)。默認值為100ms。

3.insert_keeper_retry_max_backoff_ms?

insert_keeper_retry_max_backoff_ms?參數設定了在 INSERT 查詢執行期間,對失敗的 Keeper/ZooKeeper 請求進行重試時的最大退避等待時間上限(毫秒)。默認值為10s。

2.2 提交流程

注意這里提交的并不是寫入的數據,而是寫入part的元信息。

提交主要通過ReplicatedMergeTreeSinkImpl<async_insert>::commitPart完成。

block_id_path

/clickhouse/tables/s1/replicated_table/blocks/202507_12141418273484448060_16681511374997932159

retries_ctl.retryLoop為提交的驅動:

提交的狀態轉化通過stage_switcher這個匿名函數完成:

初始時retry_context.stage為LOCK_AND_COMMIT,所以進入commit_new_part_stage:

commit_new_part_stage主要做了以下幾件事:

(1)設置要提交part的基本信息,例如block_number,part 名。對于New Part來說,block_number在一個復制表引擎中是全局遞增的。

(2)構造要在Keeper上執行的請求,例如

構造在Keeper上創建的LogEntry的請求,通過get_logs_ops完成。對于一個New Part來說,這個LogEntry的type為GET_PART,還包括其他的一些信息,例如:

  • create_time:創建時間
  • source_replica:哪個副本創建的這個part

  • new_part_name:part名

等等。最后將這個LogEntry封裝為一個CreateRequest。

一次Part的提交會帶著很多其他的請求:

RemoveRequest有:

其他的CreateRequest有:

get_quorum_ops只有在副本大于2時,會有攜帶請求。

getCommitPartOps中的CreateRequest:

(3)開啟事務,在提交LogEntry到Keeper失敗時,回滾,進行狀態的恢復

(4)將LogEntry發送到Keeper上

由于是多個請求,所以會調用ZooKeeper::multiImpl

此處流程,可用下圖表示(如果是寫請求達到了follower,follower會轉發給leader):?

非阻塞等待異步操作結果,最大等待時間為args.operation_timeout_ms毫秒

操作超時時間的參數,Coordination/CoordinationSettings.cpp

默認值為10S,Common/ZooKeeper/ZooKeeperConstants.h

3 副本拉取LogEntry

3.1 問題記錄

問題1:創建表報Session was killed

這個問題可以跳過,暫時采用另一種方法解決,在此保留,以后有時間了繼續追。

創建表時報錯:Coordination::Exception: Session was killed

原因時,長時間未操作,ch-client與Keeper之間的session斷開。

但是這有一個問題是:雖然創建表失敗,但是建表的元信息可能會提交到Keeper上。

此時你會發現,雖然這個庫并沒有這個表,但是無法創建:

再次創建表報錯如下:

此時可以使用以下語句剔除在keeper上的元信息:

SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/s1/replicated_table';

剔除在keeper上的元信息后,再次創建表,會發現此時會卡在創建這里:

之后翻看副本2的日志,發現副本2之前已經拉到了replicated_table這張表,并為它創建了數據目錄。

解決:去副本2上刪除對應得表目錄

此時,會發現replicated_table表已經成功創建。

刪除表同樣有這個問題:


最終解決需要調整session超時時間。根因不是這個參數。以下繼續分析:

其中code為:

下一步調試Keeper看為什么會有這個錯誤碼。

這個錯誤碼的設置位置:

(1)KeeperStateMachine<Storage>::processReconfiguration

(2)各個預處理不同請求的地方,preprocess

TODO:比較懷疑是不是我的兩個ck使用的是不同版本的問題

這個問題最后沒追下去,暫時只知道報錯大概位置。

問題2:關于副本同步part失敗的問題記錄

此時在副本r1上的replicated_table有一個part為202507_0_9_3。

在副本2在同步這個part的過程中,雖然它從keeper上取到了這個LogEnetry:

但是一直報錯,并且從num_tries可以得知,這個任務已經重試了22次了。

日志中的報錯提示:

沒有配置這個參數interserver_http_host

keeper上存副本1的replicated_table這個表的part的地方:/clickhouse/tables/s1/replicated_table/replicas/r1/parts

調整完之后,兩個副本的parts目錄下內容一致:

3.2 拉取LogEntry任務啟動

一段核心注釋:(Storages\StorageReplicatedMergeTree.h)

/** The replicated tables have a common log (/log/log-...).

? * Log - a sequence of entries (LogEntry) about what to do.

? * Each entry is one of:

? * - normal data insertion (GET),

? * - data insertion with a possible attach from local data (ATTACH),

? * - merge (MERGE),

? * - delete the partition (DROP).

? *

? * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)

? * ?and then executes them (queueTask).

? * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).

? * In addition, the records in the queue can be generated independently (not from the log), in the following cases:

? * - when creating a new replica, actions are put on GET from other replicas (createReplica);

? * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check

? * ? (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;

? *

? * The replica to which INSERT was made in the queue will also have an entry of the GET of this data.

? * Such an entry is considered to be executed as soon as the queue handler sees it.

? *

? * The log entry has a creation time. This time is generated by the clock of server that created entry

? * - the one on which the corresponding INSERT or ALTER query came.

? *

? * For the entries in the queue that the replica made for itself,

? * as the time will take the time of creation the appropriate part on any of the replicas.

? */

解釋如下:

所有副本共享一個日志目錄 /log/log-...,每個日志項(LogEntry)描述一項操作。

  • 這個“日志”是指 ZooKeeper 中的節點 /log/log-0000001, /log/log-0000002 等。

  • 所有的副本會從這個共享日志中拉取操作(如插入、合并、刪除等)。

日志項類型包括:(定義在Storages\MergeTree\ReplicatedMergeTreeLogEntry.h)

  • GET:常規插入數據;

  • ATTACH:插入數據時可能會使用本地已有的數據;

  • MERGE:后臺合并多個 part;

  • DROP:刪除某個分區的數據。

每個副本會把這些日志項復制到自己的執行隊列中/replicas/<replica_name>/queue/queue-00000...),通過:

  • queueUpdatingTask(周期性任務)

  • pullLogsToQueue()(從 /log/ 拉取 log 到 /queue/

副本隨后會啟動后臺線程執行隊列里的任務(queueTask())。

雖然叫“隊列”,但實際上執行順序可以根據依賴進行重排(由 shouldExecuteLogEntry() 控制依賴,決定某 entry 是否可執行)。

舉個例子,如果 MERGE 依賴的 part 還沒拉取完成,就會延后執行。

某些隊列任務并非從日志而來,而是副本本地生成的,比如:

  • 創建新副本時,會向隊列中加入從其他副本 GET 所有已有 part 的任務;

如果發現某個 part 損壞(removePartAndEnqueueFetch)或缺失(啟動時用 checkParts(),運行時用 searchForMissingPart()),
也會生成 GET 請求從其他副本拉取缺失的 part

即使某個副本自己是寫入的目標,它也會有一個 GET 類型的 entry 表示這次插入。
這類 entry 在隊列中會立即視為“已完成”,因為本地已經有數據,不需要再拉取。

日志項有創建時間戳,這個時間由“發起該寫入的server”產生(即 INSERT / ALTER 語句在哪個副本執行)。

對于某個副本自己給自己生成的隊列項(比如 GET 缺失 part),會使用已有副本上該 part 的創建時間作為時間戳。


正如前文提到的當創建一個引擎為Replicated族的表時,內核會啟動這個復制表引擎,之后在ReplicatedMergeTreeRestartingThread::runImpl()中啟動各種后臺任務,拉取LogEntry的任務也在這個地方調度:

這個任務的主要內容如下所示:(核心為queue.pullLogsToQueue)

void StorageReplicatedMergeTree::queueUpdatingTask()

{

? ? if (!queue_update_in_progress)

? ? {

? ? ? ? last_queue_update_start_time.store(time(nullptr));

? ? ? ? queue_update_in_progress = true;

? ? }

? ? try

? ? {

? ? ? ? auto zookeeper = getZooKeeperAndAssertNotStaticStorage();

? ? ? ? if (is_readonly)

? ? ? ? {

? ? ? ? ? ? /// Note that we need to check shutdown_prepared_called, not shutdown_called, since the table will be marked as readonly

? ? ? ? ? ? /// after calling StorageReplicatedMergeTree::flushAndPrepareForShutdown().

? ? ? ? ? ? if (shutdown_prepared_called)

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {}), cannot update queue", replica_path);

? ? ? ? }

? ? ? ? queue.pullLogsToQueue(zookeeper, queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);

? ? ? ? last_queue_update_finish_time.store(time(nullptr));

? ? ? ? queue_update_in_progress = false;

? ? }

? ? ? ?......

}

3.3 日志同步位點(log-pointer)

首先創建一個復制表之后,它在Keeper上的元數據都有哪些呢?

例如:

CREATE TABLE my_db.my_table ( ... ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/my_table', '{replica}') ORDER BY ...

其中:

{shard} ? = s1
{replica} = r1

所以表的 ZooKeeper 路徑會解析為:/clickhouse/tables/s1/my_table

副本路徑為:/clickhouse/tables/s1/my_table/replicas/r1

ZooKeeper 路徑結構圖:

/clickhouse/
└── tables/
└── s1/ ? ? ? ? ? ? ? ? ? ? ?← shard = s1
└── my_table/ ? ? ? ? ? ← 表名
├── log/ ? ? ? ? ? ?← 主日志目錄(所有副本共享)
│ ? ├── log-0000000000
│ ? ├── log-0000000001
│ ? └── ...
├── replicas/
│ ? ├── r1/ ? ? ? ? ← 當前副本,replica = r1
│ ? │ ? ├── queue/ ? ? ? ? ? ? ?← 待處理的日志操作隊列
│ ? │ ? │ ? ├── queue-0000000000
│ ? │ ? │ ? └── ...
│ ? │ ? ├── log_pointer ? ? ? ? ← 當前副本已同步日志的游標
│ ? │ ? ├── host ? ? ? ? ? ? ? ?← 當前副本的主機地址信息
│ ? │ ? ├── is_active ? ? ? ? ? ← 當前副本是否存活
│ ? │ ? └── ...
│ ? ├── r2/ ? ? ? ? ← 其他副本(如果有)
│ ? └── ...
├── mutations/ ? ? ← 所有的 mutation 操作
├── block_numbers/ ← 每個分區的最大 block number
├── temp/ ? ? ? ? ?← 臨時節點
└── ...

在 ClickHouse Keeper中,log_pointer每個副本(replica)維護的一個游標(cursor),它的作用是在分布式表(如 ReplicatedMergeTree)中 記錄該副本已經處理到哪個日志 entry。

3.4 拉取LogEntry流程

明白了log-pointer(日志同步位點)之后,再看看Keeper是如何具體拉取LogEntry的。

流程如下:

1.主表路徑: /clickhouse/tables/{shard}/{table}/log/ 存放主日志(所有副本共享)。

2.每個副本路徑: /clickhouse/tables/{shard}/{table}/replicas/{replica}/log_pointer 存儲該副本處理進度。

3.副本執行拉取任務:

  • 獲取當前副本的 log_pointer

  • 讀取 /log 目錄下的所有日志節點

  • 過濾日志列表,刪除所有索引小于當前 log_pointer 指向的日志條目

  • 如果過濾后log_entries不為空,先sort

  • for循環邏輯:

    • 批次劃分,以 current_multi_batch_size(初始較小)為批大小,從 log_entries 中取一段連續日志作為本批處理目標。last 指向本批最后一個日志條目。

    • 更新循環索引和批大小。entry_idx 指向下批次起點,批大小指數級增長(最多增長到 MAX_MULTI_OPS),加速同步過程。

    • 生成 ZooKeeper 路徑列表,批量讀取日志數據

      for (auto it = begin; it != end; ++it)get_paths.emplace_back(fs::path(zookeeper_path) / "log" / *it);
      auto get_results = zookeeper->get(get_paths);
      
    • 構造 ZooKeeper 操作列表,準備批量寫入 queue 和更新指針

      for (size_t i = 0; i < get_num; ++i)
      {// 解析日志數據,構造 LogEntry 對象copied_entries.emplace_back(LogEntry::parse(res.data, res.stat, format_version));// 創建 queue 節點的請求(持久順序節點)ops.emplace_back(zkutil::makeCreateRequest(fs::path(replica_path) / "queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));// 處理 create_time,更新 min_unprocessed_insert_time(用于后續處理優先級等)
      }
      
    • 更新 log_pointer 和 min_unprocessed_insert_time 的請求。更新本副本的日志處理進度指針,指向最后處理的日志后一個索引。如果有最早插入時間更新,同步寫入。

    • 使用 ZooKeeper multi() 提交以上所有操作

      auto responses = zookeeper->multi(ops, /* check_session_valid */ true);
    • 將LogEntry加到復制表queue中

      insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock);
  • 喚醒表的后臺任務執行線程去執行LogEntry任務

    storage.background_operations_assignee.trigger();

注意點:

//這只是讀到所有的任務的名字,不讀具體的內容

Strings log_entries = zookeeper->getChildrenWatch(fs::path(zookeeper_path) / "log", nullptr, watch_callback);

//讀到log的具體內容

auto get_results = zookeeper->get(get_paths);

4 副本執行LogEntry

拉取到LogEntry到queue中后,最后會通過storage.background_operations_assignee.trigger()調度執行LogEntry的線程。

調度任務的內容為:

bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
{cleanup_thread.wakeupEarlierIfNeeded();/// If replication queue is stopped exit immediately as we successfully executed the taskif (queue.actions_blocker.isCancelled())return false;/// This object will mark the element of the queue as running.ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();if (!selected_entry)return false;auto job_type = selected_entry->log_entry->type;/// Depending on entry type execute in fetches (small) pool or big merge_mutate poolif (job_type == LogEntry::GET_PART || job_type == LogEntry::ATTACH_PART){assignee.scheduleFetchTask(std::make_shared<ExecutableLambdaAdapter>([this, selected_entry] () mutable{return processQueueEntry(selected_entry);}, common_assignee_trigger, getStorageID()));return true;}if (job_type == LogEntry::MERGE_PARTS){auto task = std::make_shared<MergeFromLogEntryTask>(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}if (job_type == LogEntry::MUTATE_PART){auto task = std::make_shared<MutateFromLogEntryTask>(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>([this, selected_entry]() mutable { return processQueueEntry(selected_entry); }, common_assignee_trigger, getStorageID()),/* need_trigger */ true);return true;
}

這里主要說明任務的選擇和執行:

1.從隊列中選擇一個待處理任務

ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();
if (!selected_entry)return false;

2.根據任務類型選擇線程池調度

  • ?類型:GET_PART / ATTACH_PART
if (job_type == LogEntry::GET_PART || job_type == LogEntry::ATTACH_PART)
{assignee.scheduleFetchTask(...);return true;
}
  • 類型:MERGE_PARTS

if (job_type == LogEntry::MERGE_PARTS)
{auto task = std::make_shared<MergeFromLogEntryTask>(...);assignee.scheduleMergeMutateTask(task);return true;
}

等等。

以下我們聚焦于GET_PART任務的執行邏輯:

processQueueEntry? ? ? ? ->

????????executeLogEntry? ? ? ? ->

????????????????executeFetch

????????的核心流程為:

1.找到擁有 entry.new_part_name 或覆蓋它的 part 的 其它副本(replica)??????

    /// Looking for covering part. After that entry.actual_new_part_name may be filled.String replica = findReplicaHavingCoveringPart(entry, true);
  • 獲取所有副本名,并隨機打亂(防止偏好某個副本)
    • Strings replicas = zookeeper->getChildren(...);
      std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
      
  • 遍歷所有副本,跳過自身和不活躍副本
    • if (replica == replica_name) continue;
      if (active && !zookeeper->exists(.../replica/is_active)) continue;
      
  • 獲取該副本上的所有 part,并檢查是否包含所需 part 或其覆蓋 part
    • 如果找到完全一致的 part,直接接受;
    • 如果是覆蓋的 part,則選覆蓋面最大的那個(如 all_0_0_10 優于 all_0_0_3);
    • MergeTreePartInfo::contains 判斷某個 part 是否邏輯上包含另一個。
    • Strings parts = zookeeper->getChildren(.../replica/parts);for (const String & part_on_replica : parts)
      {if (part_on_replica == part_name || MergeTreePartInfo::contains(part_on_replica, part_name, format_version)){if (largest_part_found.empty() || MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version)){largest_part_found = part_on_replica;}}
      }
      
  • 如果找到了覆蓋的 part,還要做一個額外檢查-queue.addFuturePartIfNotCoveredByThem,這個函數暫未細看

2.確定 fetch 的 part 名稱

String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;if (!entry.actual_new_part_name.empty())LOG_DEBUG(log, "Will fetch part {} instead of {}", entry.actual_new_part_name, entry.new_part_name);
  • 如果 findReplicaHavingCoveringPart 選中的 replica 擁有 更大的覆蓋 part,比如:你需要的是 part_0_1_1, 它有的是 part_0_3_1,則 entry.actual_new_part_name 會被設置成那個覆蓋的部分。

  • 然后將其作為 fetch 的目標

3.拼接 source_replica 的 ZooKeeper 路徑

String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;

構造這個副本在 ZooKeeper 中的路徑,例如:

/clickhouse/tables/s1/my_table/replicas/r2

4.執行 fetchPart

該函數會嘗試將 part 拉取到本地,執行以下操作:

1. 前置檢查與準備工作

  • 如果是靜態只讀表,禁止 fetch 操作。
?if (isStaticStorage())throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to static storage");?
  • 如果不是 fetch 到 detached 目錄,先檢查是否已有舊的同名 part(可能是上次拉取失敗的殘留),如有,則觸發后臺清理線程。
if (!to_detached) {if (auto part = getPartIfExists(...)) {cleanup_thread.wakeup();return false;}
}
  • 檢查是否有其它線程正在拉取這個 part。?????
std::lock_guard lock(currently_fetching_parts_mutex);
if (!currently_fetching_parts.insert(part_name).second) {return false; // 已在拉取中,避免重復工作
}

2. 日志記錄,可以看到副本拉取過來的part,對應的類型為DOWNLOAD_PART

    /// LoggingStopwatch stopwatch;MutableDataPartPtr part;DataPartsVector replaced_parts;ProfileEventsScope profile_events_scope;auto write_part_log = [&] (const ExecutionStatus & execution_status){writePartLog(PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),part_name, part, replaced_parts, nullptr,profile_events_scope.getSnapshot());};

3.決定拉取方式:clone or fetch

如果目標 part 是一個 part mutation 的結果,嘗試查找其 source part,并將其 checksums 與目標 part 的 checksums 進行比較。如果兩者一致,則可以直接 clone 本地的 part。

    DataPartPtr part_to_clone;{/// If the desired part is a result of a part mutation, try to find the source part and compare/// its checksums to the checksums of the desired part. If they match, we can just clone the local part./// If we have the source part, its part_info will contain covered_part_info.auto covered_part_info = part_info;covered_part_info.mutation = 0;auto source_part = getActiveContainingPart(covered_part_info);/// Fetch for zero-copy replication is cheap and straightforward, so we don't use local clone hereif (source_part && !is_zero_copy_part(source_part)){auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(source_part->getColumns(), source_part->checksums);String part_path = fs::path(source_replica_path) / "parts" / part_name;String part_znode = zookeeper->get(part_path);std::optional<ReplicatedMergeTreePartHeader> desired_part_header;if (!part_znode.empty()){desired_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode);}else{String columns_str;String checksums_str;if (zookeeper->tryGet(fs::path(part_path) / "columns", columns_str) &&zookeeper->tryGet(fs::path(part_path) / "checksums", checksums_str)){desired_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);}else{LOG_INFO(log, "Not checking checksums of part {} with replica {}:{} because part was removed from ZooKeeper",part_name, source_zookeeper_name, source_replica_path);}}/// Checking both checksums and columns hash. For example we can have empty part/// with same checksums but different columns. And we attaching it exception will/// be thrown.if (desired_part_header&& source_part_header.getColumnsHash() == desired_part_header->getColumnsHash()&& source_part_header.getChecksums() == desired_part_header->getChecksums()){LOG_TRACE(log, "Found local part {} with the same checksums and columns hash as {}", source_part->name, part_name);part_to_clone = source_part;}}}

遠程 fetch:獲取源副本的 host 地址和端口信息,準備 HTTP 拉取所需的認證信息和參數,構造 get_part(),使用 fetcher.fetchSelectedPart()

接下來看一下遠程拉取,在fetchSelectedPart中:

數據在構造HttpReadBuffer中已經獲取到

主體流程如下:

1.準備臨時下載目錄(如 tmp-fetch_<part_name>),用于避免寫入中直接影響數據目錄,后續成功后才正式提交。

    static const String TMP_PREFIX = "tmp-fetch_";String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

2.傳統 Fetch 分支 -?downloadPartToDisk

downloadPartToDisk中調用downloadBaseOrProjectionPartToDisk來取Part或者是Projection的Part:

    try{for (size_t i = 0; i < projections; ++i){String projection_name;readStringBinary(projection_name, in);MergeTreeData::DataPart::Checksums projection_checksum;auto projection_part_storage = part_storage_for_loading->getProjection(projection_name + ".proj");projection_part_storage->createDirectories();downloadBaseOrProjectionPartToDisk(replica_path, projection_part_storage, in, output_buffer_getter, projection_checksum, throttler, sync);data_checksums.addFile(projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());}downloadBaseOrProjectionPartToDisk(replica_path, part_storage_for_loading, in, output_buffer_getter, data_checksums, throttler, sync);}

downloadBaseOrProjectionPartToDisk中,遍歷part中的每一個文件,例如.bin , .mrk等等

    for (size_t i = 0; i < files; ++i){String file_name;UInt64 file_size;readStringBinary(file_name, in);readBinary(file_size, in);/// File must be inside "absolute_part_path" directory./// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.String absolute_file_path = fs::weakly_canonical(fs::path(data_part_storage->getRelativePath()) / file_name);if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage->getRelativePath()).string()))throw Exception(ErrorCodes::INSECURE_PATH,"File path ({}) doesn't appear to be inside part path ({}). ""This may happen if we are trying to download part from malicious replica or logical error.",absolute_file_path, data_part_storage->getRelativePath());written_files.emplace_back(output_buffer_getter(*data_part_storage, file_name, file_size));HashingWriteBuffer hashing_out(*written_files.back());copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);hashing_out.finalize();if (blocker.isCancelled()){/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,/// performing a poll with a not very large timeout./// And now we check it only between read chunks (in the `copyData` function).throw Exception(ErrorCodes::ABORTED, "Fetching of part was cancelled");}MergeTreeDataPartChecksum::uint128 expected_hash;readPODBinary(expected_hash, in);if (expected_hash != hashing_out.getHash())throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,"Checksum mismatch for file {} transferred from {} (0x{} vs 0x{})",(fs::path(data_part_storage->getFullPath()) / file_name).string(),replica_path,getHexUIntLowercase(expected_hash),getHexUIntLowercase(hashing_out.getHash()));if (file_name != "checksums.txt" &&file_name != "columns.txt" &&file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)checksums.addFile(file_name, file_size, expected_hash);}

之后將Part涉及的文件寫到磁盤:

    /// Call fsync for all files at once in attempt to decrease the latencyfor (auto & file : written_files){file->finalize();if (sync)file->sync();}

5 擴展

如何判斷一個Part是否包含另一個Part通過這個函數完成:

    bool contains(const MergeTreePartInfo & rhs) const{/// Containing part may have equal level iff block numbers are equal (unless level is MAX_LEVEL)/// (e.g. all_0_5_2 does not contain all_0_4_2, but all_0_5_3 or all_0_4_2_9 do)bool strictly_contains_block_range = (min_block == rhs.min_block && max_block == rhs.max_block) || level > rhs.level|| level == MAX_LEVEL || level == LEGACY_MAX_LEVEL;return partition_id == rhs.partition_id        /// Parts for different partitions are not merged&& min_block <= rhs.min_block&& max_block >= rhs.max_block&& level >= rhs.level&& mutation >= rhs.mutation&& strictly_contains_block_range;}

同步刪除表:

DROP DATABASE IF EXISTS my_database SYNC;

刪database目錄的信息:

system drop database replica '分片名|副本名' from database db_name;

刪replica下信息:

system drop replica '副本名' from database db_name;

剔除表的元信息:

SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/s1/replicated_table5';

在集群中創建表:

CREATE TABLE replicated_db.replicated_table ON CLUSTER my_cluster
(
`id` UInt64,
`event_time` DateTime,
`event_type` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated_table', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity = 8192

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/916058.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/916058.shtml
英文地址,請注明出處:http://en.pswp.cn/news/916058.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring AI 系列之二十四 - ModerationModel

之前做個幾個大模型的應用&#xff0c;都是使用Python語言&#xff0c;后來有一個項目使用了Java&#xff0c;并使用了Spring AI框架。隨著Spring AI不斷地完善&#xff0c;最近它發布了1.0正式版&#xff0c;意味著它已經能很好的作為企業級生產環境的使用。對于Java開發者來說…

在 macOS 上 安裝最新 Python 和 pip

文章目錄方法一&#xff1a;使用 Homebrew&#xff08;推薦&#xff09;方法二&#xff1a;使用 pyenv&#xff08;管理多個 Python 版本&#xff09;方法三&#xff1a;從官網下載安裝包升級 pip驗證安裝方法一&#xff1a;使用 Homebrew&#xff08;推薦&#xff09; 1. 安裝…

新能源電池廠自動化應用:Modbus TCP轉DeviceNet實踐

一、項目背景在新能源電池廠的生產過程中&#xff0c;提升自動化水平對提高生產效率和產品質量至關重要。我們的生產線上&#xff0c;施耐德PLC負責整體的生產流程控制&#xff0c;采用Modbus TCP協議進行數據傳輸&#xff0c;它基于以太網&#xff0c;傳輸速度快、穩定性高&am…

Java進階3:Java集合框架、ArrayList、LinkedList、HashSet、HashMap和他們的迭代器

Java集合框架 集合框架被設計成的目標&#xff1a;高性能、高效 允許不同類型的結合&#xff0c;以類似的方式進行工作&#xff0c;有高度的互操作性 對一個集合的擴展和適應必須是簡單的兩種容器&#xff1a;集合Collection、圖Map 集合接口被分為了三種子類型&#xff1a;Lis…

筆記/使用Excel進行財務預測

文章目錄金融預測的決策與數據收集決定財務問題收集財務數據清理與合并財務數據解釋與應用預測結果使用excel進行財務回歸分析回歸預測的步驟解釋回歸結果在 Excel 中執行預測財務分析指標財務分析常用指標一覽表財務指標的相關性對競爭對手進行基準測試財務指標的趨勢分析持續…

力扣1287:有序數組中出現次數超過25%的元素

力扣1287:有序數組中出現次數超過25%的元素題目思路代碼題目 給你一個非遞減的 有序 整數數組&#xff0c;已知這個數組中恰好有一個整數&#xff0c;它的出現次數超過數組元素總數的 25%。 請你找到并返回這個整數 思路 哈希表秒了 代碼 class Solution { public:int fi…

如何用 Z.ai 生成PPT,一句話生成整套演示文檔

大家好&#xff0c;這里是K姐。 一個幫你追蹤最新AI應用的女子。 最近朋友給我分享了一個好玩的頁面截圖。 一眼看過去&#xff0c;就感覺這PPT的文字排版很有人工味。 我立馬就去試了一下&#xff0c;才發現它根本不是傳統的 PPT&#xff0c;而是一種網頁式的 Slides 。 做…

C/C++ 編程:掌握靜態庫與動態庫的編譯

在 C/C 項目開發中&#xff0c;理解并掌握如何編譯和使用庫文件是至關重要的一環。庫允許你將常用的函數和代碼模塊化&#xff0c;從而提高代碼重用性、簡化項目管理并縮短編譯時間。最常見的兩種庫類型是靜態庫 (.a) 和動態庫 (.so)。它們各有優缺點&#xff0c;適用于不同的開…

汽車安全 | 汽車安全入門

引言 汽車安全不僅僅是對汽車/車輛進行物理入侵。這只是很小且簡單的一部分。當你以攻擊者/對手的思維去看待一輛聯網汽車時&#xff0c;你關注的是整個車輛生態系統。這不僅包括它如何與外部實體通信&#xff0c;也包括它在車內如何運作。 汽車是主要的交通工具&#xff0c;…

CLIP與SIGLIP對比淺析

CLIP 和 SIGLIP 的核心區別在于損失函數的設計&#xff1a;CLIP 使用基于 softmax 的對比損失&#xff08;InfoNCE&#xff09;&#xff0c;強制正樣本在全局對比中壓倒所有負樣本&#xff0c;計算成本高且受限于負樣本數量&#xff1b;SIGLIP 改用基于 sigmoid 的二元分類損失…

移動管家手機控車便捷性如何

移動管家4G手機控車-全面升級一鍵啟動、無鑰匙進入、手機啟動、手機開關鎖、手機開尾箱、手機尋車、車輛診斷、GPS北斗定位、電子圍欄、車輛授權、車輛防盜搶、胎壓檢測、預約啟動、車窗控制、車況提醒等功&#xff1b;移動管家手機控車系統&#xff08;以“移動管家控車APP”為…

MySQL 8.4.4詳細下載安裝配置

1、下載mysql8.4.4文件&#xff0c;取zip文件 mysql8.4.4下載路徑 MySQL 5.7.31詳細下載安裝配置 2、配置環境變量 1.系統—>高級系統設置—>環境變量—>系統變量 在系統變量中點擊新建&#xff0c;變量名為量名為&#xff1a;MYSQL_HOME&#xff0c;添加你的mys…

在 Linux 上安裝 `pgvector`(這是一個 PostgreSQL 的向量類型擴展,常用于處理嵌入向量,便于進行向量相似度搜索)

全文 4000 字&#xff0c;配圖配碼&#xff0c;已在多家企業落地驗證。閱讀完如有收獲&#xff0c;文末投票告訴我你最關注的方向&#xff0c;我會在下一篇文章里繼續深入。 0. pgvector 簡介 pgvector 是一款 PostgreSQL 原生向量數據類型擴展&#xff0c;核心能力&#xff1…

【項目實戰】——深度學習.全連接神經網絡

目錄 1.使用全連接網絡訓練和驗證MNIST數據集 2.使用全連接網絡訓練和驗證CIFAR10數據集 1.使用全連接網絡訓練和驗證MNIST數據集 import torch from torch import nn from torchvision import datasets, transforms from torch.utils.data import DataLoader from torch im…

嵌入式學習的第三十四天-進程間通信-TCP

一、TCPTCP : 傳輸控制協議 傳輸層1. TCP特點(1).面向連接,避免部分數據丟失 (2).安全、可靠 (3).面向字節流 (4).占用資源開銷大2.TCP安全可靠機制三次握手:指建立tcp連接時&#xff0c;需要客戶端和服務端總共發送三次報文確認連接。確保雙方均已做好 收發…

【爬蟲】06 - 自動化爬蟲selenium

自動化爬蟲selenium 文章目錄自動化爬蟲selenium一&#xff1a;Selenium簡介1&#xff1a;什么是selenium2&#xff1a;安裝準備二&#xff1a;元素定位1&#xff1a;id 定位2&#xff1a;name 定位3&#xff1a;class 定位4&#xff1a;tag 定位5&#xff1a;xpath 定位(最常用…

2025年中國移動鴻鵠大數據實訓營(大數據方向)kafka講解及實踐-第2次作業指導

書接上回&#xff0c;第二次作業比較容易解決&#xff0c;我問了ai&#xff0c;讓他對我進行指導&#xff0c;按照它提供的步驟&#xff0c;我完成了本次實驗&#xff0c;接下來我會標注出需要注意的細節&#xff0c;指導大家完成此次任務。 &#x1f3af; 一、作業目標 ??…

三十七、【高級特性篇】定時任務:基于 APScheduler 實現測試計劃的靈活調度

三十七、【高級特性篇】定時任務:基于 APScheduler 實現測試計劃的靈活調度 前言 準備工作 第一部分:后端實現 - `APScheduler` 集成與任務調度 1. 安裝 `django-apscheduler` 2. 配置 `django-apscheduler` 3. 數據庫遷移 4. 創建調度觸發函數 5. 啟動 APScheduler 調度器 6…

RabbitMQ--消息順序性

看本章之前強烈建議先去看博主的這篇博客 RabbitMQ--消費端單線程與多線程-CSDN博客 一、消息順序性概念 消息順序性是指消息在生產者發送的順序和消費者接收處理的順序保持一致。 二、RabbitMQ 順序性保證機制 情況順序保證情況備注單隊列&#xff0c;單消費者消息嚴格按發送順…

.net core接收對方傳遞的body體里的json并反序列化

1、首先我在通用程序里有一個可以接收對象型和數組型json串的反序列化方法public static async Task<Dictionary<string, string>> AllParameters(this HttpRequest request){Dictionary<string, string> parameters QueryParameters(request);request.Enab…