错误出现场景

使用kill -9 pid强制退出kafka进程之后,重启出现此错误,提示信息如下

1
2
3
ERROR There was an error in one of the threads during logs loading: java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (${log.dirs}/${topicName}-0/00000000000001964914.index) has non-zero size but the last offset is 1964914 and the base offset is 1964914 (kafka.log.LogManager)
[2016-02-22 18:01:01,213] FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (${log.dirs}/${topicName}-0/00000000000001964914.index) has non-zero size but the last offset is 1964914 and the base offset is 1964914

解决方法

查看提示信息是加载.index文件时出错,将00000000000001964914.index删除,再次重启,启动正常。

错误原因及解决方法分析

  • Kafka的文件存储机制

kafka中的message以topic的形式存在,topic在物理上又分为很多的partition,partition物理上由很多segment组成,segment是存放message的真正载体。

下面具体介绍下segment文件:
(1) 每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。
(2) 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。
(3) segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
(4) segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
segment中index<—->data file对应关系物理结构如下:
![index与log映射关系](/blogimgs/kafka corrupt index found/kafka-fs-index-correspond-data.png “index与log映射关系”)
.index文件存放的是message逻辑相对偏移量(相对offset=绝对offset-base offset)与在相应的.log文件中的物理位置(position)。但.index并不是为每条message都指定到物理位置的映射,而是以entry为单位,每条entry可以指定连续n条消息的物理位置映射(例如:假设有2000020009共10条消息,.index文件可配置为每条entry
指定连续10条消息的物理位置映射,该例中,index entry会记录偏移量为20000的消息到其物理文件位置,一旦该条消息被定位,20001
20009可以很快查到。)。每个entry大小8字节,前4个字节是这个message相对于该log segment第一个消息offset(base offset)的相对偏移量,后4个字节是这个消息在log文件中的物理位置。

  • Kafka启动加载log流程

kafka数据log的管理类是LogManager.scala,关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()

createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs() // 关键方法
...
}

loadLogs()中先判断上次关闭是否为cleanshutdown,判断依据为${log.dirs}目录中是否存在.kafka_cleanshutDown的文件

1
2
3
4
5
6
7
8
9
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}

然后开始加载log,其中new Log方法为初始化log file 和index file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
//从文件目录上获得topic和partition
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
val previous = this.logs.put(topicPartition, current)

if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}

Log中的关键方法是loadSegments()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private def loadSegments() {
// create the log directory if it doesn't exist
dir.mkdirs()

// first do a pass through the files in the log directory and remove any temporary files
// and complete any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName
// DeletedFileSuffix = ".deleted" CleanedFileSuffix = ".cleaned"
// 删除所有后缀名为.cleaned和.delete的文件
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// if the file ends in .deleted or .cleaned, delete it
file.delete()
} else if(filename.endsWith(SwapFileSuffix)) { // SwapFileSuffix = ".swap"
// we crashed in the middle of a swap operation, to recover:
// if a log, swap it in and delete the .index file
// if an index just delete it, it will be rebuilt
val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
// 如果.swap文件是index文件,则删除该index
file.delete()
} else if(baseName.getPath.endsWith(LogFileSuffix)){
// 如果.swap是log文件,则删除该log文件对应的index文件,然后多log文件进行重命名
// delete the index
val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete()
// complete the swap operation
val renamed = file.renameTo(baseName)
if(renamed)
info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
else
throw new KafkaException("Failed to rename file %s.".format(file.getPath))
}
}
}

// now do a second pass and load all the .log and .index files
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
// IndexFileSuffix = ".index"
if(filename.endsWith(IndexFileSuffix)) {
// if it is an index file, make sure it has a corresponding .log file
val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
// 如果仅存在index文件,没有相应的log文件,则直接删除index文件
file.delete()
}
} else if(filename.endsWith(LogFileSuffix)) { // LogFileSuffix = ".log"
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val hasIndex = Log.indexFilename(dir, start).exists
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
// 如果log文件对应的index文件不存在,则对index文件进行rebuild
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
segments.put(start, segment)
}
}

if(logSegments.size == 0) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
} else {
recoverLog()
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
}

// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
// 这里是报错的关键代码
for (s <- logSegments)
s.index.sanityCheck()
}

