【橘子分布式】gRPC(番外篇-攔截器)

一、簡介

我們之前其實已經完成了關于grpc的一些基礎用法,實際上還有一些比較相對進階的使用方式。比如:

  • 攔截器:包括客戶端和服務端的攔截器,進而在每一端都可以劃分為流式的攔截器和非流式的攔截器。和以前我們在spring web中的攔截器思路是一樣的,都可以在請求來之前做一些統一的處理,進而減少代碼量,做一些鑒權 ,數據校驗 ,限流等等,和業務解耦。
    gRPC的攔截器
    1. 一元請求的 攔截器
      客戶端 【請求 響應】
      服務端 【請求 響應】
    2. 流式請求的 攔截器 (Stream Tracer)
      客戶端 【請求 響應】
      服務端 【請求 響應】
  • 客戶端重試:grpc的客戶端還可以發起重試請求,當我們有一些異常并非代碼異常的時候,可以通過重試來避免問題。
  • NameResolver :當用于微服務的時候,需要注冊中心對服務名的解析等等。
  • 負載均衡:包括(pick-first , 輪訓)等輪訓方式。
  • 可以在其他微服務框架中整合,比如dubbo中,spring cloud中,用protobuf來序列化數據,用grpc來發起rpc(比如可以替代open fegin)等場合。

下面我們就來從攔截器功能開始學習一下grpc。

二、項目構建

我們為進階篇搭建一個新的工程。結構還是客戶端,服務端,api模塊。
其中api模塊作為公共內容被其他模塊引入做公共的聲明使用。

api模塊: rpc-grpc-adv-api
服務端模塊:rpc-grpc-adv-server
客戶端模塊: rpc-grpc-adv-client

1、api模塊

<dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.51.0</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.51.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.51.0</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency>
</dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact><outputDirectory>${basedir}/src/main/java</outputDirectory><clearOutputDirectory>false</clearOutputDirectory></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins>
</build>

在main目錄下創建proto目錄,下建立Hello.proto文件,聲明內容為。

syntax = "proto3";package com.levi;option java_multiple_files = false;
option java_package = "com.levi";
option java_outer_classname = "HelloProto";message HelloRequest{string name = 1;
}message HelloRespnose{string result = 1;
}service HelloService{// 普通方法rpc hello(HelloRequest) returns (HelloRespnose);// 雙端流方法rpc hello1(stream HelloRequest) returns (stream HelloRespnose);
}

然后通過編譯器編譯生成對應的message類HelloProto.java和service類HelloServiceGrpc.java。
然后該模塊將會被其他模塊引用,使用這些定義的類。

2、server模塊

引入api模塊。

<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

服務端業務代碼為:

@Slf4j
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) {String name = request.getName();System.out.println("接收到客戶端的參數name = " + name);responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult("this is server result").build());responseObserver.onCompleted();}
}

服務端啟動代碼為:

package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}

3、client模塊

引入api模塊。

<dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-adv-api</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

ok,至此就搭建完了項目結構。

三、攔截器

1、一元攔截器

其中一元攔截器就是在我們以前的一元通信模式使用的。也就是非流式的通信模式下。
而一元攔截器也分為兩種:客戶端攔截器和服務端攔截器
每一種下面又能分為兩種
1.簡單模式:只能攔截請求,不能攔截響應。
2.復雜模式:可以攔截請求和響應兩種。
下面我們先來研究客戶端攔截器。

1.1、客戶端攔截器

我們說客戶端攔截器又分為簡單模式和復雜模式。

1.1.1、簡單客戶端攔截器

