一、 前置知識
1. 反射
獲取字節碼的三種方式
Class.forName("全類名")
(全類名,即包名+類名)類名.class
對象.getClass()
(任意對象都可調用,因為該方法來自Object
類)
獲取成員方法
Method getMethod(String name,Class<?>...parameterTypes)
參數為:方法名,參數類型<可變>
執行成員方法
Object invoke(Object obj, Object ... args)
參數 1:哪個對象來調用該方法
參數 2 :傳入的實參
2. 動態代理
具體代碼實現參考下方的"代碼實現"部分
動態代理實現流程:
- 創建一個類來實現
InvocationHandler
接口(重寫invoke
方法) - 調用
Proxy.newProxyInstance()
來創建代理類(需要將第一步創建的類作為參數傳進來) - 通過代理類來調用方法
二、上一個版本中的問題&解決思路
**問題 1:**服務端當前僅支持一個服務,當提供多個服務時,客戶端如何指定要調用哪個服務?(當前客戶端發送請求數據時,而只會傻乎乎地發送數據,而無法指定具體調用哪個接口)
當服務端提供多個服務時,客戶端需要指定調用的服務名稱。
創建一個請求對象類Request
,其中包含的成員屬性有:接口名稱、要調用的方法名稱、傳遞的參數數據,服務端利用這些信息,來使用反射調用相應的服務
**問題 2:**當前服務端中的方法返回類型是固定的,但是當有不同的方法時,返回數據類型可能不同,如果客戶端要處理這些不同類型的數據,就需要提前知道服務端返回的數據類型。但這顯然會違背解耦的原則,降低靈活性,而且不便于后續維護和擴展
引入統一的響應格式——將返回數據封裝到一個公共類型中
創建一個響應對象類Response
,其中包含的成員屬性有:狀態碼、狀態描述、響應數據(這種思想在 javaweb 開發也經常使用)
**問題 3:**在上個版本中,客戶端和目標主機建立連接時,采用了硬編碼,不夠優雅
**問題 4:**如果仍然采用上個版本的客戶端代碼,那么代碼耦合性較高(建立連接、發送請求的代碼、接收響應、處理響應結果都寫在了一起)
針對問題 3、4,將這些問題抽象了出來,建立一個 IOClient 類,專門建立連接、發送請求、接收響應。
三、本版本目標
- 將請求、響應的數據各自封裝到一個公共類中,這樣在請求和響應時就能進行統一,便于后續代碼的書寫和維護
- 服務端采用循環+BIO的形式,當接收到請求對象時,利用反射調用對應方法,并將執行結果發送給客戶端
- 將建立連接、發送數據、接收數據的過程封裝到專用組件中(會將請求參數封裝到一個請求對象中(包含方法、參數類型、參數列表等))
- 利用動態代理,攔截所有對接口方法的調用,將其轉換為 RPC 請求
四、代碼實現
服務端
server/Server
package com.chanlee.crpc.v1.server;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
/*** 服務端代碼*/
public class Server {private static final int SERVER_PORT = 8005;public static void main(String[] args) {UserServiceImpl userService = new UserServiceImpl();try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT)){System.out.println("服務器已啟動...");while(true){Socket socket = serverSocket.accept();//接收到連接請求后,啟動一個新線程去處理任務new Thread(() -> {try {// 獲取輸入、輸出流ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInput = new ObjectInputStream(socket.getInputStream());//讀取接收到的請求RpcRequestDTO request = (RpcRequestDTO) objectInput.readObject();//利用反射調用對應方法Method method = userService.getClass().getMethod(request.getMethod(), request.getParamsTypes());Object invoke = method.invoke(userService, request.getParams());//將調用結果進行封裝objectOutput.writeObject(RpcResponseDTO.success(invoke));//及時傳遞消息objectOutput.flush();} catch (Exception e) {e.printStackTrace();}}).start();}} catch (IOException e) {System.out.println("服務器啟動失敗...");}}
}
server/UserServiceImpl
package com.chanlee.crpc.v1.server;import com.chanlee.crpc.v1.common.User;
import com.chanlee.crpc.v1.service.UserService;import java.util.Random;
import java.util.UUID;/*** 服務端接口實現類*/
public class UserServiceImpl implements UserService {public User getUserById(int id) {System.out.println("客戶端調用id 為 " + id + " 的用戶");Random random = new Random();User user = User.builder().id(id).realName(UUID.randomUUID().toString()).age(random.nextInt(50)).build();return user;}public Integer insertUser(User user) {System.out.println("插入用戶成功:" + user);return 1;}
}
客戶端
client/Client
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.User;
import com.chanlee.crpc.v1.service.UserService;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;/*** 客戶端主代碼*/
public class Client{public static void main(String[] args){ClientProxy clientProxy = new ClientProxy("127.0.0.1", 8005);UserService proxy = clientProxy.getProxy(UserService.class);//調用方法 1User user = proxy.getUserById(1);System.out.println("對應的user為:" + user);//調用方法 2User codingBoy = User.builder().age(25).id(32).realName("coding boy").build();Integer i = proxy.insertUser(codingBoy);System.out.println("向服務端插入的 user的Id為:" + i);}
}
client/Proxy
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;
import lombok.AllArgsConstructor;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;import static com.chanlee.crpc.v1.client.IOClient.sendRequest;/*** 客戶端代理類*/
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {/*** 服務端 IP*/private String host;/*** 服務端端口號*/private int port;public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//構建request請求RpcRequestDTO request = RpcRequestDTO.builder().interfaceName(method.getDeclaringClass().getName()).method(method.getName()).paramsTypes(method.getParameterTypes()).params(args).build();//發送請求并獲取響應RpcResponseDTO<Object> response = sendRequest(host, port, request);//返回結果數據return response.getData();}public <T> T getProxy(Class<T> tClass){Object o = Proxy.newProxyInstance(tClass.getClassLoader(),new Class[]{tClass},this);return (T)o;}
}
client/IOClient
package com.chanlee.crpc.v1.client;import com.chanlee.crpc.v1.common.RpcRequestDTO;
import com.chanlee.crpc.v1.common.RpcResponseDTO;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;/*** 客戶端 IO 組件*/
@Slf4j
public class IOClient implements Serializable {public static <T> RpcResponseDTO<T> sendRequest(String host, int port, RpcRequestDTO request){//和服務器建立連接try {Socket socket = new Socket(host, port);// 獲取輸入流和輸出流ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInput = new ObjectInputStream(socket.getInputStream());//發送請求objectOutput.writeObject(request);objectOutput.flush();//接收結果RpcResponseDTO<T> response = (RpcResponseDTO<T>) objectInput.readObject();//關閉連接socket.close();//返回結果return response;} catch (IOException e) {log.error("和服務器建立連接失敗: {}", e);throw new RuntimeException(e);} catch (ClassNotFoundException e) {log.error("接收結果失敗: {}", e);throw new RuntimeException(e);}}
}
服務層
service/UserService
package com.chanlee.crpc.v1.service;import com.chanlee.crpc.v1.common.User;/*** 服務端接口*/
public interface UserService {/*** 根據id獲取用戶信息* @param id* @return*/User getUserById(int id);/*** 插入用戶信息* @param user* @return*/Integer insertUser(User user);
}
公共類
convention/BaseErrorCode
package com.chanlee.crpc.v1.common.convention;/*** 基礎錯誤碼*/
public enum BaseErrorCode implements ErrorCode {SERVER_ERROR("A000001", "服務端內部錯誤");private final String code;private final String message;BaseErrorCode(String code, String message) {this.code = code;this.message = message;}@Overridepublic String code() {return code;}@Overridepublic String message() {return message;}
}
convention/ErrorCode
package com.chanlee.crpc.v1.common.convention;/*** 錯誤碼接口*/
public interface ErrorCode {/*** 錯誤碼*/String code();/*** 錯誤信息*/String message();
}
commom/RpcRequestDTO
package com.chanlee.crpc.v1.common;import lombok.Builder;
import lombok.Data;import java.io.Serializable;/*** 請求對象體*/
@Builder
@Data
public class RpcRequestDTO implements Serializable {/*** 接口名*/private String interfaceName;/*** 方法名*/private String method;/*** 參數*/private Object[] params;/*** 參數類型*/private Class<?>[] paramsTypes;
}
Common/RpcRespDTO
package com.chanlee.crpc.v1.common;import com.chanlee.crpc.v1.common.convention.BaseErrorCode;
import lombok.Data;
import lombok.experimental.Accessors;import java.io.Serializable;/*** 響應對象體*/
@Data
@Accessors(chain = true)
public class RpcResponseDTO<T> implements Serializable {/*** 正確返回碼*/public static final String SUCCESS_CODE = "200";/*** 返回碼*/private String code;/*** 返回消息*/private String message;/*** 響應數據*/private T data;public static RpcResponseDTO<Void> success(){return new RpcResponseDTO<Void>().setCode(SUCCESS_CODE);}public static <T> RpcResponseDTO<T> success(T data){return new RpcResponseDTO<T>().setCode(SUCCESS_CODE).setData(data);}public static RpcResponseDTO<Void> failure(){return new RpcResponseDTO<Void>().setMessage(BaseErrorCode.SERVER_ERROR.code()).setCode(BaseErrorCode.SERVER_ERROR.message());}
}
common/User
package com.chanlee.crpc.v1.common;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;
/*** 用戶類*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {/*** 用戶id*/Integer id;/*** 用戶真實姓名*/String realName;/*** 用戶年齡*/Integer age;
}