PinkHello
做一个快乐的程序猿
RocketMQ源码阅读 通信组件
RocketMQ开箱源码详解-手握大哥大

RocketMQ 核心基石

前面已经介绍了 RocketMQ 的基本的概念和组件。今天我们开启真正的源码的阅读诗篇, RocketMQ 消息系各个组件 ProducerConsumerBrokerNameSrv 通通离不开交互,那是使用的什么交互的呢。答案是TCP长链接。 而 RocketMQ 开源代码内部,对通信相关的进行了一次封装,都在 rocketmq-remoting 模块下,这个模块被其他 clientbrokernamesrv 应用。

直接先说 remoting 的实现是基于 netty 做了封装、启动了服务端和客户端,支持三种消息的发送方式:

  • 同步发送
  • 单向发送 (不需要关注响应)
  • 异步发送

下图为异步通信流程 rocketmq_remoting

remoting 包下的核心接口体系

remoting uml

接口 RemotingService

public interface RemotingService {
    // 开启
    void start();
    // 关闭
    void shutdown();
    // 注册 RPCHook
    void registerRPCHook(RPCHook rpcHook);
}

接口 RemotingServer

public interface RemotingServer extends RemotingService {

    // 注册请求类型的处理器 【common 模块的 org.apache.rocketmq.common.protocol.RequestCode]
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    // 注册默认的处理器
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    // 本地的端口
    int localListenPort();

    // 根据 requestCode 获取处理器和业务线程池
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    // 同步发送
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
    // 异步发送
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    // 单向发送
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

}

实现 NettyRemotingServer

这边选择性的进行摘取记录描述啊

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    // todo 此处代码省略号...
    // 构造
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
                               final ChannelEventListener channelEventListener) {
        //调用 NettyRemotingAbstract 的构造初始化相关配置
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        //开启一个ServerBootstrap
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
        
        //默认的 React
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        //根据系统环境或指的配置选择使用 Epoll 还是 Nio 模式
        if (useEpoll()) {
            //构建 EventLoopGroup 只有1个线程
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
            //构建 eventLoopGroupSelector
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            // todo 此处代码省略号...
        }
        //夹在SSL上下文
        loadSslContext();
    }
    
    //todo 此处代码省略号...

    // 核心的 启动方法
    @Override
    public void start() {
        //构建 Worker线程,根据配置
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
        // 准备共享式的Hanlders 包含了SSL 、编解码、连接管理、业务handler
        prepareSharableHandlers();

        //开启一个 Netty Server
        ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .option(ChannelOption.SO_KEEPALIVE, false)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                        .addLast(defaultEventExecutorGroup,
                                                encoder,
                                                new NettyDecoder(),
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                                connectionManageHandler,
                                                serverHandler
                                        );
                            }
                        });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 开启一个 Netty Server
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        // 开启扫描 ResponseTable 的检查线程
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
    
    //todo 此处代码省略号...

    // 这个是核心代码
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // 调用消息处理的方法  这个方法 在 NettyRemotingAbstract 类内上
            processMessageReceived(ctx, msg);
        }
    }
    //todo 此处代码省略号...
}

public abstract class NettyRemotingAbstract {
    
    // 根据消息的类型进行处理,是请求还是响应的CMD
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    //处理请求指令
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    //处理响应指令
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
}

好了,到现在我们大体的 remoting server 端的大体的入口和核心处理方法差不多了,后面补充一下 RocketMQ Netty Reactor 多线程的设计

rocketmq_reactor_thread

简单概括: 1-N-M1-M2 模型