我們來開發客戶端的代碼,首先我們來編寫一個簡單攔截器。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/**
* 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor
* 該攔截器在客戶端發起請求時被調用,
* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等
*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/return next.newCall(method, callOptions);
}

我們定義好攔截器之后就要整合在客戶端調用的構建上。

package com.levi;import com.levi.interceptor.CustomClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;public class GrpcClient {public static void main(String[] args) {ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000)// .intercept(new CustomClientInterceptor())// 可以傳遞多個攔截器,按照傳遞順序執行攔截器.intercept(List.of(new CustomClientInterceptor())).usePlaintext().build();try {HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel);HelloProto.HelloRequest helloRequest = HelloProto.HelloRequest.newBuilder().setName("levi").build();HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(helloRequest);System.out.println("接收到的服務端響應為: " + helloRespnose.getResult());} catch (Exception e) {e.printStackTrace();} finally {managedChannel.shutdown();}}
}

啟動服務端,啟動客戶端,日志顯示正常輸出沒毛病。
但是此時我們也看出來這種簡單模式存在幾個問題。

# 客戶端簡單攔截器的問題
1. 只能攔截請求,不能攔截響應。我們只能在請求的時候發起攔截,但是接收響應的時候無法攔截,也就是類似spring mvc的時候沒有后置的攔截能力。
2. 即使攔截了請求操作,但是就這個請求攔截上,這個業務粒度也是過于寬泛,不精準。無法在請求的各個階段發起攔截(1. 開始階段 2. 設置消息數量 3.發送數據階段 4.半連接階段。),其實我們上面的代碼可以看出來我們的攔截器是在往下傳遞ClientCall給grpc,也就是這個調用最后是ClientCall完成的。這里的各個階段攔截其實就是在ClientCall的各個方法里面插入一些攔截操縱,其實就是在發起請求前,在ClientCall構建的各個階段攔截一下(這個的底層應該是netty那些階段性的事件感知實現的。)
裝飾者模式增強了一下。

在這里插入圖片描述

1.1.2、復雜客戶端攔截器

我們前面操作的簡單客戶端請求攔截器粒度比較大,無法實現對請求過程的更加細力度的監聽和管理。所以我們需要一個更加強大的攔截器。我們說白了就是對原來能正常請求中間加一些增強方法,其實就是裝飾者模式,包裝一下原始類型。在原始類型的基礎上加了一堆方法分別在各個階段生效,從而來增強原始能力。但是真正的rpc調用實現還是原始類型發起的。

請求攔截

于是我們來寫一下代碼。

/*這個類型增強原始類型 適用于控制 攔截 請求發送各個環節*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式delegate().start(responseListener, headers);}
}

我們看到這就是一個增強的包裝類,他是對原始的簡單攔截器的那個ClientCall的包裝。我們看到它在后續的動作之前增強了一個檢查的實現。
然后你鑰匙要繼續就一定要delegate().start才會往下走。否則沒啟動ClientCall會報錯。
ok,我們已經完成了增強ClientCall的開發,現在要把原來的攔截器方法里面的簡單ClientCall替換為增強ClientCall。
我們來修改攔截器代碼。

/*** 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor* 該攔截器在客戶端發起請求時被調用,* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/// return next.newCall(method, callOptions);/**  如果我們需要用復雜客戶端攔截器 ,就需要對原始的ClientCall進行包裝*  那么這個時候,就不能返回原始ClientCall對象,*  應該返回 包裝的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}

于是,啟動服務端代碼,客戶端代碼觀察執行結果。
在這里插入圖片描述
沒有問題。
此外這個增強攔截還有更加細粒度的方法增強,我們來實現一下。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;import javax.annotation.Nullable;/*** 自定義客戶端攔截器,需要實現grpc提供的攔截器接口ClientInterceptor* 該攔截器在客戶端發起請求時被調用,* 可以在該攔截器中對請求進行處理,比如添加請求頭、修改請求參數等*/
@Slf4j
public class CustomClientInterceptor implements ClientInterceptor {@Overridepublic <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {log.debug("模擬業務處理,這是一個攔截啟動的處理 ,統一的做了一些操作 ....");/** 攔截器在客戶端發起stub的rpc調用之前被調用。處理完之后往下傳,把本次調用的一些信息繼續往下傳* 把調用交給grpc,所以需要傳遞下去調用方法的元信息和一些選項* 其實就是攔截器方法的MethodDescriptor<ReqT, RespT> method, CallOptions callOptions* 然后往下傳是用來發起調用的,底層基于netty,所以需要傳遞Channel next(這是netty調用的基礎連接)* 所以需要返回一個ClientCall,封裝元信息,然后交給grpc,用來發起調用*/// return next.newCall(method, callOptions);/**  如果我們需要用復雜客戶端攔截器 ,就需要對原始的ClientCall進行包裝*  那么這個時候,就不能返回原始ClientCall對象,*  應該返回 包裝的ClientCall ---> CustomForwardingClientClass*/return new CustomForwardingClientClass<>(next.newCall(method, callOptions));}
}/*這個類型增強原始類型 適用于控制 攔截 請求發送各個環節*/
@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式delegate().start(responseListener, headers);}// 真正開始發送消息,netty的發送消息的方法,outBoundBuffer@Overridepublic void sendMessage(ReqT message) {log.info("發送請求數據: {}", message);super.sendMessage(message);}// 指定發送消息的數量,類似批量發送@Overridepublic void request(int numMessages) {log.info("指定發送消息的數量: {}", numMessages);super.request(numMessages);}// 取消請求的時候回調觸發@Overridepublic void cancel(@Nullable String message, @Nullable Throwable cause) {log.info("取消請求: {}", message);super.cancel(message, cause);}// 鏈接半關閉的時候回調觸發,請求消息無法發送,但是可以接受響應的消息@Overridepublic void halfClose() {log.info("鏈接半關閉");super.halfClose();}// 消息發送是否啟用壓縮@Overridepublic void setMessageCompression(boolean enabled) {log.info("消息發送是否啟用壓縮: {}", enabled);super.setMessageCompression(enabled);}// 是否可以發送消息,這個在流式里面會調用,一元的不會@Overridepublic boolean isReady() {log.info("是否可以發送消息: {}", super.isReady());return super.isReady();}
}

