ERROR There was an error in one of the threads during logs loading: java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (${log.dirs}/${topicName}-0/00000000000001964914.index) has non-zero size but the last offset is 1964914 and the base offset is 1964914 (kafka.log.LogManager) [2016-02-22 18:01:01,213] FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (${log.dirs}/${topicName}-0/00000000000001964914.index) has non-zero size but the last offset is 1964914 and the base offset is 1964914
if (cleanShutdownFile.exists) { debug( "Found clean shutdown file. " + "Skipping recovery for all logs in data directory: " + dir.getAbsolutePath) } else { // log recovery itself is being performed by `Log` class during initialization brokerState.newState(RecoveringFromUncleanShutdown) }
privatedefloadSegments() { // create the log directory if it doesn't exist dir.mkdirs() // first do a pass through the files in the log directory and remove any temporary files // and complete any interrupted swap operations for(file <- dir.listFiles if file.isFile) { if(!file.canRead) thrownewIOException("Could not read file " + file) val filename = file.getName // DeletedFileSuffix = ".deleted" CleanedFileSuffix = ".cleaned" // 删除所有后缀名为.cleaned和.delete的文件 if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it file.delete() } elseif(filename.endsWith(SwapFileSuffix)) { // SwapFileSuffix = ".swap" // we crashed in the middle of a swap operation, to recover: // if a log, swap it in and delete the .index file // if an index just delete it, it will be rebuilt val baseName = newFile(Utils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { // 如果.swap文件是index文件,则删除该index file.delete() } elseif(baseName.getPath.endsWith(LogFileSuffix)){ // 如果.swap是log文件,则删除该log文件对应的index文件,然后多log文件进行重命名 // delete the index val index = newFile(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() // complete the swap operation val renamed = file.renameTo(baseName) if(renamed) info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath)) else thrownewKafkaException("Failed to rename file %s.".format(file.getPath)) } } }
// now do a second pass and load all the .log and .index files for(file <- dir.listFiles if file.isFile) { val filename = file.getName // IndexFileSuffix = ".index" if(filename.endsWith(IndexFileSuffix)) { // if it is an index file, make sure it has a corresponding .log file val logFile = newFile(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) if(!logFile.exists) { warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) // 如果仅存在index文件,没有相应的log文件,则直接删除index文件 file.delete() } } elseif(filename.endsWith(LogFileSuffix)) { // LogFileSuffix = ".log" // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val hasIndex = Log.indexFilename(dir, start).exists val segment = newLogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time) // 如果log文件对应的index文件不存在,则对index文件进行rebuild if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) } segments.put(start, segment) } }
if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, newLogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) }
// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment // 这里是报错的关键代码 for (s <- logSegments) s.index.sanityCheck() }
// OffsetIndex.scala /** * Do a basic sanity check on this index to detect obvious problems * @throws IllegalArgumentException if any problems are found */ defsanityCheck() { // 错误提示信息 // 正常情况下 最后一个index的文件大小为0, lastoffset大于baseoffset require(entries == 0 || lastOffset > baseOffset, "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" .format(file.getAbsolutePath, lastOffset, baseOffset)) val len = file.length() require(len % 8 == 0, "Index file " + file.getName + " is corrupt, found " + len + " bytes which is not positive or not a multiple of 8.") }
//Append the given messages starting with the given offset. Add an entry to the index if needed. defappend(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { if(bytesSinceLastIndexEntry > indexIntervalBytes) { index.append(offset, log.sizeInBytes()) // append an entry to the index (if needed) this.bytesSinceLastIndexEntry = 0// 成功写一次索引后,重置为0 } log.append(messages) // append the messages this.bytesSinceLastIndexEntry += messages.sizeInBytes // 统计值增加,用于判断是否需要写索引 } }
// sanity check the index file of every segment to ensure we don't proceed with a corrupt segment // delete any corrupt index file. It will be rebuilt for (s <- logSegments) { try { s.index.sanityCheck() } catch { case e: IllegalArgumentException => warn("Found a corrupt index file %s. Deleting it, will be rebuilt".format(s.index.file.getAbsolutePath)) // 删除相关index文件 s.index.delete() } }