分布式壓測中間件的原理及其實現
- 原理
- 全鏈路追蹤框架(Trace)
- MQ中間件
- 數據庫
- 分布式緩存中間件(Redis)
- 分庫分表中間件
原理
通過大量閱讀中間件源碼,開源社區調研,得到設計原理:
(1)發起壓測鏈路http請求
(2)通過分布式追蹤框架獲取URL上影子標識,將其放入上下文Context中
(3)提供者應用發起PRC/MQ調用時,中間件會將測試標放入中間件的Context上下文中傳遞。
(4)消費者處理RPC/MQ消息,獲取中間件Context上下文。
(5)經過分庫分表/緩存數據庫中間件,獲取當前Context里的影子標識。
打成Maven包,在項目中直接引入
- 可插拔,業務代碼不感知。
- 支持復雜SQL處理,支持全鏈路測試,且支持全鏈路追蹤。
- 極大提高壓測工作效率。
全鏈路追蹤框架(Trace)
從HTTP請求鏈接上識別到特定的key,如:
URL添加壓測標識,test = true,將壓測標識添加到追蹤鏈路框架中的Context上下文中。
MQ中間件
例如RocketMQ: com.alibaba.rocketmq.client.hook.SendMessageHook
實現接口SendMessageHook進行日志追蹤鏈路埋點, 分布式鏈路組件SOFA
Trace也是基于這個接口去埋點,這是mq官方留給實現者的AOP。
public class MetaQSendMessageHookImpl implements SendMessageHook, MetaQTraceConstants {public MetaQSendMessageHookImpl() {}public String hookName() {return "EagleEyeSendMessageHook";}public void sendMessageBefore(SendMessageContext context) {if (context != null && context.getMessage() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = new MetaQTraceContext();context.setMqTraceContext(mqTraceContext);mqTraceContext.setMetaQType(MetaQType.METAQ);mqTraceContext.setGroup(context.getProducerGroup());mqTraceContext.setAsync(CommunicationMode.ASYNC.equals(context.getCommunicationMode()));Message msg = context.getMessage();if (msg != null) {MetaQTraceBean traceBean = new MetaQTraceBean();traceBean.setTopic(msg.getTopic());traceBean.setOriginMsgId(MessageAccessor.getOriginMessageId(msg));traceBean.setTags(msg.getTags());traceBean.setKeys(msg.getKeys());traceBean.setBuyerId(msg.getBuyerId());traceBean.setTransferFlag(MessageAccessor.getTransferFlag(msg));traceBean.setCorrectionFlag(MessageAccessor.getCorrectionFlag(msg));traceBean.setBodyLength(msg.getBody().length);traceBean.setBornHost(context.getBornHost());traceBean.setStoreHost(context.getBrokerAddr());traceBean.setBrokerName(context.getMq().getBrokerName());traceBean.setProps(context.getProps());traceBean.setMsgType(context.getMsgType());List<MetaQTraceBean> beans = new ArrayList();beans.add(traceBean);mqTraceContext.setTraceBeans(beans);if (StringUtils.isNotBlank(msg.getUserProperty("eagleTraceId"))) {traceBean.setTraceId(msg.getUserProperty("eagleTraceId"));traceBean.setRpcId(msg.getUserProperty("eagleRpcId"));traceBean.setEagleEyeUserData(msg.getUserProperty("eagleData"));}MetaQSendMessageTraceLog.sendMessageBefore(mqTraceContext);if (StringUtils.isBlank(msg.getProperty("eagleTraceId")) && StringUtils.isNotBlank(traceBean.getTraceId())) {msg.putUserProperty("eagleTraceId", traceBean.getTraceId());msg.putUserProperty("eagleRpcId", traceBean.getRpcId());msg.putUserProperty("eagleData", traceBean.getEagleEyeUserData());}}}}public void sendMessageAfter(SendMessageContext context) {if (context != null && context.getMessage() != null && context.getSendResult() != null && MetaQTraceLogUtils.isTraceLogOn(context.getProducerGroup())) {MetaQTraceContext mqTraceContext = (MetaQTraceContext)context.getMqTraceContext();mqTraceContext.setRegionId(context.getSendResult().getRegionId());MetaQTraceBean traceBean = (MetaQTraceBean)mqTraceContext.getTraceBeans().get(0);if (traceBean != null && context.getSendResult() != null) {traceBean.setQueueId(context.getMq().getQueueId());traceBean.setMsgId(context.getSendResult().getOffsetMsgId());traceBean.setOriginMsgId(context.getSendResult().getMsgId());traceBean.setOffset(context.getSendResult().getQueueOffset());mqTraceContext.setSuccess(true);mqTraceContext.setStatus(context.getSendResult().getSendStatus().toString());} else if (context.getException() != null) {String msg = context.getException().getMessage();mqTraceContext.setErrorMsg(StringUtils.substring(msg, 0, msg.indexOf("\n")));}MetaQSendMessageTraceLog.sendMessageAfter(mqTraceContext);}}
}
數據庫
參考數據庫Druid鏈接池方案:
https://github.com/alibaba/druid/wiki/SQL-Parser
import com.alibaba.druid.filter.FilterAdapter;
import com.alibaba.druid.filter.FilterChain;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.proxy.jdbc.DataSourceProxy;
import com.alibaba.druid.proxy.jdbc.StatementProxy;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.util.JdbcConstants;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.List;/*** 攔截druid數據鏈接池* @author doge* @date 2021/10/19*/
@Slf4j
public class DruidShadowTestFilter extends FilterAdapter {private DruidShadowTestVisitor visitor = new DruidShadowTestVisitor();@Overridepublic boolean statement_execute(FilterChain chain, StatementProxy statement, String sql) throws SQLException {try {List<SQLStatement> sqlStatements = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);sqlStatements.forEach(sqlStatement -> sqlStatement.accept(visitor));if (visitor.getRewriteStatus()) {// 改寫了SQL,需要替換String newSql = SQLUtils.toSQLString(sqlStatements,JdbcConstants.MYSQL);log.debug("rewrite sql, origin sql: [{}], new sql: [{}]", sql, newSql);return super.statement_execute(chain, statement, newSql);}return super.statement_execute(chain, statement, sql);} finally {visitor.removeRewriteStatus();}}@Overridepublic void init(DataSourceProxy dataSourceProxy){if (!(dataSourceProxy instanceof DruidDataSource)) {log.error("ConfigLoader only support DruidDataSource");}DruidDataSource dataSource = (DruidDataSource) dataSourceProxy;log.info("db configuration: url="+ dataSource.getUrl());}}
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.ewtp.test.utils.ShadowTestUtil;
import org.apache.commons.lang3.StringUtils;import java.util.Optional;/*** @author doge* @date 2021/10/20*/
public class DruidShadowTestVisitor extends MySqlASTVisitorAdapter {private static final ThreadLocal<Boolean> REWRITE_STATUS_CACHE = new ThreadLocal<>();@Overridepublic boolean visit(SQLExprTableSource sqlExprTableSource) {// 別名,如果有別名,別名保持不變String alias = StringUtils.isEmpty(sqlExprTableSource.getAlias()) ? sqlExprTableSource.getExpr().toString() : sqlExprTableSource.getAlias();// 修改表名,不包含點才加 select c.id,d.name from c left join d on c.id = d.id 中的c 和 dif(!sqlExprTableSource.getExpr().toString().contains(".")) {sqlExprTableSource.setExpr(ShadowTestUtil.PREFIX + sqlExprTableSource.getExpr());}sqlExprTableSource.setAlias(alias);REWRITE_STATUS_CACHE.set(true);return true;}/*** 返回重寫狀態* @return 重寫狀態,{@code true}表示已重寫,{@code false}表示未重寫*/public boolean getRewriteStatus() {// get reset rewrite statusreturn Optional.ofNullable(REWRITE_STATUS_CACHE.get()).orElse(Boolean.FALSE);}/*** 重置重寫狀態*/public void removeRewriteStatus() {REWRITE_STATUS_CACHE.remove();}
}
分布式緩存中間件(Redis)
可以參考SofaTrace做法
https://www.sofastack.tech/blog/sofa-channel-15-retrospect/
- 新增一個Redis的后置增強器(部分代碼)
- 實現redis的連接工廠(部分代碼)
- 實現redis的連接器(會在所有redis key前加上前綴 test )
import com.alibaba.ewtp.test.factory.TracingRedisConnectionFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
```java/*** @author doge* @date 2021/10/14* redis 后置增強處理*/
public class TracingRedisBeanPostProcessor implements BeanPostProcessor {public TracingRedisBeanPostProcessor(){}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof RedisConnectionFactory) {bean = new TracingRedisConnectionFactory((RedisConnectionFactory) bean);}return bean;}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.*;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnectionFactory implements RedisConnectionFactory {private final RedisConnectionFactory delegate;public TracingRedisConnectionFactory(RedisConnectionFactory delegate) {this.delegate = delegate;}@Overridepublic RedisConnection getConnection() {// support cluster connectionRedisConnection connection = this.delegate.getConnection();return new TracingRedisConnection(connection);}@Overridepublic RedisClusterConnection getClusterConnection() {return delegate.getClusterConnection();}@Overridepublic boolean getConvertPipelineAndTxResults() {return delegate.getConvertPipelineAndTxResults();}@Overridepublic RedisSentinelConnection getSentinelConnection() {return delegate.getSentinelConnection();}@Overridepublic DataAccessException translateExceptionIfPossible(RuntimeException e) {return delegate.translateExceptionIfPossible(e);}
}import org.springframework.dao.DataAccessException;
import org.springframework.data.geo.Distance;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.geo.Circle;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Point;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.core.types.RedisClientInfo;import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;/*** @author doge* @date 2021/10/14*/
public class TracingRedisConnection implements RedisConnection {private final RedisConnection connection;public TracingRedisConnection(RedisConnection connection) {this.connection = connection;}@Overridepublic Boolean expire(byte[] key, long seconds) {handleByte(key);return connection.expire(key, seconds);}@Overridepublic Boolean set(byte[] key, byte[] value) {handleByte(key);return connection.set(key, value);}@Overridepublic Boolean mSet(Map<byte[], byte[]> tuple) {handleByteMap(tuple);return connection.mSet(tuple);}public void handleByte(byte[] key){if (ShadowTestUtil.isShadowTesLink()){key = (ShadowTestUtil.prefix + new String(key)).getBytes();}}public void handleBytes(byte[]... keys){if (ShadowTestUtil.isShadowTesLink()){for (byte[] bytes : keys){handleByte(bytes);}}}public void handleByteMap(Map<byte[], byte[]> tuple){if (ShadowTestUtil.isShadowTesLink()){for (Map.Entry<byte[], byte[]> entry : tuple.entrySet()){handleByte(entry.getKey());}}}}
分庫分表中間件
開源框架的解決方案:
https://shardingsphere.apache.org/document/current/cn/features/shadow
方案&思路: 當獲取壓測標后,若開啟影子鏈路,將打開Sharding影子庫的開關,串通起整個分庫分表鏈路。當然也可以直接用數據庫連接池來解決。