redis分布式鎖小試

一、場景

  項目A監聽mq中的其他項目的部署消息(包括push_seq, status, environment,timestamp等),然后將部署消息同步到數據庫中(項目X在對應環境[environment]上部署的push_seq[項目X的版本])。那么問題來了,mq中加入包含了兩個部署消息 dm1 和 dm2,dm2的push_seq > dm1的push_seq,在分布式的情況下,dm1 和 dm2可能會分別被消費(也就是并行),那么在同步數據庫的時候可能會發生 dm1 的數據保存 后于 dm2的數據保存,導致保存項目的部署信息發生異常。

二、解決思路

  將mq消息的并行消費變成串行消費,這里借助redis分布式鎖來完成。同一個服務在分布式的狀態下,監聽到mq消息后,觸發方法的執行,執行之前(通過spring aop around來做的)首先獲得redis的一個分布式鎖,獲取鎖成功之后才能執行相關的邏輯以及數據庫的保存,最后釋放鎖。

三、主要代碼

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface RedisLock {/*** redis的key* @return*/String value();/*** 持鎖時間,單位毫秒,默認一分鐘*/long keepMills() default 60000;/*** 當獲取失敗時候動作*/LockFailAction action() default LockFailAction.GIVEUP;public enum LockFailAction{/*** 放棄*/GIVEUP,/*** 繼續*/CONTINUE;}/*** 睡眠時間,設置GIVEUP忽略此項* @return*/long sleepMills() default 500;
}

?

import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/**
* @author: hujunzheng
* @create: 17/9/29 下午2:49
*/
@Component
@Aspect
public class RedisLockAspect {private static final Log log = LogFactory.getLog(RedisLockAspect.class);@Autowiredprivate RedisCacheTemplate.RedisLockOperation redisLockOperation;@Pointcut("execution(* com.hjzgg..StargateDeployMessageConsumer.consumeStargateDeployMessage(..))" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)")private void lockPoint(){}@Around("lockPoint()")public Object arround(ProceedingJoinPoint pjp) throws Throwable{MethodSignature methodSignature = (MethodSignature) pjp.getSignature();Method method = methodSignature.getMethod();RedisLock lockInfo = method.getAnnotation(RedisLock.class);/*String lockKey = lockInfo.value();if (method.getParameters().length == 1 && pjp.getArgs()[0] instanceof DeployMessage) {DeployMessage deployMessage = (DeployMessage) pjp.getArgs()[0];lockKey += deployMessage.getEnv();System.out.println(lockKey);}*/boolean lock = false;Object obj = null;while(!lock){long timestamp = System.currentTimeMillis()+lockInfo.keepMills();lock = setNX(lockInfo.value(), timestamp);//得到鎖,已過期并且成功設置后舊的時間戳依然是過期的,可以認為獲取到了鎖(成功設置防止鎖競爭)long now = System.currentTimeMillis();if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){log.info("得到redis分布式鎖...");obj = pjp.proceed();if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){releaseLock(lockInfo.value());}}else{if(lockInfo.action().equals(RedisLock.LockFailAction.CONTINUE)){log.info("稍后重新請求redis分布式鎖...");Thread.currentThread().sleep(lockInfo.sleepMills());}else{log.info("放棄redis分布式鎖...");break;}}}return obj;}private boolean setNX(String key,Long value){return redisLockOperation.setNX(key, value);}private long getLock(String key){return redisLockOperation.get(key);}private Long getSet(String key,Long value){return redisLockOperation.getSet(key, value);}private void releaseLock(String key){redisLockOperation.delete(key);}@Pointcut(value = "execution(* me.ele..StargateBuildMessageConsumer.consumeStargateBuildMessage(me.ele.api.portal.service.mq.dto.BuildMessage)) && args(buildMessage)" +"&& @annotation(me.ele.api.portal.service.redis.RedisLock)", argNames = "buildMessage")private void buildMessageLockPoint(BuildMessage buildMessage){}@Around(value = "buildMessageLockPoint(buildMessage)", argNames = "pjp,buildMessage")public Object buildMessageAround(ProceedingJoinPoint pjp, BuildMessage buildMessage) throws Throwable {final String LOCK = buildMessage.getAppId() + buildMessage.getPushSequence();Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();return pjp.proceed();} finally {try {lock.unlock();} catch (Exception e) {log.error("buildMessage={}, Lock {} unlock failed. {}", buildMessage, lock, e);}}}}

四、遇到的問題

  

?

  

  開始是將鎖加到deploy的方法上的,但是一直aop一直沒有作用,換到consumeStargateDeployMessage方法上就可以了。考慮了一下是因為 @Transactional的原因。這里注意下。

?  在一篇文章中找到了原因:SpringBoot CGLIB AOP解決Spring事務,對象調用自己方法事務失效.

