证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

合并证书和私钥为一个 PKCS12 文件:

cat your_cert.pem your_key.key > combined.pem

openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts 将根证书或证书链保存为 .pem 文件。 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中: keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

org.springframework.kafka

spring-kafka

2.5.5.RELEASE

nacos配置:

spring:

kafka:

bootstrap-servers: SSL://connectedca.com:443 ##换成你自己的连接

ssl:

protocol: TLS

###3这三个密码是你证书配置的时候设置的密码

trust-store-password: a123456

key-store-password: a123456

key-password: a123456

consumer:

group-id:

producer:

topic: *.event ##换成你自己的topic

核心配置:

import lombok.extern.slf4j.Slf4j;

import org.apache.kafka.clients.admin.AdminClientConfig;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.io.Resource;

import org.springframework.core.io.ResourceLoader;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.core.KafkaAdmin;

import org.springframework.kafka.listener.SeekToCurrentErrorHandler;

import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;

import java.util.Map;

@Slf4j

@Configuration

public class KafkaConfiguration {

@Autowired

C3ConfigProperties c3ConfigProperties;

@Autowired

private KafkaConfig kafkaProperties;

@Autowired

private ResourceLoader resourceLoader;

@Bean

public KafkaAdmin kafkaAdmin() {

Map configs = new HashMap <>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());

return new KafkaAdmin(configs);

}

@Bean

public DefaultKafkaConsumerFactory consumerFactory() {

Map consumerConfig = new HashMap <>();

consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());

consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");

consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");

consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息

consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头

consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息

consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名

String pemUrl = "";

String csrUrl = "";

if (c3ConfigProperties.getEnvironment().equals("uat")) {

pemUrl = "file/uat/kafka/client.jks";

csrUrl = "file/uat/kafka/truststore.jks";

} else if (c3ConfigProperties.getEnvironment().equals("pre")) {

pemUrl = "file/pre/kafka/client.jks";

csrUrl = "file/pre/kafka/truststore.jks";

} else if (c3ConfigProperties.getEnvironment().equals("prod")) {

pemUrl = "file/prod/kafka/client.jks";

csrUrl = "file/prod/kafka/truststore.jks";

}

try {

// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载

Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);

Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);

// 获取证书文件的路径

String keyStorePath = pemResource.getFile().getAbsolutePath();

String trustStorePath = csrResource.getFile().getAbsolutePath();

consumerConfig.put("ssl.keystore.location", keyStorePath);

consumerConfig.put("ssl.truststore.location", trustStorePath);

}catch (Exception e){

log.error("Resource file error:{}",e.getMessage());

}

consumerConfig.put("security.protocol", "SSL");

consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());

consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());

consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());

return new DefaultKafkaConsumerFactory <>(consumerConfig);

}

@Bean

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory <>();

factory.setConsumerFactory(consumerFactory());

factory.setConcurrency(3); // 设置并发消费者数量

factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器

return factory;

}

@Bean

public KafkaC3MsgListener kafkaC3MsgListener() {

return new KafkaC3MsgListener();

}

}

注入配置:

import lombok.Data;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Configuration;

@Data

@Configuration

public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")

private String groupId;

@Value("${spring.kafka.producer.topic}")

private String topic;

@Value("${spring.kafka.ssl.trust-store-password}")

private String trustStorePassword;

@Value("${spring.kafka.ssl.key-store-password}")

private String keyStorePassword;

@Value("${spring.kafka.ssl.key-password}")

private String keyPassword;

}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

文章链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。