generic 泛化调用

说明

泛化接口调用方式主要用于客户端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map 表示,通常用于框架集成,比如:实现一个通用的服务测试框架,可通过 GenericService 调用所有服务实现。

  [java]
1
2
GenericService barService = (GenericService) applicationContext.getBean("barService"); Object result = barService.$invoke("sayHello", new String[] { "java.lang.String" }, new Object[] { "World" });

实现思路

客户端

泛化调用个人感受不是很深,但是有一点,当没有服务端接口的时候,也就无法通过反射获取对应的方法等原始信息。

所以需要额外提供一个接口,并且可以获取方法的相关属性。

服务端

本次基本没做处理。

个人理解是客户使用的时候自行定义实现类。

代码实现

接口定义

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.github.houbb.rpc.common.support.generic; import com.github.houbb.rpc.common.exception.GenericException; /** * 泛化调用接口 * (1)接口直接使用 dubbo 的接口 * * * 【应用场景】 * 泛接口实现方式主要用于服务器端没有API接口及模型类元的情况,参数及返回值中的所有POJO均用Map表示,通常用于框架集成,比如:实现一个通用的远程服务Mock框架,可通过实现GenericService接口处理所有服务请求。 * * 【服务端】 * 服务端代码不需要做任何调整。 * 客户端泛化调用进行相关调整即可。 * * 【客户端】 * * @author binbin.hou * @since 0.1.2 */ public interface GenericService { /** * Generic invocation * * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws GenericException potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; }

实现

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.github.houbb.rpc.client.proxy.impl; import com.github.houbb.heaven.util.guava.Guavas; import com.github.houbb.heaven.util.id.impl.Ids; import com.github.houbb.heaven.util.time.impl.Times; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.proxy.RemoteInvokeService; import com.github.houbb.rpc.client.proxy.ServiceContext; import com.github.houbb.rpc.common.exception.GenericException; import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest; import com.github.houbb.rpc.common.support.generic.GenericService; import java.util.List; /** * 泛化调用 * @author binbin.hou * @since 0.1.2 */ public class GenericReferenceProxy implements GenericService { private static final Log LOG = LogFactory.getLog(GenericReferenceProxy.class); /** * 代理上下文 * (1)这个信息不应该被修改,应该和指定的 service 紧密关联。 * @since 0.1.3 */ private final ServiceContext proxyContext; /** * 远程调用接口 * @since 0.1.3 */ private final RemoteInvokeService remoteInvokeService; public GenericReferenceProxy(ServiceContext proxyContext, RemoteInvokeService remoteInvokeService) { this.proxyContext = proxyContext; this.remoteInvokeService = remoteInvokeService; } @Override @SuppressWarnings("unchecked") public Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException { // 构建基本调用参数 final long createTime = Times.systemTime(); Object[] actualArgs = new Object[]{method, parameterTypes, args}; DefaultRpcRequest rpcRequest = new DefaultRpcRequest(); rpcRequest.serviceId(proxyContext.serviceId()); rpcRequest.createTime(createTime); rpcRequest.paramValues(actualArgs); List<String> paramTypeNames = Guavas.newArrayList(); paramTypeNames.add("java.lang.String"); paramTypeNames.add("[Ljava.lang.String;"); paramTypeNames.add("[Ljava.lang.Object;"); rpcRequest.paramTypeNames(paramTypeNames); rpcRequest.methodName("$invoke"); rpcRequest.returnType(Object.class); //proxyContext 中应该是属于当前 service 的对应信息。 // 每一次调用,对应的 invoke 信息应该是不通的,需要创建新的对象去传递信息 // rpcRequest 因为要涉及到网络间传输,尽可能保证其简洁性。 DefaultRemoteInvokeContext context = new DefaultRemoteInvokeContext(); context.request(rpcRequest); context.traceId(Ids.uuid32()); context.retryTimes(2); context.serviceProxyContext(proxyContext); context.remoteInvokeService(remoteInvokeService); //3. 执行远程调用 return remoteInvokeService.remoteInvoke(context); } }

客户端调整

实现了上面的泛化调用之后,我们把这个特性可以让客户端指定配置。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/** * 引用配置类 * @param <T> 接口泛型 * @author binbin.hou * @since 0.0.6 */ public class ClientBs<T> implements ReferenceConfig<T> { // 保持不变 /** * 是否进行泛化调用 * @since 0.1.2 */ private boolean generic; // 不变 /** * 获取对应的引用实现 * (1)处理所有的反射代理信息-方法可以抽离,启动各自独立即可。 * (2)启动对应的长连接 * * @return 引用代理类 * @since 0.0.6 */ @Override @SuppressWarnings("unchecked") public T reference() { // 1. 启动 client 端到 server 端的连接信息 // 1.1 为了提升性能,可以将所有的 client=>server 的连接都调整为一个 thread。 // 1.2 初期为了简单,直接使用同步循环的方式。 // 获取地址列表信息 List<RpcAddress> rpcAddressList = this.getRpcAddresses(); //2. 循环链接 List<RpcChannelFuture> channelFutureList = ChannelHandlers.channelFutureList(rpcAddressList, new ChannelHandlerFactory() { @Override public ChannelHandler handler() { final ChannelHandler channelHandler = new RpcClientHandler(invokeService); return ChannelHandlers.objectCodecHandler(channelHandler); } }); //3. 接口动态代理 ServiceContext<T> proxyContext = buildServiceProxyContext(channelFutureList); //3.1 动态代理 //3.2 为了提升性能,可以使用 javaassit 等基于字节码的技术 if(!this.generic) { ReferenceProxy<T> referenceProxy = new DefaultReferenceProxy<>(proxyContext, remoteInvokeService); return referenceProxy.proxy(); } else { LOG.info("[Client] generic reference proxy created."); return (T) new GenericReferenceProxy(proxyContext, remoteInvokeService); } } // 不变 @Override public ClientBs<T> generic(boolean generic) { this.generic = generic; return this; } // 不变 /** * 构建调用上下文 * * @param channelFutureList 信息列表 * @return 引用代理上下文 * @since 0.0.6 */ private ServiceContext<T> buildServiceProxyContext(final List<RpcChannelFuture> channelFutureList) { DefaultServiceContext<T> proxyContext = new DefaultServiceContext<>(); proxyContext.serviceId(this.serviceId); proxyContext.serviceInterface(this.serviceInterface); proxyContext.channelFutures(channelFutureList); proxyContext.invokeService(this.invokeService); proxyContext.timeout(this.timeout); proxyContext.callType(this.callType); proxyContext.failType(this.failType); proxyContext.generic(this.generic); return proxyContext; } }

当用户指定 generic 调用的时候,我们就是用定义好的 GenericReferenceProxy。

测试

注册中心

启动

  [plaintext]
1
[INFO] [2021-10-06 07:49:54.559] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口

服务端

  [java]
1
2
3
4
5
// 启动服务 ServiceBs.getInstance() .register(ServiceIdConst.GENERIC, new FooGenericService()) .registerCenter(ServiceIdConst.REGISTER_CENTER) .expose();

其中 FooGenericService 实现如下:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
public final class FooGenericService implements GenericService { private static final Log LOG = LogFactory.getLog(FooGenericService.class); @Override public Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException { LOG.info("[Generic] method: {}", method); LOG.info("[Generic] parameterTypes: {}", Arrays.toString(parameterTypes)); LOG.info("[Generic] args: {}", args); return null; } }

日志:

  [plaintext]
1
2
[INFO] [2021-10-06 07:50:19.718] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口 [INFO] [2021-10-06 07:50:19.721] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527

客户端

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) { // 服务配置信息 ReferenceConfig<GenericService> config = ClientBs.newInstance(); config.serviceId(ServiceIdConst.GENERIC); config.serviceInterface(GenericService.class); config.subscribe(true); config.registerCenter(ServiceIdConst.REGISTER_CENTER); config.generic(true); GenericService genericService = config.reference(); genericService.$invoke("hello", new String[]{"name"}, new Object[]{"123"}); }

此时服务端对应的日志:

  [plaintext]
1
2
3
4
5
6
7
[INFO] [2021-10-06 07:52:41.467] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00000ec8-00000002-12c0fc9a5e1fa6f7-c2e7b9a7 [INFO] [2021-10-06 07:52:41.506] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffe360988-00000ec8-00000002-12c0fc9a5e1fa6f7-c2e7b9a7 [INFO] [2021-10-06 07:52:41.507] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00000ec8-00000002-12c0fc9a5e1fa6f7-c2e7b9a7 request: DefaultRpcRequest{seqId='11a2832af424407abc2fe8463743ed06', createTime=1633477961464, serviceId='generic', methodName='$invoke', paramTypeNames=[java.lang.String, [Ljava.lang.String;, [Ljava.lang.Object;], paramValues=[hello, [Ljava.lang.String;@2ae9eb75, [Ljava.lang.Object;@48a03a3], returnType=class java.lang.Object} [INFO] [2021-10-06 07:52:41.509] [nioEventLoopGroup-2-1] [c.g.h.r.c.s.g.i.FooGenericService.$invoke] - [Generic] method: hello [INFO] [2021-10-06 07:52:41.510] [nioEventLoopGroup-2-1] [c.g.h.r.c.s.g.i.FooGenericService.$invoke] - [Generic] parameterTypes: [name] [INFO] [2021-10-06 07:52:41.510] [nioEventLoopGroup-2-1] [c.g.h.r.c.s.g.i.FooGenericService.$invoke] - [Generic] args: 123 [INFO] [2021-10-06 07:52:41.512] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00000ec8-00000002-12c0fc9a5e1fa6f7-c2e7b9a7 response DefaultRpcResponse{seqId='11a2832af424407abc2fe8463743ed06', error=null, result=null}

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。