RocketMQ
阅读数:129 评论数:0
跳转到新版页面分类
应用软件
正文
一、概述
Apache RocketMQ 是一款分布式消息中间件,最初由阿里巴巴开发,并于 2016 年捐赠给 Apache 软件基金会。RocketMQ 以其高性能、高可靠性以及丰富的特性集在消息队列和流处理领域得到了广泛应用。
二、基本概念
1、Producer
负责发送消息到 RocketMQ。
2、Consumer
负责从 RocketMQ 中消费消息。
消息代理(Broker)是负责存储和转发消息的服务器。一个 RocketMQ 集群可以包含多个 Broker,每个 Broker 可以管理多个 Topic。
(1)示例
假设我们有一个 Topic 叫做 "TopicTest",并且它有 4 个队列。我们有两个 Broker,分别叫做 BrokerA 和 BrokerB。队列可以分布在这两个 Broker 上,如下所示:
-
BrokerA:
- Queue 0 (TopicTest)
- Queue 1 (TopicTest)
-
BrokerB:
- Queue 2 (TopicTest)
- Queue 3 (TopicTest)
(2)Broker的架构
Broker 的架构可以分为以下几个主要组件:
- CommitLog
- 消息的物理存储文件,采用顺序写入方式,极大地提高了写入性能。
- 每条消息在 CommitLog 中都有一个唯一的偏移量(offset)。
- ConsumeQueue
- 消费队列,存储消息在 CommitLog 中的偏移量和其他元数据。
- 每个 Topic 的每个队列都有一个对应的 ConsumeQueue 文件。
- IndexFile
- 索引文件,通过消息的 key 或者时间戳快速查找消息。
- 消息过滤
- Broker 支持基于 Tag 的消息过滤,消费者可以通过指定 Tag 来过滤消息。
(3)broker配置
Broker 的配置文件通常为 broker.conf
,包含了 Broker 的各种配置项。以下是一些常见的配置项:
# Broker 的角色:0 表示 Master,1 表示 Slave
brokerRole=ASYNC_MASTER
# Broker 的名字
brokerName=broker-a
# Broker 的 ID,0 表示 Master,大于0 表示 Slave
brokerId=0
# NameServer 的地址
namesrvAddr=localhost:9876
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# CommitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 索引文件存储路径
storePathIndex=/usr/local/rocketmq/store/index
# 消息发送的最大大小(默认 4MB)
maxMessageSize=4194304
# Broker 的监听端口
listenPort=10911
使用 RocketMQ 提供的脚本可以启动 Broker:
# 启动 Broker
sh bin/mqbroker -c conf/broker.conf
启动后,可以通过查看日志文件确认 Broker 是否成功启动。日志文件通常位于 logs/rocketmqlogs/broker.log
。
(4)主从架构
RocketMQ 支持 Broker 的主从架构,以实现高可用性。主从架构包括以下几种模式:
- 同步复制(SYNC_MASTER)
- Master Broker 将消息同步复制到 Slave Broker,确保消息的高可用性
- 如果 Master Broker 崩溃,Slave Broker 可以无缝接管。
- 异步复制(ASYNC_MASTER)
- Master Broker 异步地将消息复制到 Slave Broker,复制过程不会影响 Master 的性能。
- 这种模式下存在数据丢失的风险,因为消息在复制到 Slave 之前 Master 可能会崩溃。
- 单Master(NO_SLAVE)
- 只有一个 Master Broker,没有 Slave Broker。
- 这种模式下没有高可用性保障,一旦 Master 崩溃,消息服务将中断。
在 Apache RocketMQ 中,NameServer 是一个关键组件,负责管理和协调 Broker 的元数据,并为生产者和消费者提供路由信息。
(1)功能
- 路由信息管理:NameServer 负责存储和管理 Broker 的路由信息,包括 Broker 的地址、Topic 的队列信息等。它为生产者和消费者提供最新的路由信息,以便它们能够正确地发送和接收消息。
- 动态注册和注销:Broker 启动时会向 NameServer 注册自己的信息,包括 Broker 的地址、所属集群、服务的 Topic 等。当 Broker 关闭时,也会向 NameServer 注销其信息。
- 负载均衡和高可用性:NameServer 通常以集群方式部署,多个 NameServer 实例之间是对等的,没有主从关系。生产者和消费者可以从任意一个 NameServer 获取路由信息,这样即使某个 NameServer 实例不可用,系统仍然能够正常工作。
(2)工作流程
- broker注册
- Broker 启动时,会将自身的元数据(如 Broker ID、地址、服务的 Topic 等)发送到所有 NameServer。
- NameServer 收到注册请求后,更新其内部的路由表,记录 Broker 的信息。
- 生产者和消费者获取路由信息
- 生产者和消费者在启动时,会向 NameServer 请求特定 Topic 的路由信息。
- NameServer 根据内部的路由表,返回该 Topic 所有队列的分布信息(包括队列所在的 Broker 地址等)。
- 生产者和消费者根据这些路由信息与相应的 Broker 进行通信。
- 路由信息更新
- Broker 定期(默认30秒)向 NameServer 发送心跳包(包含路由数据),以保持其注册信息的最新状态。
- 如果 NameServer 在一定时间内没有收到某个 Broker 的心跳包,会认为该 Broker 已下线,并从路由表中删除相应的路由信息。
(3)路由剔除
-
正常情况下,如果Broker关闭,则会与NameServer断开长连接,Netty的通道关闭监听器会监听到连接断开事件,然后会将这个Broker信息剔除掉。
-
异常情况下,NameServer中有一个定时任务,每隔10秒扫描一下Broker表,如果某个Broker的心跳包最新时间戳距离当前时间超过120秒,也会判定Broker失效并将其移除。
消息的分类标识,用于区分不同类型的消息。
(1)queue与topic的关系
队列是topic下的逻辑分区。每个topic可以包含多个队列,消息被分布到不同的队列中进行存储和消费。
队列的数量可以在创建 Topic 时指定。消息被发送到 Topic 时,会根据一定的策略(如轮询或哈希)分布到不同的队列中。
sh mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest -q 4
这条命令会在 DefaultCluster 集群中创建一个名为 "TopicTest" 的 Topic,并为其分配 4 个队列。
消息的基本单元,包含消息体、标签和其他属性。
(1)messageId
由producer端生成,其生成规则 :producerIp+进程pid+hashCode+当前时是+AutomicInteger自增计数器。
(2)offsetMsgId
由broker端生成,其生成规则:brokerIp+物理分区的offset(queue中的偏移量)
(3)key
由用户指定的业务相关的唯一标识 。
三、下载和安装
1、下载和安装
从官网:https://rocketmq.apache.org/下载最新版本的RocketMQ
wget https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
unzip rocketmq-all-4.9.2-bin-release.zip
cd rocketmq-all-4.9.2-bin-release
2、启动NameServer
nohup sh bin/mqnamesrv &
3、启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
检查 Broker 是否启动成功:
tail -f ~/logs/rocketmqlogs/broker.log
四、使用示例
以下是一个简单的生产者示例,使用 Java 编写:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 启动生产者实例
producer.start();
// 创建消息实例
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 发送消息
producer.send(msg);
// 关闭生产者实例
producer.shutdown();
}
}
以下是一个简单的消费者示例,使用 Java 编写:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
}
对于一个消费者实例,你可以多次调用 subscribe
方法来订阅多个不同的主题。下面是一个示例,展示了如何订阅多个主题:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class MultiTopicConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅第一个主题
consumer.subscribe("TopicTest1", "*");
// 订阅第二个主题
consumer.subscribe("TopicTest2", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
}
在上面的示例中,消费者订阅了两个主题 TopicTest1
和 TopicTest2
。消息监听器会处理来自这两个主题的消息。
你还可以为每个主题指定过滤条件(例如标签),以便消费者只接收符合条件的消息。例如:
// 订阅第一个主题并指定标签过滤条件
consumer.subscribe("TopicTest1", "TagA || TagB");
// 订阅第二个主题并指定标签过滤条件
consumer.subscribe("TopicTest2", "TagC");
在这个示例中,消费者只会接收 TopicTest1
中带有 TagA
或 TagB
标签的消息,以及 TopicTest2
中带有 TagC
标签的消息。
注意事项:
(1)消费模型:RocketMQ 支持两种消费模型:集群消费(Clustering)和广播消费(Broadcasting)。在集群消费模式下,同一个消费者组内的消费者实例会均匀分配消息。而在广播消费模式下,每个消费者实例都会接收到所有的消息。在订阅多个主题时,需要确保消费模型的设置符合你的需求。
(2)消息监听器:在一个消费者实例中,所有订阅的主题共享同一个消息监听器。因此,你需要在消息监听器中根据消息的主题和标签来区分处理逻辑。
为了实现高可用性,通常会部署多个 NameServer 实例。生产者和消费者在启动时,可以配置多个 NameServer 地址,以便在某个 NameServer 实例不可用时,仍然能够从其他实例获取路由信息。
在生产者或消费者的配置中,可以通过逗号分隔的方式指定多个 NameServer 地址:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876;localhost:9877");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
双主模式(Dual Master Mode)是一种高可用的部署方式,它通过设置两个主 Broker 来提高系统的可靠性和可用性。双主模式的主要特点是没有 Slave Broker,每个主 Broker 独立处理消息的存储和转发。即使一个主 Broker 发生故障,另一个主 Broker 仍然可以继续提供服务,从而实现高可用性。
(1)配置
为了配置双主模式,你需要设置两个主 Broker,并确保它们分别向 NameServer 注册。以下是一个示例配置:
broker-a.conf
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=localhost:9876
listenPort=10911
storePathRootDir=/usr/local/rocketmq/store-a
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
storePathIndex=/usr/local/rocketmq/store-a/index
broker-b.conf
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
namesrvAddr=localhost:9876
listenPort=10912
storePathRootDir=/usr/local/rocketmq/store-b
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
storePathIndex=/usr/local/rocketmq/store-b/index
(2)启动双主模式
# 启动 Broker A
sh bin/mqbroker -c conf/broker-a.conf
# 启动 Broker B
sh bin/mqbroker -c conf/broker-b.conf
(3)注意事项
- 监控和管理:定期监控两个 Broker 的状态,确保它们都在正常运行。可以使用 RocketMQ 提供的监控工具(如 RocketMQ Console)来管理和监控 Broker 的状态。
五、高级特性
RocketMQ 支持根据标签(Tag)进行消息过滤。生产者可以为每条消息设置标签,消费者可以订阅特定标签的消息。
// 生产者设置标签
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 消费者订阅特定标签
consumer.subscribe("TopicTest", "TagA");
RocketMQ 支持顺序消息,保证在同一个队列中的消息按顺序消费。
// 生产者发送顺序消息
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
int index = (int) (id % mqs.size());
return mqs.get(index);
}
}, orderId);
// 消费者消费顺序消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
RocketMQ 支持延时消息,可以设置消息在指定时间后再被消费。
// 设置延时等级
msg.setDelayTimeLevel(3); // 延时等级3,表示消息将在10秒后被消费
RocketMQ 支持事务消息,用于保证消息的可靠性和一致性。
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
// 执行本地事务逻辑
return LocalTransactionState.COMMIT_MESSAGE;
}
}, null);