RPC框架稱為遠(yuǎn)程調(diào)用框架,其實(shí)現(xiàn)的核心原理就是消費(fèi)者端使用動態(tài)代理來代理一個接口的方法(基于JDK的動態(tài)代理,當(dāng)然如果使用CGLib可以直接使用無接口類的方法),通過加入網(wǎng)絡(luò)傳輸編程,傳輸調(diào)用接口方法名稱,方法參數(shù)來給提供者獲取,再通過反射,來執(zhí)行該接口的方法,再將反射執(zhí)行的結(jié)果通過網(wǎng)絡(luò)編程傳回消費(fèi)者端。
現(xiàn)在我們來依次實(shí)現(xiàn)這些概念。這里我們做最簡單的實(shí)現(xiàn),網(wǎng)絡(luò)編程使用的是BIO,大家可以使用Reactor模式的Netty來改寫性能更好的方式。而網(wǎng)絡(luò)傳輸中使用的序列化和反序列化也是Java自帶的,當(dāng)然這樣的傳輸字節(jié)比較大,可以使用google的protoBuffer或者kryo來處理。這里只為了方便說明原理。
pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.guanjian</groupId> <artifactId>rpc-framework</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
首先當(dāng)然是我們要進(jìn)行遠(yuǎn)程調(diào)用的接口以及接口的方法。
public interface HelloService { String sayHello(String content);}
接口實(shí)現(xiàn)類
public class HelloServiceImpl implements HelloService { public String sayHello(String content) { return "hello," + content; } }
消費(fèi)者端的動態(tài)代理,如果你是把提供者和消費(fèi)者寫在兩個工程中,則提供者端需要上面的接口和實(shí)現(xiàn)類,而消費(fèi)者端只需要上面的接口。
public class ConsumerProxy { /** * 消費(fèi)者端的動態(tài)代理 * @param interfaceClass 代理的接口類 * @param host 遠(yuǎn)程主機(jī)IP * @param port 遠(yuǎn)程主機(jī)端口 * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, (proxy,method,args) -> { //創(chuàng)建一個客戶端套接字 Socket socket = new Socket(host, port); try { //創(chuàng)建一個對外傳輸?shù)膶ο罅?,綁定套接? ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //將動態(tài)代理的方法名寫入對外傳輸?shù)膶ο罅髦? output.writeUTF(method.getName()); //將動態(tài)代理的方法的參數(shù)寫入對外傳輸?shù)膶ο罅髦? output.writeObject(args); //創(chuàng)建一個對內(nèi)傳輸?shù)膶ο罅?,綁定套接? //這里是為了獲取提供者端傳回的結(jié)果 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //從對內(nèi)傳輸?shù)膶ο罅髦蝎@取結(jié)果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } ); } }
有關(guān)JDK動態(tài)代理的內(nèi)容可以參考AOP原理與自實(shí)現(xiàn) ,BIO的部分可以參考傳統(tǒng)IO與NIO比較
提供者端的網(wǎng)絡(luò)傳輸和遠(yuǎn)程方式調(diào)用服務(wù)
public class ProviderReflect { private static final ExecutorService executorService = Executors.newCachedThreadPool(); /** * RPC監(jiān)聽和遠(yuǎn)程方法調(diào)用 * @param service RPC遠(yuǎn)程方法調(diào)用的接口實(shí)例 * @param port 監(jiān)聽的端口 * @throws Exception */ public static void provider(final Object service,int port) throws Exception { //創(chuàng)建服務(wù)端的套接字,綁定端口port ServerSocket serverSocket = new ServerSocket(port); while (true) { //開始接收客戶端的消息,并以此創(chuàng)建套接字 final Socket socket = serverSocket.accept(); //多線程執(zhí)行,這里的問題是連接數(shù)過大,線程池的線程數(shù)會耗盡 executorService.execute(() -> { try { //創(chuàng)建呢一個對內(nèi)傳輸?shù)膶ο罅?,并綁定套接? ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { try { //從對象流中讀取接口方法的方法名 String methodName = input.readUTF(); //從對象流中讀取接口方法的所有參數(shù) Object[] args = (Object[]) input.readObject(); Class[] argsTypes = new Class[args.length]; for (int i = 0;i < args.length;i++) { argsTypes[i] = args[i].getClass(); } //創(chuàng)建一個對外傳輸?shù)膶ο罅?,并綁定套接? //這里是為了將反射執(zhí)行結(jié)果傳遞回消費(fèi)者端 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { Class<?>[] interfaces = service.getClass().getInterfaces(); Method method = null; for (int i = 0;i < interfaces.length;i++) { method = interfaces[i].getDeclaredMethod(methodName,argsTypes); if (method != null) { break; } } Object result = method.invoke(service, args); //將反射執(zhí)行結(jié)果寫入對外傳輸?shù)膶ο罅髦? output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } catch (Exception e) { e.printStackTrace(); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } }); } } }
啟動提供者端的網(wǎng)絡(luò)偵聽和遠(yuǎn)程調(diào)用
public class RPCProviderMain { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); ProviderReflect.provider(service,8083); } }
啟動消費(fèi)者的動態(tài)代理調(diào)用
public class RPCConsumerMain { public static void main(String[] args) throws InterruptedException { HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083); for (int i = 0;i < 1000;i++) { String hello = service.sayHello("你好_" + i); System.out.println(hello); Thread.sleep(1000); } } }
運(yùn)行結(jié)果
hello,你好_0
hello,你好_1
hello,你好_2
hello,你好_3
hello,你好_4
hello,你好_5
…..
如果你要擴(kuò)展成一個Netty+ProtoBuffer的高性能RPC框架可以參考Netty整合Protobuffer 的相關(guān)寫法。
推薦教程:《PHP》