mysql binlog監聽器
前置操作
1.查看mysql是否開啟binlog
show variables like 'log_bin';
2.查看是否使用row格式的binlog
show variables like 'binlog_format';
3.如果以上都不是請修改mysql的配置文件添加或者修改如下內容
#配置binlog存放路徑
log-bin=E://mysql//binlog//mysql-bin
#bin日志的格式 Mixed/row
binlog-format=row
4.重啟mysql再次執行1.2步查看是否生效
使用
1.執行 mvn install
2.引入下面包
cn.bucheng
binlog-core
0.0.1-SNAPSHOT
3.添加下面配置
mysql.binlog.host=127.0.0.1
mysql.binlog.port=3306
mysql.binlog.username=root
mysql.binlog.password=123456
4.創建需要監聽的實體對象
(目前默認將java字段轉數庫字段規則:將大寫字母小寫并在前面添加_ 比如 userName -轉變為-> user_name)
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName(schema = "ad_test", table = "ad_book")
public class BookEntity implements Serializable {
//如果數據庫中列表和表名相同可以不用添加Column注解
private Long id;
private String name;
private String title;
private String writer;
@ColumnName(sqlColumn = "create_time")
private Date createTime;
@ColumnName(sqlColumn = "update_time")
private Date updateTime;
private String content;
}
5.創建監聽并添加到springboot容器中
@Component
@Slf4j
public class BookListener implements IListener {
@Override
public Class getClassType() {
return BookEntity.class;
}
@Override
public void saveEvent(BookEntity data) {
log.info("save event, " + data);
}
@Override
public void updateEvent(BookEntity data) {
log.info("update event," + data);
}
@Override
public void deleteEvent(Serializable id) {
log.info("delete event," + id);
}
}
6.默認會將數據回設到對象上面當時存在有些字段回設不成功此時可以利用下面來實現高級用法
@Component
@Slf4j
public class BookHandle implements FieldValueHandle {
@Override
public Class getClassType() {
return BookEntity.class;
}
@Override
public BookEntity handle(Map values) {
log.info("decode column value to object");
BookEntity entity = BinLogUtils.decode(BookEntity.class, values);
Serializable content = values.get("content");
if(content!=null){
byte[] datas = (byte[])content;
entity.setContent(new String(datas));
}
return entity;
}
}
高級用法
1.記錄binlog加載文件并初始化偏移量為0
@Component
@Slf4j
public class GlobalBinLogFileHandle implements IBinLogFileListener {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void handleBinLogFile(String fileName, long position) {
redisTemplate.opsForHash().put("es-boot-binLog", "filename", fileName);
redisTemplate.opsForHash().put("es-boot-binLog", "position", position + "");
log.info("save binLogFile:{} position:{}",fileName,position);
}
}
2.記錄binlog加載的偏移量
@Component
@Slf4j
public class GlobalCommitPositionHandle implements BinLogCommitPosition {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void commitBinLogPosition(long position) {
log.info("update position to redis position:{}", position);
redisTemplate.opsForHash().put("es-boot-binLog", "position", position + "");
}
}
3.服務異常重啟時恢復上次加載位置
@Slf4j
@Component
public class GlobalConfigHandle implements BinLogConfigHook {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void config(BinLogConfig config) {
Object filename = redisTemplate.opsForHash().get("es-boot-binLog", "filename");
Object position = redisTemplate.opsForHash().get("es-boot-binLog", "position");
if (filename != null && !filename.equals("") && position != null && !"".equals(position + "")) {
log.info("begin load filename:{}, position:{}", filename, position);
config.setFile(filename + "");
config.setPosition(Long.parseLong(position + ""));
}
}
}
4.注意點,
>>1:不要使用監聽的庫存放上面記錄,不然會出現死循環,也就binlog不斷在變化
>>2:不要在監聽器中進行耗時操作,這樣會阻塞其他監聽器及時接收消息能力。如果需要耗時的操作請采用開啟線程進行操作
示例代碼
請查看example模塊中的readme