一、簡介
我們之前其實已經完成了關于grpc的一些基礎用法,實際上還有一些比較相對進階的使用方式。比如:
- 攔截器:包括客戶端和服務端的攔截器,進而在每一端都可以劃分為流式的攔截器和非流式的攔截器。和以前我們在spring web中的攔截器思路是一樣的,都可以在請求來之前做一些統一的處理,進而減少代碼量,做一些鑒權 ,數據校驗 ,限流等等,和業務解耦。
gRPC的攔截器- 一元請求的 攔截器
客戶端 【請求 響應】
服務端 【請求 響應】 - 流式請求的 攔截器 (Stream Tracer)
客戶端 【請求 響應】
服務端 【請求 響應】
- 一元請求的 攔截器
- 客戶端重試:grpc的客戶端還可以發起重試請求,當我們有一些異常并非代碼異常的時候,可以通過重試來避免問題。
- NameResolver :當用于微服務的時候,需要注冊中心對服務名的解析等等。
- 負載均衡:包括(pick-first , 輪訓)等輪訓方式。
- 可以在其他微服務框架中整合,比如dubbo中,spring cloud中,用protobuf來序列化數據,用grpc來發起rpc(比如可以替代open fegin)等場合。
下面我們就來從攔截器功能開始學習一下grpc。
二、項目構建
我們為進階篇搭建一個新的工程。結構還是客戶端,服務端,api模塊。
其中api模塊作為公共內容被其他模塊引入做公共的聲明使用。
api模塊: rpc-grpc-adv-api
服務端模塊:rpc-grpc-adv-server
客戶端模塊: rpc-grpc-adv-client
1、api模塊
<dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.51.0</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.51.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.51.0</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency>
</dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact><outputDirectory>${basedir}/src/main/java</outputDirectory><clearOutputDirectory>false</clearOutputDirectory></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins>
</build>
在main目錄下創建proto目錄,下建立Hello.proto文件,聲明內容為。
syntax = "proto3";package com.levi;option java_multiple_files = false;
option java_package = "com.levi";
option java_outer_classname = "HelloProto";message HelloRequest{string name = 1;
}message HelloRespnose{string result = 1;
}service HelloService{// 普通方法rpc hello(HelloRequest) returns (HelloRespnose);// 雙端流方法rpc hello1(stream HelloRequest) returns (stream HelloRespnose);
}
然后通過編譯器編譯生成對應的message類HelloProto.java和service類HelloServiceGrpc.java。
然后該模塊將會被其他模塊引用,使用這些定義的類。
2、server模塊
引入api模塊。
<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>
服務端業務代碼為:
@Slf4j
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) {String name = request.getName();System.out.println("接收到客戶端的參數name = " + name);responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult("this is server result").build());responseObserver.onCompleted();}
}
服務端啟動代碼為:
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}
3、client模塊
引入api模塊。
<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>
ok,至此就搭建完了項目結構。
三、攔截器
1、一元攔截器
其中一元攔截器就是在我們以前的一元通信模式使用的。也就是非流式的通信模式下。
而一元攔截器也分為兩種:客戶端攔截器和服務端攔截器
每一種下面又能分為兩種
1.簡單模式:只能攔截請求,不能攔截響應。
2.復雜模式:可以攔截請求和響應兩種。
下面我們先來研究客戶端攔截器。
1.1、客戶端攔截器
我們說客戶端攔截器又分為簡單模式和復雜模式。
1.1.1、簡單客戶端攔截器
我們來開發客戶端的代碼,首先我們來編寫一個簡單攔截器。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/**
* 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor
* 該攔截器在客戶端發起請求時被調用,
* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等
*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/return next.newCall(method, callOptions);
}
我們定義好攔截器之后就要整合在客戶端調用的構建上。
package com.levi;import com.levi.interceptor.CustomClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;public class GrpcClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000)// .intercept(new CustomClientInterceptor())// 可以傳遞多個攔截器,按照傳遞順序執行攔截器.intercept(List.of(new CustomClientInterceptor())).usePlaintext().build();try {HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel);HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder().setName("levi").build();HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(helloRequest);System.out.println("接收到的服務端響應為: " + helloRespnose.getResult());} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}
啟動服務端,啟動客戶端,日志顯示正常輸出沒毛病。
但是此時我們也看出來這種簡單模式存在幾個問題。
# 客戶端簡單攔截器的問題
1. 只能攔截請求,不能攔截響應。我們只能在請求的時候發起攔截,但是接收響應的時候無法攔截,也就是類似spring mvc的時候沒有后置的攔截能力。
2. 即使攔截了請求操作,但是就這個請求攔截上,這個業務粒度也是過于寬泛,不精準。無法在請求的各個階段發起攔截(1. 開始階段 2. 設置消息數量 3.發送數據階段 4.半連接階段。),其實我們上面的代碼可以看出來我們的攔截器是在往下傳遞ClientCall給grpc,也就是這個調用最后是ClientCall完成的。這里的各個階段攔截其實就是在ClientCall的各個方法里面插入一些攔截操縱,其實就是在發起請求前,在ClientCall構建的各個階段攔截一下(這個的底層應該是netty那些階段性的事件感知實現的。)
裝飾者模式增強了一下。
1.1.2、復雜客戶端攔截器
我們前面操作的簡單客戶端請求攔截器粒度比較大,無法實現對請求過程的更加細力度的監聽和管理。所以我們需要一個更加強大的攔截器。我們說白了就是對原來能正常請求中間加一些增強方法,其實就是裝飾者模式,包裝一下原始類型。在原始類型的基礎上加了一堆方法分別在各個階段生效,從而來增強原始能力。但是真正的rpc調用實現還是原始類型發起的。
請求攔截
于是我們來寫一下代碼。
/*這個類型增強原始類型 適用于控制 攔截 請求發送各個環節*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式delegate().start(responseListener, headers);}
}
我們看到這就是一個增強的包裝類,他是對原始的簡單攔截器的那個ClientCall的包裝。我們看到它在后續的動作之前增強了一個檢查的實現。
然后你鑰匙要繼續就一定要delegate().start才會往下走。否則沒啟動ClientCall會報錯。
ok,我們已經完成了增強ClientCall的開發,現在要把原來的攔截器方法里面的簡單ClientCall替換為增強ClientCall。
我們來修改攔截器代碼。
/*** 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor* 該攔截器在客戶端發起請求時被調用,* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/// return next.newCall(method, callOptions);/** 如果我們需要用復雜客戶端攔截器 ,就需要對原始的ClientCall進行包裝* 那么這個時候,就不能返回原始ClientCall對象,* 應該返回 包裝的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}
于是,啟動服務端代碼,客戶端代碼觀察執行結果。
沒有問題。
此外這個增強攔截還有更加細粒度的方法增強,我們來實現一下。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;import javax.annotation.Nullable;/*** 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor* 該攔截器在客戶端發起請求時被調用,* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/// return next.newCall(method, callOptions);/** 如果我們需要用復雜客戶端攔截器 ,就需要對原始的ClientCall進行包裝* 那么這個時候,就不能返回原始ClientCall對象,* 應該返回 包裝的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}/*這個類型增強原始類型 適用于控制 攔截 請求發送各個環節*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式delegate().start(responseListener, headers);}// 真正開始發送消息,netty的發送消息的方法,outBoundBuffer@Overridepublic void sendMessage(ReqT message) {log.info("發送請求數據: {}", message);super.sendMessage(message);}// 指定發送消息的數量,類似批量發送@Overridepublic void request(int numMessages) {log.info("指定發送消息的數量: {}", numMessages);super.request(numMessages);}// 取消請求的時候回調觸發@Overridepublic void cancel(@Nullable String message, @Nullable Throwable cause) {log.info("取消請求: {}", message);super.cancel(message, cause);}// 鏈接半關閉的時候回調觸發,請求消息無法發送,但是可以接受響應的消息@Overridepublic void halfClose() {log.info("鏈接半關閉");super.halfClose();}// 消息發送是否啟用壓縮@Overridepublic void setMessageCompression(boolean enabled) {log.info("消息發送是否啟用壓縮: {}", enabled);super.setMessageCompression(enabled);}// 是否可以發送消息,這個在流式里面會調用,一元的不會@Overridepublic boolean isReady() {log.info("是否可以發送消息: {}", super.isReady());return super.isReady();}
}
運行程序結果為:
至此我們看到我們在客戶端請求的各個階段都進行了監聽回調。這就是客戶端的請求增強了。
響應攔截
前面我們完成的是對于請求的攔截,其實我們可以在客戶端這里對服務端響應的攔截。我們可以攔截響應數據,這個能力可以讓我們在不同的客戶端定制自己的攔截需求。服務端不管你的需求,都返回,你不同的客戶端可能有不同的要求,自己去做攔截定制。
我們先來看一下我們之前的那個請求增強。
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");delegate().start(responseListener, headers);}
}
你有請求攔截才有響應攔截,而且響應攔截我們一般都是通過監聽器來實現的,因為客戶端你也不知道你啥時候響應,所以就需要監聽回調的形式來監聽。我們看到在checkedStart這個方法這里。他的參數列表里面有一個responseListener,響應監聽器。其實就是這個東西,我們需要重新實現他,然后傳進去,他就會在checkedStart調用的時候傳遞給grpc,grpc就根據你的實現來攔截了。現在他是一個responseListener,啥也沒有,你要想攔截功能還是要增強包裝。所以我們來實現一下。
/*用于監聽響應,并對響應進行攔截,其中響應頭回來onHeaders被調用是服務端的responseObserver.onNext這個調用觸發的。而服務端調用responseObserver.onCompleted()才會回調onMessage這個。對應的其實就是netty的write和flush,responseObserver.onCompleted()才會真的flush,把數據寫回來。可以在服務端做修改測試一下。*/
@Slf4j
class CustomCallListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {// 構造器包裝原始的Listener,下面的回調實現包裝增強。protected CustomCallListener(ClientCall.Listener<RespT> delegate) {super(delegate);}@Overridepublic void onHeaders(Metadata headers) {log.info("響應頭信息 回來了......");super.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {log.info("響應的數據 回來了.....{} ", message);super.onMessage(message);}// 這個在流式里面會調用,一元的不會,可以不實現@Overridepublic void onReady() {super.onReady();}//這個在流式里面會調用,一元的不會,可以不實現@Overridepublic void onClose(Status status, Metadata trailers) {super.onClose(status, trailers);}
}
通過構造函數執行包裝,然后再包裝里面增強。此時我們只需要替代delegate().start(responseListener, headers);中的參數responseListener為我們自己定義的就好了。
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式// delegate().start(responseListener, headers);delegate().start(new CustomCallListener<>(responseListener), headers);// 傳入增強響應攔截}...... 省略其余代碼
}
執行沒有問題。
1.2、服務端攔截器
1.2.1、服務端簡單攔截器
對應客戶端那邊的攔截器,服務端這里其實是對應的,該有的都有。我們來看下服務端的簡單攔截器。
/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");// 返回req請求監聽器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);return reqTListener;}
}
這里的概念我們都可以在客戶端那里找到對應的。我們看到這個方法返回了一個ServerCall.Listener 類型的,而且泛型是req,可見是對于請求的監聽器。作為服務端,他是被動連接的,所以她的攔截方式就是監聽,什么時候來我什么時候攔截,他不知道你啥時候來,就只能監聽著。而next.startCall(call, headers);返回的就是一個具有原始能力的攔截器,沒有封裝增強的。我們把這個攔截器整合到服務端發布代碼中使其生效。
package com.levi;import com.levi.interceptor.CustomServerInterceptor;
import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());// 注冊自定義攔截器serverBuilder.intercept(new CustomServerInterceptor());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}
我們把自定義的攔截器注冊進去之后啟動服務端和客戶端看一下。沒有問題。
同樣的服務端的簡單攔截器也存在像客戶端那邊的問題,
攔截請求發送過來的數據,無法處理響應的數據。
攔截力度過于寬泛所以我么需要復雜攔截器,增強原始的攔截器,達到更加細力度的控制攔截。一切都和當初我們在客戶端做的一樣,重新定義一個增強的。
1.2.2、服務端復雜攔截器
攔截請求,攔截的是客戶端過來的消息
/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");// 返回req請求監聽器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器// ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);// 包裝器設計模式,封裝原始的監聽器,增強原始監聽器的功能,實際的核心調用還是原始的在做// 只是加了一些額外的增強的方法return new CustomServerCallListener<>(next.startCall(call, headers));}
}/*** 復雜服務端攔截器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器* 對于reqTListener的事件,我們可以在事件觸發時,做一些自定義的操作,* 本質是對于原始監聽器的一個包裝增強,包裝器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//準備接受請求數據public void onReady() {log.debug("onRead Method Invoke,準備好接收客戶端數據....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客戶端請求提交的數據,客戶端的請求數據是: {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("監聽到了 半連接觸發這個操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服務端 調用onCompleted()觸發...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出現異常后 會調用這個方法... 可以在這里做一些關閉資源的操作");super.onCancel();}
}
在經歷了客戶端的開發之后,我們這里其實就很好理解了。調用沒有問題。
攔截響應,攔截的是服務端發給客戶端的響應
我們能攔截請求,自然也就能攔截響應。我們先來看一下什么是服務端的響應,其實就是服務端回寫給客戶端的操作。也就是服務端調用客戶端的操作,我們上面攔截請求其實是客戶端發給服務端,也即是服務端的監聽ServerCall.Listener,服務端的監聽器做包裝增強。
現在你要增強服務端對客戶端的調用其實就是ServerCall(這里對應我們客戶端那里的ClientCall)。所以我們要對ServerCall做包裝。就是你誰干啥就增強啥就實現啥就行。
/*** 目的:通過自定義的ServerCall 包裝原始的ServerCall 增加對于響應攔截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {// 這里包裝的是ServerCallprotected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定發送消息的數量 【響應消息】public void request(int numMessages) {log.debug("response 指定消息的數量 【request】");super.request(numMessages);}@Override//設置響應頭public void sendHeaders(Metadata headers) {log.debug("response 設置響應頭 【sendHeaders】");super.sendHeaders(headers);}@Override//響應數據public void sendMessage(RespT message) {log.debug("response 響應數據 【send Message 】 {} ", message);super.sendMessage(message);}@Override//關閉連接public void close(Status status, Metadata trailers) {log.debug("response 關閉連接 【close】");super.close(status, trailers);}
}
然后我們再把這個包裝增強整合到攔截器里面,交給grpc的體系中才能生效,在interceptCall中進行整合,我們不需要改動服務端發布那里的代碼,那里可以直接通過CustomServerInterceptor來處理我們這里整合到的兩個攔截器,以下為完整代碼。
package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");//包裝ServerCall 處理服務端響應攔截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包裝Listener 處理服務端請求攔截CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));return reqTCustomServerCallListener;}
}/*** 復雜服務端攔截器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器* 對于reqTListener的事件,我們可以在事件觸發時,做一些自定義的操作,* 本質是對于原始監聽器的一個包裝增強,包裝器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//準備接受請求數據public void onReady() {log.debug("onRead Method Invoke,準備好接收客戶端數據....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客戶端請求提交的數據,客戶端的請求數據是: {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("監聽到了 半連接觸發這個操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服務端 調用onCompleted()觸發...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出現異常后 會調用這個方法... 可以在這里做一些關閉資源的操作");super.onCancel();}
}/*** 通過自定義的ServerCall 包裝原始的ServerCall 增加對于響應攔截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {protected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定發送消息的數量 【響應消息】public void request(int numMessages) {log.debug("response 指定消息的數量 【request】");super.request(numMessages);}@Override//設置響應頭public void sendHeaders(Metadata headers) {log.debug("response 設置響應頭 【sendHeaders】");super.sendHeaders(headers);}@Override//響應數據public void sendMessage(RespT message) {log.debug("response 響應數據 【send Message 】 {} ", message);super.sendMessage(message);}@Override//關閉連接public void close(Status status, Metadata trailers) {log.debug("response 關閉連接 【close】");super.close(status, trailers);}
}
而且我們看到增強類都是要實現構造的,因為要傳進去原始類,進行封裝,調用核心方法還是走super,走那個原始的操作。你的增強的操作可以加在這些新的方法里面。這些增強方法,你可以酌情看你要的業務方法,需要的就實現,不需要就可以不覆蓋實現。其實他就是服務端的響應的各個階段不同的觸發。
我們運行代碼沒有問題,各個階段都被觸發了。
對于你要是想只攔截響應,不攔截請求可以這么做。
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");//包裝ServerCall 處理服務端響應攔截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包裝Listener 處理服務端請求攔截CustomServerCallListener<ReqT> reqTCustomServerCallListener =new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));// return reqTCustomServerCallListener;/*** 只攔截響應,我們就不需要包裝Listener,也就是返回原始的Listener即可。原始的Listener我們是通過* next.startCall(reqTRespTCustomServerCall, headers)獲取到的。所以繼續用next.startCall不操作包裝的* Listener即可,但是我們要包裝響應也就是serverCall,所以返回reqTRespTCustomServerCall。包在原始Listener中* 你要是包裝請求,那就是需要包裝的Listener,不需要就直接next.startCall返回startCall即可。*/return next.startCall(reqTRespTCustomServerCall, headers);}
明白請求是包裝的listener,響應是servercall,需要哪個就加強哪個就行,不需要增強攔截就用原始的就行。
四、總結
這就是grpc中比較常見的一元攔截器的使用,他是對于一元rpc的攔截。在各個攔截方法中我們可以定義一些自己的業務方法。進而靈活使用攔截器。而且你要是在某個點攔截之后不想繼續往下走,那你就不要調用每個攔截方法的super,不要做后續的調用,直接斷開鏈路即可。而且至于攔截請求還是響應就看你包裝啥就完了,他不是耦合在一起的。
后面我們再來分析監聽流也就是流式和雙向調用的攔截器。