register 注册中心

需求

服务的注册与发现,是一个很常见也很有用的需求。

可以让我们不关心服务端的 ip 信息,只关心服务本身。

思路

实现的方式

SimpleRpcRegister 为 rpc 提供的默认实现方案。

实际可以结合 redis,zk 等常见的成熟框架实现。

其实可以把 register 当做是服务端,此时的 server/client 都是客户端。

实现的策略时类似的。

当然也可以直接使用 zk 等成熟的框架,只是个人觉得这样不利于学习,而且 zk 太重了。

流程

  • 启动注册中心

首先启动注册中心

  • 启动服务端

服务端启动时,将注册信息注册到注册中心。

  • 启动客户端

客户端启动的时候,去注册中心读取配置。

Register-注册中心

引导类

package com.github.houbb.rpc.register.api.config.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.remote.netty.handler.ChannelHandlers;
import com.github.houbb.rpc.common.remote.netty.impl.DefaultNettyServer;
import com.github.houbb.rpc.register.api.config.RegisterConfig;
import com.github.houbb.rpc.register.simple.handler.RegisterCenterServerHandler;
import io.netty.channel.ChannelHandler;

/**
 * 默认注册中心配置
 * @author binbin.hou
 * @since 0.0.8
 */
public class RegisterBs implements RegisterConfig {

    /**
     * 服务启动端口信息
     * @since 0.0.8
     */
    private int port;

    private RegisterBs(){}

    public static RegisterBs newInstance() {
        RegisterBs registerBs = new RegisterBs();
        registerBs.port(8527);
        return registerBs;
    }

    @Override
    public RegisterBs port(int port) {
        ArgUtil.notNegative(port, "port");

        this.port = port;
        return this;
    }

    @Override
    public RegisterBs start() {
        ChannelHandler channelHandler = ChannelHandlers.objectCodecHandler(new RegisterCenterServerHandler());
        DefaultNettyServer.newInstance(port, channelHandler).asyncRun();

        return this;
    }

}

指定一下 register 的 port,就可以启动了。

RegisterCenterServerHandler

这里是注册中心的核心实现类。

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.register.simple.handler;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.domain.message.RegisterMessage;
import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages;
import com.github.houbb.rpc.register.simple.SimpleRpcRegister;
import com.github.houbb.rpc.register.simple.client.ClientRegisterService;
import com.github.houbb.rpc.register.simple.client.impl.DefaultClientRegisterService;
import com.github.houbb.rpc.register.simple.constant.MessageTypeConst;
import com.github.houbb.rpc.register.simple.server.ServerRegisterService;
import com.github.houbb.rpc.register.simple.server.impl.DefaultServerRegisterService;
import com.github.houbb.rpc.register.spi.RpcRegister;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * <p> 注册中心服务器处理类 </p>
 *
 * <pre> Created: 2019/10/23 10:29 下午  </pre>
 * <pre> Project: rpc  </pre>
 * <p>
 * 请求的标准化:
 * (1)对于 server 的服务注册,client 的配置拉取。
 * 二者都是将 register 作为服务端。所以需要统一请求信息。
 * (2)对于 server 的注册,不需要提供对应的反馈信息
 * (3)当配置发生变化时,需要及时通知所有的 client 端。
 * 这里就需要知道哪些是客户端??
 *
 * @author houbinbin
 * @since 0.0.8
 */
@ChannelHandler.Sharable
public class RegisterCenterServerHandler extends SimpleChannelInboundHandler {

    private static final Log LOG = LogFactory.getLog(RegisterCenterServerHandler.class);

    /**
     * 注册中心服务
     * @since 0.0.8
     */
    private final RpcRegister rpcRegister;