// OffsetIndex.scala
/**
* Do a basic sanity check on this index to detect obvious problems
* @throws IllegalArgumentException if any problems are found
*/
def sanityCheck() {
// 错误提示信息
// 正常情况下 最后一个index的文件大小为0, lastoffset大于baseoffset
require(entries == 0 || lastOffset > baseOffset,
"Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
.format(file.getAbsolutePath, lastOffset, baseOffset))
val len = file.length()
require(len % 8 == 0,
"Index file " + file.getName + " is corrupt, found " + len +
" bytes which is not positive or not a multiple of 8.")
}

由上述代码可见,kafka加载log的大致流程为:
1、 检查上次关闭kafka是否为clean shutdown
2、 删除所有后缀名为.cleaned和.delete的文件
3、 对于.swp结尾的文件,如果是log文件则直接恢复(去掉.swp, 变为.log); 如果是index文件直接删掉(然后rebuild index文件);
4、 对于.index文件,如果没有对应的.log文件(同一个logSement其index和log的主文件名相同), 则删除该index文件;
5、 对于.log文件,加载入内存;如果其没有对应的.index文件(可能在第<2>步中被删除), 重新恢复其index文件;
6、 如果Kafka已经加载到log, 则开始recover log segments
7、 最后做sanityCheck, 主要是检查每个log sement的index文件,确保不会加载一个出错的Log Segment

  • 推理验证

查看先前出现异常时备份kafka .log.index文件,使用ll命令查看文件的大小(这里说下lldu的区别,du查看的是文件占用的磁盘块的大小,而ll查看的是文件大小)如下:

1
2
3
4
5
6
-rw-rw-r-- 1 username username    1678200 Feb 22 17:43 00000000000000163891.index
-rw-rw-r-- 1 username username 1073741823 Feb 22 16:16 00000000000000163891.log
-rw-rw-r-- 1 username username 1666008 Feb 22 17:43 00000000000001131903.index
-rw-rw-r-- 1 username username 1073741375 Feb 22 16:42 00000000000001131903.log
-rw-rw-r-- 1 username username 10485760 Feb 22 18:01 00000000000001964914.index
-rw-rw-r-- 1 username username 834044222 Feb 22 17:31 00000000000001964914.log

发现00000000000001964914.index 的大小为10485760b,即10M,将其删除可以正常启动。

index为什么会发生损坏

.index文件是一个索引文件映射,它不会对每条消息都索引,所以是稀疏文件。
kafka运行时会创建一个log.index.size.max.bytes大小的.index文件,向其中写入稀疏索引,内容达到阈值会进行roll。
.index的中索引并不是往.log中写一条message就写入一条索引,而是间隔indexIntervalBytes大小之后才写入一条索引条目

1
2
3
4
5
6
7
8
9
10
11
//Append the given messages starting with the given offset. Add an entry to the index if needed.
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes()) // append an entry to the index (if needed)
this.bytesSinceLastIndexEntry = 0 // 成功写一次索引后,重置为0
}
log.append(messages) // append the messages
this.bytesSinceLastIndexEntry += messages.sizeInBytes // 统计值增加,用于判断是否需要写索引
}
}

由于.index文件是稀疏文件,所以需要对其进行compacted,在使用kill -9 时可能会导致kafka 对.index文件compacted失败。

相关patch

https://issues.apache.org/jira/browse/KAFKA-1791
https://issues.apache.org/jira/browse/KAFKA-1554
patch1554对其进行了修复,但修复的逻辑也就把相关.index删除,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
// delete any corrupt index file. It will be rebuilt
for (s <- logSegments) {
try {
s.index.sanityCheck()
}
catch {
case e: IllegalArgumentException =>
warn("Found a corrupt index file %s. Deleting it, will be rebuilt".format(s.index.file.getAbsolutePath))
// 删除相关index文件
s.index.delete()
}
}

需要注意的是sanity check的位置发生了变化,在加载.log.index文件之前先进行检查。

参考

http://tech.meituan.com/kafka-fs-design-theory.html
http://blog.csdn.net/jsky_studio/article/details/42012561
http://zqhxuyuan.github.io/2016/01/10/2016-01-10-Kafka_LogAppend/#handleProducerRequest