目錄
代碼示例
接口
代理
接口實現
限流工廠
限流處理器接口
直接交換處理器
限流處理器
限流配置
滑動窗口限流
?
通過代理模式+滑動窗口,限流請求第三方平臺,避免出現第三方平臺拋出限流異常,影響正常業務流程,從出口出發進行限流請求。
代碼示例
接口
/*** 第三方請求*/
public interface ThirdApi {/*** 發送消息** @param userId 用戶id* @param message 消息* @return 發送是否成功*/boolean sendMessage(String userId, String message);
}
代理
/*** 第三方請求代理*/
@Component
public class ProxyThirdApi implements ThirdApi {@Resourceprivate ThirdApiServiceClient thirdApiServiceClient;@Resourceprivate LimitProcessorFactory limitProcessorFactory;@Resourceprivate YmlConstant ymlConstant;private ThirdApi thirdApi;@PostConstructpublic void initThirdApi() {thirdApi = new ThirdApiImpl(thirdApiServiceClient, ymlConstant);}@Override@SneakyThrowspublic boolean sendMessage(String userId, String message) {// 限流String bizLimit = "MSG_SEND_LIMIT";Object result = limitProcessorFactory.getProcessor(bizLimit).process(() -> thirdApi.sendMessage(userId, message));if (result instanceof Boolean) {return (Boolean) result;} else {return false;}}
}
接口實現
/*** 第三方請求實現**/
@Slf4j
@AllArgsConstructor
public class ThirdApiImpl implements ThirdApi {private final ThirdApiServiceClient thirdApiServiceClient;private final YmlConstant ymlConstant;@Overridepublic boolean sendMessage(String userId, String message) {MessageReq messageReq = new MessageReq();messageReq.setContent(message);messageReq.setReceiveId(userId);log.info("[ThirdApiImpl][sendMessage] {}", JSON.toJSONString(messageReq));HttpResponse<SendMessagesResp> sendResp = thirdApiServiceClient.sendMessage(messageReq);if (sendResp.isOk()) {return true;} else {log.error("[ThirdApiImpl][sendMessage] 消息發送失敗,返回信息:{}", JSON.toJSONString(sendResp));return false;}}
}
限流工廠
/*** 限流工廠**/
@Component
public class LimitProcessorFactory {@Resourceprivate LimitProperties properties;@Getterprivate Map<String, LimitProperties.LimitData> propertiesMap;private final Map<String, LimiterProcessor> processorMap = new ConcurrentHashMap<>(10);@PostConstructpublic void initPropertiesMap() {List<LimitProperties.LimitData> props = properties.getProps();if (CollectionUtils.isEmpty(props)) {propertiesMap = Collections.emptyMap();} else {propertiesMap = props.stream().collect(Collectors.toMap(LimitProperties.LimitData::getName, Function.identity()));}}/*** 獲取限流處理器** @param name 業務名稱* @return 限流處理器*/public LimiterProcessor getProcessor(String name) {LimitProperties.LimitData props = propertiesMap.get(name);if (Objects.isNull(props)) {throw new BusinessException(String.format("無法找到[%s]的處理器配置", name));}if (props.getEnabled()) {return processorMap.computeIfAbsent(props.getName(), name -> {TimeUnit timeUnit = props.getTimeUnit();// 使用窗口滑動算法進行限流RateLimiter limiter = new SlidingWindowRateLimiter(props.getInterval(), props.getLimit(), timeUnit);return new LimiterProcessor(name, timeUnit.toMillis(props.getWaitTime()), limiter);});} else {return new SynchronousProcessor();}}
}
限流處理器接口
/*** 限流處理器接口*/
public interface LimiterProcessor {/*** 限流** @param callback 回調* @return 執行結果* @throws Throwable Throwable*/Object process(LimiterCallback callback) throws Throwable;
}
直接交換處理器
/*** 直接交換處理器** @author zhimajiang*/
@Slf4j
public class SynchronousProcessor implements LimiterProcessor {@Overridepublic Object process(LimiterCallback callback) throws Throwable {return callback.process();}
}
限流處理器
/*** 限流處理器**/
@Slf4j
@AllArgsConstructor
public class Processor implements LimiterProcessor {private final String name;private final long waitTime;private final RateLimiter rateLimiter;@Overridepublic Object process(LimiterCallback callback) throws Throwable {while (true) {if (rateLimiter.tryAcquire()) {// 未被限流,則嘗試喚醒其他被限流的任務Object proceed = callback.process();synchronized (this) {this.notifyAll();}return proceed;} else {// 已被限流則進入阻塞log.info("LimiterProcessor][process] {}-限流", name);synchronized (this) {try {this.wait(waitTime);} catch (InterruptedException ignored) {}}}}}
}
限流配置
/*** 限流配置**/
@Data
@Configuration
@ConfigurationProperties("limit")
public class LimitProperties {/*** 限流配置*/private List<LimitProperties.LimitData> props;@Datapublic static class LimitData {/*** 名稱*/private String name;/*** 是否啟用*/private Boolean enabled = false;/*** 時間間隔*/private int interval;/*** 限制閾值*/private int limit;/*** 阻塞等待時間*/private int waitTime = 1000;/*** 時間單位*/private TimeUnit timeUnit = TimeUnit.MILLISECONDS;}
}
滑動窗口限流
/*** 滑動窗口限流**/
public class SlidingWindowRateLimiter implements RateLimiter {/*** 子窗口數量*/private final int slotNum;/*** 子窗口大小*/private final long slotSize;/*** 限流閾值*/private final int limit;/*** 上一次的窗口結束時間*/private long lastTime;/*** 子窗口流量計數*/private final AtomicInteger[] counters;/*** 滑動窗口限流** @param windowSize 時間窗口大小* @param slotNum 子窗口數量* @param limit 限流閾值* @param timeUnit 時間單位*/public SlidingWindowRateLimiter(int windowSize, int slotNum, int limit, TimeUnit timeUnit) {long windowSizeMills = timeUnit.toMillis(windowSize);this.slotNum = slotNum;this.slotSize = windowSizeMills / slotNum;this.limit = limit;this.lastTime = System.currentTimeMillis();this.counters = new AtomicInteger[slotNum];resetCounters();}/*** 滑動窗口限流** @param windowSize 時間窗口大小* @param limit 限流閾值* @param timeUnit 時間單位*/public SlidingWindowRateLimiter(int windowSize, int limit, TimeUnit timeUnit) {this(windowSize, 5, limit, timeUnit);}/*** 滑動窗口限流** @param windowSize 時間窗口大小(毫秒)* @param limit 限流閾值*/public SlidingWindowRateLimiter(int windowSize, int limit) {this(windowSize, 5, limit, TimeUnit.MILLISECONDS);}/*** 重置子窗口流量計數*/private void resetCounters() {for (int i = 0; i < this.slotNum; i++) {this.counters[i] = new AtomicInteger(0);}}/*** 限流請求** @return true-允許執行 false-觸發限流*/@Overridepublic synchronized boolean tryAcquire() {long currentTime = System.currentTimeMillis();// 小窗口移動格數int slideNum = (int) Math.floor((double) (currentTime - this.lastTime) / this.slotSize);slideWindow(slideNum);// 窗口時間內的請求總數int sum = Arrays.stream(this.counters).mapToInt(AtomicInteger::get).sum();this.lastTime = this.lastTime + slideNum * slotSize;if (sum >= limit) {return false;} else {this.counters[this.slotNum - 1].incrementAndGet();return true;}}/*** 將計數器內全部元素向左移動num個位置** @param num 移動位置個數*/private void slideWindow(int num) {if (num == 0) {return;}if (num >= this.slotNum) {// 如果移動步數大于子窗口個數,則計數全部清零resetCounters();return;}// 對于a[0]~a[num-1]來說,移動元素則代表刪除元素,所以直接從a[num]開始移動for (int index = num; index < this.slotNum; index++) {// 移動元素int newIndex = index - num;this.counters[newIndex] = this.counters[index];this.counters[index].getAndSet(0);}}
}
?
?