callType 调用方式
说明
不同的场景我们会希望有不同的调用方式。
常见的有三种调用方式:
(1)sync 同步调用
(2)async 异步调用
(3)oneWay 单向调用
个人感觉(1)(3)是最常见的需求,所以本次优先实现了这两种。
实现思路
不同的调用方式只是处理的行为不同而已。
可以将这个配置传递,分别在 client/server 的端进行相应的处理。
callType 调用方式
接口定义
package com.github.houbb.rpc.client.support.calltype;
import com.github.houbb.rpc.client.proxy.ProxyContext;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
/**
* 调用方式上下文
* @author binbin.hou
* @since 0.1.0
*/
public interface CallTypeStrategy {
/**
* 获取结果
* @param proxyContext 代理上下文
* @param rpcRequest 请求信息
* @return 结果
* @since 0.1.0
*/
Object result(final ProxyContext proxyContext,
final RpcRequest rpcRequest);
}
oneway 实现
oneway 的实现非常简单,发起之后不关心响应结果。
一般是不重要的,追求性能的场景使用,比如日志通知。
package com.github.houbb.rpc.client.support.calltype.impl;
import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.proxy.ProxyContext;
import com.github.houbb.rpc.client.support.calltype.CallTypeStrategy;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses;
/**
* one way 调用服务实现类
*
* @author binbin.hou
* @since 0.1.0
*/
@ThreadSafe
class OneWayCallTypeStrategy implements CallTypeStrategy {
private static final Log LOG = LogFactory.getLog(OneWayCallTypeStrategy.class);
/**
* 实例
*
* @since 0.1.0
*/
private static final CallTypeStrategy INSTANCE = new OneWayCallTypeStrategy();
/**
* 获取实例
*
* @since 0.1.0
*/
public static CallTypeStrategy getInstance() {
return INSTANCE;
}
@Override
public Object result(ProxyContext proxyContext, RpcRequest rpcRequest) {
final String seqId = rpcRequest.seqId();
// 结果可以不是简单的 null,而是根据 result 类型处理,避免基本类型NPE。
RpcResponse rpcResponse = RpcResponses.result(null, rpcRequest.returnType());
LOG.info("[Client] call type is one way, seqId: {} set response to {}", seqId, rpcResponse);
// 获取结果
return RpcResponses.getResult(rpcResponse);
}
}
sync 同步获取结果
这个也是默认的策略。
package com.github.houbb.rpc.client.support.calltype.impl;
import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.rpc.client.proxy.ProxyContext;
import com.github.houbb.rpc.client.support.calltype.CallTypeStrategy;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses;
/**
* 同步调用服务实现类
* @author binbin.hou
* @since 0.1.0
*/
@ThreadSafe
class SyncCallTypeStrategy implements CallTypeStrategy {
/**
* 实例
* @since 0.1.0
*/
private static final CallTypeStrategy INSTANCE = new SyncCallTypeStrategy();
/**
* 获取实例
* @since 0.1.0
*/
public static CallTypeStrategy getInstance(){
return INSTANCE;
}
@Override
public Object result(ProxyContext proxyContext, RpcRequest rpcRequest) {
final String seqId = rpcRequest.seqId();
RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
return RpcResponses.getResult(rpcResponse);
}
}
策略工厂类 CallTypeStrategyFactory
为了让其使用更加方便,我们定义一下 CallTypeStrategyFactory。
实现
package com.github.houbb.rpc.client.support.calltype.impl;
import com.github.houbb.rpc.client.constant.enums.CallTypeEnum;
import com.github.houbb.rpc.client.support.calltype.CallTypeStrategy;
/**
* callType 策略工厂类
* @author binbin.hou
* @since 0.1.0
*/
public final class CallTypeStrategyFactory {
private CallTypeStrategyFactory(){}
/**
* 获取调用策略
* @param callTypeEnum 调用类型枚举
* @return 调用策略实现
* @since 0.1.0
*/
public static CallTypeStrategy callTypeStrategy(final CallTypeEnum callTypeEnum) {
switch (callTypeEnum) {
case SYNC:
return SyncCallTypeStrategy.getInstance();
case ONE_WAY:
return OneWayCallTypeStrategy.getInstance();
default:
throw new UnsupportedOperationException("Not support call type : " + callTypeEnum);
}
}
}
枚举
这里定义了我们内置的枚举类:
package com.github.houbb.rpc.client.constant.enums;
/**
* 调用方式枚举
* (1)调用方式,是一种非常固定的模式。所以使用枚举代替常量。
* (2)在 api 中使用常量,避免二者产生依赖。
* @author binbin.hou
* @since 0.1.0
*/
public enum CallTypeEnum {
/**
* 单向调用:不关心调用的结果
* @since 0.1.0
*/
ONE_WAY(1),
/**
* 同步调用:最常用的调用方式,关心结果
* @since 0.1.0
*/
SYNC(2),
/**
* 异步调用:性能更高的调用方式,异步获取结果
* @since 0.1.0
*/
ASYNC(3),
/**
* 回调方式:通过 callback 处理结果信息
* @since 0.1.0
*/
CALLBACK(4),
;
//toString
}
客户端
客户端的实现调整也比较简单:
/**
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultReferenceProxy<T> implements ReferenceProxy<T> {
// 不变
/**
* 反射调用
* @param proxy 代理
* @param method 方法
* @param args 参数
* @return 结果
* @throws Throwable 异常
* @since 0.0.6
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 不变
// 获取结果
CallTypeStrategy callTypeStrategy = CallTypeStrategyFactory.callTypeStrategy(callType);
return callTypeStrategy.result(proxyContext, rpcRequest);
}
// 不变
}
测试代码
注册中心
启动注册中心:
[INFO] [2021-10-05 17:47:55.090] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口
服务端
启动服务端:
[INFO] [2021-10-05 17:48:14.706] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口
[INFO] [2021-10-05 17:48:14.715] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527
客户端
sync
默认就是同步策略,此处不再赘述。
oneway
public static void main(String[] args) {
// 服务配置信息
ReferenceConfig<CalculatorService> config = ClientBs.newInstance();
config.serviceId(ServiceIdConst.CALC);
config.serviceInterface(CalculatorService.class);
// 自动发现服务
config.subscribe(true);
config.registerCenter(ServiceIdConst.REGISTER_CENTER);
config.callType(CallTypeEnum.ONE_WAY);
CalculatorService calculatorService = config.reference();
CalculateRequest request = new CalculateRequest();
request.setOne(10);
request.setTwo(20);
// 循环 10 次,验证负载均衡。
for(int i = 0; i < 10; i++) {
CalculateResponse response = calculatorService.sum(request);
System.out.println(response);
}
}
这里指定 callType 为 oneway,所有的结果都是 null。
小结
为了便于大家学习,以上源码已经开源:
我是老马,期待与你的下次重逢。