一、客戶端流式處理概述
- 客戶端流式處理方法在該方法沒有接收消息的情況下啟動。?
requestStream
?參數用于從客戶端讀取消息。 返回響應消息時,客戶端流式處理調用完成。 - 客戶端可以發送多個消息流到服務端,當所有客戶端消息流發送結束,調用請求流完結方法,則標記客戶端流消息推送結束,等待服務端執行完成。
- 等同于客戶端發送批量消息,服務端統一處理。
二、案例介紹
- 接下來 會提供三個案例,用于大家理解
- 第一個客戶端流的基礎用法
- 第二個客戶端流的優化版本
- 第三個客戶端的文件流式傳輸
三、服務端配置(注意:grpc相關配置參考我之前的文章)
// 1.提供公共的實體proto文件
// 2.服務引用對應的proto文件
// 3.定義三個客戶流方法// 公共messages.proto文件
syntax = "proto3";option csharp_namespace = "GrpcProject";package grpc.serviceing;// 請求體
message ServerRequest{string name = 1;double height = 2;int32 age = 3;bool flag = 4;float x = 5;float y = 6;float z= 7;repeated string departments = 8;
}message ServerFileRequest{bytes fileBytes = 1;
}// 響應體
message ServerResponse{bool result = 1;
}// clientstream.proto 定義service方法
syntax = "proto3";import "google/protobuf/empty.proto";
import "Protos/messages.proto";option csharp_namespace = "GrpcProject";package grpc.serviceing;service ClientStreamRpc{// 基礎客戶端流處理rpc StreamingFromClient (stream ServerRequest) returns (ServerResponse);// foreach 客戶端流式處理 前提使用C#8 或者更高版本rpc StreamingClientForeach(stream ServerRequest) returns (ServerResponse);// 文件流傳輸rpc FileStreamFromClient(stream ServerFileRequest) returns (ServerResponse);
}
服務接口實現:
/// <summary>/// 客戶端流式處理/// </summary>public class ClientStreamService : ClientStreamRpc.ClientStreamRpcBase{/// <summary>/// 基礎訪問流模式/// </summary>/// <param name="requestStream"></param>/// <param name="context"></param>/// <returns></returns>public override async Task<ServerResponse> StreamingFromClient(IAsyncStreamReader<ServerRequest> requestStream, ServerCallContext context){while (await requestStream.MoveNext()){await Console.Out.WriteLineAsync("\r\n-------------------------激光射線------------------------------\r\n");ServerRequest request = requestStream.Current;await Handle(request);}return new ServerResponse();}/// <summary>/// foreach訪問流模式/// </summary>/// <param name="requestStream"></param>/// <param name="context"></param>/// <returns></returns>public override async Task<ServerResponse> StreamingClientForeach(IAsyncStreamReader<ServerRequest> requestStream, ServerCallContext context){await foreach (var request in requestStream.ReadAllAsync()){await Console.Out.WriteLineAsync("\r\n-------------------------激光射線------------------------------\r\n");await Handle(request);}return new ServerResponse();}/// <summary>/// 讀取文件流 組合成完整的文件。/// </summary>/// <param name="requestStream"></param>/// <param name="context"></param>/// <returns></returns>public override async Task<ServerResponse> FileStreamFromClient(IAsyncStreamReader<ServerFileRequest> requestStream, ServerCallContext context){ServerResponse serverResponse = new ServerResponse();serverResponse.Result = false;// 存儲流MemoryStream ms = new();await foreach (var request in requestStream.ReadAllAsync()){ms.Write(request.FileBytes.Span);await Console.Out.WriteLineAsync($"記錄字節大小:{ms.Length} bytes");}string filePath = Path.Combine(Directory.GetCurrentDirectory(), "log.txt");using FileStream fileStream = new FileStream(filePath, FileMode.OpenOrCreate, FileAccess.ReadWrite);fileStream.Position = 0;await fileStream.WriteAsync(ms.ToArray());fileStream.Flush();fileStream.Close();serverResponse.Result = true;return serverResponse;}private async Task Handle(ServerRequest request){if (request != null){foreach (var prop in request.GetType().GetProperties()){if (prop.CanRead && prop.GetValue(request) is not null){await Console.Out.WriteLineAsync($"property name:{prop.Name};value:{prop.GetValue(request)}");}}}}}
?await foreach (var request in requestStream.ReadAllAsync()) 版本是C#8及以上版本才支持,這個需要注意!
四、客戶端配置
- 引用proto文件,配置為客戶端類型
- 根據編譯生成的函數進行傳參調用
public partial class ClientStreamForm : Form{private readonly string _url;public ClientStreamForm(IConfiguration configuration){InitializeComponent();_url = configuration.GetConnectionString("ConnectionString");}private async void button1_Click(object sender, EventArgs e){var channel = GrpcChannel.ForAddress(_url);var client = new ClientStreamRpc.ClientStreamRpcClient(channel);var streamingFromClient = client.StreamingFromClient();for (int i = 0; i < 3; i++){ServerRequest request = new();request.Age = i;request.X = new Random(30).Next(50);request.Y = new Random(60).Next(50);request.Z = new Random(90).Next(50);request.Flag = i % 2 == 0 ? true : false;request.Height = 12;request.Departments.Add($"{i}");await streamingFromClient.RequestStream.WriteAsync(request);}await streamingFromClient.RequestStream.CompleteAsync();}private async void btnGrpcOptimize_Click(object sender, EventArgs e){var channel = GrpcChannel.ForAddress(_url);var client = new ClientStreamRpc.ClientStreamRpcClient(channel);var streamingFromClient = client.StreamingClientForeach();var departments = new List<string>(){"one","two","three","four","five","six"};ServerRequest request = new();foreach (var department in departments){request.Age = new Random(20).Next(100);request.X = new Random(30).Next(50);request.Y = new Random(60).Next(50);request.Z = new Random(90).Next(50);request.Flag = false;request.Height = 12;request.Departments.Add(department);await streamingFromClient.RequestStream.WriteAsync(request);}await streamingFromClient.RequestStream.CompleteAsync();}private async void btnFile_Click(object sender, EventArgs e){var channel = GrpcChannel.ForAddress(_url);var client = new ClientStreamRpc.ClientStreamRpcClient(channel);var streamingFromClient = client.FileStreamFromClient();var strMessage = "偉大抗日戰爭的一周年紀念,七月七日,快要到了。全民族的力量團結起來,堅持抗戰,堅持統一戰線,同敵人作英勇的戰爭,快一年了。這個戰爭,在東方歷史上是空前的,在世界歷史上也將是偉大的,全世界人民都關心這個戰爭。身受戰爭災難、為著自己民族的生存而奮斗的每一個中國人,無日不在渴望戰爭的勝利。然而戰爭的過程究竟會要怎么樣?能勝利還是不能勝利?能速勝還是不能速勝?很多人都說持久戰,但是為什么是持久戰?怎樣進行持久戰?很多人都說最后勝利,但是為什么會有最后勝利?怎樣爭取最后勝利?這些問題,不是每個人都解決了的,甚至是大多數人至今沒有解決的。于是失敗主義的亡國論者跑出來向人們說:中國會亡,最后勝利不是中國的。某些性急的朋友們也跑出來向人們說:中國很快就能戰勝,無需乎費大氣力。這些議論究竟對不對呢?我們一向都說:這些議論是不對的。可是我們說的,還沒有為大多數人所了解。一半因為我們的宣傳解釋工作還不夠,一半也因為客觀事變的發展還沒有完全暴露其固有的性質,還沒有將其面貌鮮明地擺在人們之前,使人們無從看出其整個的趨勢和前途,因而無從決定自己的整套的方針和做法。現在好了,抗戰十個月的經驗,盡夠擊破毫無根據的亡國論,也盡夠說服急性朋友們的速勝論了。在這種情形下,很多人要求做個總結性的解釋。尤其是對持久戰,有亡國論和速勝論的反對意見,也有空洞無物的了解。“盧溝橋事變以來,四萬萬人一齊努力,最后勝利是中國的。”這樣一種公式,在廣大的人們中流行著。這個公式是對的,但有加以充實的必要。抗日戰爭和統一戰線之所以能夠堅持,是由于許多的因素:全國黨派,從共產黨到國民黨;全國人民,從工人農民到資產階級;全國軍隊,從主力軍到游擊隊;國際方面,從社會主義國家到各國愛好正義的人民;敵國方面,從某些國內反戰的人民到前線反戰的兵士。總而言之,所有這些因素,在我們的抗戰中都盡了他們各種程度的努力。每一個有良心的人,都應向他們表示敬意。我們共產黨人,同其他抗戰黨派和全國人民一道,唯一的方向,是努力團結一切力量,戰勝萬惡的日寇。今年七月一日,是中國共產黨建立的十七周年紀念日。為了使每個共產黨員在抗日戰爭中能夠盡其更好和更大的努力,也有著重地研究持久戰的必要。因此,我的講演就來研究持久戰。和持久戰這個題目有關的問題,我都準備說到;但是不能一切都說到,因為一切的東西,不是在一個講演中完全說得了的。";var spanLength = strMessage.Length / 10;for (int i = 0; i < 10; i++){var startIndex = spanLength * i;string spanMessage;if (i == 9){spanMessage = strMessage.Substring(startIndex);}else{spanMessage = strMessage.Substring(startIndex, spanLength - 1);}var strBytes = Encoding.UTF8.GetBytes(spanMessage);var byteString = ByteString.CopyFrom(strBytes, 0, strBytes.Length);await streamingFromClient.RequestStream.WriteAsync(new ServerFileRequest() { FileBytes = byteString });Task.Delay(1000).Wait();}await streamingFromClient.RequestStream.CompleteAsync();}}
- IConfiguration 接口在program類中進行注入,讀取appsettings.json文件
- 調用接口查看執行結果
- 客戶端流基礎模式
? ? ?2.客戶端流foreach模式
? ? ? ?3.文件流處理模式
?
- 客戶端流基礎模式
五、源碼地址
鏈接:https://pan.baidu.com/s/1PnLhysfGbVxC1ecpu7XReA?
提取碼:l6w0