使用 Kafka 和 RabbitMQ 进行事件驱动通信:构建可靠的消息传递系统
2025/8/31大约 13 分钟
使用 Kafka 和 RabbitMQ 进行事件驱动通信
Kafka和RabbitMQ是两种主流的消息队列系统,在事件驱动架构中发挥着重要作用。通过合理选择和使用这些消息队列系统,可以构建出高效、可靠的事件驱动通信机制。这两种消息队列系统各有特点,适用于不同的应用场景。本章将深入探讨如何使用Kafka和RabbitMQ实现事件驱动通信,以及它们在微服务架构中的最佳实践。
消息队列基础概念
消息队列定义
消息队列是一种在分布式系统中用于服务间异步通信的中间件。它通过存储和转发消息,实现了生产者和消费者之间的解耦,提高了系统的可扩展性和可靠性。
核心组件
生产者(Producer)
生产者是发送消息的应用程序:
- 职责:创建和发送消息到消息队列
- 实现:使用消息队列客户端API
- 特点:不需要知道消费者的细节
- 例子:订单服务发送订单创建消息
消费者(Consumer)
消费者是接收和处理消息的应用程序:
- 职责:从消息队列接收并处理消息
- 实现:实现消息处理逻辑
- 特点:可以有多个消费者处理同一消息
- 例子:库存服务处理订单创建消息
队列/主题(Queue/Topic)
队列或主题是存储消息的逻辑容器:
- 职责:存储和管理消息
- 实现:消息队列系统的核心组件
- 特点:提供消息的持久化和路由功能
- 例子:订单创建队列、用户注册主题
消息(Message)
消息是包含数据和元数据的传输单元:
- 内容:包含业务数据和相关元数据
- 格式:可以是文本、JSON、二进制等格式
- 特点:应该是不可变的
- 例子:包含订单信息的JSON消息
工作模式
点对点模式
每条消息只有一个消费者处理:
- 特点:确保消息被处理一次
- 优势:负载均衡,避免重复处理
- 适用场景:任务分发,工作队列
- 示例:订单处理任务分发给多个处理实例
发布-订阅模式
消息被广播给所有订阅者:
- 特点:一对多的通信模式
- 优势:松耦合,支持多个消费者
- 适用场景:事件通知,广播消息
- 示例:用户注册事件通知给多个服务
Apache Kafka 详解
Kafka 核心概念
主题(Topic)
Kafka中消息的分类单位:
# 创建主题
kafka-topics.sh --create --topic user-events --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe --topic user-events --bootstrap-server localhost:9092分区(Partition)
主题的并行处理单位:
- 并行性:每个分区可以并行处理
- 顺序性:分区内消息保持顺序
- 扩展性:通过增加分区提高吞吐量
- 分布性:分区可以分布在不同节点
副本(Replica)
分区的备份机制:
- 领导者:处理读写请求的副本
- 追随者:同步领导者数据的副本
- ISR:与领导者保持同步的副本集合
- 容错性:提供高可用性和数据保护
生产者偏移量(Offset)
消息在分区中的唯一标识:
- 递增性:偏移量单调递增
- 唯一性:分区内偏移量唯一
- 持久性:偏移量持久化存储
- 定位性:用于定位和消费消息
Kafka 架构组件
Broker
Kafka集群中的服务器节点:
- 存储:存储分区数据
- 处理:处理生产者和消费者请求
- 协调:参与集群协调
- 扩展:支持水平扩展
ZooKeeper
Kafka的协调服务(在新版本中逐渐被KRaft替代):
- 元数据管理:管理集群元数据
- 领导者选举:选举分区领导者
- 配置管理:管理集群配置
- 状态监控:监控集群状态
Consumer Group
消费者组是消费者的逻辑分组:
- 负载均衡:组内消费者分担消费负载
- 容错性:消费者故障时重新分配分区
- 扩展性:支持动态添加消费者
- 独占性:同一组内消费者不会重复消费
Kafka 生产者实现
Java 生产者示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class UserEventProducer {
private final KafkaProducer<String, String> producer;
private final String topic;
public UserEventProducer(String bootstrapServers, String topic) {
this.topic = topic;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保消息持久化
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 延迟发送
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
this.producer = new KafkaProducer<>(props);
}
public Future<RecordMetadata> sendUserEvent(String userId, String eventType, String eventData) {
String key = userId;
String value = String.format("{\"userId\":\"%s\",\"eventType\":\"%s\",\"eventData\":\"%s\",\"timestamp\":%d}",
userId, eventType, eventData, System.currentTimeMillis());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// 添加回调处理
return producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("Failed to send message: " + exception.getMessage());
exception.printStackTrace();
} else {
System.out.printf("Message sent to topic %s partition %d offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
public void close() {
producer.close();
}
}Spring Boot 集成
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class UserEventService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topic.user-events}")
private String userEventsTopic;
public void publishUserEvent(String userId, String eventType, String eventData) {
String message = String.format("{\"userId\":\"%s\",\"eventType\":\"%s\",\"eventData\":\"%s\",\"timestamp\":%d}",
userId, eventType, eventData, System.currentTimeMillis());
kafkaTemplate.send(userEventsTopic, userId, message)
.addCallback(
result -> System.out.println("Message sent successfully"),
failure -> System.err.println("Failed to send message: " + failure.getMessage())
);
}
}Kafka 消费者实现
Java 消费者示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class UserEventConsumer {
private final KafkaConsumer<String, String> consumer;
private final String topic;
private volatile boolean running = true;
public UserEventConsumer(String bootstrapServers, String topic, String groupId) {
this.topic = topic;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交偏移量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次拉取的最大记录数
this.consumer = new KafkaConsumer<>(props);
}
public void startConsuming() {
consumer.subscribe(Collections.singletonList(topic));
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
// 手动提交偏移量
consumer.commitSync(Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// 可以选择跳过错误消息或发送到死信队列
}
}
} catch (Exception e) {
System.err.println("Error polling messages: " + e.getMessage());
}
}
}
private void processMessage(ConsumerRecord<String, String> record) {
System.out.printf("Received message - Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 实际的消息处理逻辑
// 解析JSON消息并处理业务逻辑
}
public void stop() {
running = false;
consumer.close();
}
}Spring Boot 集成
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
@Component
public class UserEventListener {
@KafkaListener(topics = "${app.kafka.topic.user-events}", groupId = "user-event-group")
public void handleUserEvent(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
System.out.printf("Received user event - Key: %s, Value: %s%n", record.key(), record.value());
// 处理用户事件
processUserEvent(record.value());
// 手动确认消息
ack.acknowledge();
} catch (Exception e) {
System.err.println("Error processing user event: " + e.getMessage());
// 可以发送到死信队列或记录错误日志
}
}
private void processUserEvent(String eventData) {
// 实际的事件处理逻辑
}
}RabbitMQ 详解
RabbitMQ 核心概念
交换机(Exchange)
接收生产者发送的消息并路由到队列:
# 声明交换机
rabbitmqctl eval 'rabbit_exchange:declare({resource, <<"/">>, exchange, <<"user.events">>}, topic, true, false, false, [])'
# 查看交换机
rabbitmqctl list_exchanges队列(Queue)
存储消息直到被消费者消费:
# 声明队列
rabbitmqctl eval 'rabbit_amqqueue:declare({resource, <<"/">>, queue, <<"user.registration">>}, true, false, [], none)'
# 查看队列
rabbitmqctl list_queues绑定(Binding)
连接交换机和队列的规则:
# 创建绑定
rabbitmqctl set_topic_policy user.events "user.*" '{"queues":["user.registration"]}'
# 查看绑定
rabbitmqctl list_bindings虚拟主机(Virtual Host)
RabbitMQ中的逻辑分组:
# 创建虚拟主机
rabbitmqctl add_vhost /myapp
# 设置权限
rabbitmqctl set_permissions -p /myapp myuser ".*" ".*" ".*"RabbitMQ 工作模式
简单队列模式
// 生产者
public class SimpleProducer {
private final Connection connection;
private final Channel channel;
public SimpleProducer() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("simple.queue", true, false, false, null);
}
public void sendMessage(String message) throws Exception {
channel.basicPublish("", "simple.queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("Sent: " + message);
}
public void close() throws Exception {
channel.close();
connection.close();
}
}
// 消费者
public class SimpleConsumer {
private final Connection connection;
private final Channel channel;
public SimpleConsumer() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("simple.queue", true, false, false, null);
}
public void startConsuming() throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
try {
// 处理消息
processMessage(message);
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// 拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
// 开始消费,手动确认
channel.basicConsume("simple.queue", false, deliverCallback, consumerTag -> {});
}
private void processMessage(String message) {
// 实际的消息处理逻辑
}
}发布-订阅模式
// 生产者
public class FanoutProducer {
private final Connection connection;
private final Channel channel;
public FanoutProducer() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明扇形交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
}
public void sendMessage(String message) throws Exception {
channel.basicPublish("logs", "", null, message.getBytes("UTF-8"));
System.out.println("Sent: " + message);
}
}
// 消费者
public class FanoutConsumer {
private final Connection connection;
private final Channel channel;
private final String queueName;
public FanoutConsumer() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明扇形交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 声明临时队列
queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机
channel.queueBind(queueName, "logs", "");
}
public void startConsuming() throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}路由模式
// 生产者
public class RoutingProducer {
private final Connection connection;
private final Channel channel;
public RoutingProducer() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明直连交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
}
public void sendMessage(String severity, String message) throws Exception {
channel.basicPublish("direct_logs", severity, null, message.getBytes("UTF-8"));
System.out.println("Sent [" + severity + "]: " + message);
}
}
// 消费者
public class RoutingConsumer {
private final Connection connection;
private final Channel channel;
private final String queueName;
public RoutingConsumer(String... severities) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明直连交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
// 声明队列
queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机,指定路由键
for (String severity : severities) {
channel.queueBind(queueName, "direct_logs", severity);
}
}
public void startConsuming() throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("Received [" + routingKey + "]: " + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}Spring Boot 集成 RabbitMQ
配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 3
max-concurrency: 10配置类
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public TopicExchange userEventsExchange() {
return new TopicExchange("user.events");
}
@Bean
public Queue userRegistrationQueue() {
return QueueBuilder.durable("user.registration").build();
}
@Bean
public Queue userActivityQueue() {
return QueueBuilder.durable("user.activity").build();
}
@Bean
public Binding userRegistrationBinding() {
return BindingBuilder.bind(userRegistrationQueue())
.to(userEventsExchange())
.with("user.registered");
}
@Bean
public Binding userActivityBinding() {
return BindingBuilder.bind(userActivityQueue())
.to(userEventsExchange())
.with("user.*");
}
}生产者服务
@Service
public class UserEventPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
public void publishUserRegistered(String userId, String email) {
Map<String, Object> eventData = new HashMap<>();
eventData.put("userId", userId);
eventData.put("email", email);
eventData.put("timestamp", System.currentTimeMillis());
rabbitTemplate.convertAndSend("user.events", "user.registered", eventData);
}
public void publishUserActivity(String userId, String activityType) {
Map<String, Object> eventData = new HashMap<>();
eventData.put("userId", userId);
eventData.put("activityType", activityType);
eventData.put("timestamp", System.currentTimeMillis());
rabbitTemplate.convertAndSend("user.events", "user.activity", eventData);
}
}消费者服务
@Component
public class UserEventConsumer {
private static final Logger logger = LoggerFactory.getLogger(UserEventConsumer.class);
@RabbitListener(queues = "user.registration")
public void handleUserRegistration(Map<String, Object> eventData, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
logger.info("Processing user registration: {}", eventData);
// 处理用户注册事件
processUserRegistration(eventData);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
logger.error("Error processing user registration: {}", e.getMessage(), e);
try {
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
logger.error("Error acknowledging message: {}", ioException.getMessage(), ioException);
}
}
}
@RabbitListener(queues = "user.activity")
public void handleUserActivity(Map<String, Object> eventData, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
logger.info("Processing user activity: {}", eventData);
// 处理用户活动事件
processUserActivity(eventData);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
logger.error("Error processing user activity: {}", e.getMessage(), e);
try {
// 发送到死信队列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ioException) {
logger.error("Error acknowledging message: {}", ioException.getMessage(), ioException);
}
}
}
private void processUserRegistration(Map<String, Object> eventData) {
// 实际的用户注册处理逻辑
}
private void processUserActivity(Map<String, Object> eventData) {
// 实际的用户活动处理逻辑
}
}Kafka 与 RabbitMQ 对比
性能对比
吞吐量
- Kafka:高吞吐量,支持每秒百万级消息
- RabbitMQ:中等吞吐量,支持每秒万级消息
延迟
- Kafka:毫秒级延迟
- RabbitMQ:亚毫秒级延迟
持久化
- Kafka:磁盘持久化,支持大规模数据存储
- RabbitMQ:内存+磁盘持久化,适合中等规模数据
功能特性对比
消息模式
- Kafka:主要支持发布-订阅模式
- RabbitMQ:支持多种消息模式(点对点、发布-订阅、路由等)
消息顺序
- Kafka:分区内保证顺序
- RabbitMQ:队列内保证顺序
消息确认
- Kafka:支持手动和自动提交偏移量
- RabbitMQ:支持手动和自动确认消息
扩展性
- Kafka:水平扩展能力强
- RabbitMQ:垂直扩展为主,集群扩展相对复杂
适用场景
Kafka 适用场景
- 大数据处理:日志收集、流处理
- 实时分析:实时数据处理和分析
- 事件溯源:存储大量事件数据
- 微服务通信:高吞吐量的服务间通信
RabbitMQ 适用场景
- 传统消息队列:任务队列、工作队列
- 复杂路由:需要复杂路由规则的场景
- 低延迟要求:对延迟要求极高的场景
- 企业应用:传统企业应用集成
最佳实践
消息设计
消息格式
// 使用标准化的消息格式
public class EventMessage {
private String eventId;
private String eventType;
private String source;
private long timestamp;
private Map<String, Object> payload;
private Map<String, String> metadata;
// 构造函数、getter和setter方法
}
// 使用CloudEvents标准
{
"specversion": "1.0",
"type": "com.example.user.registered",
"source": "/users",
"id": "A234-1234-1234",
"time": "2023-10-01T10:00:00Z",
"datacontenttype": "application/json",
"data": {
"userId": "12345",
"email": "user@example.com"
}
}消息大小优化
// 使用压缩减少消息大小
public class CompressedMessageProducer {
private final KafkaProducer<String, byte[]> producer;
public void sendCompressedMessage(String topic, String key, String value) {
try {
byte[] compressedValue = compress(value.getBytes("UTF-8"));
ProducerRecord<String, byte[]> record =
new ProducerRecord<>(topic, key, compressedValue);
producer.send(record);
} catch (Exception e) {
// 处理异常
}
}
private byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
try (GZIPOutputStream gzip = new GZIPOutputStream(bos)) {
gzip.write(data);
}
return bos.toByteArray();
}
}错误处理
死信队列
// RabbitMQ死信队列配置
@Configuration
public class DeadLetterConfig {
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue").build();
}
@Bean
public Queue mainQueue() {
return QueueBuilder.durable("main.queue")
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", "dead.letter.queue")
.withArgument("x-message-ttl", 60000) // 1分钟超时
.build();
}
}重试机制
@Component
public class RetryableMessageConsumer {
@RabbitListener(queues = "main.queue")
public void handleMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
@Header("x-death", required = false) List<Map<String, ?>> deathHeader) {
try {
int retryCount = getRetryCount(deathHeader);
if (retryCount > 3) {
// 超过重试次数,发送到死信队列
channel.basicNack(deliveryTag, false, false);
return;
}
// 处理消息
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
// 重试,重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
// 处理IO异常
}
}
}
private int getRetryCount(List<Map<String, ?>> deathHeader) {
if (deathHeader == null || deathHeader.isEmpty()) {
return 0;
}
Map<String, ?> death = deathHeader.get(0);
return ((Long) death.get("count")).intValue();
}
private void processMessage(String message) throws Exception {
// 实际的消息处理逻辑
}
}监控与运维
指标收集
@Component
public class MessagingMetrics {
private final MeterRegistry meterRegistry;
private final Counter messagesProduced;
private final Counter messagesConsumed;
private final Counter messagesFailed;
private final Timer messageProcessingTime;
public MessagingMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesProduced = Counter.builder("messaging.produced")
.description("Number of messages produced")
.register(meterRegistry);
this.messagesConsumed = Counter.builder("messaging.consumed")
.description("Number of messages consumed")
.register(meterRegistry);
this.messagesFailed = Counter.builder("messaging.failed")
.description("Number of messages failed")
.register(meterRegistry);
this.messageProcessingTime = Timer.builder("messaging.processing.time")
.description("Message processing time")
.register(meterRegistry);
}
public void recordProduced(String topic) {
messagesProduced.increment(Tag.of("topic", topic));
}
public void recordConsumed(String queue) {
messagesConsumed.increment(Tag.of("queue", queue));
}
public void recordFailed(String destination, Exception e) {
messagesFailed.increment(
Tag.of("destination", destination),
Tag.of("error", e.getClass().getSimpleName())
);
}
public Timer.Sample startProcessingTimer() {
return Timer.start(meterRegistry);
}
public void stopProcessingTimer(Timer.Sample sample, String destination) {
sample.stop(messageProcessingTime.tag("destination", destination));
}
}健康检查
@Component
public class MessagingHealthIndicator implements HealthIndicator {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.kafka.topic.health-check}")
private String healthCheckTopic;
@Override
public Health health() {
try {
// 发送测试消息
kafkaTemplate.send(healthCheckTopic, "health-check", "ping")
.get(5, TimeUnit.SECONDS);
return Health.up().build();
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
}
}通过正确使用Kafka和RabbitMQ进行事件驱动通信,可以构建出高效、可靠的微服务系统。选择合适的消息队列系统需要根据具体的业务需求、性能要求和技术栈来决定。在实际应用中,也可以根据不同的场景选择不同的消息队列系统,甚至在同一系统中同时使用多种消息队列技术。
