以下代码基于 SpringKafka 2.3.13.RELEASE + Boot 2.2.9.RELEASE 实现
Producer 消息的可靠性 实现方案:ack 模式调整 + 重试机制 + 规避重试机制下带来的问题
spring.kafka: producer: clientId: ${spring.application.name} bootstrap-servers: 127.0 .0 .1 :8080 acks: all retries: 2 properties: retry-backoff-ms: 1000 compressionType: "none" batch-size: 16384 buffer-memory: 33554432
生产者:
@Component @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class ProducerFuture implements FailureCallback , SuccessCallback<SendResult<String, Object>> { private static final Logger logger = LoggerFactory.getLogger(ProducerFuture.class); @Resource private KafkaTemplate<String, Object> kafkaTemplate; private String uniqueId; public void async (String topicLcs, String value) { uniqueId = MDC.get("UNIQUE_ID" ); logger.info("send {} data:{}" , topicLcs, value); ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicLcs, value); listenableFuture.addCallback(this , this ); } @Override public void onFailure (Throwable ex) { MDC.put(UNIQUE_ID, uniqueId); logger.error("sendFailure:" , ex); MDC.remove(UNIQUE_ID); } @Override public void onSuccess (SendResult<String, Object> result) { MDC.put(UNIQUE_ID, uniqueId); logger.info("sendSuccess {} " , result.getRecordMetadata().topic() + result.getRecordMetadata().offset()); MDC.remove(UNIQUE_ID); } }
启用重试机制后带来的问题
重试过程中,一条消息只会向同一个分区进行重试发送 ,所以在重试的机制下,也能保证消息的全局幂等性
由于重试,可能导致消息在 Node 中的顺序和 Producer 发送时的顺序不一致。可以对 max.in.flight.requests.per.connectio(限制每个连接(指客户端与 Node 之间的 per.connection 连接)最多缓存 已发送但未收到响应的请求数 ,默认为 5)设置为 1,即可保证在重试机制下的消息顺序。
Consumer 消息的可靠性 实现方案:手动提交 offset + 重试机制 + 死信队列(告警) + 死信队列消息采用其他策略去处理消息
kafka 配置:
spring.kafka: consumer: bootstrap-servers: 127.0 .0 .1 :8080 autoOffsetReset: latest enable-auto-commit: false auto-commit-interval: 2000 fetchMinSize: 1 fetchMaxWait: 500 maxPollRecords: 500
配置类:
@Configuration public class Config { Logger logger = LoggerFactory.getLogger(Config.class); private final String error_topic="error_topic" ; @Bean public ConcurrentKafkaListenerContainerFactory listenerContainerFactory (ConsumerFactory consumerFactory, KafkaTemplate<String,Object> template) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory(consumerFactory); SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler ((consumerRecord, e) -> { logger.error("重试机制后异常,consumerRecord:{}" , consumerRecord.toString(), e); template.send(error_topic,consumerRecord.toString()); }, new FixedBackOff (5000 , 5 )); factory.setErrorHandler(seekToCurrentErrorHandler); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
消费者:
@Component public class ConsumerKafka { private Logger logger = LoggerFactory.getLogger(getClass()); private static final String CONSUMER_GROUP_PREFIX = "MOCK-A-GROUP" ; @KafkaListener(topics = {"${kafka.topic.topic4Test}"}, groupId = CONSUMER_GROUP_PREFIX, containerFactory = "listenerContainerFactory") public void onMessage (ConsumerRecord<String,Object> consumerRecord, Acknowledgment acknowledgeMode) { acknowledgeMode.acknowledge(); } }