RocketMQ网络通信原理分析(一)

作者阿里云代理 文章分类 分类:linux图文教程 阅读次数 已被围观 609

引言

我们都知道RocketMQ是高性能的消息中间件,其高性能不仅体现在其优秀的消息吞吐量上,也体现在其基于Netty实现的高性能通信能力上。接下来将通过几篇文章来阐述RocketMQ的通信模块。通过RocketMQ对于通信模块的设计分析,我们在日后需要设计中间件关于通信模块时,其实也可以参考以及借鉴已经成熟的中间件的设计,同时结合自身业务进行改进。

  • 通信架构说明
  • 以NameServer启动为例
  • 消息编解码
  • 总结

一、通信架构说明

RocketMQ的网络通信模块主要实现在remoting模块中,从模块中我们可以得知RocketMQ使用netty进行底层的通信实现,同时在protocol中自定义了通信协议。

image.png

最主要的类关系如下所示:

image.png

(1)RemotingService接口

RemotingService 作为顶层接口定义了三个主要的方法,主要包括启动netty服务、关闭netty服务以及注册RPC钩子处理请求前后的逻辑。

public interface RemotingService {  //开启服务  void start();   //停止服务  void shutdown();   //注册RPC钩子  void registerRPCHook(RPCHook rpcHook); }

(2)RPCHook 接口

其中RPCHook 接口定义了请求前后进行的逻辑处理,

public interface RPCHook {  void doBeforeRequest(final String remoteAddr, final RemotingCommand request);   void doAfterResponse(final String remoteAddr, final RemotingCommand request,  final RemotingCommand response); }

(3)服务端与客户端接口

RemotingServerRemotingClient 接口分别继承了RemotingService 接口,并进行了自己的业务扩展。

RemotingServer 接口

public interface RemotingServer extends RemotingService {   //注册处理请求的处理器, 根据requestCode, 获取处理器,处理请求  void registerProcessor(final int requestCode, final NettyRequestProcessor processor,  final ExecutorService executor);   void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);   int localListenPort();   Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);   RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,  final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,  RemotingTimeoutException;   void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,  final InvokeCallback invokeCallback) throws InterruptedException,  RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;   //单向发送消息,只管发送消息,不管消息发送的结果  void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)  throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,  RemotingSendRequestException;  }

二、消息编解码

(1)通信协议设计

image.png

(2)编码

remoting模块对于消息进行了自定义协议,将发送的消息以及收到的消息封装为RemotingCommand对象。

