29 加餐:HTTP 协议 + JSON-RPC,Dubbo 跨语言就是如此简单 在前面课时介绍 Protocol 和 Invoker 实现时,我们重点介绍了 AbstractProtocol 以及 DubboInvoker 实现。其实,Protocol 还有一个实现分支是 AbstractProxyProtocol,如下图所示:

Lark20201103-162545.png

AbstractProxyProtocol 继承关系图

从图中我们可以看到:gRPC、HTTP、WebService、Hessian、Thrift 等协议对应的 Protocol 实现,都是继承自 AbstractProxyProtocol 抽象类。

目前互联网的技术栈百花齐放,很多公司会使用 Node.js、Python、Rails、Go 等语言来开发 一些 Web 端应用,同时又有很多服务会使用 Java 技术栈实现,这就出现了大量的跨语言调用的需求。Dubbo 作为一个 RPC 框架,自然也希望能实现这种跨语言的调用,目前 Dubbo 中使用“HTTP 协议 + JSON-RPC”的方式来达到这一目的,其中 HTTP 协议和 JSON 都是天然跨语言的标准,在各种语言中都有成熟的类库。

下面我们就重点来分析 Dubbo 对 HTTP 协议的支持。首先,我会介绍 JSON-RPC 的基础,并通过一个示例,帮助你快速入门,然后介绍 Dubbo 中 HttpProtocol 的具体实现,也就是如何将 HTTP 协议与 JSON-RPC 结合使用,实现跨语言调用的效果。

JSON-RPC

Dubbo 中支持的 HTTP 协议实际上使用的是 JSON-RPC 协议。

JSON-RPC 是基于 JSON 的跨语言远程调用协议。Dubbo 中的 dubbo-rpc-xml、dubbo-rpc-webservice 等模块支持的 XML-RPC、WebService 等协议与 JSON-RPC 一样,都是基于文本的协议,只不过 JSON 的格式比 XML、WebService 等格式更加简洁、紧凑。与 Dubbo 协议、Hessian 协议等二进制协议相比,JSON-RPC 更便于调试和实现,可见 JSON-RPC 协议还是一款非常优秀的远程调用协议。

在 Java 体系中,有很多成熟的 JSON-RPC 框架,例如 jsonrpc4j、jpoxy 等,其中,jsonrpc4j 本身体积小巧,使用方便,既可以独立使用,也可以与 Spring 无缝集合,非常适合基于 Spring 的项目。

下面我们先来看看 JSON-RPC 协议中请求的基本格式: { “id”:1, “method”:”sayHello”, “params”:[ “Dubbo json-rpc” ] }

JSON-RPC请求中各个字段的含义如下:

  • id 字段,用于唯一标识一次远程调用。
  • method 字段,指定了调用的方法名。
  • params 数组,表示方法传入的参数,如果方法无参数传入,则传入空数组。

在 JSON-RPC 的服务端收到调用请求之后,会查找到相应的方法并进行调用,然后将方法的返回值整理成如下格式,返回给客户端: { “id”:1, “result”:”Hello Dubbo json-rpc”, “error”:null }

JSON-RPC响应中各个字段的含义如下:

  • id 字段,用于唯一标识一次远程调用,该值与请求中的 id 字段值保持一致。
  • result 字段,记录了方法的返回值,若无返回值,则返回空;若调用错误,返回 null。
  • error 字段,表示调用发生异常时的异常信息,方法执行无异常时该字段为 null。

jsonrpc4j 基础使用

Dubbo 使用 jsonrpc4j 库来实现 JSON-RPC 协议,下面我们使用 jsonrpc4j 编写一个简单的 JSON-RPC 服务端示例程序和客户端示例程序,并通过这两个示例程序说明 jsonrpc4j 最基本的使用方式。

