常见消息模型及基本消息队列(basicQueue)

一、常见消息模型二、HelloWorld案例1.初识2.下载资料3.导入工程4.详解代码5.发送者执行过程分析6.消费者执行过程分析

三、总结

一、常见消息模型

  RabbitMQ的官网提供了几个入门案例,对应了几种不同的消息模型:

  1.基本消息队列(BasicQueue)   2.工作消息队列(WorkQueue)

  发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:   3.Fanout Exchange:广播   4.Direct Exchange:路由   5.Topic Exchange:主题

二、HelloWorld案例

1.初识

  HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

1.publisher:消息发布者,将消息发送到队列 queue 2.queue:消息队列,负责接受并缓存消息 3.consumer:订阅队列,处理队列中的消息

2.下载资料

  将最上方的 mq-demo 项目下载到本地

3.导入工程

  通过 IDEA 打开刚刚下载的 mq-demo 项目,其中父工程中主要是用来做依赖管理的,consumer 是消费者, publisher 是发布者。

4.详解代码

  父工程中引入的依赖有:

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-test

  发布者代码:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.junit.Test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class PublisherTest {

@Test

public void testSendMessage() throws IOException, TimeoutException {

// 1.建立连接

ConnectionFactory factory = new ConnectionFactory();

// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码

factory.setHost("192.168.3.128");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("gentlebrother");

factory.setPassword("gentlebrother");

// 1.2.建立连接

Connection connection = factory.newConnection();

// 2.创建通道Channel

Channel channel = connection.createChannel();

// 3.创建队列

String queueName = "simple.queue";

channel.queueDeclare(queueName, false, false, false, null);

// 4.发送消息

String message = "hello, rabbitmq!";

channel.basicPublish("", queueName, null, message.getBytes());

System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接

channel.close();

connection.close();

}

}

  消费者代码:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class ConsumerTest {

public static void main(String[] args) throws IOException, TimeoutException {

// 1.建立连接

ConnectionFactory factory = new ConnectionFactory();

// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码

factory.setHost("192.168.3.128");

factory.setPort(5672);

factory.setVirtualHost("/");

factory.setUsername("gentlebrother");

factory.setPassword("gentlebrother");

// 1.2.建立连接

Connection connection = factory.newConnection();

// 2.创建通道Channel

Channel channel = connection.createChannel();

// 3.创建队列

String queueName = "simple.queue";

channel.queueDeclare(queueName, false, false, false, null);

// 4.订阅消息

channel.basicConsume(queueName, true, new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body) throws IOException {

// 5.处理消息

String message = new String(body);

System.out.println("接收到消息:【" + message + "】");

}

});

System.out.println("等待接收消息。。。。");

}

}

5.发送者执行过程分析

  1.与 mq 建立连接时,需要改为你自己主机地址、用户名、密码等   2.当消息发送者执行代码完 1.建立连接 后,去 RabbitMQ 客户端查看,建立成功   3.当消息发送者执行代码完 2.建立通道 后,去 RabbitMQ 客户端查看,建立成功   4.当消息发送者执行代码完 3.创建队列 后,去 RabbitMQ 客户端查看,创建成功   5.当消息发送者执行代码完 4.发送消息 后,去 RabbitMQ 客户端查看,已经收到刚刚发送的一条消息   点击队列名称,可以查看消息内容

6.消费者执行过程分析

  可以看到,目前有两条消息,内容都是 【hello, rabbitmq!】   由于前面步骤和发送者一样,我们从 3.创建队列 开始分析,这里是否会有疑问,刚刚发布者已经创建好了队列,这里再创建一个一样的队列不会冲突吗?其实不会,这是一种保险措施,因为你也不知道是消费者先启动还是发布者先启动,所以我们都去创建,如果已经存在那么就不创建了,如果没有则创建。   从代码中可以看出,4.订阅消息 使用的是回调函数的机制,先把 5.处理消息 的行为挂到队列上,然后等有消息的时候它再执行,这也就解释了为什么控制台会这样输出

  回到 mq 客户端查看,两条消息都没有了

三、总结

基本消息队列的消息发送流程:

建立connection创建channel利用channel声明队列利用channel向队列发送消息

基本消息队列的消息接收流程:

建立connection创建channel利用channel声明队列定义consumer的消费行为handleDelivery()利用channel将消费者与队列绑定

相关文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。