運行程序結果為:
在這里插入圖片描述
至此我們看到我們在客戶端請求的各個階段都進行了監聽回調。這就是客戶端的請求增強了。

響應攔截

前面我們完成的是對于請求的攔截,其實我們可以在客戶端這里對服務端響應的攔截。我們可以攔截響應數據,這個能力可以讓我們在不同的客戶端定制自己的攔截需求。服務端不管你的需求,都返回,你不同的客戶端可能有不同的要求,自己去做攔截定制。
我們先來看一下我們之前的那個請求增強。

@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");delegate().start(responseListener, headers);}
}

你有請求攔截才有響應攔截,而且響應攔截我們一般都是通過監聽器來實現的,因為客戶端你也不知道你啥時候響應,所以就需要監聽回調的形式來監聽。我們看到在checkedStart這個方法這里。他的參數列表里面有一個responseListener,響應監聽器。其實就是這個東西,我們需要重新實現他,然后傳進去,他就會在checkedStart調用的時候傳遞給grpc,grpc就根據你的實現來攔截了。現在他是一個responseListener,啥也沒有,你要想攔截功能還是要增強包裝。所以我們來實現一下。

/*用于監聽響應,并對響應進行攔截,其中響應頭回來onHeaders被調用是服務端的responseObserver.onNext這個調用觸發的。而服務端調用responseObserver.onCompleted()才會回調onMessage這個。對應的其實就是netty的write和flush,responseObserver.onCompleted()才會真的flush,把數據寫回來。可以在服務端做修改測試一下。*/
@Slf4j
class CustomCallListener<RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {// 構造器包裝原始的Listener,下面的回調實現包裝增強。protected CustomCallListener(ClientCall.Listener<RespT> delegate) {super(delegate);}@Overridepublic void onHeaders(Metadata headers) {log.info("響應頭信息 回來了......");super.onHeaders(headers);}@Overridepublic void onMessage(RespT message) {log.info("響應的數據 回來了.....{} ", message);super.onMessage(message);}// 這個在流式里面會調用,一元的不會,可以不實現@Overridepublic void onReady() {super.onReady();}//這個在流式里面會調用,一元的不會,可以不實現@Overridepublic void onClose(Status status, Metadata trailers) {super.onClose(status, trailers);}
}

