【如何實現分布式壓測中間件】

分布式壓測中間件的原理及其實現

    • 原理
    • 全鏈路追蹤框架(Trace)
    • MQ中間件
    • 數據庫
    • 分布式緩存中間件(Redis)
    • 分庫分表中間件

原理

通過大量閱讀中間件源碼,開源社區調研,得到設計原理:
(1)發起壓測鏈路http請求
(2)通過分布式追蹤框架獲取URL上影子標識,將其放入上下文Context中
(3)提供者應用發起PRC/MQ調用時,中間件會將測試標放入中間件的Context上下文中傳遞。
(4)消費者處理RPC/MQ消息,獲取中間件Context上下文。
(5)經過分庫分表/緩存數據庫中間件,獲取當前Context里的影子標識。

打成Maven包,在項目中直接引入

  1. 可插拔,業務代碼不感知。
  2. 支持復雜SQL處理,支持全鏈路測試,且支持全鏈路追蹤。
  3. 極大提高壓測工作效率。

全鏈路追蹤框架(Trace)

從HTTP請求鏈接上識別到特定的key,如:

URL添加壓測標識,test = true,將壓測標識添加到追蹤鏈路框架中的Context上下文中。

MQ中間件

例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
實現接口SendMessageHook進行日志追蹤鏈路埋點, 分布式鏈路組件SOFA
Trace也是基于這個接口去埋點,這是mq官方留給實現者的AOP。

public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {public MetaQSendMessageHookImpl() {}public String hookName() {return "EagleEyeSendMessageHook";}public void sendMessageBefore(SendMessageContext context) {if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = new MetaQTraceContext();context.setMqTraceContext(mqTraceContext);mqTraceContext.setMetaQType(MetaQType.METAQ);mqTraceContext.setGroup(context.getProducerGroup());mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));Message msg = context.getMessage();if (msg != null) {MetaQTraceBean traceBean = new MetaQTraceBean();traceBean.setTopic(msg.getTopic());traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));traceBean.setTags(msg.getTags());traceBean.setKeys(msg.getKeys());traceBean.setBuyerId(msg.getBuyerId());traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));traceBean.setBodyLength(msg.getBody().length);traceBean.setBornHost(context.getBornHost());traceBean.setStoreHost(context.getBrokerAddr());traceBean.setBrokerName(context.getMq().getBrokerName());traceBean.setProps(context.getProps());traceBean.setMsgType(context.getMsgType());List<MetaQTraceBean> beans = new ArrayList();beans.add(traceBean);mqTraceContext.setTraceBeans(beans);if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));}MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {msg.putUserProperty("eagleTraceId", traceBean.getTraceId());msg.putUserProperty("eagleRpcId", traceBean.getRpcId());msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());}}}}public void sendMessageAfter(SendMessageContext context) {if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();mqTraceContext.setRegionId(context.getSendResult().getRegionId());MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);if (traceBean != null && context.getSendResult() != null) {traceBean.setQueueId(context.getMq().getQueueId());traceBean.setMsgId(context.getSendResult().getOffsetMsgId());traceBean.setOriginMsgId(context.getSendResult().getMsgId());traceBean.setOffset(context.getSendResult().getQueueOffset());mqTraceContext.setSuccess(true);mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());} else if (context.getException() != null) {String msg = context.getException().getMessage();mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));}MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);}}
}

數據庫

參考數據庫Druid鏈接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser

