赞
踩
为了提高查找消息的性能,kafka在0.8 后为每个日志文件添加了对应的索引文件,OffsetIndex对象对应管理磁盘上的一个索引文件,预FileMessageSet共同构成了一个LogSegment对象。
OffsetIndex字段有:
- class OffsetIndex(@volatile private[this]
- //指向磁盘上的索引文件。
- var _file: File,
- // 日志文件中的第一个消息的offset
- val baseOffset: Long,
- val maxIndexSize: Int = -1) extends Logging {
- //mmap用来操作索引文件
- private[this] var mmap: MappedByteBuffer = {
- //如果索引文件不存在,则创建新文件并返回true
- val newlyCreated = _file.createNewFile()
- val raf = new RandomAccessFile(_file, "rw")
- try {
- /* 对新创建的索引文件进行扩容 */
- if (newlyCreated) {
- if (maxIndexSize < 8)
- throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
- //根据maxIndexSize的值对索引文件进行扩容,扩容结果是小于maxIndexSize的最大的8的倍数
- raf.setLength(roundToExactMultiple(maxIndexSize, 8))
- }
-
- /* 内存映射 */
- val len = raf.length()
- val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
- /* set the position in the index for the next entry */
- if (newlyCreated)
- idx.position(0)
- else
- // if this is a pre-existing index, assume it is all valid and set position to last entry
- idx.position(roundToExactMultiple(idx.limit, 8))
- idx
- } finally {
- CoreUtils.swallow(raf.close())
- }
- }
-
- /* 索引文件中索引项的个数 */
- @volatile
- private[this] var _entries = mmap.position / 8
-
- /* 最大索引个数 */
- @volatile
- private[this] var _maxEntries = mmap.limit / 8
- //最后一个索引的offset
- @volatile
- private[this] var _lastOffset = readLastEntry.offset
- }

offsetIndex中最常用的是查找方法lockup
- def lookup(targetOffset: Long): OffsetPosition = {
- maybeLock(lock) {
- val idx = mmap.duplicate
- //二分查找
- val slot = indexSlotFor(idx, targetOffset)
- if(slot == -1)
- OffsetPosition(baseOffset, 0)
- else
- OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
- }
- }
-
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。