NameServer的启动流程

核心类:org.apache.rocketmq.namesrv.NamesrvStartUp

public static NamesrvController main0(String[] args) {

    try {
        // 1. 解析配置文件
        NamesrvController controller = createNamesrvController(args);
        // 2. 初始化NamesrvController
        // 3. 注册JVM钩子函数,优雅停机
        start(controller);
        // ...
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

解析配置文件

主要解析NameServer本身的配置以及Netty的配置

NamesrvStartUp#createNamesrvController

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置nettyServer的默认端口位9876
nettyServerConfig.setListenPort(9876);
// 检查命令行参数是否带有-c选项,-c选项用于指定配置文件路径
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        // 如果有-c选项并且其参数值不为null,则解析该文件并将生成对应的namesrvConfig和nettyServerConfig
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        // 将文件加载到properties中
        properties.load(in);
        // 将文件解析成namsrvConfig
        MixAll.properties2Object(properties, namesrvConfig);
        // 将文件解析成nettyServerConfig
        MixAll.properties2Object(properties, nettyServerConfig);
        namesrvConfig.setConfigStorePath(file);
        // ...
    }
}

// 如果命令行参数带有-p选项,则打印配置项并退出
if (commandLine.hasOption('p')) {
    InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
    MixAll.printObjectProperties(console, namesrvConfig);
    MixAll.printObjectProperties(console, nettyServerConfig);
    System.exit(0);
}

// 打印配置
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

// 创建NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// 将配置注册到controller中
controller.getConfiguration().registerConfig(properties);
return controller;

NamesrvConfig

// rocketmq主目录,首先从配置中取,如果没有则从环境变量中取
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// NameServer存储kv配置的路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// NameServer默认配置文件路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
// ...
// 是否支持顺序消息,默认不支持
private boolean orderMessageEnable = false;

NettyServerConfig

// nettyServer默认监听端口,会被初始化为9876
private int listenPort = 8888;
// netty业务线程池线程个数
private int serverWorkerThreads = 8;
// netty public任务线程池个数 
// netty会根据业务类型创建不同的线程池,比如消息发送,消息消费,心跳检测
// 如果该类型未定义,则默认由public线程池来执行
private int serverCallbackExecutorThreads = 0;
// IO线程个数,用于处理网络请求,解析请求包,然后转发至业务线程池处理请求,再返回结果
private int serverSelectorThreads = 3;
// oneway消息并发度
private int serverOnewaySemaphoreValue = 256;
// 异步发送消息最大并发度
private int serverAsyncSemaphoreValue = 64;
// 网络连接最大空闲时间,默认为120s,连接空闲时间超过120s,则连接关闭
private int serverChannelMaxIdleTimeSeconds = 120;

// socket发送缓冲区大小,默认64kb
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// socket接收缓冲区大小,默认64kb
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// ByteBuffer是否开启缓存,默认开启
private boolean serverPooledByteBufAllocatorEnable = true;
// 是否启用epoll IO模型,linux环境建议开启
private boolean useEpollNativeSelector = false;

初始化NamesrvController

NamesrvController#initialize

该方法用于执行NamesrvController的初始化过程,主要完成了以下几个任务

  • 加载KV配置
  • 创建nettyServer并创建业务线程池
  • 开启两个定时任务
    • 每隔10秒检查是否有broker宕机,如果有则从NameServer中移除
    • 每隔10秒打印一次KV配置
// 加载KV配置
this.kvConfigManager.load();

// 创建nettyServer并创建业务线程池
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
    Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 创建检查broker的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);
// 创建打印KV配置的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
}, 1, 10, TimeUnit.MINUTES);

注册钩子函数,优雅停机

注册钩子函数,在JVM关闭前将namesrvController关闭

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

public void shutdown() {
    // 关闭nettyServer
    this.remotingServer.shutdown();
    // 关闭nettyServer业务线程池
    this.remotingExecutor.shutdown();
    // 关闭定时任务
    this.scheduledExecutorService.shutdown();

    if (this.fileWatchService != null) {
        // 如果文件监听服务存在,也将其关闭
        this.fileWatchService.shutdown();
    }
}