在《用最少的代碼模擬gRPC四種消息交換模式》中,我使用很簡單的代碼模擬了gRPC四種消息交換模式(Unary、Client Streaming、Server Streaming和Duplex Streaming),現在我們更近一步,試著使用極簡的方式打造一個gRPC框架(https://github.com/jiangjinnan/grpc-mini)。這個gRPC是對ASP.NET Core gRPC實現原理的模擬,并不是想重新造一個輪子。
一、“標準”的gRPC定義、承載和調用
二、將gRPC方法抽象成委托
三、將委托轉換成RequestDelegate
?? UnaryCallHandler
?? ClientStreamingCallHandler
?? ServerStreamingCallHandler
?? DuplexStreamingCallHandler
四、路由注冊
五、為gRPC服務定義一個接口
六、重新定義和承載服務
一、“標準”的gRPC定義、承載和調用
可能有些讀者朋友們對ASP.NET Core gRPC還不是太熟悉,所以我們先來演示一下如何在一個ASP.NET Core應用中如何定義和承載一個簡單的gRPC服務,并使用自動生成的客戶端代碼進行調用。我們新建一個空的解決方案,并在其中添加如下所示的三個項目。
我們在類庫項目Proto中定義了如下所示Greeter服務,并利用其中定義的四個操作分別模擬四種消息交換模式。HelloRequest 和HelloReply 是它們涉及的兩個ProtoBuf消息。
syntax?=?"proto3";
import?"google/protobuf/empty.proto";service?Greeter?{rpc?SayHelloUnary?(HelloRequest)?returns?(?HelloReply);rpc?SayHelloServerStreaming?(google.protobuf.Empty)?returns?(stream?HelloReply);rpc?SayHelloClientStreaming?(stream?HelloRequest)?returns?(HelloReply);rpc?SayHelloDuplexStreaming?(stream?HelloRequest)?returns?(stream?HelloReply);
}message?HelloRequest?{string?name?=?1;
}message?HelloReply?{string?message?=?1;
}
ASP.NET Core項目中定義了如下的GreeterServce服務實現了定義的四個操作,基類GreeterBase是針對上面這個.proto文件生成的類型。
public?class?GreeterService:?GreeterBase
{public?override?Task<HelloReply>?SayHelloUnary(HelloRequest?request,?ServerCallContext?context)=>?Task.FromResult(new?HelloReply?{?Message?=?$"Hello,?{request.Name}"?});public?override?async?Task<HelloReply>?SayHelloClientStreaming(IAsyncStreamReader<HelloRequest>?reader,?ServerCallContext?context){var?list?=?new?List<string>();while?(await?reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return?new?HelloReply?{?Message?=?$"Hello,?{string.Join(",",?list)}"?};}public??override?async?Task?SayHelloServerStreaming(Empty?request,?IServerStreamWriter<HelloReply>?responseStream,?ServerCallContext?context){await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Foo!"?});await?Task.Delay(1000);await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Bar!"?});await?Task.Delay(1000);await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Baz!"?});}public?override?async?Task?SayHelloDuplexStreaming(IAsyncStreamReader<HelloRequest>?reader,?IServerStreamWriter<HelloReply>?writer,?ServerCallContext?context){while?(await?reader.MoveNext()){await?writer.WriteAsync(new?HelloReply?{?Message?=?$"Hello?{reader.Current.Name}"?});}}
}
具體的服務承載代碼如下。我們采用Minimal API的形式,通過調用IServiceCollection接口的AddGrpc擴展方法注冊相關服務,并調用MapGrpcService<TService>將定義在GreeterServce中的四個方法映射我對應的路由終結點。
var?builder?=?WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(kestrel?=>?kestrel.ConfigureEndpointDefaults(options?=>?options.Protocols?=?HttpProtocols.Http2));
var?app?=?builder.Build();
app.MapGrpcService<GreeterService>();
app.Run();
在控制臺項目Client中,我們利用生成出來的客戶端類型GreeterClient分別一對應的服務交換模式調用了四個gRPC方法。
var?channel?=?GrpcChannel.ForAddress("http://localhost:5000");
var?client?=?new?GreeterClient(channel);Console.WriteLine("Unary");
await?UnaryCallAsync();Console.WriteLine("\nServer?Streaming");
await?ServerStreamingCallAsync();Console.WriteLine("\nClient?Streaming");
await?ClientStreamingCallAsync();Console.WriteLine("\nDuplex?Streaming");
await?DuplexStreamingCallAsync();
Console.ReadLine();async?Task?UnaryCallAsync()
{var?request?=?new?HelloRequest?{?Name?=?"foobar"?};var?reply?=?await?client.SayHelloUnaryAsync(request);Console.WriteLine(reply.Message);
}
async?Task?ServerStreamingCallAsync()
{var?streamingCall?=?client.SayHelloServerStreaming(new?Empty());var?reader?=?streamingCall.ResponseStream;while?(await?reader.MoveNext(CancellationToken.None)){Console.WriteLine(reader.Current.Message);}
}
async?Task?ClientStreamingCallAsync()
{var?streamingCall?=?client.SayHelloClientStreaming();var?writer?=?streamingCall.RequestStream;await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Foo"?});await?Task.Delay(1000);await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Bar"?});await?Task.Delay(1000);await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Baz"?});await?writer.CompleteAsync();var?reply?=?await?streamingCall.ResponseAsync;Console.WriteLine(reply.Message);
}
async?Task?DuplexStreamingCallAsync()
{var?streamingCall?=?client.SayHelloDuplexStreaming();var?writer?=?streamingCall.RequestStream;var?reader?=?streamingCall.ResponseStream;_?=?Task.Run(async?()?=>{await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Foo"?});await?Task.Delay(1000);await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Bar"?});await?Task.Delay(1000);await?writer.WriteAsync(new?HelloRequest?{?Name?=?"Baz"?});await?writer.CompleteAsync();});await?foreach?(var?reply?in?reader.ReadAllAsync()){Console.WriteLine(reply.Message);}
}
如下所示的是客戶端控制臺上的輸出結果。
二、將gRPC方法抽象成委托
通過上面的演示我們也知道,承載的gRPC類型最終會將其實現的方法注冊成路由終結點,這一點其實和MVC是一樣的。但是gRPC的方法和定義在Controller類型中的Action方法不同之處在于,前者的簽名其實是固定的。如果我們將請求和響應消息類型使用Request和Reply來表示,四種消息交換模式的方法簽名就可以寫成如下的形式。
Task<Reply>?Unary(Request?request,?ServerCallContext?context);
Task<Reply>?ClientStreaming(IAsyncStreamReader<Request>?reader,?ServerCallContext?context);
Task?ServerStreaming(Empty?request,?IServerStreamWriter<Reply>?responseStream,?ServerCallContext?context);
Task?DuplexStreaming(IAsyncStreamReader<Request>?reader,?IServerStreamWriter<Reply>?writer,?ServerCallContext?context);
“流式”方法中用來讀取請求和寫入響應的IAsyncStreamReader<T>和IServerStreamWriter<T>定義如下。
public?interface?IAsyncStreamReader<out?T>
{T?Current?{?get;?}Task<bool>?MoveNext(CancellationToken?cancellationToken?=?default);
}
public?interface?IAsyncStreamWriter<in?T>
{Task?WriteAsync(T?message,?CancellationToken?cancellationToken?=?default);
}
public?interface?IServerStreamWriter<in?T>?:?IAsyncStreamWriter<T>
{
}
public?interface?IClientStreamWriter<in?T>?:?IAsyncStreamWriter<T>
{Task?CompleteAsync();
}
表示服務端調用上下文的ServerCallContext?類型具有豐富的成員,但是它的本質就是對HttpContext上下文的封裝,所以我們對它進行了簡化。如下面的代碼片段所示,我們給予這個上下文類型兩個屬性成員,一個是表示請求上下文的HttpContext,另一個則是用來設置響應狀態StatusCode,后者對應的枚舉定義了完整的gRPC狀態碼。
public?class?ServerCallContext
{public?StatusCode?StatusCode?{?get;?set;?}?=?StatusCode.OK;public?HttpContext?HttpContext?{?get;?}public?ServerCallContext(HttpContext?httpContext)=>?HttpContext?=?httpContext;
}public?enum?StatusCode
{OK?=?0,Cancelled?=?1,Unknown?=?2,InvalidArgument?=?3,DeadlineExceeded?=?4,NotFound?=?5,AlreadyExists?=?6,PermissionDenied?=?7,Unauthenticated?=?0x10,ResourceExhausted?=?8,FailedPrecondition?=?9,Aborted?=?10,OutOfRange?=?11,Unimplemented?=?12,Internal?=?13,Unavailable?=?14,DataLoss?=?0xF
}
既然方法簽名固定,意味著我們可以將四種gRPC方法定義成如下四個對應的委托,泛型參數TService、TRequest和TResponse分別表示服務、請求和響應類型。
public?delegate?Task<TResponse>?UnaryMethod<TService,?TRequest,?TResponse>(TService?service,?TRequest?request,?ServerCallContext?context)where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;public?delegate?Task<TResponse>?ClientStreamingMethod<TService,?TRequest,?TResponse>(TService?service,?IAsyncStreamReader<TRequest>?reader,?ServerCallContext?context)where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;public?delegate?Task?ServerStreamingMethod<TService,?TRequest,?TResponse>(TService?service,?TRequest?request,?IServerStreamWriter<TResponse>?writer,?ServerCallContext?context)where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;public?delegate?Task?DuplexStreamingMethod<TService,?TRequest,?TResponse>(TService?service,?IAsyncStreamReader<TRequest>?reader,?IServerStreamWriter<TResponse>?writer,?ServerCallContext?context)where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;
我們知道路由的本質就是創建一組路由模式(Pattern)和對應處理器之間的映射關系。路由模式很簡單,對應的路由模板為“{ServiceName}/{MethodName}”,并且采用Post請求方法。對應的處理器最終體現為一個RequestDelegate。那么只要我們能夠將上述四種委托類型都轉換成RequestDelegate委托,一切都迎刃而解了。
三、將委托轉換成RequestDelegate
為了將四種委托類型轉化成RequestDelegate,我們將后者實現為一個ServiceCallHandler類型,并為其定義了如下兩個基類。ServerCallHandlerBase的HandleCallAsync方法正好與RequestDelegate委托的簽名一致,所以這個方法最終會用來處理gRPC請求。不同的消息交換模式采用不同的請求處理方式,只需實現抽象方法HandleCallAsyncCore就可以了。HandleCallAsync方法在調用此抽象方法之前將響應的ContentType設置成gRPC標準的響應類型“application/grpc”。在此之后將狀態碼設置為“grpc-status”首部,它將在HTTP2的DATA幀發送完畢后,以HEADERS幀發送到客戶端。這兩項操作都是gRPC協議的一部分。
public?abstract?class?ServerCallHandlerBase
{public?async?Task?HandleCallAsync(HttpContext?httpContext){try{var?serverCallContext?=?new?ServerCallContext(httpContext);var?response?=?httpContext.Response;response.ContentType?=?"application/grpc";await?HandleCallAsyncCore(serverCallContext);SetStatus(serverCallContext.StatusCode);}catch{SetStatus(StatusCode.Unknown);}void?SetStatus(StatusCode?statusCode){httpContext.Response.AppendTrailer("grpc-status",?((int)statusCode).ToString());}}protected?abstract?Task?HandleCallAsyncCore(ServerCallContext?serverCallContext);
}public?abstract?class?ServerCallHandler<TService,?TRequest,?TResponse>?:?ServerCallHandlerBasewhere?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>
{protected?ServerCallHandler(MessageParser<TRequest>?requestParser)=>?RequestParser?=?requestParser;public?MessageParser<TRequest>?RequestParser?{?get;?}
}
ServerCallHandler<TService, TRequest, TResponse>派生自ServerCallHandlerBase,并利用三個泛型參數TService、TRequest、TResponse來表示服務、請求和響應類型,RequestParser用來提供發序列化請求消息的MessageParser<TRequest>對象。針對四種消息交換模式的ServiceCallHandler類型均繼承這個泛型基類。
UnaryCallHandler
基于Unary消息交換模式的ServerCallHandler的具體類型為UnaryCallHandler<TService, TRequest, TResponse>,它由上述的UnaryMethod<TService, TRequest, TResponse>委托構建而成。在重寫的HandleCallAsyncCore方法中,我們利用HttpContext提供的IServiceProvider對象將服務實例創建出來后,從請求主體中將請求消息讀取出來,然后交給指定的委托對象進行處理并得到響應消息,該響應消息最終用來對當前請求予以回復。
internal?class?UnaryCallHandler<TService,?TRequest,?TResponse>?:?ServerCallHandler<TService,?TRequest,?TResponse>where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>
{private?readonly?UnaryMethod<TService,?TRequest,?TResponse>?_handler;public?UnaryCallHandler(UnaryMethod<TService,?TRequest,?TResponse>?handler,?MessageParser<TRequest>?requestParser):base(requestParser)=>?_handler?=?handler;protected?override?async?Task?HandleCallAsyncCore(ServerCallContext?serverCallContext){using?var?scope?=?serverCallContext.HttpContext.RequestServices.CreateScope();var?service?=?ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var?httpContext?=?serverCallContext.HttpContext;var?request?=?await?httpContext.Request.BodyReader.ReadSingleMessageAsync<TRequest>(RequestParser);var?reply?=?await?_handler(service,?request!,?serverCallContext);await?httpContext.Response.BodyWriter.WriteMessageAsync(reply);}
}
請求消息是通過如下這個ReadSingleMessageAsync<TMessage>方法讀取出來的。按照gRPC協議,通過網絡傳輸的請求和響應消息都會在前面追加5個字節,第一個字節表示消息是否經過加密,后面四個字節是一個以大端序表示的整數,表示消息的長度。對于其他消息交換模式,也是調用Buffers的TryReadMessage<TRequest>方法從緩沖區中讀取請求消息。
public?static?async?Task<TMessage>?ReadSingleMessageAsync<TMessage>(this?PipeReader?reader,?MessageParser<TMessage>?parser)?where?TMessage:IMessage<TMessage>
{while?(true){var?result?=?await?reader.ReadAsync();var?buffer?=?result.Buffer;if?(Buffers.TryReadMessage(parser,?ref?buffer,?out?var?message)){return?message!;}reader.AdvanceTo(buffer.Start,?buffer.End);if?(result.IsCompleted){break;}}throw?new?IOException("Fails?to?read?message.");
}internal?static?class?Buffers
{public?static?readonly?int?HeaderLength?=?5;public?static?bool?TryReadMessage<TRequest>(MessageParser<TRequest>?parser,?ref?ReadOnlySequence<byte>?buffer,?out?TRequest??message)?where?TRequest:?IMessage<TRequest>{if?(buffer.Length?<?HeaderLength){message?=?default;return?false;}Span<byte>?lengthBytes?=?stackalloc?byte[4];buffer.Slice(1,?4).CopyTo(lengthBytes);var?length?=?BinaryPrimitives.ReadInt32BigEndian(lengthBytes);if?(buffer.Length?<?length?+?HeaderLength){message?=?default;return?false;}message?=?parser.ParseFrom(buffer.Slice(HeaderLength,?length));buffer?=?buffer.Slice(length?+?HeaderLength);return?true;}
}
如下這個WriteMessageAsync擴展方法負責輸出響應消息。
public?static?ValueTask<FlushResult>?WriteMessageAsync(this?PipeWriter?writer,?IMessage?message)
{var?length?=?message.CalculateSize();var?span?=?writer.GetSpan(5?+?length);span[0]?=?0;BinaryPrimitives.WriteInt32BigEndian(span.Slice(1,?4),?length);message.WriteTo(span.Slice(5,?length));writer.Advance(5?+?length);return?writer.FlushAsync();
}
ClientStreamingCallHandler
ClientStreamingCallHandler<TService, TRequest, TResponse>代表Client Streaming模式下的ServerCallHandler,它由對應的ClientStreamingMethod<TService, TRequest, TResponse>委托創建而成。在重寫的HandleCallAsyncCore方法中,除了服務實例,它還需要一個用來以“流”的方式讀取請求的IAsyncStreamReader<TRequest>對象,它們都將作為參數傳遞給指定的委托,后者執行后會返回最終的響應消息。此消息同樣通過上面這個WriteMessageAsync擴展方法予以回復。
internal?class?ClientStreamingCallHandler<TService,?TRequest,?TResponse>?:?ServerCallHandler<TService,?TRequest,?TResponse>where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>
{private?readonly?ClientStreamingMethod<TService,?TRequest,?TResponse>?_handler;public?ClientStreamingCallHandler(ClientStreamingMethod<TService,?TRequest,?TResponse>?handler,?MessageParser<TRequest>?requestParser):base(requestParser){_handler?=?handler;}protected?override?async?Task?HandleCallAsyncCore(ServerCallContext?serverCallContext){using?var?scope?=?serverCallContext.HttpContext.RequestServices.CreateScope();var?service?=?ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var?reader?=?serverCallContext.HttpContext.Request.BodyReader;var?writer?=?serverCallContext.HttpContext.Response.BodyWriter;var?streamReader?=?new?HttpContextStreamReader<TRequest>(serverCallContext.HttpContext,?RequestParser);var?response?=?await?_handler(service,?streamReader,?serverCallContext);await?writer.WriteMessageAsync(response);}
}
IAsyncStreamReader<T>接口的實現類型為如下這個HttpContextStreamReader<T>。在了解了請求消息在網絡中的結構之后,對于實現在該類型中針對請求的讀取操作,應該不難理解。
public?class?HttpContextStreamReader<T>?:?IAsyncStreamReader<T>?where?T?:?IMessage<T>
{private?readonly?PipeReader?_reader;private?readonly?MessageParser<T>?_parser;private?ReadOnlySequence<byte>?_buffer;public?HttpContextStreamReader(HttpContext?httpContext,?MessageParser<T>?parser){_reader?=?httpContext.Request.BodyReader;_parser?=?parser;}public?T?Current?{?get;?private?set;?}?=?default!;public?async?Task<bool>?MoveNext(CancellationToken?cancellationToken){var?completed?=?false;if?(_buffer.IsEmpty){var?result?=?await?_reader.ReadAsync(cancellationToken);_buffer?=?result.Buffer;completed?=?result.IsCompleted;}if?(Buffers.TryReadMessage(_parser,?ref?_buffer,?out?var?mssage)){Current?=?mssage!;_reader.AdvanceTo(_buffer.Start,?_buffer.End);return?true;}_reader.AdvanceTo(_buffer.Start,?_buffer.End);_buffer?=?default;return?!completed?&&?await?MoveNext(cancellationToken);}
}
ServerStreamingCallHandler
ServerStreamingCallHandler<TService, TRequest, TResponse>代表Server Streaming模式下的ServerCallHandler,它由對應的ServerStreamingMethod<TService, TRequest, TResponse>委托創建而成。在重寫的HandleCallAsyncCore方法中,除了服務實例,它還需要一個用來以“流”的方式寫入響應的IAsyncStreamWriter<TResponse>對象,它們都將作為參數傳遞給指定的委托。
internal?class?ServerStreamingCallHandler<TService,?TRequest,?TResponse>?:?ServerCallHandler<TService,?TRequest,?TResponse>where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>
{private?readonly?ServerStreamingMethod<TService,?TRequest,?TResponse>?_handler;public?ServerStreamingCallHandler(ServerStreamingMethod<TService,?TRequest,?TResponse>?handler,?MessageParser<TRequest>?requestParser):base(requestParser)=>?_handler?=?handler;protected?override?async?Task?HandleCallAsyncCore(ServerCallContext?serverCallContext){using?var?scope?=?serverCallContext.HttpContext.RequestServices.CreateScope();var?service?=?ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);var?httpContext?=?serverCallContext.HttpContext;var?streamWriter?=?new?HttpContextStreamWriter<TResponse>(httpContext);var?request?=?await?httpContext.Request.BodyReader.ReadSingleMessageAsync(RequestParser);await?_handler(service,?request,?streamWriter,?serverCallContext);}
}
IAsyncStreamWriter<T>接口的實現類型為如下這個HttpContextStreamWriter<T>,它直接調用上面定義的WriteMessageAsync擴展方法將指定的消息寫入響應主體的輸出流。
public?class?HttpContextStreamWriter<T>?:?IServerStreamWriter<T>?where?T?:?IMessage<T>
{private?readonly?PipeWriter?_writer;public?HttpContextStreamWriter(HttpContext?httpContext)?=>?_writer?=?httpContext.Response.BodyWriter;public?Task?WriteAsync(T?message,?CancellationToken?cancellationToken?=?default){cancellationToken.ThrowIfCancellationRequested();return?_writer.WriteMessageAsync(message).AsTask();}
}
DuplexStreamingCallHandler
DuplexStreamingCallHandler<TService, TRequest, TResponse>代表Duplex Streaming模式下的ServerCallHandler,它由對應的DuplexStreamingMethod<TService, TRequest, TResponse>委托創建而成。在重寫的HandleCallAsyncCore方法中,除了服務實例,它還需要分別創建以“流”的方式讀/寫請求/響應的IAsyncStreamReader<TRequest>和IAsyncStreamWriter<TResponse>對象,對應的類型分別為上面定義的HttpContextStreamReader<TRequest>和HttpContextStreamWriter<TResponse>。
internal?class?DuplexStreamingCallHandler<TService,?TRequest,?TResponse>?:?ServerCallHandler<TService,?TRequest,?TResponse>where?TService?:?classwhere?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>
{private?readonly?DuplexStreamingMethod<TService,?TRequest,?TResponse>?_handler;public?DuplexStreamingCallHandler(DuplexStreamingMethod<TService,?TRequest,?TResponse>?handler,?MessageParser<TRequest>?requestParser)?:base(requestParser)=>?_handler?=?handler;protected?override?async?Task?HandleCallAsyncCore(ServerCallContext?serverCallContext){using?var?scope?=?serverCallContext.HttpContext.RequestServices.CreateScope();var?service?=?ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);???????var?streamReader?=?new?HttpContextStreamReader<TRequest>(serverCallContext.HttpContext,?RequestParser);var?streamWriter?=?new?HttpContextStreamWriter<TResponse>(serverCallContext.HttpContext);await?_handler(service,?streamReader,?streamWriter,?serverCallContext);}
}
四、路由注冊
目前我們將針對四種消息交換模式的gRPC方法抽象成對應的泛型委托,并且可以利用它們創建ServerCallHandler,后者可以提供作為路由終結點處理器的RequestDelegate委托。枚舉和對應ServerCallHandler之間的映射關系如下所示:
UnaryMethod<TService, TRequest, TResponse>:UnaryCallHandler<TService, TRequest, TResponse>
ClientStreamingMethod<TService, TRequest, TResponse>:ClientStreamingCallHandler<TService, TRequest, TResponse>
ServerStreamingMethod<TService, TRequest, TResponse>:ServerStreamingCallHandler<TService, TRequest, TResponse>
DuplexStreamingMethod<TService, TRequest, TResponse>:DuplexStreamingCallHandler<TService, TRequest, TResponse>
現在我們將整個路由注冊的流程串起來,為此我們定義了如下這個IServiceBinder<TService>接口,它提供了兩種方式將定義在服務類型TService中的gRPC方法注冊成對應的路由終結點。
public?interface?IServiceBinder<TService>?where?TService?:?class
{IServiceBinder<TService>?AddUnaryMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<TRequest,?ServerCallContext,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddClientStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<IAsyncStreamReader<TRequest>,?ServerCallContext,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddServerStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<TRequest,?IServerStreamWriter<TResponse>,?ServerCallContext,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddDuplexStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<IAsyncStreamReader<TRequest>,?IServerStreamWriter<TResponse>,?ServerCallContext,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddUnaryMethod<TRequest,?TResponse>(Expression<Func<TService,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddClientStreamingMethod<TRequest,?TResponse>(?Expression<Func<TService,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddServerStreamingMethod<TRequest,?TResponse>(?Expression<Func<TService,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;IServiceBinder<TService>?AddDuplexStreamingMethod<TRequest,?TResponse>(?Expression<Func<TService,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>;
}
路由終結點由路由模式和處理器兩個元素組成,路由模式主要體現在由gRPC服務和操作名稱組成的路由模板,我們默認使用服務類型的名稱和方法名稱(提出Async后綴)。為了能夠對這兩個名稱進行定制,我們定義了如下兩個特性GrpcServiceAttribute和GrpcMethodAttribute,它們可以分別標注在服務類型和操作方法上來指定一個任意的名稱。
[AttributeUsage(AttributeTargets.Class)]
public?class?GrpcServiceAttribute:?Attribute
{public?string??ServiceName?{?get;?set;?}
}[AttributeUsage(AttributeTargets.Method)]
public?class?GrpcMethodAttribute?:?Attribute
{public?string??MethodName?{?get;?set;?}
}
如下所示的ServiceBinder<TService> 是對IServiceBinder<TService> 接口的實現,它是對一個IEndpointRouteBuilder 對象的封裝。對于實現的第一組方法,我們利用提供的方法名稱與解析TService類型得到的服務名稱合并,進而得到路由終結點的URL模板。這些方法還提供了一個針對gRPC方法簽名的Func<TService,Func<…>>委托,我們利用它來將提供用于構建對應ServiceCallHandler的委托。我們最終利用IEndpointRouteBuilder 對象完成針對路由終結點的注冊。
public?class?ServiceBinder<TService>?:?IServiceBinder<TService>?where?TService?:?class
{private?readonly?IEndpointRouteBuilder?_routeBuilder;public?ServiceBinder(IEndpointRouteBuilder?routeBuilder)?=>?_routeBuilder?=?routeBuilder;public?IServiceBinder<TService>?AddUnaryMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<TRequest,?ServerCallContext,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{Task<TResponse>?GetMethod(TService?service,?TRequest?request,?ServerCallContext?context)?=>?methodAccessor(service)(request,?context);var?callHandler?=?new?UnaryCallHandler<TService,?TRequest,?TResponse>(GetMethod,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddClientStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<IAsyncStreamReader<TRequest>,?ServerCallContext,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{Task<TResponse>?GetMethod(TService?service,?IAsyncStreamReader<TRequest>?reader,?ServerCallContext?context)?=>?methodAccessor(service)(reader,?context);var?callHandler?=?new?ClientStreamingCallHandler<TService,?TRequest,?TResponse>(GetMethod,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddServerStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<TRequest,?IServerStreamWriter<TResponse>,?ServerCallContext,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{ServerStreamingMethod<TService,?TRequest,?TResponse>?handler?=?(service,?request,?writer,?context)?=>?methodAccessor(service)(request,?writer,?context);var?callHandler?=?new?ServerStreamingCallHandler<TService,?TRequest,?TResponse>(handler,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddDuplexStreamingMethod<TRequest,?TResponse>(string?methodName,?Func<TService,?Func<IAsyncStreamReader<TRequest>,?IServerStreamWriter<TResponse>,?ServerCallContext,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{DuplexStreamingMethod<TService,?TRequest,?TResponse>?handler?=?(service,?reader,?writer,?context)?=>?methodAccessor(service)(reader,?writer,?context);var?callHandler?=?new?DuplexStreamingCallHandler<TService,?TRequest,?TResponse>(handler,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}private?static?string?GetPath(string?methodName){var?serviceName?=?typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName????typeof(TService).Name;if?(methodName.EndsWith("Async")){methodName?=?methodName.Substring(0,?methodName.Length?-?5);}return?$"{serviceName}/{methodName}";}public?IServiceBinder<TService>?AddUnaryMethod<TRequest,?TResponse>(Expression<Func<TService,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{var?method?=?CreateDelegate<UnaryMethod<TService,?TRequest,TResponse>>(methodAccessor,?out?var?methodName);var?serviceName?=?typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName????typeof(TService).Name;var?callHandler?=?new?UnaryCallHandler<TService,?TRequest,?TResponse>(method,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddClientStreamingMethod<TRequest,?TResponse>(?Expression<Func<TService,?Task<TResponse>>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{var?method?=?CreateDelegate<ClientStreamingMethod<TService,?TRequest,?TResponse>>(methodAccessor,?out?var?methodName);var?serviceName?=?typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName????typeof(TService).Name;var?callHandler?=?new?ClientStreamingCallHandler<TService,?TRequest,?TResponse>(method,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddServerStreamingMethod<TRequest,?TResponse>(Expression<Func<TService,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{var?method?=?CreateDelegate<ServerStreamingMethod<TService,?TRequest,?TResponse>>(methodAccessor,?out?var?methodName);var?serviceName?=?typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName????typeof(TService).Name;var?callHandler?=?new?ServerStreamingCallHandler<TService,?TRequest,?TResponse>(method,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}public?IServiceBinder<TService>?AddDuplexStreamingMethod<TRequest,?TResponse>(Expression<Func<TService,?Task>>?methodAccessor,?MessageParser<TRequest>?parser)where?TRequest?:?IMessage<TRequest>where?TResponse?:?IMessage<TResponse>{var?method?=?CreateDelegate<DuplexStreamingMethod<TService,?TRequest,?TResponse>>(methodAccessor,?out?var?methodName);var?serviceName?=?typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName????typeof(TService).Name;var?callHandler?=?new?DuplexStreamingCallHandler<TService,?TRequest,?TResponse>(method,?parser);_routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName),?callHandler.HandleCallAsync);return?this;}private?TDelegate?CreateDelegate<TDelegate>(LambdaExpression?expression,?out?string?methodName)?where?TDelegate?:?Delegate{var?method?=?((MethodCallExpression)expression.Body).Method;methodName?=?method.GetCustomAttribute<GrpcMethodAttribute>()?.MethodName????method.Name;return?(TDelegate)Delegate.CreateDelegate(typeof(TDelegate),?method);}
}
由于第二組方法提供的針對gRPC方法調用的表達式,所以我們可以得到描述方法的MethodInfo對象,該對象不但解決了委托對象的創建問題,還可以提供方法的名稱,所以這組方法無需提供gRPC方法的名稱。但是提供的表達式并不能嚴格匹配方法的簽名,所以無法提供編譯時的錯誤檢驗,所以各有優缺點。
五、為gRPC服務定義一個接口
由于路由終結點的注冊是針對服務類型進行的,所以我們決定讓服務類型自身來完成所有的路由注冊工作。在這里我們使用C# 11中一個叫做“靜態接口方法”的特性,為服務類型定義如下這個IGrpcService<TService>接口,服務類型TService定義的所有gRPC方法的路由注冊全部在靜態方法Bind中完成,該方法將上述的IServiceBinder<TService>作為參數。
public?interface??IGrpcService<TService>?where?TService:class
{static?abstract?void?Bind(IServiceBinder<TService>?binder);
}
我們定義了如下這個針對IEndpointRouteBuilder 接口的擴展方法完成針對指定服務類型的路由注冊。為了與現有的方法區別開來,我特意將其命名為MapGrpcService2。該方法根據指定的IEndpointRouteBuilder 對象將ServiceBinder<TService>對象創建出來,并作為參數調用服務類型的靜態Bind方法。到此為止,整個Mini版的gRPC服務端框架就構建完成了,接下來我們看看它能否工作。
public?static?class?EndpointRouteBuilderExtensions
{public?static?IEndpointRouteBuilder?MapGrpcService2<TService>(this?IEndpointRouteBuilder?routeBuilder)?where?TService?:?class,?IGrpcService<TService>{var?binder?=?new?ServiceBinder<TService>(routeBuilder);TService.Bind(binder);return?routeBuilder;}
}
六、重新定義和承載服務
我們開篇演示了ASP.NET Core gRPC的服務定義、承載和調用。如果我們上面構建的Mini版gRPC框架能夠正常工作,意味著客戶端代碼可以保持不變,我們現在就來試試看。我們在Server項目中將GreeterService服務類型改成如下的形式,它不再繼承任何基類,只實現IGrpcService<GreeterService>接口。針對四種消息交換模式的四個方法的實現方法保持不變,在實現的靜態Bind方法中,我們采用兩種形式完成了針對這四個方法的路由注冊。
[GrpcService(ServiceName?=?"Greeter")]
public?class?GreeterService:?IGrpcService<GreeterService>
{public?Task<HelloReply>?SayHelloUnaryAsync(HelloRequest?request,?ServerCallContext?context)=>?Task.FromResult(new?HelloReply?{?Message?=?$"Hello,?{request.Name}"?});public?async?Task<HelloReply>?SayHelloClientStreamingAsync(IAsyncStreamReader<HelloRequest>?reader,?ServerCallContext?context){var?list?=?new?List<string>();while?(await?reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return?new?HelloReply?{?Message?=?$"Hello,?{string.Join(",",?list)}"?};}public??async?Task?SayHelloServerStreamingAsync(Empty?request,?IServerStreamWriter<HelloReply>?responseStream,?ServerCallContext?context){await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Foo!"?});await?Task.Delay(1000);await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Bar!"?});await?Task.Delay(1000);await?responseStream.WriteAsync(new?HelloReply?{?Message?=?"Hello,?Baz!"?});}public?async?Task?SayHelloDuplexStreamingAsync(IAsyncStreamReader<HelloRequest>?reader,?IServerStreamWriter<HelloReply>?writer,?ServerCallContext?context){while?(await?reader.MoveNext()){await?writer.WriteAsync(new?HelloReply?{?Message?=?$"Hello?{reader.Current.Name}"?});}}public?static?void?Bind(IServiceBinder<GreeterService>?binder){binder.AddUnaryMethod<HelloRequest,?HelloReply>(it?=>it.SayHelloUnaryAsync(default!,default!),?HelloRequest.Parser).AddClientStreamingMethod<HelloRequest,?HelloReply>(it?=>?it.SayHelloClientStreamingAsync(default!,?default!),?HelloRequest.Parser).AddServerStreamingMethod<Empty,?HelloReply>(nameof(SayHelloServerStreamingAsync),?it?=>?it.SayHelloServerStreamingAsync,?Empty.Parser).AddDuplexStreamingMethod<HelloRequest,?HelloReply>(nameof(SayHelloDuplexStreamingAsync),?it?=>?it.SayHelloDuplexStreamingAsync,?HelloRequest.Parser);}}
}
服務承載程序直接將針對MapGrpcService<GreeterService>方法的調用換成MapGrpcService2<GreeterService>。由于整個框架根本不需要預先注冊任何的服務,所以針對AddGrpc擴展方法的調用也可以刪除。
using?GrpcMini;
using?Microsoft.AspNetCore.Server.Kestrel.Core;var?builder?=?WebApplication.CreateBuilder(args);
builder.WebHost.ConfigureKestrel(kestrel?=>?kestrel.ConfigureEndpointDefaults(options?=>?options.Protocols?=?HttpProtocols.Http2));
var?app?=?builder.Build();
app.MapGrpcService2<Server.Greeter>();
app.Run();
再次運行我們的程序,客戶端依然可以得到相同的輸出。