load balance

当我们有多个服务端时,就需要负载均衡进行选择。

策略

负载均衡的策略有很多,比如随机选择,权重选择,最小负载等等。

实现思路

直接将所有可以选择的服务端列举出来,通过实现对应的策略,选择一个即可。

代码实现

接口

为了便于拓展,我们定义一个接口。

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.common.rpc.filter;

/**
 * <p> 调用上下文 </p>
 *
 * <pre> Created: 2019/10/26 9:30 上午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * 核心目的:
 * (1)用于定义 filter 相关信息
 * (2)用于 load-balance 相关信息处理
 * (3)后期的路由-分区 都可以视为这样的一个抽象实现而已。
 *
 * 插件式实现:
 * (1)远程调用也认为是一次 filter,上下文中构建 filter-chain
 * (2)filter-chain 可以使用 {@link com.github.houbb.heaven.support.pipeline.Pipeline} 管理
 *
 *
 * 后期拓展:
 * (1)类似于 aop,用户可以自行定义 interceptor 拦截器
 *
 * @author houbinbin
 * @since 0.0.9
 */
public interface RpcFilter {

    /**
     * filter 处理
     * @param rpcFilterContext 调用上下文
     * @since 0.0.9
     */
    void filter(final RpcFilterContext rpcFilterContext);

}

随机选择

这里实现了负载均衡中最简单的实现,随机选择。

package com.github.houbb.rpc.client.filter.balance;

import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.rpc.common.rpc.domain.RpcChannelFuture;
import com.github.houbb.rpc.common.rpc.filter.RpcFilter;
import com.github.houbb.rpc.common.rpc.filter.RpcFilterContext;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 随机负载均衡过滤器
 * 参考:https://www.cnblogs.com/xwdreamer/archive/2012/06/13/2547426.html
 *
 * @author binbin.hou
 * @since 0.0.9
 */
@ThreadSafe
public class RandomBalanceFilter implements RpcFilter {

    @Override
    public void filter(RpcFilterContext rpcFilterContext) {
        List<RpcChannelFuture> channelFutures = rpcFilterContext.channelFutures();
        final int size = channelFutures.size();

        Random random = ThreadLocalRandom.current();
        int index = random.nextInt(size);

        rpcFilterContext.channelFuture(channelFutures.get(index));
    }

}

DefaultReferenceProxy 代理实现的调整

调用服务端的时候,统一进行 filter 处理即可。

package com.github.houbb.rpc.client.proxy.impl;

import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.filter.balance.RandomBalanceFilter;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.proxy.ProxyContext;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.common.rpc.domain.RpcRequest;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponses;
import com.github.houbb.rpc.common.rpc.filter.RpcFilter;
import com.github.houbb.rpc.common.rpc.filter.RpcFilterContext;
import com.github.houbb.rpc.common.rpc.filter.impl.DefaultRpcFilterContext;
import com.github.houbb.rpc.common.support.id.impl.Ids;
import com.github.houbb.rpc.common.support.time.impl.Times;
import io.netty.channel.Channel;

import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * 参考:https://blog.csdn.net/u012240455/article/details/79210250
 *
 * (1)方法执行并不需要一定要有实现类。
 * (2)直接根据反射即可处理相关信息。
 * (3)rpc 是一种强制根据接口进行编程的实现方式。
 * @author binbin.hou
 * @since 0.0.6
 */
public class DefaultReferenceProxy<T> implements ReferenceProxy<T> {

    private static final Log LOG = LogFactory.getLog(DefaultReferenceProxy.class);

    /**
     * 服务标识
     * @since 0.0.6
     */
    private final ProxyContext<T> proxyContext;

    public DefaultReferenceProxy(ProxyContext<T> proxyContext) {
        this.proxyContext = proxyContext;
    }

    /**
     * 反射调用
     * @param proxy 代理
     * @param method 方法
     * @param args 参数
     * @return 结果
     * @throws Throwable 异常
     * @since 0.0.6
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 反射信息处理成为 rpcRequest
        // 和以前一致

        // 这里使用 load-balance 进行选择 channel 写入。
        // 构建 filter 相关信息,结合 pipeline 进行整合
        final RpcFilterContext rpcFilterContext = buildRpcFilterContext(rpcRequest);
        this.doFilter(rpcFilterContext);
        final Channel channel = rpcFilterContext.channel();
        LOG.info("[Client] start call channel id: {}", channel.id().asLongText());

        // 对于信息的写入,实际上有着严格的要求。
        // writeAndFlush 实际是一个异步的操作,直接使用 sync() 可以看到异常信息。
        // 支持的必须是 ByteBuf
        channel.writeAndFlush(rpcRequest).syncUninterruptibly();
        LOG.info("[Client] start call remote with request: {}", rpcRequest);
        final InvokeService invokeService = proxyContext.invokeService();
        invokeService.addRequest(seqId, proxyContext.timeout());

        // 获取结果
        RpcResponse rpcResponse = invokeService.getResponse(seqId);
        return RpcResponses.getResult(rpcResponse);
    }

    @Override
    @SuppressWarnings("unchecked")
    public T proxy() {
        final Class<T> interfaceClass = proxyContext.serviceInterface();
        ClassLoader classLoader = interfaceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{interfaceClass};
        return (T) Proxy.newProxyInstance(classLoader, interfaces, this);
    }

    /**
     * 执行过滤
     * @param context 上下文
     * @since 0.0.9
     */
    private void doFilter(final RpcFilterContext context) {
        RpcFilter rpcFilter = new RandomBalanceFilter();
        rpcFilter.filter(context);
    }

