// k/v accounting // 存放meta数据的IntBuffer,都是int entry,占4byte private IntBuffer kvmeta; // metadata overlay on backing store int kvstart; // marks origin of spill metadata int kvend; // marks end of spill metadata int kvindex; // marks end of fully serialized records // 分割meta和key value内容的标识 // meta数据和key value内容都存放在同一个环形缓冲区,所以需要分隔开 int equator; // marks origin of meta/serialization int bufstart; // marks beginning of spill int bufend; // marks beginning of collectable int bufmark; // marks end of record int bufindex; // marks end of collected int bufvoid; // marks the point where we should stop // reading at the end of the buffer // 存放key value的byte数组,单位是byte,注意与kvmeta区分 byte[] kvbuffer; // main output buffer privatefinalbyte[] b0 = newbyte[0];
// key value在kvbuffer中的地址存放在偏移kvindex的距离 privatestaticfinalint VALSTART = 0; // val offset in acct privatestaticfinalint KEYSTART = 1; // key offset in acct // partition信息存在kvmeta中偏移kvindex的距离 privatestaticfinalint PARTITION = 2; // partition offset in acct privatestaticfinalint VALLEN = 3; // length of value // 一对key value的meta数据在kvmeta中占用的个数 privatestaticfinalint NMETA = 4; // num meta ints // 一对key value的meta数据在kvmeta中占用的byte数 privatestaticfinalint METASIZE = NMETA * 4; // size in bytes
// MapOutputBuffer.collect publicsynchronizedvoidcollect(K key, V value, finalint partition )throws IOException { ... // 新数据collect时,先将剩余的空间减去元数据的长度,之后进行判断 bufferRemaining -= METASIZE; if (bufferRemaining <= 0) { // start spill if the thread is not running and the soft limit has been // reached spillLock.lock(); try { do { // 首次spill时,spillInProgress是false if (!spillInProgress) { // 得到kvindex的byte位置 finalint kvbidx = 4 * kvindex; // 得到kvend的byte位置 finalint kvbend = 4 * kvend; // serialized, unspilled bytes always lie between kvindex and // bufindex, crossing the equator. Note that any void space // created by a reset must be included in "used" bytes finalint bUsed = distanceTo(kvbidx, bufindex); finalboolean bufsoftlimit = bUsed >= softLimit; if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // spill finished, reclaim space resetSpill(); bufferRemaining = Math.min( distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE; continue; } elseif (bufsoftlimit && kvindex != kvend) { // spill records, if any collected; check latter, as it may // be possible for metadata alignment to hit spill pcnt startSpill(); finalint avgRec = (int) (mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter()); // leave at least half the split buffer for serialization data // ensure that kvindex >= bufindex finalint distkvi = distanceTo(bufindex, kvbidx); finalint newPos = (bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE))) % kvbuffer.length; setEquator(newPos); bufmark = bufindex = newPos; finalint serBound = 4 * kvend; // bytes remaining before the lock must be held and limits // checked is the minimum of three arcs: the metadata space, the // serialization space, and the soft limit bufferRemaining = Math.min( // metadata max distanceTo(bufend, newPos), Math.min( // serialization max distanceTo(newPos, serBound), // soft limit softLimit)) - 2 * METASIZE; } } } while (false); } finally { spillLock.unlock(); } } // 将key value 及元数据信息写入缓冲区 try { // serialize key bytes into buffer int keystart = bufindex; // 将key序列化写入kvbuffer中,并移动bufindex keySerializer.serialize(key); // key所占空间被bufvoid分隔,则移动key, // 将其值放在连续的空间中便于sort时key的对比 if (bufindex < keystart) { // wrapped the key; must make contiguous bb.shiftBufferedKey(); keystart = 0; } // serialize value bytes into buffer finalint valstart = bufindex; valSerializer.serialize(value); // It's possible for records to have zero length, i.e. the serializer // will perform no writes. To ensure that the boundary conditions are // checked and that the kvindex invariant is maintained, perform a // zero-length write into the buffer. The logic monitoring this could be // moved into collect, but this is cleaner and inexpensive. For now, it // is acceptable. bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata // for this record are not yet written int valend = bb.markRecord();
publicvoidwrite(byte b[], int off, int len) throws IOException { // must always verify the invariant that at least METASIZE bytes are // available beyond kvindex, even when len == 0 bufferRemaining -= len; if (bufferRemaining <= 0) { // writing these bytes could exhaust available buffer space or fill // the buffer to soft limit. check if spill or blocking are necessary boolean blockwrite = false; spillLock.lock(); try { do { checkSpillException();
finalint kvbidx = 4 * kvindex; finalint kvbend = 4 * kvend; // ser distance to key index finalint distkvi = distanceTo(bufindex, kvbidx); // ser distance to spill end index finalint distkve = distanceTo(bufindex, kvbend);
// if kvindex is closer than kvend, then a spill is neither in // progress nor complete and reset since the lock was held. The // write should block only if there is insufficient space to // complete the current write, write the metadata for this record, // and write the metadata for the next record. If kvend is closer, // then the write should block if there is too little space for // either the metadata or the current write. Note that collect // ensures its metadata requirement with a zero-length write blockwrite = distkvi <= distkve ? distkvi <= len + 2 * METASIZE : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
if (!spillInProgress) { if (blockwrite) { if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // spill finished, reclaim space // need to use meta exclusively; zero-len rec & 100% spill // pcnt would fail resetSpill(); // resetSpill doesn't move bufindex, kvindex bufferRemaining = Math.min( distkvi - 2 * METASIZE, softLimit - distanceTo(kvbidx, bufindex)) - len; continue; } // we have records we can spill; only spill if blocked if (kvindex != kvend) { startSpill(); // Blocked on this write, waiting for the spill just // initiated to finish. Instead of repositioning the marker // and copying the partial record, we set the record start // to be the new equator setEquator(bufmark); } else { // We have no buffered records, and this record is too large // to write into kvbuffer. We must spill it directly from // collect finalint size = distanceTo(bufstart, bufindex) + len; setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; bufvoid = kvbuffer.length; thrownew MapBufferTooSmallException(size + " bytes"); } } }
if (blockwrite) { // wait for spill try { while (spillInProgress) { reporter.progress(); spillDone.await(); } } catch (InterruptedException e) { thrownew IOException( "Buffer interrupted while waiting for the writer", e); } } } while (blockwrite); } finally { spillLock.unlock(); } } // here, we know that we have sufficient space to write if (bufindex + len > bufvoid) { finalint gaplen = bufvoid - bufindex; System.arraycopy(b, off, kvbuffer, bufindex, gaplen); len -= gaplen; off += gaplen; bufindex = 0; } System.arraycopy(b, off, kvbuffer, bufindex, len); bufindex += len; }