在閱讀這篇文章前,推薦先閱讀
- [netty5: ByteToMessageCodec & MessageToByteEncoder & ByteToMessageDecoder]-源碼分析
- [netty5: HttpObject]-源碼解析
100-continue
100-continue 是 HTTP/1.1 協議中的一種機制,用于客戶端在發送大體積請求體(如文件上傳)前,先向服務器發送一個帶有 Expect: 100-continue 頭的請求,詢問服務器是否準備好接收請求體;服務器如果準備好了,會返回 100 Continue 響應,客戶端才開始發送實際數據,從而避免不必要的大數據傳輸。
MessageAggregator
MessageAggregator<I, S, C, A> 是一個高度可復用的通用聚合器框架,適用于各種流式協議的“分段消息聚合”場景:
功能 | 說明 |
---|---|
啟動聚合 | tryStartMessage() 檢測到起始消息,初始化聚合 |
聚合中 | tryContentMessage() 檢測到內容消息,append 內容 |
完成聚合 | isLastContentMessage() 判斷是否是最后一塊,調用 finishAggregation |
超長控制 | maxContentLength + lengthForContent 控制體積 |
特殊控制 | 支持 100-continue、異常處理、復用 context listener |
public abstract class MessageAggregator<I, S, C extends AutoCloseable, A extends AutoCloseable> extends MessageToMessageDecoder<I> {// 當前正在聚合的完整消息(例如 FullHttpRequest 或 FullHttpResponse)private A currentMessage;// 聚合內容允許的最大字節數,超過則觸發 handleOversizedMessageprivate final int maxContentLength;// 標識當前是否正在處理超長消息,避免重復處理。private boolean handlingOversizedMessage;private ChannelHandlerContext ctx;// 用于監聽 100-Continue 響應寫入完成后的回調private FutureContextListener<ChannelHandlerContext, Void> continueResponseWriteListener;// 標識是否正在聚合過程中private boolean aggregating;// 是否在通道關閉時處理未完成的聚合private boolean handleIncompleteAggregateDuringClose = true;protected MessageAggregator(int maxContentLength) {this.maxContentLength = maxContentLength;}@Overridepublic boolean acceptInboundMessage(Object msg) throws Exception {if (!super.acceptInboundMessage(msg)) {return false;}if (isAggregated(msg)) {return false;}if (tryStartMessage(msg) != null) {return true;}return aggregating && tryContentMessage(msg) != null;}@Overrideprotected void decode(final ChannelHandlerContext ctx, I msg) throws Exception {// 1. 判斷是否為新消息起始(startMsg)// 判斷當前收到的 msg 是否為 HttpMessage,如果是,則開始處理聚合。final S startMsg = tryStartMessage(msg);if (startMsg != null) {aggregating = true;handlingOversizedMessage = false;// 如果已存在未完成的 currentMessage,說明消息異常,拋出 MessageAggregationExceptionif (currentMessage != null) {currentMessage.close();currentMessage = null;throw new MessageAggregationException();}// 2. 處理 100-continue 相關響應(continueResponse)// newContinueResponse 的核心邏輯:// - 如果請求頭中包含 Expect: 100-continue,并且請求體大小沒有超過 maxContentLength,則返回一個 100 Continue 響應;// - 如果請求體過大(Content-Length > maxContentLength),則返回一個 413 Request Entity Too Large 錯誤響應;// - 如果不符合任何條件,返回 null,表示不需要繼續響應。Object continueResponse = newContinueResponse(startMsg, maxContentLength, ctx.pipeline());if (continueResponse != null) {// 構造監聽器FutureContextListener<ChannelHandlerContext, Void> listener = continueResponseWriteListener;if (listener == null) {continueResponseWriteListener = listener = (context, future) -> {if (future.isFailed()) {context.fireChannelExceptionCaught(future.cause());}};}// 判斷在收到 100-continue 響應后是否關閉連接,條件是配置了關閉標志且響應表示應忽略后續內容。boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);// // 判斷響應是否為客戶端錯誤(4xx),如果是,則說明應忽略后續內容。handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);// 寫出響應并監聽結果Future<Void> future = ctx.writeAndFlush(continueResponse).addListener(ctx, listener);if (closeAfterWrite) {handleIncompleteAggregateDuringClose = false;future.addListener(ctx, ChannelFutureListeners.CLOSE);return;}// 判斷是否忽略后續內容if (handlingOversizedMessage) {return;}} else if (isContentLengthInvalid(startMsg, maxContentLength)) {// 3. 檢查請求長度是否合法invokeHandleOversizedMessage(ctx, startMsg);return;}// 4. 處理起始消息中已有的解碼失敗情況if (startMsg instanceof DecoderResultProvider &&!((DecoderResultProvider) startMsg).decoderResult().isSuccess()) {final A aggregated = beginAggregation(ctx.bufferAllocator(), startMsg);finishAggregation(ctx.bufferAllocator(), aggregated);ctx.fireChannelRead(aggregated);return;}// 5. 初始化新消息聚合對象// 創建一個聚合消息實例(包含起始行和一個空的內容緩沖區),等待后續內容片段加入。currentMessage = beginAggregation(ctx.bufferAllocator(), startMsg);return;}// 6. 處理內容消息(contentMsg)// 先判斷 msg 是否是消息體片段,如果不是則拋異常。final C contentMsg = tryContentMessage(msg);if (contentMsg != null) {// 如果還沒有初始化的聚合消息(即沒有起始消息),忽略該內容。if (currentMessage == null) {return;}// 檢查聚合后長度是否超限,超限則調用超長消息處理。if (lengthForAggregation(currentMessage) > maxContentLength - lengthForContent(contentMsg)) {invokeHandleOversizedMessage(ctx, currentMessage);return;}// 調用 aggregate 將當前內容片段追加到聚合消息。aggregate(ctx.bufferAllocator(), currentMessage, contentMsg);final boolean last;// 檢查是否為消息最后一片(last)if (contentMsg instanceof DecoderResultProvider) {DecoderResult decoderResult = ((DecoderResultProvider) contentMsg).decoderResult();if (!decoderResult.isSuccess()) {if (currentMessage instanceof DecoderResultProvider) {((DecoderResultProvider) currentMessage).setDecoderResult(DecoderResult.failure(decoderResult.cause()));}last = true;} else {last = isLastContentMessage(contentMsg);}} else {last = isLastContentMessage(contentMsg);}// 如果是,完成聚合,清理狀態,向下游傳遞完整消息。if (last) {finishAggregation0(ctx.bufferAllocator(), currentMessage);// All doneA message = currentMessage;currentMessage = null;ctx.fireChannelRead(message);}} else {throw new MessageAggregationException();}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {if (currentMessage != null && !ctx.channel().getOption(ChannelOption.AUTO_READ)) {ctx.read();}ctx.fireChannelReadComplete();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (aggregating && handleIncompleteAggregateDuringClose) {ctx.fireChannelExceptionCaught(new PrematureChannelClosureException("Channel closed while still aggregating message"));}try {super.channelInactive(ctx);} finally {releaseCurrentMessage();}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {try {super.handlerRemoved(ctx);} finally {releaseCurrentMessage();}}protected final void releaseCurrentMessage() throws Exception {if (currentMessage != null) {currentMessage.close();currentMessage = null;}handlingOversizedMessage = false;aggregating = false;}// 省略抽象方法,具體看 HttpObjectAggregator
}
HttpObjectAggregator
HttpObjectAggregator
是構建高層 HTTP 服務的基礎設施組件,它將分塊的 HTTP 請求或響應組裝為完整對象,從而簡化上層應用邏輯。其設計清晰、可擴展性強,并充分考慮了 Expect: 100-continue 與 Content-Length 異常等 HTTP 協議邊界情況,是非常值得借鑒的聚合處理器實現。
public class HttpObjectAggregator<C extends HttpContent<C>>extends MessageAggregator<HttpObject, HttpMessage, HttpContent<C>, FullHttpMessage<?>> {private static final Logger logger = LoggerFactory.getLogger(HttpObjectAggregator.class);// 當檢測到客戶端發送了 100-continue 期望但請求內容過大時,是否關閉連接;// 為 true 則直接關閉連接,防止浪費資源,// 為 false 則保持連接打開并繼續讀取和丟棄數據直到下一請求。private final boolean closeOnExpectationFailed;public HttpObjectAggregator(int maxContentLength) {this(maxContentLength, false);}public HttpObjectAggregator(int maxContentLength, boolean closeOnExpectationFailed) {super(maxContentLength);this.closeOnExpectationFailed = closeOnExpectationFailed;}@Overrideprotected HttpMessage tryStartMessage(Object msg) {return msg instanceof HttpMessage ? (HttpMessage) msg : null;}@SuppressWarnings("unchecked")@Overrideprotected HttpContent<C> tryContentMessage(Object msg) {return msg instanceof HttpContent ? (HttpContent<C>) msg : null;}@Overrideprotected boolean isAggregated(Object msg) throws Exception {return msg instanceof FullHttpMessage;}@Overrideprotected int lengthForContent(HttpContent<C> msg) {return msg.payload().readableBytes();}@Overrideprotected int lengthForAggregation(FullHttpMessage<?> msg) {return msg.payload().readableBytes();}@Overrideprotected boolean isLastContentMessage(HttpContent<C> msg) throws Exception {return msg instanceof LastHttpContent;}@Overrideprotected boolean isContentLengthInvalid(HttpMessage start, int maxContentLength) {try {return getContentLength(start, -1L) > maxContentLength;} catch (final NumberFormatException e) {return false;}}// 根據請求的 Expectation 頭判斷是否返回 100 Continue 或錯誤響應(如 417 或 413),并在不支持或內容過大時觸發相應事件private static FullHttpResponse continueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {// 根據請求的 Expectation 頭判斷是否支持期望,若不支持觸發失敗事件返回 417;if (HttpUtil.isUnsupportedExpectation(start)) {pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);return newErrorResponse(EXPECTATION_FAILED, pipeline.channel().bufferAllocator(), true, false);}if (HttpUtil.is100ContinueExpected(start)) {// 若期望 100-continue 且內容長度未超限,返回 100 Continue 響應,if (getContentLength(start, -1L) <= maxContentLength) {return newErrorResponse(CONTINUE, pipeline.channel().bufferAllocator(), false, false);}// 否則觸發失敗事件并返回 413 請求體過大響應。pipeline.fireChannelInboundEvent(HttpExpectationFailedEvent.INSTANCE);return newErrorResponse(REQUEST_ENTITY_TOO_LARGE, pipeline.channel().bufferAllocator(), true, false);}return null;}// 根據請求創建一個 100-continue 響應,并在響應生成后移除請求中的 Expect 頭。@Overrideprotected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {FullHttpResponse response = continueResponse(start, maxContentLength, pipeline);if (response != null) {start.headers().remove(EXPECT);}return response;}// 判斷在收到 100-continue 響應后是否關閉連接,條件是配置了關閉標志且響應表示應忽略后續內容。@Overrideprotected boolean closeAfterContinueResponse(Object msg) {return closeOnExpectationFailed && ignoreContentAfterContinueResponse(msg);}// 判斷響應是否為客戶端錯誤(4xx),如果是,則說明應忽略后續內容。@Overrideprotected boolean ignoreContentAfterContinueResponse(Object msg) {if (msg instanceof HttpResponse) {final HttpResponse httpResponse = (HttpResponse) msg;return httpResponse.status().codeClass() == HttpStatusClass.CLIENT_ERROR;}return false;}// 開始對一個非完整的 HTTP 消息進行聚合,移除分塊傳輸編碼標記,并創建一個對應請求或響應類型的空聚合消息,準備接收后續內容。@Overrideprotected FullHttpMessage<?> beginAggregation(BufferAllocator allocator, HttpMessage start) throws Exception {assert !(start instanceof FullHttpMessage);// 移除 HTTP 消息頭中的 Transfer-Encoding: chunked,以便后續使用聚合后的 Content-LengthHttpUtil.setTransferEncodingChunked(start, false);final CompositeBuffer content = allocator.compose();FullHttpMessage<?> ret;// 根據消息類型創建對應的聚合消息并初始化其 payload 為一個可擴展的空 CompositeBuffer,用于后續追加內容塊。if (start instanceof HttpRequest) {ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);} else if (start instanceof HttpResponse) {ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);} else {throw new Error();}return ret;}// 將內容塊追加到聚合消息中,并在遇到最后一塊時設置尾部頭信息@Overrideprotected void aggregate(BufferAllocator allocator, FullHttpMessage<?> aggregated, HttpContent<C> content) throws Exception {final CompositeBuffer payload = (CompositeBuffer) aggregated.payload();payload.extendWith(content.payload().send());if (content instanceof LastHttpContent) {((AggregatedFullHttpMessage<?>) aggregated).setTrailingHeaders(((LastHttpContent<?>) content).trailingHeaders());}}// 完成聚合時,如果未設置 Content-Length,則自動設置為聚合內容的實際長度。@Overrideprotected void finishAggregation(BufferAllocator allocator, FullHttpMessage<?> aggregated) throws Exception {if (!HttpUtil.isContentLengthSet(aggregated)) {aggregated.headers().set(CONTENT_LENGTH, String.valueOf(aggregated.payload().readableBytes()));}}// 處理超大 HTTP 消息@Overrideprotected void handleOversizedMessage(final ChannelHandlerContext ctx, Object oversized) throws Exception {if (oversized instanceof HttpRequest) {HttpRequest request = (HttpRequest) oversized;// 條件1:如果是完整請求(FullHttpMessage)或者請求既不期待100-continue也不保持連接if (oversized instanceof FullHttpMessage || !HttpUtil.is100ContinueExpected(request) && !HttpUtil.isKeepAlive(request)) {// 發送帶關閉連接指示的413錯誤響應Future<Void> future = ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, true));future.addListener(f -> {if (f.isFailed()) {// 日志打印發送失敗的原因logger.debug("Failed to send a 413 Request Entity Too Large.", f.cause());}// 響應發送后關閉連接ctx.close();});} else {// 條件2:請求期待100-continue或者保持連接時,發送不關閉連接的413響應ctx.writeAndFlush(newErrorResponse(REQUEST_ENTITY_TOO_LARGE, ctx.bufferAllocator(), true, false)).addListener(future -> {if (future.isFailed()) {// 發送失敗時日志記錄并關閉連接logger.debug("Failed to send a 413 Request Entity Too Large.", future.cause());ctx.close();}});}} else if (oversized instanceof HttpResponse) {// 如果是超大的響應,直接拋出異常,可能交由上層處理throw new ResponseTooLargeException("Response entity too large: " + oversized);} else {// 既不是請求也不是響應,視為非法狀態,拋異常throw new IllegalStateException();}}@Overridepublic void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.channelExceptionCaught(ctx, cause);if (cause instanceof ResponseTooLargeException) {ctx.close();}}// 該方法用于生成一個指定狀態碼的空響應,并根據參數決定是否關閉連接和設置內容長度private static FullHttpResponse newErrorResponse(HttpResponseStatus status, BufferAllocator allocator, boolean emptyContent, boolean closeConnection) {// 根據傳入的狀態碼 status,創建一個空內容的 FullHttpResponse,FullHttpResponse resp = new DefaultFullHttpResponse(HTTP_1_1, status, allocator.allocate(0));// 如果 emptyContent 為 true,則設置響應頭 Content-Length 為 0,if (emptyContent) {resp.headers().set(CONTENT_LENGTH, HttpHeaderValues.ZERO);}// 如果 closeConnection 為 true,則設置響應頭 Connection: close,表示連接關閉。if (closeConnection) {resp.headers().set(CONNECTION, HttpHeaderValues.CLOSE);}return resp;}
}
AggregatedFullHttpMessage
private abstract static class AggregatedFullHttpMessage<R extends FullHttpMessage<R>> implements FullHttpMessage<R> {protected final HttpMessage message;private final Buffer payload;private HttpHeaders trailingHeaders;AggregatedFullHttpMessage(HttpMessage message, Buffer payload, HttpHeaders trailingHeaders) {this.message = message;this.payload = payload;this.trailingHeaders = trailingHeaders;}@Overridepublic void close() {payload.close();}@Overridepublic boolean isAccessible() {return payload.isAccessible();}@Overridepublic Buffer payload() {return payload;}@Overridepublic HttpHeaders trailingHeaders() {HttpHeaders trailingHeaders = this.trailingHeaders;return requireNonNullElse(trailingHeaders, HttpHeaders.emptyHeaders());}void setTrailingHeaders(HttpHeaders trailingHeaders) {this.trailingHeaders = trailingHeaders;}@Overridepublic HttpVersion getProtocolVersion() {return message.protocolVersion();}@Overridepublic HttpVersion protocolVersion() {return message.protocolVersion();}@Overridepublic FullHttpMessage<R> setProtocolVersion(HttpVersion version) {message.setProtocolVersion(version);return this;}@Overridepublic HttpHeaders headers() {return message.headers();}@Overridepublic DecoderResult decoderResult() {return message.decoderResult();}@Overridepublic void setDecoderResult(DecoderResult result) {message.setDecoderResult(result);}
}
AggregatedFullHttpRequest
private static final class AggregatedFullHttpRequest extends AggregatedFullHttpMessage<FullHttpRequest> implements FullHttpRequest {AggregatedFullHttpRequest(HttpRequest request, Buffer content, HttpHeaders trailingHeaders) {super(request, content, trailingHeaders);}@Overridepublic Send<FullHttpRequest> send() {return payload().send().map(FullHttpRequest.class,p -> new AggregatedFullHttpRequest(this, p, trailingHeaders()));}@Overridepublic AggregatedFullHttpRequest copy() {return new AggregatedFullHttpRequest(this, payload().copy(), trailingHeaders().copy());}@Overridepublic FullHttpRequest touch(Object hint) {payload().touch(hint);return this;}@Overridepublic FullHttpRequest setMethod(HttpMethod method) {((HttpRequest) message).setMethod(method);return this;}@Overridepublic FullHttpRequest setUri(String uri) {((HttpRequest) message).setUri(uri);return this;}@Overridepublic HttpMethod method() {return ((HttpRequest) message).method();}@Overridepublic String uri() {return ((HttpRequest) message).uri();}@Overridepublic FullHttpRequest setProtocolVersion(HttpVersion version) {super.setProtocolVersion(version);return this;}@Overridepublic String toString() {return HttpMessageUtil.appendFullRequest(new StringBuilder(256), this).toString();}
}
AggregatedFullHttpResponse
private static final class AggregatedFullHttpResponse extends AggregatedFullHttpMessage<FullHttpResponse> implements FullHttpResponse {AggregatedFullHttpResponse(HttpResponse message, Buffer content, HttpHeaders trailingHeaders) {super(message, content, trailingHeaders);}@Overridepublic Send<FullHttpResponse> send() {return payload().send().map(FullHttpResponse.class,p -> new AggregatedFullHttpResponse(this, p, trailingHeaders()));}@Overridepublic AggregatedFullHttpResponse copy() {return new AggregatedFullHttpResponse(this, payload().copy(), trailingHeaders().copy());}@Overridepublic FullHttpResponse touch(Object hint) {payload().touch(hint);return this;}@Overridepublic FullHttpResponse setStatus(HttpResponseStatus status) {((HttpResponse) message).setStatus(status);return this;}@Overridepublic HttpResponseStatus status() {return ((HttpResponse) message).status();}@Overridepublic FullHttpResponse setProtocolVersion(HttpVersion version) {super.setProtocolVersion(version);return this;}@Overridepublic String toString() {return HttpMessageUtil.appendFullResponse(new StringBuilder(256), this).toString();}
}