  只要脫離了Spring容器管理的所有對象,對于SpringAOP的注解都會失效,因為他們不是Spring容器的代理類,SpringAOP,就切入不了。也就是說是?@Transactional注解方法的代理對象并不是spring代理對象。

  參考:?關于proxy模式下,@Transactional標簽在創建代理對象時的應用

五、使用spring-redis中的RedisLockRegistry

import java.util.concurrent.locks.Lock;
import org.springframework.integration.redis.util.RedisLockRegistry;@Bean
public RedisLockRegistry redisLockRegistry(@Value("${xxx.xxxx.registry}") String redisRegistryKey,RedisTemplate redisTemplate) {return new RedisLockRegistry(redisTemplate.getConnectionFactory(), redisRegistryKey, 200000);
}Lock lock = redisLockRegistry.obtain(appId);lock.tryLock(180, TimeUnit.SECONDS);
....
lock.unlock();  

六、參考

  其他工具類,請參考這里。

七、springboot LockRegistry

  

? ? ??分布式鎖-RedisLockRegistry源碼分析[轉]

?

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
import redis.clients.jedis.JedisShardInfo;@Ignore
public class RedisLockTest {private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockTest.class);private static final String LOCK = "xxx.xxx";private RedisLockRegistry redisLockRegistry;@Beforepublic void setUp() {JedisShardInfo shardInfo = new JedisShardInfo("127.0.0.1");JedisConnectionFactory factory = new JedisConnectionFactory(shardInfo);redisLockRegistry = new RedisLockRegistry(factory, "test", 50L);}private class TaskA implements Runnable {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(10);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}private class TimeoutTask implements Runnable {@Overridepublic void run() {Lock lock = redisLockRegistry.obtain(LOCK);try {lock.lock();LOGGER.info("Lock {} is obtained", lock);Thread.sleep(5000);lock.unlock();LOGGER.info("Lock {} is unlocked", lock);} catch (Exception ex) {LOGGER.error("Lock {} unlock failed", lock, ex);}}}@Testpublic void test() throws InterruptedException, TimeoutException {ExecutorService service = Executors.newFixedThreadPool(2);service.execute(new TimeoutTask());service.execute(new TaskA());service.shutdown();if (!service.awaitTermination(1, TimeUnit.MINUTES)) {throw new TimeoutException();}}
}

?

?

?

轉載于:https://www.cnblogs.com/hujunzheng/p/7612264.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/531238.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/531238.shtml
英文地址,請注明出處:http://en.pswp.cn/news/531238.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Jackson ObjectMapper readValue過程

1.整體調用棧 2.看一下調用棧的兩個方法 resolve 方法中通過 Iterator i$ this._beanProperties.iterator() 遍歷屬性的所有子屬性,緩存對應的 deserializer。觀察調用棧的方法,可以發現是循環調用的。 3.比如尋找自定義的 LocalDateTime類的序列化實現…

java如何尋找main函數對應的類