    public RegisterCenterServerHandler() {
        this.rpcRegister = this.buildSimpleRpcRegister();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final String id = ctx.channel().id().asLongText();
        LOG.info("[Register Server] channel {} connected " + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        RegisterMessage registerMessage = (RegisterMessage) msg;
        Object body = registerMessage.body();
        int type = RegisterMessages.type(registerMessage);
        String seqId = registerMessage.seqId();
        LOG.info("[Register Server] received message type: {}, seqId: {} ", type,
                seqId);

        final Channel channel = ctx.channel();
        ServiceEntry serviceEntry = (ServiceEntry)body;

        switch (type) {
            case MessageTypeConst.SERVER_REGISTER:
                rpcRegister.register(serviceEntry);
                break;

            case MessageTypeConst.SERVER_UN_REGISTER:
                rpcRegister.unRegister(serviceEntry);
                break;

            case MessageTypeConst.CLIENT_SUBSCRIBE:
                rpcRegister.subscribe(serviceEntry, channel);
                break;

            case MessageTypeConst.CLIENT_UN_SUBSCRIBE:
                rpcRegister.unSubscribe(serviceEntry, channel);
                break;

            case MessageTypeConst.CLIENT_LOOK_UP:
                rpcRegister.lookUp(seqId, serviceEntry, channel);
                break;

            default:
                LOG.warn("[Register Center] not support type: {} and seqId: {}",
                        type, seqId);
        }

    }

    /**
     * 构建简单注册实现类
     * @return 注册实现
     * @since 0.0.8
     */
    private RpcRegister buildSimpleRpcRegister() {
        final ServerRegisterService serverRegisterService = new DefaultServerRegisterService();
        final ClientRegisterService clientRegisterService = new DefaultClientRegisterService();
        return new SimpleRpcRegister(serverRegisterService, clientRegisterService);
    }


}

这里主要区分几种常见的实现,分别调用 RpcRegister 中对应的方法。

RpcRegister

接口

常见的方法定义。

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.register.spi;


import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import io.netty.channel.Channel;

/**
 * <p> 注册中心接口 </p>
 *
 * <pre> Created: 2019/10/23 8:01 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.8
 */
public interface RpcRegister {

    /**
     * 注册当前服务信息
     * 订阅了这个 serviceId 的所有客户端
     * @param serviceEntry 注册当前服务信息
     * @since 0.0.8
     */
    void register(final ServiceEntry serviceEntry);

    /**
     * 注销当前服务信息
     * @param serviceEntry 注册当前服务信息
     * @since 0.0.8
     */
    void unRegister(final ServiceEntry serviceEntry);

    /**
     * 监听服务信息
     * (1)监听之后,如果有任何相关的机器信息发生变化,则进行推送。
     * (2)内置的信息,需要传送 ip 信息到注册中心。
     *
     * @param serviceEntry 客户端明细信息
     * @param channel 频道信息
     * @since 0.0.8
     */
    void subscribe(final ServiceEntry serviceEntry, final Channel channel);

    /**
     * 取消监听服务信息
     *
     * (1)将改服务从客户端的监听列表中移除即可。
     *
     * @param server 客户端明细信息
     * @param channel 频道信息
     * @since 0.0.8
     */
    void unSubscribe(final ServiceEntry server, final Channel channel);

    /**
     * 启动时查询 serviceId 对应的所有服务端信息
     * @param seqId 请求标识
     * @param clientEntry 客户端查询明细
     * @param channel 频道信息
     * @since 0.0.8
     */
    void lookUp(String seqId, ServiceEntry clientEntry, final Channel channel);

}

SimpleRpcRegister

简单的实现。

主要分为两大块:

(1)客户端指定监听的服务

(2)服务端变更时,通知监听的客户端

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.register.simple;

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.simple.client.ClientRegisterService;
import com.github.houbb.rpc.register.simple.server.ServerRegisterService;
import com.github.houbb.rpc.register.simple.server.impl.DefaultServerRegisterService;
import com.github.houbb.rpc.register.spi.RpcRegister;
import io.netty.channel.Channel;

import java.util.List;

/**
 * <p> 简单的 rpc 注册 </p>
 *
 * <pre> Created: 2019/10/23 8:59 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * (1)各种关系的关系服务类
 * (2)各种关系之间的通讯类
 * (3)domain 层
 *
 * @author houbinbin
 * @since 0.0.8
 */
public class SimpleRpcRegister implements RpcRegister {

    private static final Log LOG = LogFactory.getLog(DefaultServerRegisterService.class);

    /**
     * 服务端信息管理
     * @since 0.0.8
     */
    private final ServerRegisterService serverRegisterService;

    /**
     * 客户端信息管理
     * @since 0.0.8
     */
    private final ClientRegisterService clientRegisterService;

    public SimpleRpcRegister(ServerRegisterService serverRegisterService, ClientRegisterService clientRegisterService) {
        this.serverRegisterService = serverRegisterService;
        this.clientRegisterService = clientRegisterService;
    }

