Skip to content

RocketMQ的集群架构是怎样的

RocketMQ的集群架构设计旨在提高系统的可用性、可靠性和可扩展性。它通过多种组件协同工作,实现消息的生产、存储、分发和消费。以下是关于RocketMQ集群架构及其使用场景的详细说明。

RocketMQ集群架构

  1. NameServer
    • 它是一个几乎无状态的节点,可以集群部署用于服务发现。
    • NameServer为Producer和Consumer提供路由信息。
  2. Broker
    • Broker负责接收、存储和转发消息。
    • 它可以分为Master和Slave,支持多对Master-Slave配置以实现高可用。
    • Master和Slave之间通过同步/异步机制进行数据复制。
  3. Producer
    • 消息生产者负责产生消息,并发送到Broker。
    • 支持同步和异步发送消息。
  4. Consumer
    • 消息消费者负责从Broker接收消息。
    • 支持Push和Pull两种消费方式,可以是集群消费或广播消费。

使用场景

  1. 高可用性
    • 通过Master-Slave配置和Broker高可用机制,保证系统在部分节点失效的情况下仍能正常运行。
  2. 消息持久化
    • 控制消息的存储策略(同步/异步),从而达成更高的可靠性需求。
  3. 负载均衡
    • 使用多NameServer和Broker集群,支持负载均衡,从而支持大规模消息流量。
  4. 弹性扩展
    • 可以按需添加NameServer和Broker节点,进行水平扩展。

代码示例

创建NameServer集群

NameServer无状态,可以简单地在不同的服务器上启动多个实例:

java
nohup sh mqnamesrv &

多台服务器上重复上述操作。

配置和启动Broker集群

配置文件(broker-a.properties)示例:

java
brokerClusterName=DefaultCluster  
brokerName=broker-a  
brokerId=0  // 0表示Master,>0表示Slave  
namesrvAddr=localhost:9876;localhost:9877  // 多个NameServer地址

启动Broker:

java
nohup sh mqbroker -c broker-a.properties &

生产者与消费者代码示例

生产者:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;  
import org.apache.rocketmq.common.message.Message;  

public class ClusterProducerExample {  
    public static void main(String[] args) throws Exception {  
        DefaultMQProducer producer = new DefaultMQProducer("ClusterProducerGroup");  
        producer.setNamesrvAddr("localhost:9876;localhost:9877");  
        producer.start();  

        for (int i = 0; i < 10; i++) {  
            Message msg = new Message("ClusterTopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());  
            producer.send(msg);  
        }  

        producer.shutdown();  
    }  
}

消费者:

java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import org.apache.rocketmq.common.message.MessageExt;  

import java.util.List;  

public class ClusterConsumerExample {  
    public static void main(String[] args) throws Exception {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ClusterConsumerGroup");  
        consumer.setNamesrvAddr("localhost:9876;localhost:9877");  
        consumer.subscribe("ClusterTopicTest", "*");  

        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                for (MessageExt msg : msgs) {  
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));  
                }  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  

        consumer.start();  
        System.out.printf("Consumer started.%n");  
    }  
}

总结

RocketMQ的集群架构通过NameServer、Broker的灵活组合,实现了高可用、弹性伸缩和负载均衡,适用于各种规模的分布式系统。通过合理配置和使用,RocketMQ可以支持广泛的应用场景,如在线业务系统、异步处理和事件驱动架构。

更新: 2024-08-18 20:26:01
原文: https://www.yuque.com/tulingzhouyu/db22bv/whblnk2iu56i25po