import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;/*** 攔截druid數據鏈接池* @author doge* @date 2021/10/19*/
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();@Overridepublic boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {try {List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));if (visitor.getRewriteStatus()) {// 改寫了SQL,需要替換String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);return super.statement_execute(chain, statement, newSql);}return super.statement_execute(chain, statement, sql);} finally {visitor.removeRewriteStatus();}}@Overridepublic void init(DataSourceProxy dataSourceProxy){if (!(dataSourceProxy instanceof DruidDataSource)) {log.error("ConfigLoader only support DruidDataSource");}DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;log.info("db configuration: url="+ dataSource.getUrl());}}
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;import java.util.Optional;/*** @author doge* @date 2021/10/20*/
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();@Overridepublic boolean visit(SQLExprTableSource sqlExprTableSource) {// 別名,如果有別名,別名保持不變String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();// 修改表名,不包含點才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 dif(!sqlExprTableSource.getExpr().toString().contains(".")) {sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX  + sqlExprTableSource.getExpr());}sqlExprTableSource.setAlias(alias);REWRITE_STATUS_CACHE.set(true);return true;}/*** 返回重寫狀態* @return 重寫狀態,{@code true}表示已重寫,{@code false}表示未重寫*/public boolean getRewriteStatus() {// get reset rewrite statusreturn Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);}/*** 重置重寫狀態*/public void removeRewriteStatus() {REWRITE_STATUS_CACHE.remove();}
}

分布式緩存中間件(Redis)

可以參考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/

  1. 新增一個Redis的后置增強器(部分代碼)
  2. 實現redis的連接工廠(部分代碼)
  3. 實現redis的連接器(會在所有redis key前加上前綴 test

import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java/*** @author doge* @date 2021/10/14* redis 后置增強處理*/
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {public TracingRedisBeanPostProcessor(){}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof RedisConnectionFactory) {bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);}return bean;}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnectionFactory implements RedisConnectionFactory {private final RedisConnectionFactory delegate;public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {this.delegate = delegate;}@Overridepublic RedisConnection getConnection() {// support cluster connectionRedisConnection connection = this.delegate.getConnection();return new TracingRedisConnection(connection);}@Overridepublic RedisClusterConnection getClusterConnection() {return delegate.getClusterConnection();}@Overridepublic boolean getConvertPipelineAndTxResults() {return delegate.getConvertPipelineAndTxResults();}@Overridepublic RedisSentinelConnection getSentinelConnection() {return delegate.getSentinelConnection();}@Overridepublic DataAccessException translateExceptionIfPossible(RuntimeException e) {return delegate.translateExceptionIfPossible(e);}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnection implements RedisConnection {private final RedisConnection connection;public TracingRedisConnection(RedisConnection connection) {this.connection = connection;}@Overridepublic Boolean expire(byte[] key, long seconds) {handleByte(key);return connection.expire(key, seconds);}@Overridepublic Boolean set(byte[] key, byte[] value) {handleByte(key);return connection.set(key, value);}@Overridepublic Boolean mSet(Map<byte[], byte[]> tuple) {handleByteMap(tuple);return connection.mSet(tuple);}public void handleByte(byte[] key){if (ShadowTestUtil.isShadowTesLink()){key = (ShadowTestUtil.prefix + new String(key)).getBytes();}}public void handleBytes(byte[]... keys){if (ShadowTestUtil.isShadowTesLink()){for (byte[] bytes : keys){handleByte(bytes);}}}public void handleByteMap(Map<byte[], byte[]> tuple){if (ShadowTestUtil.isShadowTesLink()){for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){handleByte(entry.getKey());}}}}

分庫分表中間件

開源框架的解決方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow

方案&思路: 當獲取壓測標后,若開啟影子鏈路,將打開Sharding影子庫的開關,串通起整個分庫分表鏈路。當然也可以直接用數據庫連接池來解決。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87376.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87376.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87376.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Qt進程間保活方案:詳解如何實現進程間通信與自動保活機制

目錄 摘要 一、進程間保活的基本原理 二、具體步驟及代碼示例 三、常見問題與優化 四、總體方案 摘要 在一些需要長時間運行的應用程序中&#xff0c;確保進程在意外退出時能夠自動重啟是一項非常重要的任務。尤其是在嵌入式開發、后臺服務以及需要高可用性的場景下&#x…

Python-內置數據結構-list-tuple-bubble-字符串-bytes-bytesarray-切片-學習筆記

欠4年前自己的一份筆記&#xff0c;獻給今后的自己。 分類 數值型 int、float、complex、bool 序列對象 字符串 str 列表 list tuple 鍵值對 集合set 字典dict 數值型 int、float、complex、bool都是class&#x…

利用事務鉤子函數解決業務異步發送問題

利用事務鉤子函數解決業務異步發送問題 一、問題背景二、實現方案1、生產者代碼2、消費者代碼 三、測試與驗證1、未開啟事務場景2、開啟事務場景 四、項目結構及源碼 一、問題背景 在某項業務中&#xff0c;需要在事務完成后&#xff0c;寫入日志到某數據庫中。需要要么都成功&…

uniapp選擇相冊

概述 一款針對Android平臺下的圖片選擇器&#xff0c;支持從相冊獲取圖片、視頻、音頻&拍照&#xff0c;支持裁剪(單圖or多圖裁剪)、壓縮、主題自定義配置等功能&#xff0c;支持動態獲取權限&適配Android 5.0系統的開源圖片選擇框架。 支持Uniapp和Uniapp X下的Vue2、…

MAC 多應用切換技巧,單應用切換技巧

在 Mac 上&#xff0c;有幾種快捷鍵可以幫助你快速切換應用程序窗口&#xff1a; 1. Command (?) Tab - 這是最常用的快捷鍵&#xff0c;用于在打開的應用程序之間進行循環切換。按住 Command 鍵不放&#xff0c;然后反復按下 Tab 鍵可以選擇下一個應用程序。當你松開 Comm…

SpringBoot+本地部署大模型實現知識庫功能

SpringBoot本地部署大模型實現RAG知識庫功能 1、Linux系統部署本地大模型1.1 安裝ollama1.2 啟動ollama1.3 下載deepseek模型 2、Springboot代碼調用本地模型實現基礎問答功能3、集成向量數據庫4、知識庫數據喂取5、最終實現RAG知識庫功能 1、Linux系統部署本地大模型 1.1 安裝…

嵌入式原理與應用篇---ARM

ARM 架構的 STM32 系列微控制器廣泛應用于嵌入式系統開發&#xff0c;理解其匯編語言指令對于優化性能、訪問硬件底層非常重要。下面詳細解釋常見的 ARM 匯編指令及其使用實例。 數據處理指令 1. MOV&#xff08;移動&#xff09; 功能&#xff1a;將立即數或寄存器值復制到…

【RHCSA-Linux考試題目筆記(自用)】servera的題目

一、開始 1、啟動rhcsa環境 2、點擊題目&#xff0c;看題 3、通過控制器來啟動所有虛擬機 控制器 打開后點start&#xff0c;然后ok 之后進入一個有classroom、servera、serverb&#xff08;考試不一定叫這些名&#xff0c;但大差不差&#xff09;什么之類的界面&#xff0c;…

SpringBoot項目使用arthas-tunnel-server

參考官網Arthas Spring Boot Starter | arthas Spring Boot系列之使用Arthas Tunnel Server 進行遠程調試實踐-騰訊云開發者社區-騰訊云 springBoot項目, 增加maven依賴 <dependency><groupId>com.taobao.arthas</groupId><artifactId>arthas-sprin…

Modbus TCP 進階:基于以太網的遠程設備控制(二)

基于 Modbus TCP 的遠程設備控制實戰 &#xff08;一&#xff09;硬件與網絡搭建實操 1. 設備選型與連接 在工業現場&#xff0c;根據遠程控制需求進行設備選型至關重要 。對于傳感器&#xff0c;若要監測溫度&#xff0c;可選擇高精度的熱電偶傳感器&#xff0c;如 K 型熱電…

分庫分表之實戰-sharding-JDBC

大家好&#xff0c;我是工藤學編程 &#x1f989;一個正在努力學習的小博主&#xff0c;期待你的關注實戰代碼系列最新文章&#x1f609;C實現圖書管理系統&#xff08;Qt C GUI界面版&#xff09;SpringBoot實戰系列&#x1f437;【SpringBoot實戰系列】Sharding-Jdbc實現分庫…

httpcore-nio引起的線程、fd泄露問題

依賴來源&#xff1a;httpasyncclient-4.1.4.jar 現象 程序報錯too many open files 線程數飆升、句柄數飆升 thread dump顯示大量 "I/O dispatcher 7215" #9102 prio5 os_prio0 tid0x00002b7ba036a800 nid0x6f24 runnable [0x00002b7d98d41000]java.lang.Thread.…

多線程生產者消費者模型實戰案例

多線程生產者消費者模型實戰案例 前言業務場景術前準備無鎖無事務有事務 synchronized事務在鎖外事務在鎖內 數據庫行鎖什么是數據庫行鎖有事務沒有事務 樂觀鎖ReentrantLock分布式鎖 前言 曾經一直有一個疑惑&#xff0c;就是關于多線程生產者消費者模型的學習過程中&#xf…

青少年編程與數學 02-022 專業應用軟件簡介 03 三維建模及動畫軟件:Autodesk Maya

青少年編程與數學 02-022 專業應用軟件簡介 03 三維建模及動畫軟件&#xff1a;Autodesk Maya 一、什么是三維建模二、什么是計算機動畫三、三維建模及動畫設計軟件的發展歷程&#xff08;一&#xff09;早期探索階段&#xff08;20世紀60年代 - 80年代&#xff09;&#xff08…

獲得 OCM 大師證書學習歷練

當我站在山城重慶的洪崖洞前&#xff0c;看著璀璨的夜景倒映在嘉陵江上&#xff0c;手中緊握著 OCM 大師證書&#xff0c;那一刻&#xff0c;備考時的艱辛與考試時的緊張都化作了滿滿的成就感。這段在重慶獲得 OCM 大師證書的經歷&#xff0c;就像一場充滿挑戰與驚喜的冒險&…

srs-gb28181 與 SRS 5.0 對 GB28181 國標支持

srs-gb28181 是基于 SRS 4.0/5.0 的國標&#xff08;GB28181&#xff09;擴展分支&#xff0c;而 SRS 5.0 官方版本也逐步增強了對 GB28181 的支持。以下是兩者的主要區別&#xff1a; 1. 功能支持對比 功能srs-gb28181&#xff08;擴展分支&#xff09;SRS 5.0&#xff08;官…

算法第18天|繼續二叉樹:修剪二叉搜索樹、將有序數組轉化為二叉搜索樹、把二叉搜索樹轉換為累加樹

今日總結&#xff1a; 1、修剪二叉搜索樹&#xff08;重點思考如何修剪&#xff09; &#xff08;1&#xff09;遞歸的返回值是什么&#xff1f;&#xff08;與插入、刪除一樣&#xff09; &#xff08;2&#xff09;遞歸的單層邏輯一定要縷清&#xff08;3中情況討論&#xff…

C# 多線程(三)線程池

目錄 1.通過TPL使用線程池 2.不使用TPL進入線程池的辦法 異步委托 3.線程池優化技術 最小線程數的工作原理 每當啟動一個新線程時&#xff0c;系統都需要花費數百微秒來分配資源&#xff0c;例如創建獨立的局部變量棧空間。默認情況下&#xff0c;每個線程還會占用約1…

學習筆記(29):訓練集與測試集劃分詳解:train_test_split 函數深度解析

學習筆記(29):訓練集與測試集劃分詳解&#xff1a;train_test_split 函數深度解析 一、為什么需要劃分訓練集和測試集&#xff1f; 在機器學習中&#xff0c;模型需要經歷兩個核心階段&#xff1a; 訓練階段&#xff1a;用訓練集數據學習特征與目標值的映射關系&#xff08;…

【全網唯一】自動化編輯器 Windows版純本地離線文字識別插件

目的 自動化編輯器超輕量級RPA工具&#xff0c;零代碼制作RPA自動化任務&#xff0c;解放雙手&#xff0c;釋放雙眼&#xff0c;輕松玩游戲&#xff0c;刷任務。本篇文章主要講解下自動化編輯器的TomatoOCR純本地離線文字識別Windows版插件如何使用和集成。 準備工作 1、下載自…