说在前面
本次开始进行rocketmq源码解析,比较喜欢rocketmq的架构设计,rocketmq内嵌了namesrv注册中心保存了元数据,进行负载均衡、容错的一些处理,4.3以上支持消息事务,有管理控制台、命令行工具,底层namesrv与broker、client与server交互netty实现。更多精彩文章请关注“天河聊架构”微信公众号。
源码解析
创建NamesrvController,进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main0
public static NamesrvController main0(String[] args) { try { // 源码解析之创建namesrv控制器 =》 NamesrvController controller = createNamesrvController(args); // 源码解析之启动namesrv控制器 =》 start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 从系统文件中查询rocketmq版本,默认4.3.0 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // PackageConflictDetect.detectFastjson(); // 构建命令行操作的指令 =》 Options options = ServerUtil.buildCommandlineOptions(new Options()); // mqnamesrv 启动namesrv命令 =》 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { // 系统非正常退出,流程结束 System.exit(-1); return null; } // 解析配置文件 =》 final NamesrvConfig namesrvConfig = new NamesrvConfig(); // =》 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 设置 namesrv的服务端口 nettyServerConfig.setListenPort(9876); // c 指定启动的时候加载配置文件 if (commandLine.hasOption('c')) { // 命令行启动指定配置文件,前面用c开头 String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); // 设置命令行启动namesrv指定的配置文件路径 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 打印namesrv的配置信息,命令行上面加p if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); // 正常程序退出 System.exit(0); } // 把命令行属性解析成properties MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); // 解析 ROCKETMQ_HOME 环境变量 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); // 系统非正常退出 System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); // logback日志文件路径 configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 创建namesrv控制器 =》 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard 把配置文件配置值的属性值注册到namesrv控制器 controller.getConfiguration().registerConfig(properties); return controller; }
进入这个方法org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions 这里可以看到启动rocketmq的时候namesrv参数怎么制定
public static Options buildCommandlineOptions(final Options options) { Option opt = new Option("h", "help", false, "Print help"); opt.setRequired(false); options.addOption(opt); // 这里可以看到启动的时候有一个n参数指定namesrv的地址,可以是单机可以是集群,启动地址之间用;分开 opt = new Option("n", "namesrvAddr", true, "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); opt.setRequired(false); options.addOption(opt); return options; }
进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions 从这里看出来启动rocketmq的时候怎么指定配置文件,是否自动创建topic、消费组这些参数都可以在配置文件中配置
// 指定namesrv启动的时候加载的配置文件 public static Options buildCommandlineOptions(final Options options) { Option opt = new Option("c", "configFile", true, "Name server config properties file"); opt.setRequired(false); options.addOption(opt); // 控制台输出配置项 opt = new Option("p", "printConfigItem", false, "Print all config item"); opt.setRequired(false); options.addOption(opt); return options; }
从这里可以看到一些配置的存储地址
public class NamesrvConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // 解析ROCKETMQ_HOME private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // kvConfig存储地址 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // namesrv存储地址 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
nettyserver的一些配置
public class NettyServerConfig implements Cloneable { // netty server监听端口 private int listenPort = 8888; // 服务端工作线程数 private int serverWorkerThreads = 8; // 服务端回调执行线程 private int serverCallbackExecutorThreads = 0; // 服务端选择器线程 private int serverSelectorThreads = 3; // oneway方式信号量的值 private int serverOnewaySemaphoreValue = 256; // 服务端异步信号量值 private int serverAsyncSemaphoreValue = 64; // 服务端channel最大空闲时间 private int serverChannelMaxIdleTimeSeconds = 120; // 发送消息最大大小从系统属性中获取,默认值65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 接受消息最大大小从系统属性中获取,默认值65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;
nettyserver默认监听端口9876
nettyServerConfig.setListenPort(9876);
从这里可以看到是解析启动的时候指定的配置文件属性
// c 指定启动的时候加载配置文件 if (commandLine.hasOption('c')) { // 命令行启动指定配置文件,前面用c开头 String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig);
配置文件中可以配置这个类org.apache.rocketmq.common.namesrv.NamesrvConfig namesrv的一些配置
// 解析ROCKETMQ_HOME private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // kvConfig存储地址 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // namesrv存储地址 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;
也可以配置这个类org.apache.rocketmq.remoting.netty.NettyServerConfig nettyserver的一些配置
public class NettyServerConfig implements Cloneable { // netty server监听端口 private int listenPort = 8888; // 服务端工作线程数 private int serverWorkerThreads = 8; // 服务端回调执行线程 private int serverCallbackExecutorThreads = 0; // 服务端选择器线程 private int serverSelectorThreads = 3; // oneway方式信号量的值 private int serverOnewaySemaphoreValue = 256; // 服务端异步信号量值 private int serverAsyncSemaphoreValue = 64; // 服务端channel最大空闲时间 private int serverChannelMaxIdleTimeSeconds = 120; // 发送消息最大大小从系统属性中获取,默认值65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 接受消息最大大小从系统属性中获取,默认值65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;
namesrv配置的存储地址就是启动指定的配置文件
namesrvConfig.setConfigStorePath(file);
也可以在命令行上指定namesrv属性配置
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
logback的配置文件
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
根据namesrv配置和nettyserver配置创建NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
进入这个方法org.apache.rocketmq.namesrv.NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); // =》 this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); // 指定保存namesrv配置的配置文件 this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
进入这里org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#RouteInfoManager
public RouteInfoManager() { // 指定长度,减少扩容提高性能 this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);//topicQueue缓存信息 this.brokerAddrTable = new HashMap<String, BrokerData>(128);//broker地址缓存 this.clusterAddrTable = new HashMap<String, Set<String>>(32);//集群地址缓存 this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);//可用的broker缓存 this.filterServerTable = new HashMap<String, List<String>>(256);//过滤的server地址 }
设置namrsrv配置存储地址System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
// remember all configs to prevent discard 把配置文件配置值的属性值注册到namesrv控制器 controller.getConfiguration().registerConfig(properties);
进入这个方法org.apache.rocketmq.common.Configuration#registerConfig(java.util.Properties)
public Configuration registerConfig(Object configObject) { try { readWriteLock.writeLock().lockInterruptibly(); try { Properties registerProps = MixAll.object2Properties(configObject); merge(registerProps, this.allConfigs); configObjectList.add(configObject); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("registerConfig lock error"); } return this; }
/** * All properties include configs in object and extend properties. * 所有的启动配置都在这里 */ private Properties allConfigs = new Properties();
说在最后
本次解析仅代表个人观点,仅供参考。
加入技术微信群