load balance

当我们有多个服务端时,就需要负载均衡进行选择。

策略

负载均衡的策略有很多,比如随机选择,权重选择,最小负载等等。

实现思路

直接将所有可以选择的服务端列举出来,通过实现对应的策略,选择一个即可。

代码实现

接口

为了便于拓展,我们定义一个接口。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* * Copyright (c) 2019. houbinbin Inc. * rpc All rights reserved. */ package com.github.houbb.rpc.common.rpc.filter; /** * <p> 调用上下文 </p> * * <pre> Created: 2019/10/26 9:30 上午 </pre> * <pre> Project: rpc </pre> * * 核心目的: * (1)用于定义 filter 相关信息 * (2)用于 load-balance 相关信息处理 * (3)后期的路由-分区 都可以视为这样的一个抽象实现而已。 * * 插件式实现: * (1)远程调用也认为是一次 filter,上下文中构建 filter-chain * (2)filter-chain 可以使用 {@link com.github.houbb.heaven.support.pipeline.Pipeline} 管理 * * * 后期拓展: * (1)类似于 aop,用户可以自行定义 interceptor 拦截器 * * @author houbinbin * @since 0.0.9 */ public interface RpcFilter { /** * filter 处理 * @param rpcFilterContext 调用上下文 * @since 0.0.9 */ void filter(final RpcFilterContext rpcFilterContext); }

随机选择

这里实现了负载均衡中最简单的实现,随机选择。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.github.houbb.rpc.client.filter.balance; import com.github.houbb.heaven.annotation.ThreadSafe; import com.github.houbb.rpc.common.rpc.domain.RpcChannelFuture; import com.github.houbb.rpc.common.rpc.filter.RpcFilter; import com.github.houbb.rpc.common.rpc.filter.RpcFilterContext; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; /** * 随机负载均衡过滤器 * 参考:https://www.cnblogs.com/xwdreamer/archive/2012/06/13/2547426.html * * @author binbin.hou * @since 0.0.9 */ @ThreadSafe public class RandomBalanceFilter implements RpcFilter { @Override public void filter(RpcFilterContext rpcFilterContext) { List<RpcChannelFuture> channelFutures = rpcFilterContext.channelFutures(); final int size = channelFutures.size(); Random random = ThreadLocalRandom.current(); int index = random.nextInt(size); rpcFilterContext.channelFuture(channelFutures.get(index)); } }

DefaultReferenceProxy 代理实现的调整

调用服务端的时候,统一进行 filter 处理即可。

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.github.houbb.rpc.client.proxy.impl; import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil; import com.github.houbb.log.integration.core.Log; import com.github.houbb.log.integration.core.LogFactory; import com.github.houbb.rpc.client.filter.balance.RandomBalanceFilter; import com.github.houbb.rpc.client.invoke.InvokeService; import com.github.houbb.rpc.client.proxy.ProxyContext; import com.github.houbb.rpc.client.proxy.ReferenceProxy; import com.github.houbb.rpc.common.rpc.domain.RpcRequest; import com.github.houbb.rpc.common.rpc.domain.RpcResponse; import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest; import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses; import com.github.houbb.rpc.common.rpc.filter.RpcFilter; import com.github.houbb.rpc.common.rpc.filter.RpcFilterContext; import com.github.houbb.rpc.common.rpc.filter.impl.DefaultRpcFilterContext; import com.github.houbb.rpc.common.support.id.impl.Ids; import com.github.houbb.rpc.common.support.time.impl.Times; import io.netty.channel.Channel; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * 参考:https://blog.csdn.net/u012240455/article/details/79210250 * * (1)方法执行并不需要一定要有实现类。 * (2)直接根据反射即可处理相关信息。 * (3)rpc 是一种强制根据接口进行编程的实现方式。 * @author binbin.hou * @since 0.0.6 */ public class DefaultReferenceProxy<T> implements ReferenceProxy<T> { private static final Log LOG = LogFactory.getLog(DefaultReferenceProxy.class); /** * 服务标识 * @since 0.0.6 */ private final ProxyContext<T> proxyContext; public DefaultReferenceProxy(ProxyContext<T> proxyContext) { this.proxyContext = proxyContext; } /** * 反射调用 * @param proxy 代理 * @param method 方法 * @param args 参数 * @return 结果 * @throws Throwable 异常 * @since 0.0.6 */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 反射信息处理成为 rpcRequest // 和以前一致 // 这里使用 load-balance 进行选择 channel 写入。 // 构建 filter 相关信息,结合 pipeline 进行整合 final RpcFilterContext rpcFilterContext = buildRpcFilterContext(rpcRequest); this.doFilter(rpcFilterContext); final Channel channel = rpcFilterContext.channel(); LOG.info("[Client] start call channel id: {}", channel.id().asLongText()); // 对于信息的写入,实际上有着严格的要求。 // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。 // 支持的必须是 ByteBuf channel.writeAndFlush(rpcRequest).syncUninterruptibly(); LOG.info("[Client] start call remote with request: {}", rpcRequest); final InvokeService invokeService = proxyContext.invokeService(); invokeService.addRequest(seqId, proxyContext.timeout()); // 获取结果 RpcResponse rpcResponse = invokeService.getResponse(seqId); return RpcResponses.getResult(rpcResponse); } @Override @SuppressWarnings("unchecked") public T proxy() { final Class<T> interfaceClass = proxyContext.serviceInterface(); ClassLoader classLoader = interfaceClass.getClassLoader(); Class<?>[] interfaces = new Class[]{interfaceClass}; return (T) Proxy.newProxyInstance(classLoader, interfaces, this); } /** * 执行过滤 * @param context 上下文 * @since 0.0.9 */ private void doFilter(final RpcFilterContext context) { RpcFilter rpcFilter = new RandomBalanceFilter(); rpcFilter.filter(context); } /** * 构建 rpc 过滤上下文 * @return 上下文信息 * @since 0.0.9 */ private RpcFilterContext buildRpcFilterContext(final RpcRequest rpcRequest) { DefaultRpcFilterContext context = new DefaultRpcFilterContext(); context.request(rpcRequest); context.timeout(proxyContext.timeout()); context.channelFutures(proxyContext.channelFutures()); return context; } }

