Kafka 报 InvalidPidMappingException 问题解决

分享到:

有个开发组不时地在测试环境中遇到下面的问题:

1Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

经过排查发现,这个开发组在代码中使用了 Kafka 的事务。这个错误的抛出与使用了事务有关系。

Spring kafka 支持一个叫做“僵尸围栏”的机制,用于处理在事务中参与的生产者或消费者与协调器之间的连接丢失或发生故障的情况。这个机制的目的是防止由于参与者变成无法通信的 "僵尸" 状态而导致事务状态不一致。具体来说,当一个生产者或消费者在参与 Kafka 事务期间与协调器失去联系时,它可能处于一种无法确认其事务状态的状态。"Zombie fencing" 的任务是在检测到这种情况时,将这个参与者标记为 "僵尸" 并且不再考虑其在事务中的状态。

为了识别和解决僵尸实例的问题,Kafka 为每个事务生产者分配一个称为 transaction id 的惟一标识符。这用于跨流程重新启动标识相同的生产者实例。事务生产者的第一个操作必须是在 kafka 集群中是显式地注册其 transaction id,接着 Kafka 代理会用给定的 transaction id 来检查已开启的事务并结束他们。它还增加与 transaction id 关联的 epoch 值。这个 epoch 是存储在每个 transaction id 中的内部元数据。一旦 epoch 发生碰撞,任何具有相同 transaction id 的生产者,只要带有老的 epoch 值都被认为是僵尸并且被隔离,同时来自这些生产者的未来事务写将被拒绝。

kafka 中有个 transactional.id.expiration.ms 配置,当生产者与集群超过这个时间长度没有通讯的话,transaction id 就会被设置成过期,这个就意味着被标记成“僵尸”状态。如果该生产者后面再次与 Kafka 代理通讯,就会接收到本文开头的那个异常。

解决这个问题的也比较简单,在 producer factory 中设置一下 max age 这个属性值,比 transactional.id.expiration.ms 属性值小就可以了。

Starting with version 2.5.8, you can now configure the maxAge property on the producer factory. This is useful when using transactional producers that might lay idle for the broker’s transactional.id.expiration.ms. With current kafka-clients, this can cause a ProducerFencedException without a rebalance. By setting the maxAge to less than transactional.id.expiration.ms, the factory will refresh the producer if it is past it’s max age.


参考链接: