NameServer路由注册

NameServer能够管理Broker信息,所有的Broker在启动时都会将自己注册到所有的NameServer上去,同时NameServer还管理着Topic的路由信息

Topic路由信息

类:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

// Topic队列路由信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker地址信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群地址信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker状态信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 消息过滤信息
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

QueueData:队列信息

// Broker名字
private String brokerName;
// 读队列数量
private int readQueueNums;
// 写队列数量
private int writeQueueNums;
// 权限
private int perm;
// 同步标记
private int topicSynFlag;

BrokerData:Broker信息

// 集群名
private String cluster;
// Broker名
private String brokerName;
// BrokerId -> Broker地址,BrokerId = 0表示当前Broker为master,其他为slave
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

BrokerLiveinfo:Broker状态信息

NameServer每次收到Broker的心跳包时会更新这个信息

// 上一次更新时间
private long lastUpdateTimestamp;
// 数据版本
private DataVersion dataVersion;
// netty中的channel
private Channel channel;
// master地址
private String haServerAddr;

路由信息如何注册到NameServer

实际上,就是Broker向NameServer发送心跳以及NameServer接收心跳的过程

Broker发送心跳

当Broker启动时,会将自己的信息注册到NameServer中去,会调用org.apache.rocketmq.broker.BrokerStartup#main方法

public static void main(String[] args) {
    // 创建BrokerController
    start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
    try {
        // 调用BrokerController中的start方法
        controller.start();
        // ...
            
        // 最后返回BrokerController
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}

org.apache.rocketmq.broker.BrokerController#start

public void start() throws Exception {
    // ...
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 路由注册的实现
                // 该方法会调用doRegisterBrokerAll方法,它是真正实现Broker注册操作的
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
    // ...
}

org.apache.rocketmq.broker.out.BrokerOuterAPI#doRegisterBrokerAll

// Broker注册结果列表,即成功注册到NameServer上的Broker列表
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
// NameServer列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

    // 构造注册Broker的请求头及请求体
    // 主要是设置Broker的基本信息,比如master节点信息,集群节点信息,brokerId,0表示master,其他表示slave,broker地址,是否压缩,消息过滤列表等等
    final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    requestHeader.setHaServerAddr(haServerAddr);
    requestHeader.setCompressed(compressed);

    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    // 设置请求体中的topic配置以及消息过滤服务器列表
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
    requestBody.setFilterServerList(filterServerList);
    final byte[] body = requestBody.encode(compressed);
    final int bodyCrc32 = UtilAll.crc32(body);
    requestHeader.setBodyCrc32(bodyCrc32);
    
    // 异步注册
    final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // registerBroker为真正的注册方法
                    RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                    if (result != null) {
                        registerBrokerResultList.add(result);
                    }

                    log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                } catch (Exception e) {
                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
                } finally {
                    countDownLatch.countDown();
                }
            }
        });
    }

    try {
        countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    }
}

// 将注册上的Broker列表返回
return registerBrokerResultList;

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

// ...

// 判断消息发送方法是否是oneway,是则调用invokeOneway方法
// 否则调用invokeSync方法
if (oneway) {
    try {
        this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
    } catch (RemotingTooMuchRequestException e) {
        // Ignore
    }
    return null;
}

RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        // 注册成功,拼装注册结果并返回
        RegisterBrokerResponseHeader responseHeader =
            (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
        RegisterBrokerResult result = new RegisterBrokerResult();
        result.setMasterAddr(responseHeader.getMasterAddr());
        result.setHaServerAddr(responseHeader.getHaServerAddr());
        if (response.getBody() != null) {
            // 设置kv配置表
            result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
        }
        return result;
    }
    default:
        break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());

这里就是Broker注册的整个过程了

NameServer接收并处理心跳包

通过对Broker注册过程的分析,可以得知,Broker通过给NameServer发送请求的方式来将自己的信息注册到NameServer中,因此NameServer必须得有一个处理Broker请求的类,即org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor

请求到来后,会由其方法processRequest来进行处理

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    // ...
    
    switch (request.getCode()) {
        // ...        
        // 当请求Code为REGISTER_BROKER时,将请求转发达到其对应的处理类中进行处理
        // 最后转发的路径为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        // ...        
        default:
            break;
    }
    return null;
}

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