测试代码

注册中心

启动注册中心。

  [plaintext]
1
[INFO] [2021-10-05 17:27:31.172] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口

服务端

启动服务端,为了演示启动 2 个服务端。

(1)服务端 1

  [plaintext]
1
2
[INFO] [2021-10-05 17:28:04.610] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口 [INFO] [2021-10-05 17:28:04.612] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527

(2)服务端 2

  [plaintext]
1
2
[INFO] [2021-10-05 17:28:43.796] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9526】端口 [INFO] [2021-10-05 17:28:43.798] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527

客户端

客户端进行 10 次调用:

  [java]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) { // 服务配置信息 ReferenceConfig<CalculatorService> config = ClientBs.newInstance(); config.serviceId(ServiceIdConst.CALC); config.serviceInterface(CalculatorService.class); // 自动发现服务 config.subscribe(true); config.registerCenter(ServiceIdConst.REGISTER_CENTER); CalculatorService calculatorService = config.reference(); CalculateRequest request = new CalculateRequest(); request.setOne(10); request.setTwo(20); // 循环 10 次,验证负载均衡。 for(int i = 0; i < 10; i++) { CalculateResponse response = calculatorService.sum(request); System.out.println(response); } }

日志

直接通过服务端日志,因为是随机调用,二者收到的请求也基本相同。

  • server1
  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
[INFO] [2019-11-01 21:44:15.088] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 [INFO] [2019-11-01 21:44:15.149] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 [INFO] [2019-11-01 21:44:15.150] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='4999d107fd39497691a8427871fa9af1', createTime=1572615855133, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.153] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='4999d107fd39497691a8427871fa9af1', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.172] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 [INFO] [2019-11-01 21:44:15.173] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='1dcaffd156e444d29d1568bb93325c66', createTime=1572615855169, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.174] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='1dcaffd156e444d29d1568bb93325c66', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.178] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 [INFO] [2019-11-01 21:44:15.179] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='88f9fd2c9b8f4ce29cc02246529fe475', createTime=1572615855176, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.180] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='88f9fd2c9b8f4ce29cc02246529fe475', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.184] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 [INFO] [2019-11-01 21:44:15.184] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='51e2a2171e4b4e9987c6d23081c48a0e', createTime=1572615855181, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.185] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='51e2a2171e4b4e9987c6d23081c48a0e', error=null, result=CalculateResponse{success=true, sum=30}}
  • server2
  [plaintext]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[INFO] [2019-11-01 21:44:15.126] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.127] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='14ee5788e1754575a90ef7ba64340455', createTime=1572615855087, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.130] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='14ee5788e1754575a90ef7ba64340455', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.159] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.160] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='a935c347059a477a96b132e41b4b78c1', createTime=1572615855156, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.161] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='a935c347059a477a96b132e41b4b78c1', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.166] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.166] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='82778d7d616547e6917f77e6da6e11df', createTime=1572615855163, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.167] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='82778d7d616547e6917f77e6da6e11df', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.192] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.192] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='c4e380597fe6419ab12b8928278e4cdb', createTime=1572615855188, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.193] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='c4e380597fe6419ab12b8928278e4cdb', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.198] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.198] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='399a2fd2d393413497172510e0651ce9', createTime=1572615855195, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.199] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='399a2fd2d393413497172510e0651ce9', error=null, result=CalculateResponse{success=true, sum=30}} [INFO] [2019-11-01 21:44:15.203] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 [INFO] [2019-11-01 21:44:15.204] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='cc1f64281beb4c87bf11eac565609883', createTime=1572615855201, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]} [INFO] [2019-11-01 21:44:15.205] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='cc1f64281beb4c87bf11eac565609883', error=null, result=CalculateResponse{success=true, sum=30}}

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。