 public ByteBuffer encode() {  // 1> header length size  int length = 4;   // 2> header data length  byte[] headerData = this.headerEncode();  length += headerData.length;   // 3> body data length  if (this.body != null) {  length += body.length;  }   ByteBuffer result = ByteBuffer.allocate(4 + length);   // length  result.putInt(length);   // header length  result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));   // header data  result.put(headerData);   // body data;  if (this.body != null) {  result.put(this.body);  }   result.flip();   return result;  }

(3)解码

 public static RemotingCommand decode(final ByteBuffer byteBuffer) {  // 获取byteBuffer的总长度  int length = byteBuffer.limit();  int oriHeaderLen = byteBuffer.getInt();  int headerLength = getHeaderLength(oriHeaderLen);  // 保存header data  byte[] headerData = new byte[headerLength];  byteBuffer.get(headerData);   RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));   int bodyLength = length - 4 - headerLength;  byte[] bodyData = null;  if (bodyLength > 0) {  bodyData = new byte[bodyLength];  // 获取消息体的数据  byteBuffer.get(bodyData);  }  cmd.body = bodyData;   return cmd;  }

三、以NameServer启动为例

在了解remoting模块的核心接口之后,我们接下来看下具体的实现过程。其实在如NameServer启动过程中,它本身就会作为一个Netty的服务端进行启动。我们这里先忽略掉NameServer启动过程中的其他的配置操作,着重对Netty作为服务端启动的流程。大致的启动流程如下所示:

image.png

NameServer实际作为Netty服务端启动底层网络连接的,我们都知道它的作用是作为服务端提供给Broker进行注册以及客户端向其拉取路由信息。

NameServer启动过程中实际是创建了NettyRemotingServer,而NettyRemotingServer是RocketMQ自己开发的网络连接组件,当然它的底层实际是基于Netty的接口实现的ServerBootstrap。下列是start的方法,同样我们只关注Netty服务器的启动。

public static NamesrvController start(final NamesrvController controller) throws Exception {   if (null == controller) {  throw new IllegalArgumentException("NamesrvController is null");  }  //初始化  boolean initResult = controller.initialize();  if (!initResult) {  controller.shutdown();  System.exit(-3);  }  //通过Runtime类注册了一个JVM关闭时的shutdown的钩子  Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {  @Override  public Void call() throws Exception {  controller.shutdown();  return null;  }  }));   controller.start();   return controller;  }

其中初始化的方法如下所示:

public boolean initialize() {   //加载配置  this.kvConfigManager.load();  //构建Netty服务器  this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);  //Netty的分作线程池  this.remotingExecutor =  Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));  //将工作线程池分配给Netty服务器  this.registerProcessor();   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   @Override  public void run() {  NamesrvController.this.routeInfoManager.scanNotActiveBroker();  }  }, 5, 10, TimeUnit.SECONDS);   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   @Override  public void run() {  NamesrvController.this.kvConfigManager.printAllPeriodically();  }  }, 1, 10, TimeUnit.MINUTES);   if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {  // Register a listener to reload SslContext  try {  fileWatchService = new FileWatchService(  new String[] {  TlsSystemConfig.tlsServerCertPath,  TlsSystemConfig.tlsServerKeyPath,  TlsSystemConfig.tlsServerTrustCertPath  },  new FileWatchService.Listener() {  boolean certChanged, keyChanged = false;  @Override  public void onChanged(String path) {  if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {  log.info("The trust certificate changed, reload the ssl context");  reloadServerSslContext();  }  if (path.equals(TlsSystemConfig.tlsServerCertPath)) {  certChanged = true;  }  if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {  keyChanged = true;  }  if (certChanged && keyChanged) {  log.info("The certificate and private key changed, reload the ssl context");  certChanged = keyChanged = false;  reloadServerSslContext();  }  }  private void reloadServerSslContext() {  ((NettyRemotingServer) remotingServer).loadSslContext();  }  });  } catch (Exception e) {  log.warn("FileWatchService created error, can't load the certificate dynamically");  }  }   return true;  }

初始化完成之后进行启动,我们可以看到实际启动的是NettyRemotingServer

 public void start() throws Exception {  this.remotingServer.start();   if (this.fileWatchService != null) {  this.fileWatchService.start();  }  }

NettyRemotingServer启动过程如下代码所示:

@Override  public void start() {  this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(  nettyServerConfig.getServerWorkerThreads(),  new ThreadFactory() {   private AtomicInteger threadIndex = new AtomicInteger(0);   @Override  public Thread newThread(Runnable r) {  return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());  }  });  //配置启动Netty服务器  ServerBootstrap childHandler =  //各种网络配置  this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)  .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)  .option(ChannelOption.SO_BACKLOG, 1024)  .option(ChannelOption.SO_REUSEADDR, true)  .option(ChannelOption.SO_KEEPALIVE, false)  .childOption(ChannelOption.TCP_NODELAY, true)  .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())  .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())  .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))  //设置网络请求处理器,当Netty服务器收到网络请求后,就会有这些Handler进行处理  .childHandler(new ChannelInitializer<SocketChannel>() {  @Override  public void initChannel(SocketChannel ch) throws Exception {  ch.pipeline()  .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,  new HandshakeHandler(TlsSystemConfig.tlsMode))  .addLast(defaultEventExecutorGroup,  //编解码  new NettyEncoder(),  new NettyDecoder(),  //空闲连接管理  new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),  //网络连接管理  new NettyConnectManageHandler(),  //网络请求处理  new NettyServerHandler()  );  }  });   if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {  childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);  }   try {  //启动Netty服务器,绑定对应的端口号  ChannelFuture sync = this.serverBootstrap.bind().sync();  InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();  this.port = addr.getPort();  } catch (InterruptedException e1) {  throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);  }   if (this.channelEventListener != null) {  this.nettyEventExecutor.start();  }   this.timer.scheduleAtFixedRate(new TimerTask() {   @Override  public void run() {  try {  NettyRemotingServer.this.scanResponseTable();  } catch (Throwable e) {  log.error("scanResponseTable exception", e);  }  }  }, 1000 * 3, 1000);  }

四、总结

通过以上分析可知,RocketMQ实际是在原生Netty之上进行了自己的封装。最后一张图来说明NameServer启动过程中关于Netty启动的部分。在后续的文章中我们再着重分析RocketMQ如何高效使用Netty框架。

image.png

本公司销售:阿里云、腾讯云、百度云、天翼云、金山大米云、金山企业云盘!可签订合同,开具发票。

我有话说: