要在Spring Boot项目中实现一个通用的消息消费服务,可以将前面的概念整合并利用Spring的依赖注入特性来创建一个更灵活、可配置的服务。下面是如何创建这样的服务,包括通过application.properties来配置连接信息,以及使用@Service注解定义消费服务。

步骤 1: 配置application.properties

首先,在application.properties文件中添加RocketMQ的配置信息。

# RocketMQ 配置

rocketmq.endpoint=你的RocketMQ接入点

rocketmq.username=你的RocketMQ用户名

rocketmq.password=你的RocketMQ密码

步骤 2: 创建RocketMQConsumerService

接下来,定义RocketMQConsumerService服务类。这个类将读取application.properties中的配置,并提供一个方法来启动消息消费者。

package com.aliyun.openservices;

import org.apache.rocketmq.client.apis.ClientConfiguration;

import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;

import org.apache.rocketmq.client.apis.ClientException;

import org.apache.rocketmq.client.apis.ClientServiceProvider;

import org.apache.rocketmq.client.apis.consumer.FilterExpression;

import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;

import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

import java.util.Collections;

@Service

public class RocketMQConsumerService {

@Value("${rocketmq.endpoint}")

private String endpoint;

@Value("${rocketmq.username}")

private String username;

@Value("${rocketmq.password}")

private String password;

public void startConsumer(String topicName, String filterExpression, String consumerGroupId) throws ClientException {

ClientServiceProvider provider = ClientServiceProvider.loadService();

ClientConfiguration configuration = ClientConfiguration.newBuilder()

.setEndpoints(endpoint)

.setCredentialProvider(new StaticSessionCredentialsProvider(username, password))

.build();

FilterExpression expression = new FilterExpression(filterExpression, FilterExpressionType.TAG);

PushConsumer consumer = provider.newPushConsumerBuilder()

.setClientConfiguration(configuration)

.setConsumerGroup(consumerGroupId)

.setSubscriptionExpressions(Collections.singletonMap(topicName, expression))

.setMessageListener(messageView -> {

// 实现你的消息处理逻辑

System.out.println("Received message: " + messageView.toString());

return ConsumeResult.SUCCESS;

})

.build();

// 注意: 实际应用中你可能需要更优雅的方式来启动和关闭Consumer

}

}

步骤 3: 使用服务

最后,你可以在Spring Boot应用的任何地方注入并使用RocketMQConsumerService服务。例如,在一个配置类或启动监听器中启动消费者:

package com.aliyun.openservices;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.CommandLineRunner;

import org.springframework.stereotype.Component;

@Component

public class RocketMQConsumerRunner implements CommandLineRunner {

@Autowired

private RocketMQConsumerService rocketMQConsumerService;

@Override

public void run(String... args) throws Exception {

// 启动消费者

rocketMQConsumerService.startConsumer("topicName", "*", "consumerGroupId");

}

}

这个CommandLineRunner实现确保了当Spring Boot应用启动时,会自动启动消息消费服务。你需要根据实际情况替换topicName、filterExpression(这里用*表示接收所有消息)和consumerGroupId的值。

通过这种方式,你可以轻松地在Spring Boot应用中集成和使用RocketMQ的消费服务,同时保持高度的灵活性和配置能力。

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。
大家都在看: