组成

消费者

消费者消费消息方式主要有2种:

  1. pull方式,客户端主动拉取数据,由客户端控制消息offset。

  2. push长轮训方式,服务端收到请求,如果队列里面没有消息,broker会hold住请求一段时间。比较适合客户端数量可控的环境。

生产者

  1. 消息发送有2种方式,同步和异步。

  2. 可支持固定时间间隔的延迟消息。

  3. 事务的支持。例如:a用户增加100元,b用户减少100。流程是先发b用户减少100元消息,然后执行本地a用户增加100元事务,根据本地事务的结果,来决定b用户减少100元的消息是回退还是提交。同时mq会定期回检消息是否提交。只有消息提交后,才会真正被消费。需要注意的是,修改已经提交的消息,会影响性能。因此,rocket4.x去掉了这个功能。

队列位置信息

Offset是指某个topic下某条消息在某个message quene中的位置。根据消费模式的不同,有2种方式。Offset结构如下:

  1. 当集群消费模式时,使用RemoteBrokerOffsetStore,由broker存储和控制位置信息。

  2. 当广播消费模式时,使用LocalfileOffsetStore,Offset保存在本地。

消息存储内容

需要注意的是,如果使用PullConsumer来拉取消息,位置信息就需要使用者自己维护,持久化位置信息来防止丢失。

NameServer

分布式信息的统一维护者。如果是多个NameServer,则相互之间是独立的。Brokers定期上报自身信息,nameServer将信息保存在内存中。

// key是topic名称,List每个item对应一个队列,QueueData存储着master的 Broker的名称、 读写queue的数量、 同步标识等。
private final HashMap<String topic, List<QueueData>> topicQueueTable topicQueueTable
//key是broker名称,BrokerData存储着所属集群,broker地址信息等,
private final HashMap<String BrokerName,BrokerData>Broker-AddrTable
//key是集群名称,value是集群中包含的节点名称集合。
private final HashMap<String ClusterName, Set<String BrokerName>> ClusterAddrTable
//key是broker地址,value是当前broker的实时状态,更新时间戳。nameserver会定期检查更新时间戳,超时则会剔除该broker
private final HashMap<String BrokerAddr , BrokerLivelnfo> Broker- LiveTable
//key是broker地址,value是需要过滤的broker列表
private final HashMap<String BrokerAddr  , List<String>  Filter Server > filterServerTable

定期检查broker

为什么没有使用zookeeper?

zk功能很强大,并且比较完善,比如master选举,但是rocketmq的设计却用不到这些功能。rocketmq只需要一个轻量的元数据存储即可,这样可以在满足要求的基础上减少维护成本。

底层通信

  1. RemotingService为上层接口,提供开启,关闭和注册。

  2. RemotingClient和RemotingServer继承RemotingService接口。

  3. 提供NettyRemotingClient和NettyRemotingServer实现。

协议设计和编码

RocketMQ 自己定义了一个通信协议。

  1. header data为json数据。

  2. body data为二进制数据。

Last updated

Was this helpful?