前言碎語
當系統流量負載比較高時,業務日志的寫入操作也要納入系統性能考量之內,如若處理不當,將影響系統的正常業務操作,之前寫過一篇《spring boot通過MQ消費log4j2的日志》的博文,采用了RabbitMQ消息中間件來存儲抗高并發下的日志,因為引入了中間件,操作使用起來可能沒那么簡便,今天分享使用多線程消費阻塞隊列的方式來處理我們的海量日志
waht阻塞隊列?
阻塞隊列(BlockingQueue)是區別于普通隊列多了兩個附加操作的線程安全的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
1.聲明存儲固定消息的隊列
/*** Created by kl on 2017/3/20.* Content :銷售操作日志隊列*/ public class SalesLogQueue{//隊列大小public static final int QUEUE_MAX_SIZE = 1000;private static SalesLogQueue alarmMessageQueue = new SalesLogQueue();//阻塞隊列private BlockingQueueblockingQueue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);private SalesLogQueue(){}public static SalesLogQueue getInstance() {return alarmMessageQueue;}/*** 消息入隊* @param salesLog* @return*/public boolean push(SalesLog salesLog) {return this.blockingQueue.add(salesLog);//隊列滿了就拋出異常,不阻塞}/*** 消息出隊* @return*/public SalesLog poll() {SalesLog result = null;try {result = this.blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}return result;}/*** 獲取隊列大小* @return*/public int size() {return this.blockingQueue.size();} }
ps:因為業務原因,采用add的方式入隊,隊列滿了就拋異常,不阻塞
2.消息入隊
消息入隊可以在任何需要保存日志的地方操作,如aop統一攔截日志處理,filter過濾請求日志處理,或者耦合的業務日志,記住,不阻塞入隊操作,不然將影響正常的業務操作,如下為filter統一處理請求日志:
/*** Created by kl on 2017/3/20.* Content :訪問請求攔截,保存操作日志*/ public class SalesLogFilter implements Filter {private RoleResourceService resourceService;@Overridepublic void init(FilterConfig filterConfig) throws ServletException {ServletContext context = filterConfig.getServletContext();ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);resourceService = ctx.getBean(RoleResourceService.class);}@Overridepublic void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {try {HttpServletRequest request = (HttpServletRequest) servletRequest;String requestUrl = request.getRequestURI();String requestType=request.getMethod();String ipAddress = HttpClientUtil.getIpAddr(request);Map resource=resourceService.getResource();String context=resource.get(requestUrl);//動態url正則匹配if(StringUtil.isNull(context)){for(Map.Entry entry:resource.entrySet()){String resourceUrl= entry.getKey();if(requestUrl.matches(resourceUrl)){context=entry.getValue();break;}}}SalesLog log=new SalesLog();log.setCreateDate(new Timestamp(System.currentTimeMillis()));log.setContext(context);log.setOperateUser(UserTokenUtil.currentUser.get().get("realname"));log.setRequestIp(ipAddress);log.setRequestUrl(requestUrl);log.setRequestType(requestType);SalesLogQueue.getInstance().push(log);}catch (Exception e){e.printStackTrace();}filterChain.doFilter(servletRequest, servletResponse);}@Overridepublic void destroy() {} }
3.消息出隊被消費
BlockingQueue是線程安全的,所以可以放心的在多個線程中去處理隊列中的消息,如下代碼聲明了一個兩個大小的固定線程池,并添加了兩個線程去處理隊列中的消息
?
/*** Created by kl on 2017/3/20.* Content :啟動消費操作日志隊列的線程*/ @Component public class ConsumeSalesLogQueue {@AutowiredSalesLogService salesLogService;@PostConstructpublic void startrtThread() {ExecutorService e = Executors.newFixedThreadPool(2);//兩個大小的固定線程池e.submit(new PollSalesLog(salesLogService));e.submit(new PollSalesLog(salesLogService));}class PollSalesLog implements Runnable {SalesLogService salesLogService;public PollSalesLog(SalesLogService salesLogService) {this.salesLogService = salesLogService;}@Overridepublic void run() {while (true) {try {SalesLog salesLog = SalesLogQueue.getInstance().poll();if(salesLog!=null){salesLogService.saveSalesLog(salesLog);}} catch (Exception e) {e.printStackTrace();}}}} }
參考博文如下,對BlockingQueue隊列更多了解,可讀一讀如下的博文:
- ??http://blog.csdn.net/vernonzheng/article/details/8247564
- ? http://www.infoq.com/cn/articles/java-blocking-queue
- ??http://wsmajunfeng.iteye.com/blog/1629354