    @Override
    public void register(ServiceEntry serviceEntry) {
        List<ServiceEntry> serviceEntryList = serverRegisterService.register(serviceEntry);

        // 通知监听者
        clientRegisterService.notify(serviceEntry.serviceId(), serviceEntryList);
    }

    @Override
    public void unRegister(ServiceEntry serviceEntry) {
        List<ServiceEntry> serviceEntryList = serverRegisterService.unRegister(serviceEntry);

        // 通知监听者
        clientRegisterService.notify(serviceEntry.serviceId(), serviceEntryList);
    }

    @Override
    public void subscribe(ServiceEntry clientEntry, final Channel channel) {
        clientRegisterService.subscribe(clientEntry, channel);
    }

    @Override
    public void unSubscribe(ServiceEntry clientEntry, Channel channel) {
        clientRegisterService.unSubscribe(clientEntry, channel);
    }

    @Override
    public void lookUp(String seqId, ServiceEntry clientEntry, Channel channel) {
        final String serviceId = clientEntry.serviceId();
        List<ServiceEntry> serviceEntryList = serverRegisterService.lookUp(serviceId);

        // 回写
        // 为了复用原先的相应结果,此处直接使用 rpc response
        RpcResponse rpcResponse = DefaultRpcResponse.newInstance().seqId(seqId)
                .result(serviceEntryList);
        channel.writeAndFlush(rpcResponse);
    }


}

ServerRegisterService

服务端的注册管理实现其实比较简单:

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.register.simple.server.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.simple.server.ServerRegisterService;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <p> 默认服务注册类 </p>
 *
 * <pre> Created: 2019/10/23 9:16 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.8
 */
public class DefaultServerRegisterService implements ServerRegisterService {

    private static final Log LOG = LogFactory.getLog(DefaultServerRegisterService.class);

    /**
     * 存放对应的 map 信息
     * @since 0.0.8
     */
    private final Map<String, Set<ServiceEntry>> map;

    public DefaultServerRegisterService(){
        map = new ConcurrentHashMap<>();
    }

    @Override
    public List<ServiceEntry> register(ServiceEntry serviceEntry) {
        paramCheck(serviceEntry);

        final String serviceId = serviceEntry.serviceId();
        Set<ServiceEntry> serviceEntrySet = map.get(serviceId);
        if(ObjectUtil.isNull(serviceEntrySet)) {
            serviceEntrySet = Guavas.newHashSet();
        }

        LOG.info("[Register Server] add service: {}", serviceEntry);
        serviceEntrySet.add(serviceEntry);
        map.put(serviceId, serviceEntrySet);

        // 返回更新后的结果
        return Guavas.newArrayList(serviceEntrySet);
    }

    @Override
    public List<ServiceEntry> unRegister(ServiceEntry serviceEntry) {
        paramCheck(serviceEntry);

        final String serviceId = serviceEntry.serviceId();
        Set<ServiceEntry> serviceEntrySet = map.get(serviceId);

        if(CollectionUtil.isEmpty(serviceEntrySet)) {
            // 服务列表为空
            LOG.info("[Register Server] remove service set is empty. entry: {}", serviceEntry);
            return Guavas.newArrayList();
        }

        serviceEntrySet.remove(serviceEntry);
        LOG.info("[Register Server] remove service: {}", serviceEntry);
        map.put(serviceId, serviceEntrySet);

        // 返回更新后的结果
        return Guavas.newArrayList(serviceEntrySet);
    }

    @Override
    public List<ServiceEntry> lookUp(String serviceId) {
        ArgUtil.notEmpty(serviceId, "serviceId");

        LOG.info("[Register Server] start lookUp serviceId: {}", serviceId);
        Set<ServiceEntry> serviceEntrySet = map.get(serviceId);
        return Guavas.newArrayList(serviceEntrySet);
    }