參考springboot Class<?> deduceMainApplicationClass() {try {StackTraceElement[] stackTrace new RuntimeException().getStackTrace();for (StackTraceElement stackTraceElement : stackTrace) {if ("main".equals(stackTraceElement.getMethodName())…

jooq實踐

用法 sql語句 SELECT AUTHOR.FIRST_NAME, AUTHOR.LAST_NAME, COUNT(*)FROM AUTHORJOIN BOOK ON AUTHOR.ID BOOK.AUTHOR_IDWHERE BOOK.LANGUAGE DEAND BOOK.PUBLISHED > DATE 2008-01-01 GROUP BY AUTHOR.FIRST_NAME, AUTHOR.LAST_NAMEHAVING COUNT(*) > 5 ORDER BY AUT…

不同包下,相同數據結構的兩個類進行轉換

import com.alibaba.fastjson.JSON; JSON.parseObject(JSON.toJSONString(obj1), obj2.class) import com.fasterxml.jackson.databind.ObjectMapper; objectMapper.convertValue(obj1, obj2.class); 兩個工具類 JsonUtil JacksonHelper 轉載于:https://www.cnblogs.com/hujunz…

git根據用戶過濾提交記錄

使用SourceTree 使用gitk 轉載于:https://www.cnblogs.com/hujunzheng/p/8398203.html

springboot Autowired BeanNotOfRequiredTypeException

現象 org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named xxxxImpl is expected to be of type com.xxx.xxxImpl but was actually of type com.sun.proxy.$Proxy62 直接Autowired一個實現類&#xff0c;而不是接口 Autowired private XxxServiceI…

cglib動態代理導致注解丟失問題及如何修改注解允許被繼承

現象 SOAService這個bean先后經過兩個BeanPostProcessor&#xff0c;會發現代理之后注解就丟失了。 開啟了cglib代理 SpringBootApplication EnableAspectJAutoProxy(proxyTargetClass true) public class Application {public static void main(String[] args) {SpringApplic…

spring AbstractBeanDefinition創建bean類型是動態代理類的方式

1.接口 Class<?> resourceClass 2.獲取builder BeanDefinitionBuilder builder BeanDefinitionBuilder.genericBeanDefinition(resourceClass); 3.獲取接口對應的動態代理class Class<?> targetProxyClass Proxy.getProxyClass(XXX.class.getClassLoader(), ne…

TypeReference -- 讓Jackson Json在List/Map中識別自己的Object

private Map<String, Object> buildHeaders(Object params) {ObjectMapper objectMapper JacksonHelper.getMapper();return objectMapper.convertValue(params, new TypeReference<Map<String, Object>>(){}); } 參考How to use Jackson to deserialis…

微信小程序:一起玩連線,一個算法來搞定

微信小程序&#xff1a;一起玩連線 游戲玩法 將相同顏色的結點連接在一起&#xff0c;連線之間不能交叉。 算法思想 轉換為多個源點到達對應終點的路徑問題&#xff0c;且路徑之間不相交。按照dfs方式尋找兩個結點路徑&#xff0c;一條路徑探索完之后&#xff0c;標記地圖并記錄…

IntelliJ IDEA關于logger的live template配置

1.安裝 log support2插件 2.配置log support2 由于項目中的日志框架是公司自己封裝的&#xff0c;所以還需要自己手動改一下 log support2插件生成的live template 當然也可以修改 Log support global的配置 包括 Logger Field、Logger class、Logger Factory class都可以修改。…

springboot項目接入配置中心,實現@ConfigurationProperties的bean屬性刷新方案

前言 配置中心&#xff0c;通過keyvalue的形式存儲環境變量。配置中心的屬性做了修改&#xff0c;項目中可以通過配置中心的依賴&#xff08;sdk&#xff09;立即感知到。需要做的就是如何在屬性發生變化時&#xff0c;改變帶有ConfigurationProperties的bean的相關屬性。 配置…

jackson實現java對象轉支付寶/微信模板消息

一、支付寶消息模板大致長這樣 {"to_user_id": "","telephone": "xxxxx","template": {"template_id": "xxxxxx","context": {"head_color": "#85be53","url"…

簡單封裝kafka相關的api

一、針對于kafka版本 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.8.2.2</version></dependency><dependency><groupId>org.apache.kafka</groupId>…

springmvc controller動態設置content-type

springmvc RequestMappingHandlerAdapter#invokeHandlerMethod 通過ServletInvocableHandlerMethod#invokeAndHandle調用目標方法&#xff0c;并處理返回值。 如果return value &#xff01; null&#xff0c;則通過returnvalueHandlers處理&#xff0c;內部會調用MessageConv…

springboot2.0 redis EnableCaching的配置和使用

一、前言 關于EnableCaching最簡單使用&#xff0c;個人感覺只需提供一個CacheManager的一個實例就好了。springboot為我們提供了cache相關的自動配置。引入cache模塊&#xff0c;如下。 二、maven依賴 <dependency><groupId>org.springframework.boot</groupId…

依賴配置中心實現注有@ConfigurationProperties的bean相關屬性刷新

配置中心是什么 配置中心&#xff0c;通過keyvalue的形式存儲環境變量。配置中心的屬性做了修改&#xff0c;項目中可以通過配置中心的依賴&#xff08;sdk&#xff09;立即感知到。需要做的就是如何在屬性發生變化時&#xff0c;改變帶有ConfigurationProperties的bean的相關屬…

java接口簽名(Signature)實現方案

預祝大家國慶節快樂&#xff0c;趕快迎接美麗而快樂的假期吧&#xff01;&#xff01;&#xff01; 前言 在為第三方系統提供接口的時候&#xff0c;肯定要考慮接口數據的安全問題&#xff0c;比如數據是否被篡改&#xff0c;數據是否已經過時&#xff0c;數據是否可以重復提交…

Git rebase命令實戰

一、前言 一句話&#xff0c;git rebase 可以幫助項目中的提交歷史干凈整潔&#xff01;&#xff01;&#xff01; 二、避免合并出現分叉現象 git merge操作 1、新建一個 develop 分支 2、在develop分支上新建兩個文件 3、然后分別執行 add、commit、push 4、接著切換到master分…

HttpServletRequestWrapper使用技巧(自定義session和緩存InputStream)

一、前言 javax.servlet.http.HttpServletRequestWrapper 是一個開發者可以繼承的類&#xff0c;我們可以重寫相應的方法來實現session的自定義以及緩存InputStream&#xff0c;在程序中可以多次獲取request body的內容。 二、自定義seesion import javax.servlet.http.*;publi…