说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费。
Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。
@Output: 输出注解,用于定义发送消息接口
@Input: 输入注解,用于定义消息的消费者接口
@StreamListener: 用于定义监听方法的注解
使用Spring Cloud Stream 非常简单,只需要使用好3个注解即可,在实现高性能消息的生成和消费场景非常合适,但是使用Spring Cloud Stream框架有一个非常大的问题就是不能实现可靠性投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失问题。
这个原因是因为SpringCloudStream框架为了和Kafka兼顾所有在实际工作中使用它的目的就是针对高性能的消息通信的!
整合代码:
1、消息生产者:
pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
Barista.java 定义输出,发送消息接口
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
/**
* 注解@Output声明了它是一个输出类型的通道,名字是output_channel
* 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,
* 类型是output,发布的主题名为mydest。
*/
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logout();
}
application.properties:
server.servlet.context-path=/producer
server.port=8002
spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.address=192.168.2.203:5672
spring.cloud.stream.binders.rabbit_cluster.environment.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.virtual-host=/
发送消息:
@EnableBinding(Barista.class)
@Service
public class RabbitMqSender {
@Autowired
private Barista barista;
public void SendMessage(Object message,Map<String,Object> properties){
try{
MessageHeaders headers = new MessageHeaders(properties);
Message message1 = MessageBuilder.createMessage(message,headers);
boolean sendStatus = barista.logout().send(message1);
System.err.println("---------------------");
System.err.println("发送数据:" + message + "sendStatus = "+ sendStatus);
}catch (Exception e){
System.err.println("----------error------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
2、消息消费者
pom.xml同生产者
Barista.java
public interface Barista {
String INPUT_CHANNEL = "input_channel";
/**
* 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL
* 也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,
* 表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题
*/
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
application properties:
server.servlet.context-path=/consumer
server.port=8002
spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeueRejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledgeMode=manual
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recoveryInterval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.maxConcurrency=5
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.address=192.168.2.203:5672
spring.cloud.stream.binders.rabbit_cluster.environment.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.virtual-host=/
消息接收端:
@EnableBinding(Barista.class)
@Service
public class RabbitMqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message){
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliverTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.err.println("----------接收数据--------"+ message);
System.err.println("消费完毕");
try {
channel.basicAck(deliverTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
原文链接:https://blog.csdn.net/LuuvyJune/article/details/93038057
发表评论