线程数 线程名 线程具体说明 代码默认值
1 NettyBoss_%d Reactor主线程 1
N NettyServerEPOLLSelector_%d_%d Reactor线程池 3
M1 NettyServerCodecThread_%d Worker线程池 8
M2 RemotingExecutorThread_%d 业务processor处理线程池 8
  • [Reactor主线程] 1, 一个 Reactor主线程(eventLoopGroupBoss) 负责监听 TCP网络链接请求,建立好链接,创建SocketChannel、并注册到selector上。
  • [Reactor线程池] N=3, RocketMQ 根据OS或者配置选择NIO还是Epoll,然后监听真正的网络数据。后面拿到数据后,丢给 Reactor线程池 (eventLoopGroupSelector),
  • [Worker线程池] M1=8, 在执行业务逻辑之前的 SSL验证编解码空闲检查网络连接管理等等,这些工作交给了 defaultEventExecutorGroup
  • [业务processor处理线程池] M2=8, 处理业务操作的放在了业务线程池来执行,根据 RemotingCommand业务码 codeprocessorTable 缓存中获取到对应的 processor, 然后封装成Task任务,提交给业务processor处理线程池来执行 (eg: sendMessageExecutor,消息发送为例)

备注: 特别要说明 业务processor处理线程池 不一定是8,具体看代码:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer{
    //todo .....
    //注册Processor,一般 executor != null
    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
      ExecutorService executorThis = executor;
      if (null == executor) {
        executorThis = this.publicExecutor;
      }

      Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
      this.processorTable.put(requestCode, pair);
    }
    
    //注册默认的Processor,一般 executor != null
    @Override
    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
      this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
    }
    //todo .....
}

这边结合理解差不多了。

RemotingCommand

上面看见了在处理消息的时候一个很核心的类 下面来看一下这个类 org.apache.rocketmq.remoting.protocol.RemotingCommand

它其实是 rocketmq的指令协议

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息

协议编码后的样子 rocketmq_protocol_length

4个部分:

  • 消息长度: 总长度, 4 个字节存储, int 类型
  • 序列话类型&消息头长度: int 类型, 第一个字节表示序列化类型, 后面三个字节表示消息头长度
  • 消息头数据: 经过序列化后的消息头
  • 消息主体数据: 消息主体的二进制字节数据
public class RemotingCommand {
  public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
  public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
  public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
  private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
  private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
  private static final int RPC_ONEWAY = 1; // 0, RPC
  private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
          new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
  private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
  // 1, Oneway
  // 1, RESPONSE_COMMAND
  private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
  private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
  private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
  private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
  private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
  private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
  private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
  private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
  private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
  private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
  private static volatile int configVersion = -1;
  private static AtomicInteger requestId = new AtomicInteger(0);

  private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;

  static {
    final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
    if (!isBlank(protocol)) {
      try {
        serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
      } catch (IllegalArgumentException e) {
        throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
      }
    }
  }

  // 请求码操作
  private int code;
  // 语言类型
  private LanguageCode language = LanguageCode.JAVA;
  // 版本
  private int version = 0;
  // requestId,标记请求响应是一个映射的
  private int opaque = requestId.getAndIncrement();
  // 区分是普通RPC还是onewayRPC得标志
  private int flag = 0;
  // 传输自定义文本信息
  private String remark;
  // 请求自定义扩展信息
  private HashMap<String, String> extFields;
  private transient CommandCustomHeader customHeader;

  private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

  private transient byte[] body;

  protected RemotingCommand() {
  }