    /**
     * 构建 rpc 过滤上下文
     * @return 上下文信息
     * @since 0.0.9
     */
    private RpcFilterContext buildRpcFilterContext(final RpcRequest rpcRequest) {
        DefaultRpcFilterContext context = new DefaultRpcFilterContext();
        context.request(rpcRequest);
        context.timeout(proxyContext.timeout());
        context.channelFutures(proxyContext.channelFutures());
        return context;
    }

}

测试代码

注册中心

启动注册中心。

[INFO] [2021-10-05 17:27:31.172] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【8527】端口

服务端

启动服务端,为了演示启动 2 个服务端。

(1)服务端 1

[INFO] [2021-10-05 17:28:04.610] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9527】端口
[INFO] [2021-10-05 17:28:04.612] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527

(2)服务端 2

[INFO] [2021-10-05 17:28:43.796] [pool-1-thread-1] [c.g.h.r.c.r.n.i.DefaultNettyServer.run] - [Netty Server] 启动完成,监听【9526】端口
[INFO] [2021-10-05 17:28:43.798] [main] [c.g.h.r.c.r.n.i.DefaultNettyClient.call] - [Netty Client] 启动客户端完成,监听地址 127.0.0.1:8527

客户端

客户端进行 10 次调用:

public static void main(String[] args) {
    // 服务配置信息
    ReferenceConfig<CalculatorService> config = ClientBs.newInstance();
    config.serviceId(ServiceIdConst.CALC);
    config.serviceInterface(CalculatorService.class);
    // 自动发现服务
    config.subscribe(true);
    config.registerCenter(ServiceIdConst.REGISTER_CENTER);

    CalculatorService calculatorService = config.reference();
    CalculateRequest request = new CalculateRequest();
    request.setOne(10);
    request.setTwo(20);

    // 循环 10 次,验证负载均衡。
    for(int i = 0; i < 10; i++) {
        CalculateResponse response = calculatorService.sum(request);
        System.out.println(response);
    }
}

日志

直接通过服务端日志,因为是随机调用,二者收到的请求也基本相同。

  • server1
[INFO] [2019-11-01 21:44:15.088] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2
[INFO] [2019-11-01 21:44:15.149] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2
[INFO] [2019-11-01 21:44:15.150] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='4999d107fd39497691a8427871fa9af1', createTime=1572615855133, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.153] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='4999d107fd39497691a8427871fa9af1', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.172] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2
[INFO] [2019-11-01 21:44:15.173] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='1dcaffd156e444d29d1568bb93325c66', createTime=1572615855169, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.174] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='1dcaffd156e444d29d1568bb93325c66', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.178] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2
[INFO] [2019-11-01 21:44:15.179] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='88f9fd2c9b8f4ce29cc02246529fe475', createTime=1572615855176, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.180] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='88f9fd2c9b8f4ce29cc02246529fe475', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.184] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2
[INFO] [2019-11-01 21:44:15.184] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 request: DefaultRpcRequest{seqId='51e2a2171e4b4e9987c6d23081c48a0e', createTime=1572615855181, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.185] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 502b73fffec4485c-00001e18-00000002-3d6e1c4a0fd59bed-6a5605d2 response DefaultRpcResponse{seqId='51e2a2171e4b4e9987c6d23081c48a0e', error=null, result=CalculateResponse{success=true, sum=30}}
  • server2
[INFO] [2019-11-01 21:44:15.126] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.127] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='14ee5788e1754575a90ef7ba64340455', createTime=1572615855087, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.130] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='14ee5788e1754575a90ef7ba64340455', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.159] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.160] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='a935c347059a477a96b132e41b4b78c1', createTime=1572615855156, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.161] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='a935c347059a477a96b132e41b4b78c1', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.166] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.166] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='82778d7d616547e6917f77e6da6e11df', createTime=1572615855163, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.167] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='82778d7d616547e6917f77e6da6e11df', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.192] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.192] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='c4e380597fe6419ab12b8928278e4cdb', createTime=1572615855188, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.193] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='c4e380597fe6419ab12b8928278e4cdb', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.198] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.198] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='399a2fd2d393413497172510e0651ce9', createTime=1572615855195, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.199] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='399a2fd2d393413497172510e0651ce9', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2019-11-01 21:44:15.203] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel read start: 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73
[INFO] [2019-11-01 21:44:15.204] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 request: DefaultRpcRequest{seqId='cc1f64281beb4c87bf11eac565609883', createTime=1572615855201, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2019-11-01 21:44:15.205] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffeab09cf-00001d94-00000002-b86e77aa0fd59bd3-dd549b73 response DefaultRpcResponse{seqId='cc1f64281beb4c87bf11eac565609883', error=null, result=CalculateResponse{success=true, sum=30}}

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。