柚子快报激活码778899分享:微服务中间件之RocketMQ

http://yzkb.51969.com/

RocketMQ

RocketMQ 基础概念

1. 主题 Topic

主题是 Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。 主题的作用主要如下:

定义数据的分类隔离: 在 Apache RocketMQ 的方案设计中,建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。定义数据的身份和权限: Apache RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。

2. 队列 Queue

队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是 Apache RocketMQ 消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。

3. 消息 Message

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。

4. 生产者 Producer

发布消息的角色。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。

5. 消费者 Consumer

消息消费的角色。

支持以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ 底层原理

名字服务器 NameServer

NameServer 是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。

主要包括两个功能:

Broker管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 都保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。

NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯。Broker 向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息。

代理服务器 Broker(提供topic服务,消息是存放在Broker中)

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

NameServer 几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker 部署相对复杂。

在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。

部署模型小结:

每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息。

具体工作原理

1. 启动 NameServer

启动 NameServer。NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。

2. 启动 Broker

启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

3. 创建 Topic

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。

4. 生产者发送消息

生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker发消息。

5. 消费者接受消息

消费者接受消息。跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。

Example

启动 RocketMQ 安装 NameServer。 docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/**rocketmq:server-4.5.1**

安装 Brocker。 docker run -d \

-p 10911:10911 \

-p 10909:10909 \

--name rmqbroker \

--link rmqnamesrv:namesrv \

-v ${HOME}/docker/software/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf \

-e "NAMESRV_ADDR=namesrv:9876" \

-e "JAVA_OPTS=-Duser.home=/opt" \

-e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \

foxiswho/**rocketmq:broker-4.5.1**

安装 RocketMQ 控制台。 docker pull pangliang/rocketmq-console-ng

docker run -d \

--link rmqnamesrv:namesrv \

-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false" \

--name rmqconsole \

-p 8088:8080 \

-t pangliang/**rocketmq-console-ng**

消息生产者 配置文件中引入 RocketMQ 相关配置定义,比如连接 NameServer 地址等。 server:

port: 6060

rocketmq:

name-server: 127.0.0.1:9876 # NameServer 地址

producer:

group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义

定义消息生产者,通过 RocketMQTemplate 向 RocketMQ 发送普通常规消息。 /**

* 普通消息发送者

*

* @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料

*/

@Slf4j

@Component

@RequiredArgsConstructor

public class GeneralMessageDemoProduce {

private final RocketMQTemplate rocketMQTemplate;

/**

* 发送普通消息

*

* @param topic 消息发送主题,用于标识同一类业务逻辑的消息

* @param tag 消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。

* @param keys 消息索引键,可根据关键字精确查找某条消息

* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串

* @return 消息发送 RocketMQ 返回结果

*/

public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {

SendResult sendResult;

try {

StringBuilder **destinationBuilder** = **StrUtil.builder().append(topic)**;

if (StrUtil.isNotBlank(tag)) {

destinationBuilder.append(":").append(tag);

}

Message **message** = MessageBuilder

.withPayload(messageSendEvent)

.setHeader(MessageConst.PROPERTY_KEYS, keys)

.setHeader(MessageConst.PROPERTY_TAGS, tag)

.build();

// 向topic发送消息

sendResult = **rocketMQTemplate.syncSend(

destinationBuilder.toString(),

message,

2000L**

);

log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);

} catch (Throwable ex) {

log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);

throw ex;

}

return sendResult;

}

}

#

消息消费者 定义消息消费者,从 RocketMQ Broker 拉取对应 Topic Tag 的消息列表 /**

* 普通消息消费者

*

* @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料

*/

@Slf4j

@Component

@RequiredArgsConstructor

**@RocketMQMessageListener(

topic = "rocketmq-demo_common-message_topic", // 从topic获取消息

selectorExpression = "general",

consumerGroup = "rocketmq-demo_general-message_cg"

)**

public class GeneralMessageDemoConsume implements RocketMQListener {

@Override

public void onMessage(GeneralMessageEvent message) {

log.info("接到到RocketMQ消息,消息体:{}", JSON.toJSONString(message));

}

}

发送消息 定义消息发送程序,这里为了避免类过多,直接写在 SpringBoot 的启动程序里。发送普通消息的方法返回值就是发送 RocketMQ Broker 返回的状态码,成功的话就是 SEND_OK。 @RestController

@RequiredArgsConstructor

@SpringBootApplication

@Tag(name = "RocketMQ发送示例", description = "RocketMQ发送示例启动器")

