说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费。

Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

@Output: 输出注解,用于定义发送消息接口

@Input: 输入注解,用于定义消息的消费者接口

@StreamListener: 用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好3个注解即可,在实现高性能消息的生成和消费场景非常合适,但是使用Spring Cloud Stream框架有一个非常大的问题就是不能实现可靠性投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失问题。

这个原因是因为SpringCloudStream框架为了和Kafka兼顾所有在实际工作中使用它的目的就是针对高性能的消息通信的!

 整合代码:

1、消息生产者:

pom.xml:

        <dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-starter-stream-rabbit</artifactId>

<version>2.1.2.RELEASE</version>

</dependency>

Barista.java 定义输出,发送消息接口

public interface Barista {

    String OUTPUT_CHANNEL = "output_channel";

    /**

     * 注解@Output声明了它是一个输出类型的通道,名字是output_channel

     * 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,

     * 类型是output,发布的主题名为mydest。

     */

    @Output(Barista.OUTPUT_CHANNEL)

    MessageChannel logout();

}

 application.properties:

server.servlet.context-path=/producer

server.port=8002

spring.application.name=producer

spring.cloud.stream.bindings.output_channel.destination=exchange-3

spring.cloud.stream.bindings.output_channel.group=queue-3

spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit

spring.cloud.stream.binders.rabbit_cluster.environment.address=192.168.2.203:5672

spring.cloud.stream.binders.rabbit_cluster.environment.username=guest

spring.cloud.stream.binders.rabbit_cluster.environment.password=guest

spring.cloud.stream.binders.rabbit_cluster.environment.virtual-host=/

发送消息: 

@EnableBinding(Barista.class)

@Service

public class RabbitMqSender {

    @Autowired

    private Barista barista;

    public void SendMessage(Object message,Map<String,Object> properties){

        try{

            MessageHeaders headers = new MessageHeaders(properties);

            Message message1 = MessageBuilder.createMessage(message,headers);

            boolean sendStatus = barista.logout().send(message1);

            System.err.println("---------------------");

            System.err.println("发送数据:" + message + "sendStatus = "+ sendStatus);

        }catch (Exception e){

            System.err.println("----------error------------");

            e.printStackTrace();

            throw  new RuntimeException(e.getMessage());

        }

    }

}

2、消息消费者

pom.xml同生产者

Barista.java

public interface Barista {

    String INPUT_CHANNEL = "input_channel";

    /**

     * 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL

     * 也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,

     * 表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题

     */

    @Input(Barista.INPUT_CHANNEL)

    SubscribableChannel loginput();

}

application properties:

server.servlet.context-path=/consumer

server.port=8002

spring.application.name=consumer

spring.cloud.stream.bindings.input_channel.destination=exchange-3

spring.cloud.stream.bindings.input_channel.group=queue-3

spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster

spring.cloud.stream.bindings.input_channel.consumer.concurrency=1

spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeueRejected=false

spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledgeMode=manual

spring.cloud.stream.rabbit.bindings.input_channel.consumer.recoveryInterval=3000

spring.cloud.stream.rabbit.bindings.input_channel.consumer.durableSubscription=true

spring.cloud.stream.rabbit.bindings.input_channel.consumer.maxConcurrency=5

spring.cloud.stream.binders.rabbit_cluster.type=rabbit

spring.cloud.stream.binders.rabbit_cluster.environment.address=192.168.2.203:5672

spring.cloud.stream.binders.rabbit_cluster.environment.username=guest

spring.cloud.stream.binders.rabbit_cluster.environment.password=guest

spring.cloud.stream.binders.rabbit_cluster.environment.virtual-host=/

消息接收端:

@EnableBinding(Barista.class)

@Service

public class RabbitMqReceiver {

    @StreamListener(Barista.INPUT_CHANNEL)

    public void receiver(Message message){

        Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);

        Long deliverTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

        System.err.println("----------接收数据--------"+ message);

        System.err.println("消费完毕");

        try {

            channel.basicAck(deliverTag,false);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}

原文链接:https://blog.csdn.net/LuuvyJune/article/details/93038057