目錄
channel和connection的區別
自動裝配RabbitAutoConfiguration
消息發送流程
獲取connection對象
獲取channel對象
AMQConnection讀取frame幀并回調publishconfirm和publishreturn
MainLoop線程監聽
執行回調
channel和connection的區別
Spring AMQP?是 Spring 框架對 AMQP(高級消息隊列協議)的支持,提供了一個高級抽象層,使得在 Spring 項目中使用消息隊列變得更加方便。
在源碼中會出現Channel和Connecttion的概念,我先來解釋一下
TCP連接:TCP連接是傳輸層面上的連接,通常是通過IP地址和端口號建立的,RabbitMQ使用TCP協議進行網絡通信,所有的消息傳遞都是在TCP連接上進行的。
RabbitMQ的連接:RabbitMQ的連接是指通過TCP建立的連接,通常是指Connection對,RabbitMQ在一個TCP連接上可以創建多個邏輯連接(即Channel)。
? ? ? ? RabbitMQ的設計理念是盡量減少TCP連接的數量,推薦使用一個TCP連接來承載多個Channel,這種設計可以減少網絡開銷,提高性能,同時也簡化了連接管理。
自動裝配RabbitAutoConfiguration
@Configuration(proxyBeanMethods = false
)
@ConditionalOnClass({RabbitTemplate.class, Channel.class})
@EnableConfigurationProperties({RabbitProperties.class})
@Import({RabbitAnnotationDrivenConfiguration.class})
public class RabbitAutoConfiguration {@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean({RabbitOperations.class})
public RabbitTemplate rabbitTemplate(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers, ConnectionFactory connectionFactory) {PropertyMapper map = PropertyMapper.get();RabbitTemplate template = new RabbitTemplate(connectionFactory);messageConverter.ifUnique(template::setMessageConverter);template.setMandatory(this.determineMandatoryFlag(properties));RabbitProperties.Template templateProperties = properties.getTemplate();if (templateProperties.getRetry().isEnabled()) {template.setRetryTemplate((new RetryTemplateFactory((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));}templateProperties.getClass();map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);templateProperties.getClass();map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);templateProperties.getClass();map.from(templateProperties::getExchange).to(template::setExchange);templateProperties.getClass();map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);templateProperties.getClass();map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);return template;
}@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {PropertyMapper map = PropertyMapper.get();CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject());properties.getClass();map.from(properties::determineAddresses).to(factory::setAddresses);properties.getClass();map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);properties.getClass();map.from(properties::getPublisherConfirmType).whenNonNull().to(factory::setPublisherConfirmType);RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();channel.getClass();map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);channel.getClass();map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout);RabbitProperties.Cache.Connection connection = properties.getCache().getConnection();connection.getClass();map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);connection.getClass();map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize);connectionNameStrategy.getClass();map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy);return factory;
}}
@ConfigurationProperties(prefix = "spring.rabbitmq"
)
public class RabbitProperties {private String host = "localhost";private int port = 5672;private String username = "guest";private String password = "guest";private final Ssl ssl = new Ssl();private String virtualHost;private String addresses;@DurationUnit(ChronoUnit.SECONDS)private Duration requestedHeartbeat;private boolean publisherReturns;private CachingConnectionFactory.ConfirmType publisherConfirmType;private Duration connectionTimeout;private final Cache cache = new Cache();private final Listener listener = new Listener();private final Template template = new Template();private List<Address> parsedAddresses;}
這里就是spring自動裝配的流程,其完整流程就是SpringBoot啟動->@SpringBootApplication->@EnableAutoConfiguration->AutoConfigurationImportSelector掃描META-INF/spring.factories->加載RabbitAutoConfiguration->創建RabbitMQ相關Bean
RabbitProperties類加了@ConfigurationProperties會去讀取配置文件中的參數,否則就提供類屬性里面的默認配置,RabbitAutoConfiguration用@Bean注解通過方法將RabbitTemplate,CachingConnectionFactory都會注冊成bean,在方法里面會注入RabbitProperties的bean給他們設置參數。
消息發送流程
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
public class Message implements Serializable {private static final long serialVersionUID = -7177590352110605597L;private static final String DEFAULT_ENCODING = Charset.defaultCharset().name();private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*"));private static String bodyEncoding;private final MessageProperties messageProperties;private final byte[] body;public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties;}}
在通過RabbitTemplate的convertAndSend()方法發送消息的時候,先判斷是否定義了RetryTemplate,在RetryTemplate對象里面會定義重試的次數,間隔,RetryTemplate對象是否為null來判斷是否要重試,如果開啟了重試就調調用RetryTemplate的doexecute方法,并傳入RecoveryCallback,用于處理所有重試失敗后的邏輯,如果沒有開啟重試直接調用doExecute進行處理。
doExecute會通過CachingConnectionFactory來獲取channel,最后通過dosend()方法發送消息。dosend方法先進行setupConfirm()方法再調用sendToRabbit發消息,RabbitTemplate內部會將消息包裝成Message對象,通過Channel.basicPublish()方法發送消息,Message內部有一個byte字節數組封裝要發送的消息,還有MessageProperties封裝一些屬性,像消息id,做一個防止重復消費消息
獲取connection對象
public final Connection createConnection() throws AmqpException {if (this.stopped) {throw new AmqpApplicationContextClosedException("The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");} else {synchronized(this.connectionMonitor) {if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {if (this.connection.target == null) {this.connection.target = super.createBareConnection();if (!this.checkoutPermits.containsKey(this.connection)) {this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));}this.connection.closeNotified.set(false);this.getConnectionListener().onCreate(this.connection);}return this.connection;} else {return this.cacheMode == CachingConnectionFactory.CacheMode.CONNECTION ? this.connectionFromCache() : null;}}}
}
CachingConnectionFactory的createConnection方法創建連接,CachingConnectionFactory內部定義了一個Object對象作為鎖connectionMonitor,在獲取連接的時候會進行一個上鎖的操作,判斷采用的策略
獲取channel對象
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {Semaphore permits = null;if (this.channelCheckoutTimeout > 0L) {permits = this.obtainPermits(connection);}LinkedList<ChannelProxy> channelList = this.determineChannelList(connection, transactional);ChannelProxy channel = null;if (connection.isOpen()) {channel = this.findOpenChannel(channelList, channel);if (channel != null && this.logger.isTraceEnabled()) {this.logger.trace("Found cached Rabbit Channel: " + channel.toString());}}if (channel == null) {try {channel = this.getCachedChannelProxy(connection, channelList, transactional);} catch (RuntimeException var7) {if (permits != null) {permits.release();if (this.logger.isDebugEnabled()) {this.logger.debug("Could not get channel; released permit for " + connection + ", remaining:" + permits.availablePermits());}}throw var7;}}return channel;
}
private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList, ChannelProxy channelArg) {ChannelProxy channel = channelArg;synchronized(channelList) {while(!channelList.isEmpty()) {channel = (ChannelProxy)channelList.removeFirst();if (this.logger.isTraceEnabled()) {this.logger.trace(channel + " retrieved from cache");}if (channel.isOpen()) {break;}this.cleanUpClosedChannel(channel);channel = null;}return channel;}
}
private ChannelProxy getCachedChannelProxy(ChannelCachingConnectionProxy connection, LinkedList<ChannelProxy> channelList, boolean transactional) {Channel targetChannel = this.createBareChannel(connection, transactional);if (this.logger.isDebugEnabled()) {this.logger.debug("Creating cached Rabbit Channel from " + targetChannel);}this.getChannelListener().onCreate(targetChannel, transactional);Class[] interfaces;if (!CachingConnectionFactory.ConfirmType.CORRELATED.equals(this.confirmType) && !this.publisherReturns) {interfaces = new Class[]{ChannelProxy.class};} else {interfaces = new Class[]{ChannelProxy.class, PublisherCallbackChannel.class};}return (ChannelProxy)Proxy.newProxyInstance(ChannelProxy.class.getClassLoader(), interfaces, new CachedChannelInvocationHandler(connection, targetChannel, channelList, transactional));
}
private Channel createBareChannel(ChannelCachingConnectionProxy connection, boolean transactional) {if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {// 檢查連接是否斷開if (!this.connection.isOpen()) {synchronized(this.connectionMonitor) {// 雙重檢查,確保在獲取鎖后連接仍然是關閉狀態if (!this.connection.isOpen()) {// 通知連接關閉事件this.connection.notifyCloseIfNecessary();}// 再次檢查并重建連接if (!this.connection.isOpen()) {// 清除舊連接this.connection.target = null;// 創建新連接this.createConnection();}}}
}return this.doCreateBareChannel(this.connection, transactional);} else if (this.cacheMode == CachingConnectionFactory.CacheMode.CONNECTION) {if (!connection.isOpen()) {synchronized(this.connectionMonitor) {((LinkedList)this.allocatedConnectionNonTransactionalChannels.get(connection)).clear();((LinkedList)this.allocatedConnectionTransactionalChannels.get(connection)).clear();connection.notifyCloseIfNecessary();this.refreshProxyConnection(connection);}}return this.doCreateBareChannel(connection, transactional);} else {return null;}
}
private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) {Channel channel = conn.createBareChannel(transactional);if (!CachingConnectionFactory.ConfirmType.NONE.equals(this.confirmType)) {try {((Channel)channel).confirmSelect();} catch (IOException var5) {this.logger.error("Could not configure the channel to receive publisher confirms", var5);}}if ((CachingConnectionFactory.ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns) && !(channel instanceof PublisherCallbackChannelImpl)) {channel = this.publisherChannelFactory.createChannel((Channel)channel, this.getChannelsExecutor());}if (channel != null) {((Channel)channel).addShutdownListener(this);}return (Channel)channel;
}
public class ChannelN extends AMQChannel implements Channel {private static final String UNSPECIFIED_OUT_OF_BAND = "";private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);private final Map<String, Consumer> _consumers;private final Collection<ReturnListener> returnListeners;private final Collection<ConfirmListener> confirmListeners;}
CachingConnectionFactory調用getChannel()獲取channel,會用一個map存儲Connection,和一個 Semaphore來保證每個Connection的channel數,會再獲取到存儲channel的LinkList,實現Channel的復用。
獲取LinkList中的連接會先上個鎖,防止并發下產生問題。
如果從LinkList中獲取的channel為null則通過getCachedChannelProxy()方法去獲取channel的一個代理對象
getCachedChannelProxy()方法先通過createBareChannel方法獲取一個targetchannel,會通過Jdk代理生成ChannelProxy對象,會判斷有沒有開啟retrun機制,開啟了代理實現相應的接口,CachingConnectionFactory內部定義了一個CachedChannelInvocationHandler
createBareChannel方法會先判斷有沒有創建connection對象,若是connection對象為null則采用了雙重檢驗的方法,判斷加鎖再判斷來防止多線程創建的問題,最后調用doCreateBareChannel方法創建channel,存在直接調用doCreateBareChannel。
doCreateBareChannel通過connection創建channel,再判斷是否開啟confirmType,publisherReturns,在channel接口的實現類會有returnListeners,confirmListeners存儲。
AMQConnection讀取frame幀并回調publishconfirm和publishreturn
MainLoop線程監聽
public class AMQConnection {private final MainLoop mainLoop;private volatile ChannelManager _channelManager;// 連接啟動時會啟動MainLoop線程public void startMainLoop() {MainLoop loop = new MainLoop();String name = "AMQP Connection " + this.getHostAddress() + ":" + this.getPort();this.mainLoopThread = Environment.newThread(this.threadFactory, loop, name);this.mainLoopThread.start();
}
public Channel createChannel(int channelNumber) throws IOException {this.ensureIsOpen();ChannelManager cm = this._channelManager;if (cm == null) {return null;} else {Channel channel = cm.createChannel(this, channelNumber);this.metricsCollector.newChannel(channel);return channel;}
}// MainLoop是一個獨立線程,負責從socket讀取數據// 從socket讀取AMQP幀private class MainLoop implements Runnable {private MainLoop() {}public void run() {boolean shouldDoFinalShutdown = true;try {while(AMQConnection.this._running) {Frame frame = AMQConnection.this._frameHandler.readFrame();AMQConnection.this.readFrame(frame);}} catch (Throwable var6) {if (var6 instanceof InterruptedException) {shouldDoFinalShutdown = false;} else {AMQConnection.this.handleFailure(var6);}} finally {if (shouldDoFinalShutdown) {AMQConnection.this.doFinalShutdown();}}}
}
private void readFrame(Frame frame) throws IOException {if (frame != null) {this._missedHeartbeats = 0;if (frame.type != 8) {if (frame.channel == 0) {this._channel0.handleFrame(frame);} else if (this.isOpen()) {ChannelManager cm = this._channelManager;if (cm != null) {ChannelN channel;try {channel = cm.getChannel(frame.channel);} catch (UnknownChannelException var5) {LOGGER.info("Received a frame on an unknown channel, ignoring it");return;}channel.handleFrame(frame);}}}} else {this.handleSocketTimeout();}}
}
public class ChannelManager {
private final Map<Integer, ChannelN> _channelMap;private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {if (this._channelMap.containsKey(channelNumber)) {throw new IllegalStateException("We have attempted to create a channel with a number that is already in use. This should never happen. Please report this as a bug.");} else {ChannelN ch = this.instantiateChannel(connection, channelNumber, this.workService);this._channelMap.put(ch.getChannelNumber(), ch);return ch;}
}
}
AMQConnection是一些對frame幀,連接啟動時會啟動MainLoop線程,MainLoop是一個獨立線程,負責從socket讀取數據, 從socket讀取AMQP幀,根據channel號找到對應的channel并處理幀,根據channel號找到對應的channel并處理幀,在AMQConnection 有個ChannelManager類型的對象,依靠他來管理創建的channel,保存在一個map里面,key為序列號,value是channel。
執行回調
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {private static final MessagePropertiesConverter CONVERTER = new DefaultMessagePropertiesConverter();private static final long RETURN_CALLBACK_TIMEOUT = 60L;private final Log logger = LogFactory.getLog(this.getClass());private final Channel delegate;private final ConcurrentMap<String, PublisherCallbackChannel.Listener> listeners = new ConcurrentHashMap();private final Map<PublisherCallbackChannel.Listener, SortedMap<Long, PendingConfirm>> pendingConfirms = new ConcurrentHashMap();private final Map<String, PendingConfirm> pendingReturns = new ConcurrentHashMap();private final SortedMap<Long, PublisherCallbackChannel.Listener> listenerForSeq = new ConcurrentSkipListMap();}
public void handleAck(long seq, boolean multiple) {if (this.logger.isDebugEnabled()) {this.logger.debug(this.toString() + " PC:Ack:" + seq + ":" + multiple);}this.processAck(seq, true, multiple, true);
}public void handleNack(long seq, boolean multiple) {if (this.logger.isDebugEnabled()) {this.logger.debug(this.toString() + " PC:Nack:" + seq + ":" + multiple);}this.processAck(seq, false, multiple, true);
}
public synchronized void addPendingConfirm(PublisherCallbackChannel.Listener listener, long seq, PendingConfirm pendingConfirm) {SortedMap<Long, PendingConfirm> pendingConfirmsForListener = (SortedMap)this.pendingConfirms.get(listener);Assert.notNull(pendingConfirmsForListener, "Listener not registered: " + listener + " " + this.pendingConfirms.keySet());pendingConfirmsForListener.put(seq, pendingConfirm);this.listenerForSeq.put(seq, listener);if (pendingConfirm.getCorrelationData() != null) {String returnCorrelation = pendingConfirm.getCorrelationData().getId();if (StringUtils.hasText(returnCorrelation)) {this.pendingReturns.put(returnCorrelation, pendingConfirm);}}}
PublisherCallbackChannelImpl是channel的實現類,若是傳遞了correlationData會轉化成PendingConfirm放到map里面,Listener是key,PendingConfirm是value的key。當broker確認消息后,會觸發channel的confirm監聽器,找到對應的CorrelationData,執行回調。