rocketmq

1. NameServer

Two features:

2. Broker Server

Broker server is responsible for message store and delivery, message query, HA guarantee, and so on.

As shown in image below, Broker server has several important sub modules:

2.1 Broker Role

Deploy strategy:

3. Storage

3.1 CommitLog

3.2 ConsumeQueue

3.3 Index File

key_hash = hash(<topic> + "#" + key)
slot_num = key_hash % 500w
slot_offset = 40 + 4 * slot_num
index_sequence = Int4BytesAt(slot_offset)
index_offset = 40 + 4 * 500w + 20 * index_sequence
index_key_hash = Int4BytesAt(index_offset)
index_commit_offset = Long8BytesAt(index_offset + 4)
index_timestamp = Int4BytesAt(index_offset + 4 + 8) * 1000 + header_timestamp
index_next_sequence = Int4BytesAt(index_offset + 4 + 8 + 4)

3.4 write/read data

Uses PageCache to flush data to disk to enhence write performance.

Uses MappedByteBuffer(mmap) to enhence read performance.

4. Message

4.1 Topic

Topic, internally, is logically partitioned into one or more sub-topics. We call these sub-topics message queues. This concept plays a major role in implementing valuable features, including fail-over, maximum concurrency, etc.

4.2 Tag

Consume tag filter is processed on broker side, only filter by tag hash code. So consumer SHOULD check whether the tag is matched for received messages.

4.3 Message Format

Fields in a message:

Field Bytes Size Desc
msgSize 4 message size
MAGICCODE 4 fixed daa320a7
BODY CRC 4 check when broker restarting to recover
queueId 4 queue id
flag 4 flag
QUEUEOFFSET 8 sequence in consumeQueue or tranStateTable; For none-transaction message or commit message,QUEUEOFFSET * 20 is the offset in consume queue; For Prepared or Rollback Message,used to find data in tranStateTable.
PHYSICALOFFSET 8 offset in commit log
SYSFLAG 4 4 bytes from lower to high;1st byte=1 means compressed; 2nd byte = 1 means MultiTags; 3rd byte = 1 means prepared message; 4th byte = 1 means commit message; 3rd&4th bytes = 1 means rollback message; 3rd&4th bytes = 0 means none-transaction message;
BORNTIMESTAMP 8 producer timestamp
BORNHOST 8 producer address:port
STORETIMESTAMP 8 store timestamp at broker
STOREHOSTADDRESS 8 broker address:port
RECONSUMETIMES 8 re-consumed times for a consume group (count separatedly), for retry-messages being send to topic %retry%groupName(queueId=0)
PreparedTransaction Offset 8 transaction prepared message offset
messagebodyLength 4 length of body
messagebody bodyLength body
topicLength 1 topic length
topic topicLength topic
propertiesLength 2 properties length
properties propertiesLength properties

4.4 Message Id

16 bytes: <broker ip> + <broker port> + offset

4.5 transaction message

Transaction message is based on 2PC and compensation check.

Two inner topics:

 --------------    offset    -----------
 | half topic | <---------- | op topic |
 -------------              -----------
                               |
  --------------     commit    |
 | user topic | <---------------
 -------------         

If not receive op message, try to check the state of local transaction. Retry maximum 15 times, otherwise rollback message.

4.6 delay message

RocketMQ an inner topic SCHEDULE_TOPIC_XXXX and 18 queues for different delay time level. 1st level queue delays 1s, 18th level queue delays 2h.

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Set delay time level:

Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
// set delay level to 5, means delay 1m
msg.setDelayTimeLevel(5);
producer.send(msg);

Delay message process:

  1. change the Topic&Queue to SCHEDULE_TOPIC_XXXX and delay time level queue, original Topic&Queue will be saved in message properties. calculate the delivery time, which equals storeTimestamp + delay_level_time, as the tag hash code.
  2. send message to delay ConsumeQueue.
  3. ScheduleMessageService consume topic SCHEDULE_TOPIC_XXXX. It will check the delivery time of the first message, and continue process next message if matches.
  4. ScheduleMessageService change Topic&Queue of the message to the original, then save message to CommitLog again.
  5. send message to original Topic&Queue.
  6. consumers consume from original Topic.

4.7 reconsume message