通過構造函數執行包裝,然后再包裝里面增強。此時我們只需要替代delegate().start(responseListener, headers);中的參數responseListener為我們自己定義的就好了。

@Slf4j
class CustomForwardingClientClass<ReqT, RespT> extends ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT> {/*** 構造器模式,需要實現構造函數,傳入原始類型,進行增強類型的包裝*/protected CustomForwardingClientClass(ClientCall<ReqT, RespT> delegate) {super(delegate);}/*** 開始調用,目的 看一個這個RPC請求是不是可以被發起。比如加一些鑒權等功能來判斷是不是可以調用,如果不可以直接* 返回responseListener.onClose(Status.INTERNAL, new Metadata());* 否則就發起請求delegate().start(responseListener, headers);*/protected void checkedStart(Listener<RespT> responseListener, Metadata headers) throws Exception {log.debug("發送請求數據之前的檢查.....");//真正的去發起grpc的請求// 是否真正發送grpc的請求,取決這個start方法的調用,delegate()就是原始類型,可以通過構造函數來看到// delegate()就是原始類型那個之前簡單調用的ClientCall,這就是裝飾器模式// delegate().start(responseListener, headers);delegate().start(new CustomCallListener<>(responseListener), headers);// 傳入增強響應攔截}...... 省略其余代碼
}

執行沒有問題。
在這里插入圖片描述

1.2、服務端攔截器

1.2.1、服務端簡單攔截器

對應客戶端那邊的攔截器,服務端這里其實是對應的,該有的都有。我們來看下服務端的簡單攔截器。

/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");// 返回req請求監聽器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);return reqTListener;}
}

這里的概念我們都可以在客戶端那里找到對應的。我們看到這個方法返回了一個ServerCall.Listener 類型的,而且泛型是req,可見是對于請求的監聽器。作為服務端,他是被動連接的,所以她的攔截方式就是監聽,什么時候來我什么時候攔截,他不知道你啥時候來,就只能監聽著。而next.startCall(call, headers);返回的就是一個具有原始能力的攔截器,沒有封裝增強的。我們把這個攔截器整合到服務端發布代碼中使其生效。

package com.levi;import com.levi.interceptor.CustomServerInterceptor;
import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer {public static void main(String[] args) throws InterruptedException, IOException {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);serverBuilder.addService(new HelloServiceImpl());// 注冊自定義攔截器serverBuilder.intercept(new CustomServerInterceptor());Server server = serverBuilder.build();server.start();server.awaitTermination();}
}

我們把自定義的攔截器注冊進去之后啟動服務端和客戶端看一下。沒有問題。
在這里插入圖片描述
同樣的服務端的簡單攔截器也存在像客戶端那邊的問題,
攔截請求發送過來的數據,無法處理響應的數據。
攔截力度過于寬泛
所以我么需要復雜攔截器,增強原始的攔截器,達到更加細力度的控制攔截。一切都和當初我們在客戶端做的一樣,重新定義一個增強的。

1.2.2、服務端復雜攔截器

攔截請求,攔截的是客戶端過來的消息

