unclean.leader.election.enable引起的outOfRanger
前段时间采集平台的数据量发生异常,对数据进行排查发现hdfs上存在历史数据重复消费的问题。
采集平台是由TailDirSource+KafkaChannel将数据写入kafka,然后通过kafkaChannel+HDFSSink将数据写入hdfs
整个采集平台可能出现的故障的地方如下:
- taildir重复采集了log
- taildir调用kafkachannel向kafka写数据时进行了回滚
- hdfsSink调用kafkachannel消费数据时,进行了重复消费
- hdfsSink写入hdfs时发生了回滚
故障分析
- 首先检查了下hdfs上的数据量异常的现象,是多余的数据是最新的数据还是历史数据,经数据校验确认数据量暴增是由于历史数据造成的。
- 查看业务方服务器上历史log是否发生变化,查证后log的历史文件没有发生变化。
- 采集端的flume log无异常,在kafka到hdfs的环节中的flume log中发现info级别的offset重置信息。初步怀疑是offset发生了out of ranger,然后被重置了offset,而这个offset又是比较早的一个offset,导致重新读取了大量的历史数据。
log 内容如下:
1 | 15 五月 2017 17:56:59,396 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse:595) - Fetch offset 2261734 is out of range, resetting offset |
实践验证
在KafkaOffsetMonitor的log中发现offset重置的记录,
1 | 2017-05-18 13:25:17 INFO KafkaOffsetGetter$:68 - Processed commit message: [consumer-group,topic-xx,8] => OffsetAndMetadata[119088,0e78ce6b-4dca-4cd6-b914-22b78ebbaaff,1495085117090] |
从log中可以看出topic topic-xx的8分区的offset从119088置成了418。
然后从代码中查找out of ranger相关的代码并结合当前集群的状态进行分析,发现当某个topic的leader和followers的状态不一致时,发生leader切换时,会发送out of range,此时consumer进行消费时发现offset非法,就会被重置为earliest,进行重复消费。
故障重现
在测试环境中创建一个topic test,3个分区,2个副本,broker 1为leader,broker 2为follower。
写入一些数据flume进行正常消费,消费至最新状态A处之后,将test的follower的broker2停掉,继续向test写入数据,并让consumer再次消费至最新状态B处。
此时停掉的follower broker 2的状态已和leader broker 1的状态不一样,已滞后leader的状态。
现在将follower broker 2启动,此时follower和leader的状态不一样,follower需要和leader进行同步,但当follower与leader未同步成功之前将leader broker 1停掉,然后follower broker 2经过leader的选举机制被迫选为leader(unclean.leader.election.enable为true时,选择第一个启动的副本为leader),在被选举为leader之前broker 2的状态并没有和broker 1的状态一致,也就是说broker 2上的LEO并没有同步到B处,而broker 2被选举为了leader,此时producer继续向topic中写数据,写入之后consumer会进行消费,consumer需要消费的offset的B,而broker 2的LEO并没有同步到B处,此时就发生了out of ranger,offset被重置为了earliest。
优化方案
针对以上情况我们进行了一下修改:
- 将topic的副本数设置为3(原先为2),减少ISR列表只有一个leader的几率
- 调整
min.insync.replicas
为2(默认是1),此参数的意思是当ISR中的个数小于此值时,producer无法写入数据,会抛出异常。此参数还需要结合acks来使用,需要将acks设置为all或者-1(flume中kafkaChannel默认是all)。 - 调整
unclean.leader.election.enable
参数为false(默认为true),此参数标识当ISR中没有副本时,选举最早启动的broker为leader。 - 调整操作磁盘的线程数num.io.threads为24(原来为12)
- 对flume taildirSource进行二次开发,在body中添加log所在主机名、log 路径和采集时间。添加这些元数据信息为日后故障排除提供方便。
- kafkachannel中增加producer端重试的metrics统计(第2点中写入失败的次数会在此处记录)该功能提交,patch
- 集群因为维护需要重启时,先停掉一台broker,然后重启该broker,等到该broker已加入到ISR中之后,再对其它broker进行如上操作,切勿单个broker依次重启。