RocketMQ allows reconsuming a message if consuming failed and returning status ConsumeConcurrentlyStatus.RECONSUME_LATER, then the message will be send to delay message topic as a delay message.

The delay time level is default 3 (means delay 10s), and will be increased 1 if requesting to reconsume again. The message will be discarded if the delay time level exceeds the max. So the max consume retry time is 16.

consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                   ConsumeConcurrentlyContext context) {
       // handle message failed

       // return RECONSUME_LATER to reconsume later
       return ConsumeConcurrentlyStatus.RECONSUME_LATER;
   }
});

5. communication

communication between clients, brokers, nameservers.

5.1 communication message

Format: <message length>(4 bytes) + <Serialization type>(1 byte) + <Header length>(3 bytes) + <Data Header> + <Message Body>

Header Field Type Request Desc Response Desc
code int Request Operation Code Response code, 0:success, none-zero: fail
language LanguageCode request language response language
version int request version response version
opaque int like requestId,different for requests on same connection response the original value
flag int normal rpc or oneway normal rpc or oneway
remark String custom info custom info
extFields HashMap<String, String> custom info custom info

5.2 communication method

5.3 balance

Balance is done on client side both for provider and consumer.

6. Cluster

Cluster Model Performance Available Message lose
Mutiple Masters High Performance Not available when going down No message lose
Multple Masters, Multiple Slaves
Async Replication
High Performance (small replication latency for slaves) Available when master going down may lose some message when master going down
Multple Masters, Multiple Slaves
Sync Replication
RT is higher, thoughput 10% lower than others Available when master going down No message lose

6.1 broker configuration

# cluster name
brokerClusterName=rocketmq-cluster

# must different for different broker, slaves have the same name.
brokerName=broker-a

# 0:Master,>0:Slave
brokerId=0

# Broker IP
brokerIP1=192.168.31.186

# Broker listen port
listenPort=10911

# time to delete file, default 04:00
deleteWhen=04

# reserved time for files, default 48 hours
fileReservedTime=120

# Broker role,ASYNC_MASTER,SYNC_MASTER,SLAVE
brokerRole=ASYNC_MASTER

# flush disk type,ASYNC_FLUSH,SYNC_FLUSH
flushDiskType=SYNC_FLUSH

# nameServer addresses,format is ip1:port1;ip2:port2;ip3:port3
namesrvAddr=192.168.31.186:9876;192.168.31.231:9876

# queue numbers for topic, default 4. For load balance.
defaultTopicQueueNums=8

# whether create Topic automatically
autoCreateTopicEnable=false

# whether create subscription group automatically
autoCreateSubscriptionGroup=false

# store root path
storePathRootDir=/data/rocketmq-all-4.9.1-bin-release/data/store-a

# commitLog
storePathCommitLog=/data/rocketmq-all-4.9.1-bin-release/data/store-a/commitlog

# store path for consumer queue
storePathConsumerQueue=/data/rocketmq-all-4.9.1-bin-release/data/store-a/consumequeue

# store path for index
storePathIndex=/data/rocketmq-all-4.9.1-bin-release/data/store-a/index

# store path for checkpoints
storeCheckpoint=/data/rocketmq-all-4.9.1-bin-release/data/store-a/checkpoint

# store path for abort file
abortFile=/data/rocketmq-all-4.9.1-bin-release/data/store-a/abort

# size of one commitLog file, default 1G.
mapedFileSizeCommitLog=1073741824

# number of messages in one ConsumeQueue file, default 30W
mapedFileSizeConsumeQueue=300000

6.2 replication

 _________                                  __________
 |        | <---- report slave offset ----- |        |
 | master |                                 |  slave |
 |        | ---- send batch messages  ----> |        |
 ----------                                  ---------

NOTE: master-slave cluster is not guarantee strong consistence between master and slaves.

// CommitLog.java
// public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {

// 1. async flush to db 
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 2. async replicate to slave
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
        if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
            log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                    msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
        }
    }
    return putMessageResult;
});

6.3 Dledger cluster

dledger is a raft-based java library for building high-available, high-durable, strong-consistent commitlog, which could act as the persistent layer for distributed storage system, i.e. messaging, streaming, kv, db, etc.

rocketmq support to change storage to dledger, to provide consistence between master and slaves.

A. Reference

History