/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");// 返回req請求監聽器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器// ServerCall.Listener<ReqT> reqTListener = next.startCall(call, headers);// 包裝器設計模式,封裝原始的監聽器,增強原始監聽器的功能,實際的核心調用還是原始的在做// 只是加了一些額外的增強的方法return new CustomServerCallListener<>(next.startCall(call, headers));}
}/*** 復雜服務端攔截器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器* 對于reqTListener的事件,我們可以在事件觸發時,做一些自定義的操作,* 本質是對于原始監聽器的一個包裝增強,包裝器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//準備接受請求數據public void onReady() {log.debug("onRead Method Invoke,準備好接收客戶端數據....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客戶端請求提交的數據,客戶端的請求數據是:  {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("監聽到了 半連接觸發這個操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服務端 調用onCompleted()觸發...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出現異常后 會調用這個方法... 可以在這里做一些關閉資源的操作");super.onCancel();}
}

在經歷了客戶端的開發之后,我們這里其實就很好理解了。調用沒有問題。
在這里插入圖片描述

攔截響應,攔截的是服務端發給客戶端的響應

我們能攔截請求,自然也就能攔截響應。我們先來看一下什么是服務端的響應,其實就是服務端回寫給客戶端的操作。也就是服務端調用客戶端的操作,我們上面攔截請求其實是客戶端發給服務端,也即是服務端的監聽ServerCall.Listener,服務端的監聽器做包裝增強。
現在你要增強服務端對客戶端的調用其實就是ServerCall(這里對應我們客戶端那里的ClientCall)。所以我們要對ServerCall做包裝。就是你誰干啥就增強啥就實現啥就行。

/*** 目的:通過自定義的ServerCall 包裝原始的ServerCall 增加對于響應攔截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {// 這里包裝的是ServerCallprotected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定發送消息的數量 【響應消息】public void request(int numMessages) {log.debug("response 指定消息的數量 【request】");super.request(numMessages);}@Override//設置響應頭public void sendHeaders(Metadata headers) {log.debug("response 設置響應頭 【sendHeaders】");super.sendHeaders(headers);}@Override//響應數據public void sendMessage(RespT message) {log.debug("response 響應數據  【send Message 】 {} ", message);super.sendMessage(message);}@Override//關閉連接public void close(Status status, Metadata trailers) {log.debug("response 關閉連接 【close】");super.close(status, trailers);}
}

然后我們再把這個包裝增強整合到攔截器里面,交給grpc的體系中才能生效,在interceptCall中進行整合,我們不需要改動服務端發布那里的代碼,那里可以直接通過CustomServerInterceptor來處理我們這里整合到的兩個攔截器,以下為完整代碼。

package com.levi.interceptor;import io.grpc.*;
import lombok.extern.slf4j.Slf4j;/*** 自定義服務端攔截器*/
@Slf4j
public class CustomServerInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");//包裝ServerCall 處理服務端響應攔截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包裝Listener   處理服務端請求攔截CustomServerCallListener<ReqT> reqTCustomServerCallListener = new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));return reqTCustomServerCallListener;}
}/*** 復雜服務端攔截器,用于監聽服務端req請求的事件,reqTListener就是原始的攔截監聽器* 對于reqTListener的事件,我們可以在事件觸發時,做一些自定義的操作,* 本質是對于原始監聽器的一個包裝增強,包裝器模式*/
@Slf4j
class CustomServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {protected CustomServerCallListener(ServerCall.Listener<ReqT> delegate) {super(delegate);}@Override//準備接受請求數據public void onReady() {log.debug("onRead Method Invoke,準備好接收客戶端數據....");super.onReady();}@Overridepublic void onMessage(ReqT message) {log.debug("接受到了客戶端請求提交的數據,客戶端的請求數據是:  {} ", message);super.onMessage(message);}@Overridepublic void onHalfClose() {log.debug("監聽到了 半連接觸發這個操作...");super.onHalfClose();}@Overridepublic void onComplete() {log.debug("服務端 調用onCompleted()觸發...");super.onComplete();}@Overridepublic void onCancel() {log.debug("出現異常后 會調用這個方法... 可以在這里做一些關閉資源的操作");super.onCancel();}
}/*** 通過自定義的ServerCall 包裝原始的ServerCall 增加對于響應攔截的功能*/
@Slf4j
class CustomServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {protected CustomServerCall(ServerCall<ReqT, RespT> delegate) {super(delegate);}@Override//指定發送消息的數量 【響應消息】public void request(int numMessages) {log.debug("response 指定消息的數量 【request】");super.request(numMessages);}@Override//設置響應頭public void sendHeaders(Metadata headers) {log.debug("response 設置響應頭 【sendHeaders】");super.sendHeaders(headers);}@Override//響應數據public void sendMessage(RespT message) {log.debug("response 響應數據  【send Message 】 {} ", message);super.sendMessage(message);}@Override//關閉連接public void close(Status status, Metadata trailers) {log.debug("response 關閉連接 【close】");super.close(status, trailers);}
}