RegisterBrokerResult result = new RegisterBrokerResult();
try {
    try {
        // 这里加了写锁,防止多个线程并发写路由信息
        this.lock.writeLock().lockInterruptibly();

        // 维护clusterAddrTable
        Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
        // 检查当前Broker所属集群是否存在,如果存在,则加入集群,否则,创建集群并加入
        if (null == brokerNames) {
            brokerNames = new HashSet<String>();
            this.clusterAddrTable.put(clusterName, brokerNames);
        }
        brokerNames.add(brokerName);

        boolean registerFirst = false;

        // 维护brokerAddrTable
        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        // 检查当前BrokerData是否已经存在,如果不存在,则新建并加如brokerAddrTable并设置registerFirst标志位为true,表示第一次注册
        // 如果已存在,则设置registerFirst标志位为false,表示非第一次注册,并替换原来的brokerData
        if (null == brokerData) {
            registerFirst = true;
            brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
            this.brokerAddrTable.put(brokerName, brokerData);
        }
        Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
        //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
        //The same IP:PORT must only have one record in brokerAddrTable
        Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Long, String> item = it.next();
            if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                it.remove();
            }
        }

        String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
        registerFirst = registerFirst || (null == oldAddr);

        // 维护topicQueueTable
        // 如果当前Broker是master,并且它的Topic路由元数据发生了改变或者当前Broker是第一次注册
        // 则需要创建或者更新该Broker的Topic路由元数据到NameServer的topicQueueTable中
        if (null != topicConfigWrapper
            && MixAll.MASTER_ID == brokerId) {
            if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                || registerFirst) {
                ConcurrentMap<String, TopicConfig> tcTable =
                    topicConfigWrapper.getTopicConfigTable();
                if (tcTable != null) {
                    for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                        this.createAndUpdateQueueData(brokerName, entry.getValue());
                    }
                }
            }
        }

        // 维护BrokerLiveInfo列表
        // BrokerLiveInfo维护了Broker的心跳更新时间
        // NameServer通过定时检查这个时间来确定Broker是否健康
        BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
            new BrokerLiveInfo(
                System.currentTimeMillis(),
                topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr));
        if (null == prevBrokerLiveInfo) {
            log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
        }

        // 维护消息过滤服务器列表
        if (filterServerList != null) {
            if (filterServerList.isEmpty()) {
                this.filterServerTable.remove(brokerAddr);
            } else {
                this.filterServerTable.put(brokerAddr, filterServerList);
            }
        }

        if (MixAll.MASTER_ID != brokerId) {
            String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
            if (masterAddr != null) {
                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                if (brokerLiveInfo != null) {
                    result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                    result.setMasterAddr(masterAddr);
                }
            }
        }
    } finally {
        this.lock.writeLock().unlock();
    }
} catch (Exception e) {
    log.error("registerBroker Exception", e);
}

return result;

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#createAndUpdateQueueData

该方法用于维护topicQueueTable时对Topic路由信息进行创建或更新

// 从topicQueueTable中获取queueData列表
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
// 如果queueData为空,表示该topic没有任何的队列信息,新建一个队列信息并将当前注册的队列信息加入其中
// 然后放入到topicQueueTable中
if (null == queueDataList) {
    queueDataList = new LinkedList<QueueData>();
    queueDataList.add(queueData);
    this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
    log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
    // 如果queueDataList存在,设置addNewOne标志位为true,表示新增一个queueData
    boolean addNewOne = true;

    // 接着遍历queueDataList,检查是否有相同的queueData,如果有,设置addNewOne为false,表示queueData已存在
    Iterator<QueueData> it = queueDataList.iterator();
    while (it.hasNext()) {
        QueueData qd = it.next();
        if (qd.getBrokerName().equals(brokerName)) {
            if (qd.equals(queueData)) {
                addNewOne = false;
            } else {
                log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                    queueData);
                it.remove();
            }
        }
    }

    // 如果queueData不存在,则将其加入到queueDataList中
    if (addNewOne) {
        queueDataList.add(queueData);
    }
}

总结

NameServer和Broker之间通过Tcp建立长连接,Broker的状态存储在NameServer#brokerLiveTable中,每次NameServer接收到Broker的心跳包,就会更新brokerLiveTable中对应Broker的状态(心跳时间等)以及路由表(brokerAddrTable, filterServerList, topicQueueTable, brokerLiveTable)

更新时采用写锁,保证同一时间只会有一个线程对路由表进行更新