首先,我们需要创建服务端和客户端都需要的 domain 类以及服务接口。我们先来创建一个 User 类,作为最基础的数据对象: public class User implements Serializable { private int userId; private String name; private int age; // 省略上述字段的getter/setter方法以及toString()方法 }

接下来创建一个 UserService 接口作为服务接口,其中定义了 5 个方法,分别用来创建 User、查询 User 以及相关信息、删除 User:

public interface UserService { User createUser(int userId, String name, int age); User getUser(int userId); String getUserName(int userId); int getUserId(String name); void deleteAll(); }

UserServiceImpl 是 UserService 接口的实现类,其中使用一个 ArrayList 集合管理 User 对象,具体实现如下:

public class UserServiceImpl implements UserService { // 管理所有User对象 private List users = new ArrayList<>(); @Override public User createUser(int userId, String name, int age) { System.out.println("createUser method"); User user = new User(); user.setUserId(userId); user.setName(name); user.setAge(age); users.add(user); // 创建User对象并添加到users集合中 return user; } @Override public User getUser(int userId) { System.out.println("getUser method"); // 根据userId从users集合中查询对应的User对象 return users.stream().filter(u -> u.getUserId() == userId).findAny().get(); } @Override public String getUserName(int userId) { System.out.println("getUserName method"); // 根据userId从users集合中查询对应的User对象之后,获取该User的name return getUser(userId).getName(); } @Override public int getUserId(String name) { System.out.println("getUserId method"); // 根据name从users集合中查询对应的User对象,然后获取该User的id return users.stream().filter(u -> u.getName().equals(name)).findAny().get().getUserId(); } @Override public void deleteAll() { System.out.println("deleteAll"); users.clear(); // 清空users集合 } }

整个用户管理业务的核心大致如此。下面我们来看服务端如何将 UserService 与 JSON-RPC 关联起来。

首先,我们创建 RpcServlet 类,它是 HttpServlet 的子类,并覆盖了 HttpServlet 的 service() 方法。我们知道,HttpServlet 在收到 GET 和 POST 请求的时候,最终会调用其 service() 方法进行处理;HttpServlet 还会将 HTTP 请求和响应封装成 HttpServletRequest 和 HttpServletResponse 传入 service() 方法之中。这里的 RpcServlet 实现之中会创建一个 JsonRpcServer,并在 service() 方法中将 HTTP 请求委托给 JsonRpcServer 进行处理: public class RpcServlet extends HttpServlet { private JsonRpcServer rpcServer = null; public RpcServlet() { super(); // JsonRpcServer会按照json-rpc请求,调用UserServiceImpl中的方法 rpcServer = new JsonRpcServer(new UserServiceImpl(), UserService.class); } @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { rpcServer.handle(request, response); } }

最后,我们创建一个 JsonRpcServer 作为服务端的入口类,在其 main() 方法中会启动 Jetty 作为 Web 容器,具体实现如下:

public class JsonRpcServer { public static void main(String[] args) throws Throwable { // 服务器的监听端口 Server server = new Server(9999); // 关联一个已经存在的上下文 WebAppContext context = new WebAppContext(); // 设置描述符位置 context.setDescriptor(“/dubbo-demo/json-rpc-demo/src/main/webapp/WEB-INF/web.xml”); // 设置Web内容上下文路径 context.setResourceBase(“/dubbo-demo/json-rpc-demo/src/main/webapp”); // 设置上下文路径 context.setContextPath(“/”); context.setParentLoaderPriority(true); server.setHandler(context); server.start(); server.join(); } }

这里使用到的 web.xml 配置文件如下:

<?xml version=”1.0” encoding=”UTF-8”?> RpcServlet com.demo.RpcServlet RpcServlet /rpc

完成服务端的编写之后,下面我们再继续编写 JSON-RPC 的客户端。在 JsonRpcClient 中会创建 JsonRpcHttpClient,并通过 JsonRpcHttpClient 请求服务端:

public class JsonRpcClient { private static JsonRpcHttpClient rpcHttpClient; public static void main(String[] args) throws Throwable { // 创建JsonRpcHttpClient rpcHttpClient = new JsonRpcHttpClient(new URL(“http://127.0.0.1:9999/rpc”)); JsonRpcClient jsonRpcClient = new JsonRpcClient(); jsonRpcClient.deleteAll(); // 调用deleteAll()方法删除全部User // 调用createUser()方法创建User System.out.println(jsonRpcClient.createUser(1, “testName”, 30)); // 调用getUser()、getUserName()、getUserId()方法进行查询 System.out.println(jsonRpcClient.getUser(1)); System.out.println(jsonRpcClient.getUserName(1)); System.out.println(jsonRpcClient.getUserId(“testName”)); } public void deleteAll() throws Throwable { // 调用服务端的deleteAll()方法 rpcHttpClient.invoke(“deleteAll”, null); } public User createUser(int userId, String name, int age) throws Throwable { Object[] params = new Object[]{userId, name, age}; // 调用服务端的createUser()方法 return rpcHttpClient.invoke(“createUser”, params, User.class); } public User getUser(int userId) throws Throwable { Integer[] params = new Integer[]{userId}; // 调用服务端的getUser()方法 return rpcHttpClient.invoke(“getUser”, params, User.class); } public String getUserName(int userId) throws Throwable { Integer[] params = new Integer[]{userId}; // 调用服务端的getUserName()方法 return rpcHttpClient.invoke(“getUserName”, params, String.class); } public int getUserId(String name) throws Throwable { String[] params = new String[]{name}; // 调用服务端的getUserId()方法 return rpcHttpClient.invoke(“getUserId”, params, Integer.class); } } // 输出: // User{userId=1, name=’testName’, age=30} // User{userId=1, name=’testName’, age=30} // testName // 1

AbstractProxyProtocol

在 AbstractProxyProtocol 的 export() 方法中,首先会根据 URL 检查 exporterMap 缓存,如果查询失败,则会调用 ProxyFactory.getProxy() 方法将 Invoker 封装成业务接口的代理类,然后通过子类实现的 doExport() 方法启动底层的 ProxyProtocolServer,并初始化 serverMap 集合。具体实现如下: public Exporter export(final Invoker invoker) throws RpcException { // 首先查询exporterMap集合 final String uri = serviceKey(invoker.getUrl()); Exporter exporter = (Exporter) exporterMap.get(uri); if (exporter != null) { if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) { return exporter; } } // 通过ProxyFactory创建代理类,将Invoker封装成业务接口的代理类 final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl()); // doExport()方法返回的Runnable是一个回调,其中会销毁底层的Server,将会在unexport()方法中调用该Runnable exporter = new AbstractExporter(invoker) { public void unexport() { super.unexport(); exporterMap.remove(uri); if (runnable != null) { runnable.run(); } } }; exporterMap.put(uri, exporter); return exporter; }

在 HttpProtocol 的 doExport() 方法中,与前面介绍的 DubboProtocol 的实现类似,也要启动一个 RemotingServer。为了适配各种 HTTP 服务器,例如,Tomcat、Jetty 等,Dubbo 在 Transporter 层抽象出了一个 HttpServer 的接口。

Drawing 1.png

dubbo-remoting-http 模块位置

dubbo-remoting-http 模块的入口是 HttpBinder 接口,它被 @SPI 注解修饰,是一个扩展接口,有三个扩展实现,默认使用的是 JettyHttpBinder 实现,如下图所示:

Drawing 2.png

JettyHttpBinder 继承关系图

HttpBinder 接口中的 bind() 方法被 @Adaptive 注解修饰,会根据 URL 的 server 参数选择相应的 HttpBinder 扩展实现,不同 HttpBinder 实现返回相应的 HttpServer 实现。HttpServer 的继承关系如下图所示:

Drawing 3.png

HttpServer 继承关系图

这里我们以 JettyHttpServer 为例简单介绍 HttpServer 的实现,在 JettyHttpServer 中会初始化 Jetty Server,其中会配置 Jetty Server 使用到的线程池以及处理请求 Handler: public JettyHttpServer(URL url, final HttpHandler handler) { // 初始化AbstractHttpServer中的url字段和handler字段 super(url, handler); this.url = url; DispatcherServlet.addHttpHandler( // 添加HttpHandler url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler); // 创建线程池 int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); QueuedThreadPool threadPool = new QueuedThreadPool(); threadPool.setDaemon(true); threadPool.setMaxThreads(threads); threadPool.setMinThreads(threads); // 创建Jetty Server server = new Server(threadPool); // 创建ServerConnector,并指定绑定的ip和port ServerConnector connector = new ServerConnector(server); String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost()); if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) { connector.setHost(bindIp); } connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort())); server.addConnector(connector); // 创建ServletHandler并与Jetty Server关联,由DispatcherServlet处理全部的请求 ServletHandler servletHandler = new ServletHandler(); ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, “//*”); servletHolder.setInitOrder(2); // 创建ServletContextHandler并与Jetty Server关联 ServletContextHandler context = new ServletContextHandler(server, “/”, ServletContextHandler.SESSIONS); context.setServletHandler(servletHandler); ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext()); server.start(); }

我们可以看到 JettyHttpServer 收到的全部请求将委托给 DispatcherServlet 这个 HttpServlet 实现,而 DispatcherServlet 的 service() 方法会把请求委托给对应接端口的 HttpHandler 处理:

protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // 从HANDLERS集合中查询端口对应的HttpHandler对象 HttpHandler handler = HANDLERS.get(request.getLocalPort()); if (handler == null) { // 端口没有对应的HttpHandler实现 response.sendError(HttpServletResponse.SC_NOT_FOUND, “Service not found.”); } else { // 将请求委托给HttpHandler对象处理 handler.handle(request, response); } }

了解了 Dubbo 对 HttpServer 的抽象以及 JettyHttpServer 的核心之后,我们回到 HttpProtocol 中的 doExport() 方法继续分析。

在 HttpProtocol.doExport() 方法中会通过 HttpBinder 创建前面介绍的 HttpServer 对象,并记录到 serverMap 中用来接收 HTTP 请求。这里初始化 HttpServer 以及处理请求用到的 HttpHandler 是 HttpProtocol 中的内部类,在其他使用 HTTP 协议作为基础的 RPC 协议实现中也有类似的 HttpHandler 实现类,如下图所示:

Drawing 4.png

HttpHandler 继承关系图

在 HttpProtocol.InternalHandler 中的 handle() 实现中,会将请求委托给 skeletonMap 集合中记录的 JsonRpcServer 对象进行处理: public void handle(HttpServletRequest request, HttpServletResponse response) throws ServletException { String uri = request.getRequestURI(); JsonRpcServer skeleton = skeletonMap.get(uri); if (cors) { … // 处理跨域问题 } if (request.getMethod().equalsIgnoreCase(“OPTIONS”)) { response.setStatus(200); // 处理OPTIONS请求 } else if (request.getMethod().equalsIgnoreCase(“POST”)) { // 只处理POST请求 RpcContext.getContext().setRemoteAddress( request.getRemoteAddr(), request.getRemotePort()); skeleton.handle(request.getInputStream(), response.getOutputStream()); } else {// 其他Method类型的请求,例如,GET请求,直接返回500 response.setStatus(500); } }

skeletonMap 集合中的 JsonRpcServer 是与 HttpServer 对象一同在 doExport() 方法中初始化的。最后,我们来看 HttpProtocol.doExport() 方法的实现:

protected Runnable doExport(final T impl, Class type, URL url) throws RpcException { String addr = getAddr(url); // 先查询serverMap缓存 ProtocolServer protocolServer = serverMap.get(addr); if (protocolServer == null) { // 查询缓存失败 // 创建HttpServer,注意,传入的HttpHandler实现是InternalHandler RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false))); serverMap.put(addr, new ProxyProtocolServer(remotingServer)); } // 创建JsonRpcServer对象,并将URL与JsonRpcServer的映射关系记录到skeletonMap集合中 final String path = url.getAbsolutePath(); final String genericPath = path + "/" + GENERIC_KEY; JsonRpcServer skeleton = new JsonRpcServer(impl, type); JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class); skeletonMap.put(path, skeleton); skeletonMap.put(genericPath, genericServer); return () -> {// 返回Runnable回调,在Exporter中的unexport()方法中执行 skeletonMap.remove(path); skeletonMap.remove(genericPath); }; }

介绍完 HttpProtocol 暴露服务的相关实现之后,下面我们再来看 HttpProtocol 中引用服务相关的方法实现,即 protocolBindinRefer() 方法实现。该方法首先通过 doRefer() 方法创建业务接口的代理,这里会使用到 jsonrpc4j 库中的 JsonProxyFactoryBean 与 Spring 进行集成,在其 afterPropertiesSet() 方法中会创建 JsonRpcHttpClient 对象:

public void afterPropertiesSet() { … … // 省略ObjectMapper等对象 try { // 创建JsonRpcHttpClient,用于后续发送json-rpc请求 jsonRpcHttpClient = new JsonRpcHttpClient(objectMapper, new URL(getServiceUrl()), extraHttpHeaders); jsonRpcHttpClient.setRequestListener(requestListener); jsonRpcHttpClient.setSslContext(sslContext); jsonRpcHttpClient.setHostNameVerifier(hostNameVerifier); } catch (MalformedURLException mue) { throw new RuntimeException(mue); } }

下面来看 doRefer() 方法的具体实现:

protected T doRefer(final Class serviceType, URL url) throws RpcException { final String generic = url.getParameter(GENERIC_KEY); final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean(); ... // 省略其他初始化逻辑 jsonProxyFactoryBean.afterPropertiesSet(); // 返回的是serviceType类型的代理对象 return (T) jsonProxyFactoryBean.getObject(); }

在 AbstractProxyProtocol.protocolBindingRefer() 方法中,会通过 ProxyFactory.getInvoker() 方法将 doRefer() 方法返回的代理对象转换成 Invoker 对象,并记录到 Invokers 集合中,具体实现如下:

protected Invoker protocolBindingRefer(final Class type, final URL url) throws RpcException { final Invoker target = proxyFactory.getInvoker(doRefer(type, url), type, url); Invoker invoker = new AbstractInvoker(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { Result result = target.invoke(invocation); // 省略处理异常的逻辑 return result; } }; invokers.add(invoker); // 将Invoker添加到invokers集合中 return invoker; }

总结

本课时重点介绍了在 Dubbo 中如何通过“HTTP 协议 + JSON-RPC”的方案实现跨语言调用。首先我们介绍了 JSON-RPC 中请求和响应的基本格式,以及其实现库 jsonrpc4j 的基本使用;接下来我们还详细介绍了 Dubbo 中 AbstractProxyProtocol、HttpProtocol 等核心类,剖析了 Dubbo 中“HTTP 协议 + JSON-RPC”方案的落地实现。

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/Dubbo%e6%ba%90%e7%a0%81%e8%a7%a3%e8%af%bb%e4%b8%8e%e5%ae%9e%e6%88%98-%e5%ae%8c/29%20%20%e5%8a%a0%e9%a4%90%ef%bc%9aHTTP%20%e5%8d%8f%e8%ae%ae%20+%20JSON-RPC%ef%bc%8cDubbo%20%e8%b7%a8%e8%af%ad%e8%a8%80%e5%b0%b1%e6%98%af%e5%a6%82%e6%ad%a4%e7%ae%80%e5%8d%95.md