而且我們看到增強類都是要實現構造的,因為要傳進去原始類,進行封裝,調用核心方法還是走super,走那個原始的操作。你的增強的操作可以加在這些新的方法里面。這些增強方法,你可以酌情看你要的業務方法,需要的就實現,不需要就可以不覆蓋實現。其實他就是服務端的響應的各個階段不同的觸發。
我們運行代碼沒有問題,各個階段都被觸發了。
在這里插入圖片描述
對于你要是想只攔截響應,不攔截請求可以這么做。

public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {//在服務器端 攔截請求操作的功能 寫在這個方法中log.debug("服務器端攔截器生效.....");//包裝ServerCall 處理服務端響應攔截CustomServerCall<ReqT,RespT> reqTRespTCustomServerCall = new CustomServerCall<>(call);// 包裝Listener   處理服務端請求攔截CustomServerCallListener<ReqT> reqTCustomServerCallListener =new CustomServerCallListener<>(next.startCall(reqTRespTCustomServerCall, headers));// return reqTCustomServerCallListener;/*** 只攔截響應,我們就不需要包裝Listener,也就是返回原始的Listener即可。原始的Listener我們是通過* next.startCall(reqTRespTCustomServerCall, headers)獲取到的。所以繼續用next.startCall不操作包裝的* Listener即可,但是我們要包裝響應也就是serverCall,所以返回reqTRespTCustomServerCall。包在原始Listener中* 你要是包裝請求,那就是需要包裝的Listener,不需要就直接next.startCall返回startCall即可。*/return next.startCall(reqTRespTCustomServerCall, headers);}

明白請求是包裝的listener,響應是servercall,需要哪個就加強哪個就行,不需要增強攔截就用原始的就行。

四、總結

這就是grpc中比較常見的一元攔截器的使用,他是對于一元rpc的攔截。在各個攔截方法中我們可以定義一些自己的業務方法。進而靈活使用攔截器。而且你要是在某個點攔截之后不想繼續往下走,那你就不要調用每個攔截方法的super,不要做后續的調用,直接斷開鏈路即可。而且至于攔截請求還是響應就看你包裝啥就完了,他不是耦合在一起的。
后面我們再來分析監聽流也就是流式和雙向調用的攔截器。

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/90595.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/90595.shtml
英文地址,請注明出處:http://en.pswp.cn/web/90595.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深入探索嵌入式仿真教學:以酒精測試儀實驗為例的高效學習實踐

引言&#xff1a;嵌入式技術普及下的教學革新 嵌入式系統作為現代科技的核心驅動力&#xff0c;其教學重要性日益凸顯。然而&#xff0c;傳統硬件實驗面臨設備成本高、維護難、時空受限等挑戰。如何突破這些瓶頸&#xff0c;實現高效、靈活、專業的嵌入式教學&#xff1f;本文將…

三種深度學習模型(GRU、CNN-GRU、貝葉斯優化的CNN-GRU/BO-CNN-GRU)對北半球光伏數據進行時間序列預測

代碼功能 該代碼實現了一個光伏發電量預測系統&#xff0c;采用三種深度學習模型&#xff08;GRU、CNN-GRU、貝葉斯優化的CNN-GRU/BO-CNN-GRU&#xff09;對北半球光伏數據進行時間序列預測對北半球光伏數據進行時間序列預測&#xff0c;并通過多維度評估指標和可視化對比模型性…

PostgreSQL對象權限管理

本文記述在postgreSQL中對用戶/角色操作庫、模式、表、序列、函數、存儲過程的權限管理針對數據庫的授權 授權&#xff1a;grant 權限 on database 數據庫 to 用戶/角色; 撤權&#xff1a;revoke 權限 on database 數據庫 from 用戶/角色; 針對模式的授權 授權&#xff1a;gran…

Wordpress主題配置

一、下載主題 主題下載地址&#xff1a;https://www.iztwp.com/tag/blog-theme 二、主題安裝 三、上傳主題安裝即可 四、安裝完成啟動主題

lock 和 synchronized 區別

1. 引言 在多線程編程中&#xff0c;我們經常需要確保某些代碼在同一時刻只由一個線程執行。這種機制通常叫做“互斥鎖”或“同步”。Java 提供了兩種主要的同步機制&#xff1a;synchronized 關鍵字和 Lock 接口。盡管它們的作用相似&#xff0c;都用于實現線程的同步&#xf…

Tkinter - Python圖形界面開發指南

作者&#xff1a;唐叔在學習 專欄&#xff1a;唐叔學python 標簽&#xff1a;Python GUI編程 Tkinter教程 圖形界面開發 Python實戰 界面設計 事件監聽 Python入門 唐叔Python 編程學習 軟件開發 文章目錄一、Tkinter是什么&#xff1f;為什么選擇它&#xff1f;二、Tkinter基礎…

Java基礎day15

目錄 一、Java集合簡介 1.什么是集合&#xff1f; 2.集合接口 3.小結 二、List集合 1.List集合簡介 三、ArrayList容器類 1.初始化 1.1無參初始化 1.2有參初始化 2.數據結構 3.常用方法 3.1增加元素 3.2查找元素 3.3 修改元素 3.4 刪除元素 3.5 其他方法 4.擴…

React Three Fiber 實現晝夜循環:從光照過渡到日月聯動的技術拆解

在 3D 場景中用 React Three Fiber 實現自然的晝夜循環&#xff0c;核心難點在于光照的平滑過渡、日月運動的聯動邏輯、晝夜狀態下的光影差異處理&#xff0c;以及性能與視覺效果的平衡。本文以一個 ReactThree.js 的實現為例&#xff0c;詳細解析如何通過三角函數計算日月位置…

進階向:基于Python的簡易屏幕畫筆工具

用Python打造你的專屬屏幕畫筆工具&#xff1a;零基礎也能輕松實現你是否曾在觀看網課或參加遠程會議時&#xff0c;想要直接在屏幕上標注重點&#xff1f;或者作為設計師&#xff0c;需要快速繪制創意草圖&#xff1f;現在&#xff0c;只需幾行Python代碼&#xff0c;你就能輕…

Elasticsearch-ik分析器

CLI 安裝步驟 1、停止 Elasticsearch&#xff08;如果正在運行&#xff09;&#xff1a; 在安裝插件之前&#xff0c;確保 Elasticsearch 沒有在運行。 命令&#xff1a; systemctl stop elasticsearch2、安裝插件&#xff1a; 使用 elasticsearch-plugin 命令安裝 IK 插件。進…

MySQL八股篇

查詢關鍵字執行先后順序FROM&#xff08;及 JOIN)WHEREGROUP BYHAVINGSELECTDISTINCTORDER BYLIMIT / OFFSETCHAR 和 VARCHAR 的區別&#xff1f;使用場景&#xff1f;特性CHARVARCHAR?存儲方式??定長&#xff0c;存儲時填充空格至定義長度變長&#xff0c;存儲實際數據 長…

QT RCC 文件

RCC (Qt Resource Compiler) 是 Qt 框架中的一個工具&#xff0c;用于將資源文件&#xff08;如圖像、音頻、翻譯文件等&#xff09;編譯成二進制格式&#xff0c;并嵌入到應用程序可執行文件中。RCC 文件基本概念作用&#xff1a;將應用程序所需的資源文件編譯成 C 代碼&#…

數據湖典型架構解析:2025 年湖倉一體化解決方案

數據湖架構概述&#xff1a;從傳統模型到 2025 年新范式數據湖作為存儲海量異構數據的中央倉庫&#xff0c;其架構設計直接影響企業數據價值的釋放效率。傳統數據湖架構主要關注數據的存儲和管理&#xff0c;而 2025 年的數據湖架構已經演變為更加智能化、自動化的綜合性數據平…

繪圖庫 Matplotlib Search

關于Pathon的繪圖庫的認識和基本操作的學習 這里學習了兩款常用便捷的繪圖庫去學習使用Matplotlib介紹是最受歡迎的一種數據可視化包 是常用的2D繪圖庫 一般常于Numpy和Pandas使用 是數據分析中非常重要的工具可以自定義XY軸 繪制線形圖 柱狀圖 直方圖 密度圖 散點圖 更清晰的展…

Docker詳解及實戰

&#x1f389; Docker 簡介和安裝 - Docker 快速入門 Docker 簡介 Docker是一個開源的平臺&#xff0c;用于開發、交付和運行應用程序。它能夠在Windows&#xff0c;macOS&#xff0c;Linux計算機上運行&#xff0c;并將某一應用程序及其依賴項打包至一個容器中&#xff0c;這…

嵌入式學習的第三十三天-進程間通信-UDP

一、網絡1.定義不同主機間進程通信主機間在硬件層面互聯互通主機在軟件層面互聯互通2.國際網絡體系結構OSI模型&#xff08;7層&#xff09;: open system interconnect -------理論模型------定義了網絡通信中不同層的協議1977 國際標準化組織各種不同體系結構的計算機能在世…

4、Spring AI_DeepSeek模型_結構化輸出

一、前言 Spring AI 提供跨 AI 供應商&#xff08;如 OpenAI、Hugging Face 等&#xff09;的一致性 API, 通過分裝的ChatModel或ChatClient即可輕松調動LLM進行流式或非流式對話。 本專欄主要圍繞著通過OpenAI兼容接口調用各種大語言模型展開學習&#xff08;因為大部分模型…

Spring Data Redis 從入門到精通:原理與實戰指南

一、Redis 基礎概念 Redis&#xff08;Remote Dictionary Server&#xff09;是開源的內存鍵值對數據庫&#xff0c;以高性能著稱。它支持多種數據結構&#xff08;String、Hash、List、Set、ZSet&#xff09;&#xff0c;并提供持久化機制&#xff08;RDB、AOF&#xff09;。 …

免費版酒店押金原路退回系統——仙盟創夢IDE

項目介紹?東方仙盟開源酒店押金管理系統是一款面向中小型酒店、民宿、客棧的輕量級前臺管理工具&#xff0c;專注于簡化房態管理、訂單處理和押金跟蹤流程。作為完全開源的解決方案&#xff0c;它無需依賴任何第三方服務&#xff0c;所有數據存儲在本地瀏覽器中&#xff0c;確…

10. isaacsim4.2教程-RTX Lidar 傳感器

1. 前言RTX Lidar 傳感器Isaac Sim的RTX或光線追蹤Lidar支持通過JSON配置文件設置固態和旋轉Lidar配置。每個RTX傳感器必須附加到自己的視口或渲染產品&#xff0c;以確保正確模擬。重要提示&#xff1a; 在運行RTX Lidar仿真時&#xff0c;如果你在Isaac Sim UI中停靠窗口&…