  //创建 requestCommand
  public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.setCode(code);
    cmd.customHeader = customHeader;
    setCmdVersion(cmd);
    return cmd;
  }

  //设置程序版本
  private static void setCmdVersion(RemotingCommand cmd) {
    if (configVersion >= 0) {
      cmd.setVersion(configVersion);
    } else {
      String v = System.getProperty(REMOTING_VERSION_KEY);
      if (v != null) {
        int value = Integer.parseInt(v);
        cmd.setVersion(value);
        configVersion = value;
      }
    }
  }

  public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
    return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
  }

  public static RemotingCommand createResponseCommand(int code, String remark,
                                                      Class<? extends CommandCustomHeader> classHeader) {
    RemotingCommand cmd = new RemotingCommand();
    cmd.markResponseType();
    cmd.setCode(code);
    cmd.setRemark(remark);
    setCmdVersion(cmd);

    if (classHeader != null) {
      try {
        CommandCustomHeader objectHeader = classHeader.newInstance();
        cmd.customHeader = objectHeader;
      } catch (InstantiationException e) {
        return null;
      } catch (IllegalAccessException e) {
        return null;
      }
    }

    return cmd;
  }

  public static RemotingCommand createResponseCommand(int code, String remark) {
    return createResponseCommand(code, remark, null);
  }

  //解码
  public static RemotingCommand decode(final byte[] array) {
    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
    return decode(byteBuffer);
  }

  //解码
  public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    int headerLength = getHeaderLength(oriHeaderLen);

    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
      bodyData = new byte[bodyLength];
      byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;

    return cmd;
  }

  public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
  }

  //消息头解码
  private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
    switch (type) {
      case JSON:
        RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
        resultJson.setSerializeTypeCurrentRPC(type);
        return resultJson;
      case ROCKETMQ:
        RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
        resultRMQ.setSerializeTypeCurrentRPC(type);
        return resultRMQ;
      default:
        break;
    }

    return null;
  }

  public static SerializeType getProtocolType(int source) {
    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
  }

  public static int createNewRequestId() {
    return requestId.getAndIncrement();
  }

  public static SerializeType getSerializeTypeConfigInThisServer() {
    return serializeTypeConfigInThisServer;
  }

  private static boolean isBlank(String str) {
    int strLen;
    if (str == null || (strLen = str.length()) == 0) {
      return true;
    }
    for (int i = 0; i < strLen; i++) {
      if (!Character.isWhitespace(str.charAt(i))) {
        return false;
      }
    }
    return true;
  }

  //设置协议类型
  public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
  }

  public void markResponseType() {
    int bits = 1 << RPC_TYPE;
    this.flag |= bits;
  }

  public CommandCustomHeader readCustomHeader() {
    return customHeader;
  }

  public void writeCustomHeader(CommandCustomHeader customHeader) {
    this.customHeader = customHeader;
  }

  public CommandCustomHeader decodeCommandCustomHeader(
          Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
    CommandCustomHeader objectHeader;
    try {
      objectHeader = classHeader.newInstance();
    } catch (InstantiationException e) {
      return null;
    } catch (IllegalAccessException e) {
      return null;
    }

    //解码定制的扩展信息字段
    if (this.extFields != null) {

      Field[] fields = getClazzFields(classHeader);
      for (Field field : fields) {
        if (!Modifier.isStatic(field.getModifiers())) {
          String fieldName = field.getName();
          if (!fieldName.startsWith("this")) {
            try {
              String value = this.extFields.get(fieldName);
              if (null == value) {
                if (!isFieldNullable(field)) {
                  throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
                }
                continue;
              }

              field.setAccessible(true);
              String type = getCanonicalName(field.getType());
              Object valueParsed;

              if (type.equals(STRING_CANONICAL_NAME)) {
                valueParsed = value;
              } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
                valueParsed = Integer.parseInt(value);
              } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
                valueParsed = Long.parseLong(value);
              } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
                valueParsed = Boolean.parseBoolean(value);
              } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
                valueParsed = Double.parseDouble(value);
              } else {
                throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
              }

              field.set(objectHeader, valueParsed);

            } catch (Throwable e) {
              log.error("Failed field [{}] decoding", fieldName, e);
            }
          }
        }
      }

      objectHeader.checkFields();
    }

    return objectHeader;
  }

  private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
    Field[] field = CLASS_HASH_MAP.get(classHeader);

    if (field == null) {
      field = classHeader.getDeclaredFields();
      synchronized (CLASS_HASH_MAP) {
        CLASS_HASH_MAP.put(classHeader, field);
      }
    }
    return field;
  }

  private boolean isFieldNullable(Field field) {
    if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
      Annotation annotation = field.getAnnotation(CFNotNull.class);
      synchronized (NULLABLE_FIELD_CACHE) {
        NULLABLE_FIELD_CACHE.put(field, annotation == null);
      }
    }
    return NULLABLE_FIELD_CACHE.get(field);
  }

  private String getCanonicalName(Class clazz) {
    String name = CANONICAL_NAME_CACHE.get(clazz);

    if (name == null) {
      name = clazz.getCanonicalName();
      synchronized (CANONICAL_NAME_CACHE) {
        CANONICAL_NAME_CACHE.put(clazz, name);
      }
    }
    return name;
  }

  // 编码
  public ByteBuffer encode() {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;

    // 3> body data length
    if (this.body != null) {
      length += body.length;
    }

    ByteBuffer result = ByteBuffer.allocate(4 + length);

    // length
    result.putInt(length);

    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    // body data;
    if (this.body != null) {
      result.put(this.body);
    }

    result.flip();

    return result;
  }

  private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
      return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
      return RemotingSerializable.encode(this);
    }
  }

  public void makeCustomHeaderToNet() {
    if (this.customHeader != null) {
      Field[] fields = getClazzFields(customHeader.getClass());
      if (null == this.extFields) {
        this.extFields = new HashMap<String, String>();
      }

      for (Field field : fields) {
        if (!Modifier.isStatic(field.getModifiers())) {
          String name = field.getName();
          if (!name.startsWith("this")) {
            Object value = null;
            try {
              field.setAccessible(true);
              value = field.get(this.customHeader);
            } catch (Exception e) {
              log.error("Failed to access field [{}]", name, e);
            }

            if (value != null) {
              this.extFields.put(name, value.toString());
            }
          }
        }
      }
    }
  }

  public ByteBuffer encodeHeader() {
    return encodeHeader(this.body != null ? this.body.length : 0);
  }

  public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData;
    headerData = this.headerEncode();

    length += headerData.length;

    // 3> body data length
    length += bodyLength;

    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // length
    result.putInt(length);

    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // header data
    result.put(headerData);

    result.flip();

    return result;
  }

  public void markOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    this.flag |= bits;
  }

  @JSONField(serialize = false)
  public boolean isOnewayRPC() {
    int bits = 1 << RPC_ONEWAY;
    return (this.flag & bits) == bits;
  }

  public int getCode() {
    return code;
  }

  public void setCode(int code) {
    this.code = code;
  }

  @JSONField(serialize = false)
  public RemotingCommandType getType() {
    if (this.isResponseType()) {
      return RemotingCommandType.RESPONSE_COMMAND;
    }

    return RemotingCommandType.REQUEST_COMMAND;
  }

  @JSONField(serialize = false)
  public boolean isResponseType() {
    int bits = 1 << RPC_TYPE;
    return (this.flag & bits) == bits;
  }

  public LanguageCode getLanguage() {
    return language;
  }

  public void setLanguage(LanguageCode language) {
    this.language = language;
  }

  public int getVersion() {
    return version;
  }

  public void setVersion(int version) {
    this.version = version;
  }

  public int getOpaque() {
    return opaque;
  }

  public void setOpaque(int opaque) {
    this.opaque = opaque;
  }

  public int getFlag() {
    return flag;
  }

  public void setFlag(int flag) {
    this.flag = flag;
  }

  public String getRemark() {
    return remark;
  }

  public void setRemark(String remark) {
    this.remark = remark;
  }

  public byte[] getBody() {
    return body;
  }

  public void setBody(byte[] body) {
    this.body = body;
  }

  public HashMap<String, String> getExtFields() {
    return extFields;
  }

  public void setExtFields(HashMap<String, String> extFields) {
    this.extFields = extFields;
  }

  public void addExtField(String key, String value) {
    if (null == extFields) {
      extFields = new HashMap<String, String>();
    }
    extFields.put(key, value);
  }

  @Override
  public String toString() {
    return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
            + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
            + serializeTypeCurrentRPC + "]";
  }

  public SerializeType getSerializeTypeCurrentRPC() {
    return serializeTypeCurrentRPC;
  }

  public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
    this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
  }
}

感谢 !


最后修改于 2021-05-22