    /**
     * 参数校验
     * @param serviceEntry 服务明细
     * @since 0.0.8
     */
    private void paramCheck(final ServiceEntry serviceEntry) {
        ArgUtil.notNull(serviceEntry, "serviceEntry");
        final String serviceId = serviceEntry.serviceId();
        ArgUtil.notEmpty(serviceId, "serviceId");
    }

}

DefaultClientRegisterService

客户端对应的注册实现类,也是类似的。

/*
 * Copyright (c)  2019. houbinbin Inc.
 * rpc All rights reserved.
 */

package com.github.houbb.rpc.register.simple.client.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.util.CollectionUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.register.domain.entry.ServiceEntry;
import com.github.houbb.rpc.register.domain.message.RegisterMessage;
import com.github.houbb.rpc.register.domain.message.impl.RegisterMessages;
import com.github.houbb.rpc.register.simple.client.ClientRegisterService;
import com.github.houbb.rpc.register.simple.constant.MessageTypeConst;
import io.netty.channel.Channel;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * <p> 默认客户端注册服务实现类 </p>
 *
 * <pre> Created: 2019/10/23 9:42 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.8
 */
public class DefaultClientRegisterService implements ClientRegisterService {

    private static final Log LOG = LogFactory.getLog(DefaultClientRegisterService.class);

    /**
     * 服务信息-客户端列表 map
     * key: serviceId
     * value: 对应的客户端列表信息。
     *
     * 客户端使用定期拉取的方式:
     * (1)传入 host 信息,返回对应的 service 列表。
     * (2)根据 service 列表,变化时定期推送给客户端。
     *
     * 只是在第一次采用拉取的方式,后面全部采用推送的方式。
     * (1)只有变更的时候,才会进行推送,保证实时性。
     * (2)客户端启动时拉取,作为保底措施。避免客户端不在线等情况。
     *
     * @since 0.0.8
     */
    private final Map<String, Set<Channel>> serviceClientChannelMap;

    public DefaultClientRegisterService() {
        this.serviceClientChannelMap = new ConcurrentHashMap<>();
    }

    @Override
    public void subscribe(ServiceEntry clientEntry, Channel clientChannel) {
        paramCheck(clientEntry);

        final String serviceId = clientEntry.serviceId();
        Set<Channel> channelSet = serviceClientChannelMap.get(serviceId);
        if (ObjectUtil.isNull(channelSet)) {
            channelSet = Guavas.newHashSet();
        }
        channelSet.add(clientChannel);
        serviceClientChannelMap.put(serviceId, channelSet);
    }

    @Override
    public void unSubscribe(ServiceEntry clientEntry, Channel clientChannel) {
        paramCheck(clientEntry);

        final String serviceId = clientEntry.serviceId();
        Set<Channel> channelSet = serviceClientChannelMap.get(serviceId);

        if (CollectionUtil.isEmpty(channelSet)) {
            // 服务列表为空
            LOG.info("[Register Client] remove host set is empty. entry: {}", clientEntry);
            return;
        }

        channelSet.remove(clientChannel);
        serviceClientChannelMap.put(serviceId, channelSet);
    }

    @Override
    public void notify(String serviceId, List<ServiceEntry> serviceEntryList) {
        ArgUtil.notEmpty(serviceId, "serviceId");

        List<Channel> clientChannelList = clientChannelList(serviceId);
        if (CollectionUtil.isEmpty(clientChannelList)) {
            LOG.info("[Register] notify clients is empty for service: {}",
                    serviceId);
            return;
        }

        // 循环通知
        for(Channel channel : clientChannelList) {
            RegisterMessage registerMessage = RegisterMessages.of(MessageTypeConst.REGISTER_NOTIFY, serviceEntryList);
            channel.writeAndFlush(registerMessage);
        }
    }

    /**
     * 参数校验
     *
     * @param serviceEntry 入参信息
     * @since 0.0.8
     */
    private void paramCheck(final ServiceEntry serviceEntry) {
        ArgUtil.notNull(serviceEntry, "serverEntry");
        ArgUtil.notEmpty(serviceEntry.serviceId(), "serverEntry.serviceId");
        ArgUtil.notEmpty(serviceEntry.ip(), "serverEntry.ip");
    }

    /**
     * 获取所有的客户端列表
     * @param serviceId 服务标识
     * @return 客户端列表标识
     * @since 0.0.8
     */
    private List<Channel> clientChannelList(String serviceId) {
        ArgUtil.notEmpty(serviceId, "serviceId");

        Set<Channel> clientSet = serviceClientChannelMap.get(serviceId);
        return Guavas.newArrayList(clientSet);
    }

}

内容较多,客户端与服务端调整放在下一期。

小结

为了便于大家学习,以上源码已经开源:

https://github.com/houbb/rpc

我是老马,期待与你的下次重逢。