public class RocketMQDemoApplication {

private final GeneralMessageDemoProduce generalMessageDemoProduce;

@PostMapping("/test/send/general-message")

@Operation(summary = "发送RocketMQ普通消息")

public String sendGeneralMessage() {

String keys = UUID.randomUUID().toString();

GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()

.body("消息具体内容,可以是自定义对象,最终都会序列化为字符串。如果是取消订单,这里应该是订单ID或者相关联的信息")

.keys(keys)

.build();

SendResult sendResult = **generalMessageDemoProduce.sendMessage**(

**"rocketmq-demo_common-message_topic", // topic**

"general",

keys,

generalMessageEvent

);

return sendResult.getSendStatus().name();

}

public static void main(String[] args) {

SpringApplication.run(RocketMQDemoApplication.class, args);

}

}

Rocket 架构分析刨析

NameServer 内核原理剖析

NameServer 是可以集群部署的,但是集群中的每台 NameServer 之间不会进行通信 ,这样的好处就是 NameServer集群中每个节点都是对等的,其中一台挂了之后,对集群不会有影响。 Broker 在启动之后,会想 NameServer 集 群中的每个 NameServer 中都会注册自己的信息。 Broker 每隔 30s 会想 NameServer 中发送心跳,来让 NameServer 感知到 Broker 的存活状态。 在 NameServer 中有一个后台线程,会每隔 10s 去检查是否有 Broker 在 120s 内都没有发送心跳,如果有,就将该Broker 从存活列表中剔除掉!

Broker 主从架构与集群模式原理分析

在 Broker 集群中,生产者需要向 Broker 中写数据的话,先从 NameServer 中进行一个 Broker 列表的查询,之后再通过 负载均衡 去选择一个 Broker 进行消息的存储。 Broker 的主从关系通过将 Broker 的 name 设置相同,brokerId 是 0 的话代表 Broker 是主节点的 ,brokerId 不是 0的话代表 Broker 从节点的,Broker 的主从架构如下图:

关于消息中 Topic 的概念: 在生产者向 Broker 中发送消息的话,是指定了一个 Topic 的,那么 Topic 下是有一个 队列 的概念的: Topic 会在每个 Broker 分组里创建 4 个 write queue 和 4 个 read queue 那么生产者写入消息时,先根据 Topic 找到需要写入的 write queue,找到该 queue 所在的 Broker 进行写入,如下图:

内核级 Producer 发送消息流程

消息生产者发送消息根据 Topic 进行发送:

根据 Topic 找到这个 Topic 的 Queue 在每台 Broker 上的分布,进行负载均衡;通过负载均衡选择一个队列,根据 topicQueueTable 可以知道该 Queue 是属于哪一个 Broker 的;那么接下来就查找到 Broker 主节点(根据 brokerId 判断),将数据发送到这个 Broker 主节点中,再写入对应的 Queue;

那么如果当前消息发送到当前 Broker 组失败的话,在一段时间内就不会选择当前出现故障的 Queue了,会重新选择其他的 Broker 组中的 Queue 进行发送,选择 Broker 以及发送失败流程图如下图黄色部分所示:

RokcetMQ 的 NameServer 中是有 故障的延迟感知机制 ,即当 Broker 出现故障时,对于生产者来说,并不会立即感知到该 Broker 故障。 NameServer 中虽然每隔 10s 中会去检查是否有故障 Broker,将故障 Broker 剔除掉,但是此时生产者的 Topic 缓存中还是有故障 Broker 的信息的,只有等 30s 之后刷新,才可以感知到这个 Broker 已经故障了。

通过这个 故障的延迟感知机制 可以避免去做许多麻烦的操作,如果 Broker 挂掉之后,如果要让生产者立马感知到,需要通过 NameServer 去通知许多 Producer,并且如果通知丢失,还是有向故障 Broker 发送消息的可能!

Broker 如何实现高并发消息写入

Broker 对消息进行写磁盘是采用的磁盘顺序写 ,写磁盘分为两种:顺序写和随机写,两种速度差别非常大!

Broker 通过顺序写磁盘,也就是在文件末尾不停追加内容,不需要进行寻址操作,大幅度提高消息持久化存储的性能。 这里消息写入的就是 Commitlog 文件!

在将消息写入 Commitlog 文件之后,会有一个后台线程去监听 Commitlog 是否有数据写入,如果有就将新写入的数据写入到 write queue 中,这里写到 queue 中数据的内容就是消息在 Commitlog 中的偏移量 offset,写入Commitlog 文件动作如下图黑色部分:

总结: 那么这里的高并发就是通过两点来保证:

通过 磁盘顺序写 来提升性能通过 异步 将 Commitlog 文件写入 write queue(也就是 consume queue)中,保证不影响主流程速度

柚子快报激活码778899分享:微服务中间件之RocketMQ

http://yzkb.51969.com/

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。