// 緩存已創建的綁定,避免重復聲明private final Map<String, Date> createdBindings = new ConcurrentHashMap<>();
public void createAndBindQueueToExchange(String type,String clinetId, String routingKey) {String queueName = routingKey;log.info("初始化類型:{}",type);QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);if(queueInformation == null) {//隊列不存在則創建Map<String, Object> args = new HashMap<>();args.put("x-max-priority", maxPriority); // 設置優先級范圍Queue queue = new Queue(queueName, true, false, false, args);rabbitAdmin.declareQueue(queue);log.info("創建隊列: {} ", queueName);}else{log.info("隊列已存在: {} ", queueName);}String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;//還要判斷監聽容器是否存在if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {log.info("綁定已存在緩存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));createdBindings.put(containerKey,new Date());}else{//stopContainerListenerAndCleanCash(containerKey,"緩存 無Key(有無監聽容器)或(有緩存Key無監聽容器)");// 2. 聲明綁定到已存在的交換機Binding binding = new Binding(queueName,Binding.DestinationType.QUEUE,mainDirectExchange,routingKey,null);rabbitAdmin.declareBinding(binding);// 添加到緩存createdBindings.put(containerKey,new Date());log.info("成功創建綁定 for queue: {} to exchange: {} with routing key: {}",queueName, mainDirectExchange, routingKey);}// 3. 注冊監聽器if (!registry.getListenerContainerIds().contains(containerKey)) {SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();endpoint.setId(containerKey);endpoint.setQueueNames(queueName);//endpoint.setAutoStartup(true);// 使用手動ACK的消息監聽器endpoint.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//處理消息完成后,不退回為trueboolean okMessage = false;String consumerTag = "";Exception exception = null;FlowInstanceNode flowInstanceNode = null;try {consumerTag = message.getMessageProperties().getConsumerTag();String clientId = consumerTag.split(":")[3];String messageBody = new String(message.getBody());flowInstanceNode = processMessage(clientId,messageBody);if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {//消費正常channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);okMessage = true;}} catch (Exception e) {log.error("指令包消息處理失敗: {}", truncateString(new String(message.getBody())), e);exception = e;}finally {// 根據異常類型決定是否重新入隊// 網絡異常的情況都應返回隊列重新消費boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);log.info("指令執行包異常標志exceptionFlag: {}",exceptionFlag);if (exceptionFlag) {okMessage = false;// 消息重新入隊channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.warn("指令包消息處理失敗,已重新入隊: {}", truncateString(new String(message.getBody())));} else {// 消息丟棄(不重新入隊)if(!okMessage){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("指令包消息處理失敗,已丟棄: {}", truncateString(new String(message.getBody())));}okMessage = true;}if (okMessage) {// 消費消息序列ID寫入緩存Long sequence = flowInstanceNode.getSequence();String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();String sequenceKey = professionSoftwareCode+":message:finishedSequence";cacheService.set(sequenceKey, sequence);}String exceptionType = flowInstanceNode.getExceptionType();//網絡異常if(exceptionFlag){//調用專業軟件API網絡異常//停止監聽stopContainerListenerAndCleanCash(consumerTag,"網絡異常");//數據返回隊列}//判斷心跳是否超時stopContainer(consumerTag,"每次檢查心跳超時");log.info("本次消費執行完畢");}}});registry.registerListenerContainer(endpoint, factory, false);SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return containerKey;}});simpleMessageListenerContainer.start();// 設置監聽器容器log.info("設置監聽器容器并啟動監聽 queue: {},containerKey :{}" , queueName,containerKey);}else {// 啟動已存在的監聽器if (!registry.getListenerContainer(containerKey).isRunning()) {//啟動監聽registry.getListenerContainer(containerKey).start();log.info("啟動監聽 queue: {},containerKey:{}" , queueName,containerKey);} else {log.info("監聽已運行 queue: {},containerKey:{}" , queueName,containerKey);}}}
動態銷毀:
public void stopContainerListenerAndCleanCash(String containerKey,String tip) {try {log.info(tip+",清除緩存,停止監聽容器:{}", containerKey);if (registry.getListenerContainerIds().contains(containerKey)) {registry.getListenerContainer(containerKey).stop();registry.unregisterListenerContainer(containerKey);}// 刪除隊列// rabbitAdmin.deleteQueue(queueName);// 清理該隊列相關的綁定緩存createdBindings.remove(containerKey);}catch (Exception e){log.error("清除緩存,停止監聽容器異常",e);}}
容器事件監聽:
//容器異常
@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@AutowiredRabbitMQService rabbitMQService;@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {Throwable t = event.getThrowable();Object object = event.getSource();if (object instanceof SimpleMessageListenerContainer){SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;String queueName = container.getQueueNames()[0];String listenerId = container.getListenerId();rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器異常");}log.error("RabbitMQ監聽容器異常", t);// 這里可以判斷異常類型,比如隊列不存在、連接斷開等if (t instanceof ShutdownSignalException) {// 處理隊列被刪除、服務失聯等}}
}
//
@PostConstructpublic void init() {//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//factory.setReceiveTimeout();//factory.setReceiveTimeout(1L);factory.setContainerCustomizer(container -> {// 設置消費者超時時間(需RabbitMQ服務端支持)container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));});factory.setErrorHandler(t -> {// 這里可以捕獲到消息處理時的異常log.error("RabbitMQ消息處理異常", t);// 可以根據異常類型做不同處理});connectionFactory.addConnectionListener(new ConnectionListener() {@Overridepublic void onClose(Connection connection) {stopAllContainerListeners();log.warn("RabbitMQ連接關閉");}@Overridepublic void onCreate(Connection connection) {stopAllContainerListeners();log.info("RabbitMQ連接創建");}@Overridepublic void onShutDown(ShutdownSignalException signal) {stopAllContainerListeners();log.error("RabbitMQ連接異常關閉", signal);}});}