日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
MapReduce源碼解析--環(huán)形緩沖區(qū)

這篇文章把Map階段的環(huán)形緩沖區(qū)單獨拿出來進行分析,對環(huán)形緩沖區(qū)的數(shù)據(jù)結構和數(shù)據(jù)進入環(huán)形緩沖區(qū)然后溢寫到磁盤的流程進行分析。

創(chuàng)新互聯(lián)建站專注于成安企業(yè)網(wǎng)站建設,響應式網(wǎng)站建設,商城網(wǎng)站開發(fā)。成安網(wǎng)站建設公司,為成安等地區(qū)提供建站服務。全流程按需網(wǎng)站制作,專業(yè)設計,全程項目跟蹤,創(chuàng)新互聯(lián)建站專業(yè)和態(tài)度為您提供的服務

環(huán)形緩沖區(qū)數(shù)據(jù)結構

Map過程中環(huán)形緩沖區(qū)是指數(shù)據(jù)被map處理之后會先放入內存,內存中的這片區(qū)域就是環(huán)形緩沖區(qū)。

環(huán)形緩沖區(qū)是在MapTask.MapOutputBuffer中定義的,相關的屬性如下:

 
 
 
 
  1. // k/v accounting
  2. // 存放meta數(shù)據(jù)的IntBuffer,都是int entry,占4byte
  3. private IntBuffer kvmeta; // metadata overlay on backing store
  4. int kvstart; // marks origin of spill metadata
  5. int kvend; // marks end of spill metadata
  6. int kvindex; // marks end of fully serialized records
  7. // 分割meta和key value內容的標識
  8. // meta數(shù)據(jù)和key value內容都存放在同一個環(huán)形緩沖區(qū),所以需要分隔開
  9. int equator; // marks origin of meta/serialization
  10. int bufstart; // marks beginning of spill
  11. int bufend; // marks beginning of collectable
  12. int bufmark; // marks end of record
  13. int bufindex; // marks end of collected
  14. int bufvoid; // marks the point where we should stop
  15. // reading at the end of the buffer
  16. // 存放key value的byte數(shù)組,單位是byte,注意與kvmeta區(qū)分
  17. byte[] kvbuffer; // main output buffer
  18. private final byte[] b0 = new byte[0];
  19.  
  20. // key value在kvbuffer中的地址存放在偏移kvindex的距離
  21. private static final int VALSTART = 0; // val offset in acct
  22. private static final int KEYSTART = 1; // key offset in acct
  23. // partition信息存在kvmeta中偏移kvindex的距離
  24. private static final int PARTITION = 2; // partition offset in acct
  25. private static final int VALLEN = 3; // length of value
  26. // 一對key value的meta數(shù)據(jù)在kvmeta中占用的個數(shù)
  27. private static final int NMETA = 4; // num meta ints
  28. // 一對key value的meta數(shù)據(jù)在kvmeta中占用的byte數(shù)
  29. private static final int METASIZE = NMETA * 4; // size in bytes

環(huán)形緩沖區(qū)其實是一個數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和key、value的元數(shù)據(jù)信息,key/value的元數(shù)據(jù)存儲的格式是int類型,每個key/value對應一個元數(shù)據(jù),元數(shù)據(jù)由4個int組成,第一個int存放value的起始位置,第二個存放key的起始位置,第三個存放partition,最后一個存放value的長度。

key/value序列化的數(shù)據(jù)和元數(shù)據(jù)在環(huán)形緩沖區(qū)中的存儲是由equator分隔的,key/value按照索引遞增的方向存儲,meta則按照索引遞減的方向存儲,將其數(shù)組抽象為一個環(huán)形結構之后,以equator為界,key/value順時針存儲,meta逆時針存儲。

初始化

環(huán)形緩沖區(qū)的結構在MapOutputBuffer.init中創(chuàng)建。

 
 
 
 
  1. public void init(MapOutputCollector.Context context
  2. ) throws IOException, ClassNotFoundException {
  3. ...
  4. //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent
  5. // map 端buffer所占的百分比
  6. //sanity checks
  7. final float spillper =
  8. job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
  9. //IO_SORT_MB = "mapreduce.task.io.sort.mb"
  10. // map 端buffer大小
  11. // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數(shù)倍
  12. final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
  13. // 所有的spill index 在內存所占的大小的閾值
  14. indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
  15. INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
  16. ...
  17. // 排序的實現(xiàn)類,可以自己實現(xiàn)。 這里用的是改寫的快排
  18. sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
  19. QuickSort.class, IndexedSorter.class), job);
  20. // buffers and accounting
  21. // 上面IO_SORT_MB的單位是MB,左移20位將單位轉化為byte
  22. int maxMemUsage = sortmb << 20;
  23. // METASIZE是元數(shù)據(jù)的長度,元數(shù)據(jù)有4個int單元,分別為
  24. // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個byte,
  25. // 所以METASIZE長度為16。下面是計算buffer中最多有多少byte來存元數(shù)據(jù)
  26. maxMemUsage -= maxMemUsage % METASIZE;
  27. // 元數(shù)據(jù)數(shù)組 以byte為單位
  28. kvbuffer = new byte[maxMemUsage];
  29. bufvoid = kvbuffer.length;
  30. // 將kvbuffer轉化為int型的kvmeta 以int為單位,也就是4byte
  31. kvmeta = ByteBuffer.wrap(kvbuffer)
  32. .order(ByteOrder.nativeOrder())
  33. .asIntBuffer();
  34. // 設置buf和kvmeta的分界線
  35. setEquator(0);
  36. bufstart = bufend = bufindex = equator;
  37. kvstart = kvend = kvindex;
  38. // kvmeta中存放元數(shù)據(jù)實體的最大個數(shù)
  39. maxRec = kvmeta.capacity() / NMETA;
  40. // buffer spill時的閾值(不單單是sortmb*spillper)
  41. // 更加精確的是kvbuffer.length*spiller
  42. softLimit = (int)(kvbuffer.length * spillper);
  43. // 此變量較為重要,作為spill的動態(tài)衡量標準
  44. bufferRemaining = softLimit;
  45. ...
  46. // k/v serialization
  47. comparator = job.getOutputKeyComparator();
  48. keyClass = (Class)job.getMapOutputKeyClass();
  49. valClass = (Class)job.getMapOutputValueClass();
  50. serializationFactory = new SerializationFactory(job);
  51. keySerializer = serializationFactory.getSerializer(keyClass);
  52. // 將bb作為key序列化寫入的output
  53. keySerializer.open(bb);
  54. valSerializer = serializationFactory.getSerializer(valClass);
  55. // 將bb作為value序列化寫入的output
  56. valSerializer.open(bb);
  57. ...
  58. // combiner
  59. ...
  60. spillInProgress = false;
  61. // 最后一次merge時,在有combiner的情況下,超過此閾值才執(zhí)行combiner
  62. minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
  63. spillThread.setDaemon(true);
  64. spillThread.setName("SpillThread");
  65. spillLock.lock();
  66. try {
  67. spillThread.start();
  68. while (!spillThreadRunning) {
  69. spillDone.await();
  70. }
  71. } catch (InterruptedException e) {
  72. throw new IOException("Spill thread failed to initialize", e);
  73. } finally {
  74. spillLock.unlock();
  75. }
  76. if (sortSpillException != null) {
  77. throw new IOException("Spill thread failed to initialize",
  78. sortSpillException);
  79. }
  80. }

init是對環(huán)形緩沖區(qū)進行初始化構造,由mapreduce.task.io.sort.mb決定map中環(huán)形緩沖區(qū)的大小sortmb,默認是100M。

此緩沖區(qū)也用于存放meta,一個meta占用METASIZE(16byte),則其中用于存放數(shù)據(jù)的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設置sortmb轉換為byte之后是16的整數(shù)倍),然后用maxMemUsage初始化kvbuffer字節(jié)數(shù)組和kvmeta整形數(shù)組,最后設置數(shù)組的一些標識信息。利用setEquator(0)設置kvbuffer和kvmeta的分界線,初始化的時候以0為分界線,kvindex為aligned - METASIZE + kvbuffer.length,其位置在環(huán)形數(shù)組中相當于按照逆時針方向減去METASIZE,由kvindex設置kvstart = kvend = kvindex,由equator設置bufstart = bufend = bufindex = equator,還得設置bufvoid = kvbuffer.length,bufvoid用于標識用于存放數(shù)據(jù)的最大位置。

為了提高效率,當buffer占用達到閾值之后,會進行spill,這個閾值是由bufferRemaining進行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;進行初始化賦值,這里需要注意的是softLimit并不是sortmb*spillper,而是kvbuffer.length * spillper,當sortmb << 20是16的整數(shù)倍時,才可以認為softLimit是sortmb*spillper。

下面是setEquator的代碼

 
 
 
 
  1. // setEquator(0)的代碼如下
  2. private void setEquator(int pos) {
  3. equator = pos;
  4. // set index prior to first entry, aligned at meta boundary
  5. // 第一個 entry的末尾位置,即元數(shù)據(jù)和kv數(shù)據(jù)的分界線 單位是byte
  6. final int aligned = pos - (pos % METASIZE);
  7. // Cast one of the operands to long to avoid integer overflow
  8. // 元數(shù)據(jù)中存放數(shù)據(jù)的起始位置
  9. kvindex = (int)
  10. (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
  11. LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
  12. "(" + (kvindex * 4) + ")");
  13. }

buffer初始化之后的抽象數(shù)據(jù)結構如下圖所示:

環(huán)形緩沖區(qū)數(shù)據(jù)結構圖

寫入buffer

Map通過NewOutputCollector.write方法調用collector.collect向buffer中寫入數(shù)據(jù),數(shù)據(jù)寫入之前已在NewOutputCollector.write中對要寫入的數(shù)據(jù)進行逐條分區(qū),下面看下collect

 
 
 
 
  1. // MapOutputBuffer.collect
  2. public synchronized void collect(K key, V value, final int partition
  3. ) throws IOException {
  4. ...
  5. // 新數(shù)據(jù)collect時,先將剩余的空間減去元數(shù)據(jù)的長度,之后進行判斷
  6. bufferRemaining -= METASIZE;
  7. if (bufferRemaining <= 0) {
  8. // start spill if the thread is not running and the soft limit has been
  9. // reached
  10. spillLock.lock();
  11. try {
  12. do {
  13. // 首次spill時,spillInProgress是false
  14. if (!spillInProgress) {
  15. // 得到kvindex的byte位置
  16. final int kvbidx = 4 * kvindex;
  17. // 得到kvend的byte位置
  18. final int kvbend = 4 * kvend;
  19. // serialized, unspilled bytes always lie between kvindex and
  20. // bufindex, crossing the equator. Note that any void space
  21. // created by a reset must be included in "used" bytes
  22. final int bUsed = distanceTo(kvbidx, bufindex);
  23. final boolean bufsoftlimit = bUsed >= softLimit;
  24. if ((kvbend + METASIZE) % kvbuffer.length !=
  25. equator - (equator % METASIZE)) {
  26. // spill finished, reclaim space
  27. resetSpill();
  28. bufferRemaining = Math.min(
  29. distanceTo(bufindex, kvbidx) - 2 * METASIZE,
  30. softLimit - bUsed) - METASIZE;
  31. continue;
  32. } else if (bufsoftlimit && kvindex != kvend) {
  33. // spill records, if any collected; check latter, as it may
  34. // be possible for metadata alignment to hit spill pcnt
  35. startSpill();
  36. final int avgRec = (int)
  37. (mapOutputByteCounter.getCounter() /
  38. mapOutputRecordCounter.getCounter());
  39. // leave at least half the split buffer for serialization data
  40. // ensure that kvindex >= bufindex
  41. final int distkvi = distanceTo(bufindex, kvbidx);
  42. final int newPos = (bufindex +
  43. Math.max(2 * METASIZE - 1,
  44. Math.min(distkvi / 2,
  45. distkvi / (METASIZE + avgRec) * METASIZE)))
  46. % kvbuffer.length;
  47. setEquator(newPos);
  48. bufmark = bufindex = newPos;
  49. final int serBound = 4 * kvend;
  50. // bytes remaining before the lock must be held and limits
  51. // checked is the minimum of three arcs: the metadata space, the
  52. // serialization space, and the soft limit
  53. bufferRemaining = Math.min(
  54. // metadata max
  55. distanceTo(bufend, newPos),
  56. Math.min(
  57. // serialization max
  58. distanceTo(newPos, serBound),
  59. // soft limit
  60. softLimit)) - 2 * METASIZE;
  61. }
  62. }
  63. } while (false);
  64. } finally {
  65. spillLock.unlock();
  66. }
  67. }
  68. // 將key value 及元數(shù)據(jù)信息寫入緩沖區(qū)
  69. try {
  70. // serialize key bytes into buffer
  71. int keystart = bufindex;
  72. // 將key序列化寫入kvbuffer中,并移動bufindex
  73. keySerializer.serialize(key);
  74. // key所占空間被bufvoid分隔,則移動key,
  75. // 將其值放在連續(xù)的空間中便于sort時key的對比
  76. if (bufindex < keystart) {
  77. // wrapped the key; must make contiguous
  78. bb.shiftBufferedKey();
  79. keystart = 0;
  80. }
  81. // serialize value bytes into buffer
  82. final int valstart = bufindex;
  83. valSerializer.serialize(value);
  84. // It's possible for records to have zero length, i.e. the serializer
  85. // will perform no writes. To ensure that the boundary conditions are
  86. // checked and that the kvindex invariant is maintained, perform a
  87. // zero-length write into the buffer. The logic monitoring this could be
  88. // moved into collect, but this is cleaner and inexpensive. For now, it
  89. // is acceptable.
  90. bb.write(b0, 0, 0);
  91.  
  92. // the record must be marked after the preceding write, as the metadata
  93. // for this record are not yet written
  94. int valend = bb.markRecord();
  95.  
  96. mapOutputRecordCounter.increment(1);
  97. mapOutputByteCounter.increment(
  98. distanceTo(keystart, valend, bufvoid));
  99.  
  100. // write accounting info
  101. kvmeta.put(kvindex + PARTITION, partition);
  102. kvmeta.put(kvindex + KEYSTART, keystart);
  103. kvmeta.put(kvindex + VALSTART, valstart);
  104. kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
  105. // advance kvindex
  106. kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
  107. } catch (MapBufferTooSmallException e) {
  108. LOG.info("Record too large for in-memory buffer: " + e.getMessage());
  109. spillSingleRecord(key, value, partition);
  110. mapOutputRecordCounter.increment(1);
  111. return;
  112. }
  113. }

每次寫入數(shù)據(jù)時,執(zhí)行bufferRemaining -= METASIZE之后,檢查bufferRemaining,

如果大于0,直接將key/value序列化對和對應的meta寫入buffer中,key/value是序列化之后寫入的,key/value經(jīng)過一些列的方法調用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最后由MapOutputBuffer.Buffer.write(b, off, len)將數(shù)據(jù)寫入kvbuffer中,write方法如下:

 
 
 
 
  1. public void write(byte b[], int off, int len)
  2. throws IOException {
  3. // must always verify the invariant that at least METASIZE bytes are
  4. // available beyond kvindex, even when len == 0
  5. bufferRemaining -= len;
  6. if (bufferRemaining <= 0) {
  7. // writing these bytes could exhaust available buffer space or fill
  8. // the buffer to soft limit. check if spill or blocking are necessary
  9. boolean blockwrite = false;
  10. spillLock.lock();
  11. try {
  12. do {
  13. checkSpillException();
  14.  
  15. final int kvbidx = 4 * kvindex;
  16. final int kvbend = 4 * kvend;
  17. // ser distance to key index
  18. final int distkvi = distanceTo(bufindex, kvbidx);
  19. // ser distance to spill end index
  20. final int distkve = distanceTo(bufindex, kvbend);
  21.  
  22. // if kvindex is closer than kvend, then a spill is neither in
  23. // progress nor complete and reset since the lock was held. The
  24. // write should block only if there is insufficient space to
  25. // complete the current write, write the metadata for this record,
  26. // and write the metadata for the next record. If kvend is closer,
  27. // then the write should block if there is too little space for
  28. // either the metadata or the current write. Note that collect
  29. // ensures its metadata requirement with a zero-length write
  30. blockwrite = distkvi <= distkve
  31. ? distkvi <= len + 2 * METASIZE
  32. : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
  33.  
  34. if (!spillInProgress) {
  35. if (blockwrite) {
  36. if ((kvbend + METASIZE) % kvbuffer.length !=
  37. equator - (equator % METASIZE)) {
  38. // spill finished, reclaim space
  39. // need to use meta exclusively; zero-len rec & 100% spill
  40. // pcnt would fail
  41. resetSpill(); // resetSpill doesn't move bufindex, kvindex
  42. bufferRemaining = Math.min(
  43. distkvi - 2 * METASIZE,
  44. softLimit - distanceTo(kvbidx, bufindex)) - len;
  45. continue;
  46. }
  47. // we have records we can spill; only spill if blocked
  48. if (kvindex != kvend) {
  49. startSpill();
  50. // Blocked on this write, waiting for the spill just
  51. // initiated to finish. Instead of repositioning the marker
  52. // and copying the partial record, we set the record start
  53. // to be the new equator
  54. setEquator(bufmark);
  55. } else {
  56. // We have no buffered records, and this record is too large
  57. // to write into kvbuffer. We must spill it directly from
  58. // collect
  59. final int size = distanceTo(bufstart, bufindex) + len;
  60. setEquator(0);
  61. bufstart = bufend = bufindex = equator;
  62. kvstart = kvend = kvindex;
  63. bufvoid = kvbuffer.length;
  64. throw new MapBufferTooSmallException(size + " bytes");
  65. }
  66. }
  67. }
  68.  
  69. if (blockwrite) {
  70. // wait for spill
  71. try {
  72. while (spillInProgress) {
  73. reporter.progress();
  74. spillDone.await();
  75. }
  76. } catch (InterruptedException e) {
  77. throw new IOException(
  78. "Buffer interrupted while waiting for the writer", e);
  79. }
  80. }
  81. } while (blockwrite);
  82. } finally {
  83. spillLock.unlock();
  84. }
  85. }
  86. // here, we know that we have sufficient space to write
  87. if (bufindex + len > bufvoid) {
  88. final int gaplen = bufvoid - bufindex;
  89. System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
  90. len -= gaplen;
  91. off += gaplen;
  92. bufindex = 0;
  93. }
  94. System.arraycopy(b, off, kvbuffer, bufindex, len);
  95. bufindex += len;
  96. }

write方法將key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則將寫入的內容分開存儲,將一部分寫入bufindex和bufvoid之間,然后重置bufindex,將剩余的部分寫入,這里不區(qū)分key和value,寫入key之后會在collect中判斷bufindex < keystart,當bufindex小時,則key被分開存儲,執(zhí)行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲,key不能分開存儲是因為要對key進行排序。

這里需要注意的是要寫入的數(shù)據(jù)太長,并且kvinde==kvend,則拋出MapBufferTooSmallException異常,在collect中捕獲,將此數(shù)據(jù)直接spill到磁盤spillSingleRecord,也就是當單條記錄過長時,不寫buffer,直接寫入磁盤。

下面看下bb.shiftBufferedKey()代碼

 
 
 
 
  1. // BlockingBuffer.shiftBufferedKey
  2. protected void shiftBufferedKey() throws IOException {
  3. // spillLock unnecessary; both kvend and kvindex are current
  4. int headbytelen = bufvoid - bufmark;
  5. bufvoid = bufmark;
  6. final int kvbidx = 4 * kvindex;
  7. final int kvbend = 4 * kvend;
  8. final int avail =
  9. Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
  10. if (bufindex + headbytelen < avail) {
  11. System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
  12. System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
  13. bufindex += headbytelen;
  14. bufferRemaining -= kvbuffer.length - bufvoid;
  15. } else {
  16. byte[] keytmp = new byte[bufindex];
  17. System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
  18. bufindex = 0;
  19. out.write(kvbuffer, bufmark, headbytelen);
  20. out.write(keytmp);
  21. }
  22. }

shiftBufferedKey時,判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先將首部的部分key寫入keytmp中,然后分兩次寫入,再次調用Buffer.write,如果有足夠的空間,分兩次copy,先將首部的部分key復制到headbytelen的位置,然后將末尾的部分key復制到首部,移動bufindex,重置bufferRemaining的值。

key/value寫入之后,繼續(xù)寫入元數(shù)據(jù)信息并重置kvindex的值。

spill

一次寫入buffer結束,當寫入數(shù)據(jù)比較多,bufferRemaining小于等于0時,準備進行spill,首次spill,spillInProgress為false,此時查看bUsed = distanceTo(kvbidx, bufindex),此時bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進行spill,調用startSpill

 
 
 
 
  1. private void startSpill() {
  2. // 元數(shù)據(jù)的邊界賦值
  3. kvend = (kvindex + NMETA) % kvmeta.capacity();
  4. // key/value的邊界賦值
  5. bufend = bufmark;
  6. // 設置spill運行標識
  7. spillInProgress = true;
  8. ...
  9. // 利用重入鎖,對spill線程進行喚醒
  10. spillReady.signal();
  11. }

startSpill喚醒spill線程之后,進程spill操作,但此時map向buffer的寫入操作并沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設定:

 
 
 
 
  1. // 根據(jù)已經(jīng)寫入的kv得出每個record的平均長度
  2. final int avgRec = (int) (mapOutputByteCounter.getCounter() /
  3. mapOutputRecordCounter.getCounter());
  4. // leave at least half the split buffer for serialization data
  5. // ensure that kvindex >= bufindex
  6. // 得到空余空間的大小
  7. final int distkvi = distanceTo(bufindex, kvbidx);
  8. // 得出新equator的位置
  9. final int newPos = (bufindex +
  10. Math.max(2 * METASIZE - 1,
  11. Math.min(distkvi / 2,
  12. distkvi / (METASIZE + avgRec) * METASIZE)))
  13. % kvbuffer.length;
  14. setEquator(newPos);
  15. bufmark = bufindex = newPos;
  16. final int serBound = 4 * kvend;
  17. // bytes remaining before the lock must be held and limits
  18. // checked is the minimum of three arcs: the metadata space, the
  19. // serialization space, and the soft limit
  20. bufferRemaining = Math.min(
  21. // metadata max
  22. distanceTo(bufend, newPos),
  23. Math.min(
  24. // serialization max
  25. distanceTo(newPos, serBound),
  26. // soft limit
  27. softLimit)) - 2 * METASIZE;

因為equator是kvbuffer和kvmeta的分界線,為了更多的空間存儲kv,則最多拿出distkvi的一半來存儲meta,并且利用avgRec估算distkvi能存放多少個record和meta對,根據(jù)record和meta對的個數(shù)估算meta所占空間的大小,從distkvi/2和meta所占空間的大小中取最小值,又因為distkvi中最少得存放一個meta,所占空間為METASIZE,在選取kvindex時需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))。equator選取之后,設置bufmark = bufindex = newPos和kvindex,但此時并不設置bufstart、bufend和kvstart、kvend,因為這幾個值要用來表示spill數(shù)據(jù)的邊界。

spill之后,可用的空間減少了,則控制spill的bufferRemaining也應該重新設置,bufferRemaining取三個值的最小值減去2*METASIZE,三個值分別是meta可用占用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這里為什么要減去2*METASIZE,一個是spill之前kvend到kvindex的距離,另一個是當時的kvindex空間????此時,已有一個record要寫入buffer,需要從bufferRemaining中減去當前record的元數(shù)據(jù)占用的空間,即減去METASIZE,另一個METASIZE是在計算equator時,沒有包括kvindex到kvend(spill之前)的這段METASIZE,所以要減去這個METASIZE。

接下來解析下SpillThread線程,查看其run方法:

 
 
 
 
  1. public void run() {
  2. spillLock.lock();
  3. spillThreadRunning = true;
  4. try {
  5. while (true) {
  6. spillDone.signal();
  7. // 判斷是否在spill,false則掛起SpillThread線程,等待喚醒
  8. while (!spillInProgress) {
  9. spillReady.await();
  10. }
  11. try {
  12. spillLock.unlock();
  13. // 喚醒之后,進行排序和溢寫到磁盤
  14. sortAndSpill();
  15. } catch (Throwable t) {
  16. sortSpillException = t;
  17. } finally {
  18. spillLock.lock();
  19. if (bufend < bufstart) {
  20. bufvoid = kvbuffer.length;
  21. }
  22. kvstart = kvend;
  23. bufstart = bufend;
  24. spillInProgress = false;
  25. }
  26. }
  27. } catch (InterruptedException e) {
  28. Thread.currentThread().interrupt();
  29. } finally {
  30. spillLock.unlock();
  31. spillThreadRunning = false;
  32. }
  33. }

run中主要是sortAndSpill,

 
 
 
 
  1. private void sortAndSpill() throws IOException, ClassNotFoundException,
  2. InterruptedException {
  3. //approximate the length of the output file to be the length of the
  4. //buffer + header lengths for the partitions
  5. final long size = distanceTo(bufstart, bufend, bufvoid) +
  6. partitions * APPROX_HEADER_LENGTH;
  7. FSDataOutputStream out = null;
  8. try {
  9. // create spill file
  10. // 用來存儲index文件
  11. final SpillRecord spillRec = new SpillRecord(partitions);
  12. // 創(chuàng)建寫入磁盤的spill文件
  13. final Path filename =
  14. mapOutputFile.getSpillFileForWrite(numSpills, size);
  15. // 打開文件流
  16. out = rfs.create(filename);
  17. // kvend/4 是截止到當前位置能存放多少個元數(shù)據(jù)實體
  18. final int mstart = kvend / NMETA;
  19. // kvstart 處能存放多少個元數(shù)據(jù)實體
  20. // 元數(shù)據(jù)則在mstart和mend之間,(mstart - mend)則是元數(shù)據(jù)的個數(shù)
  21. final int mend = 1 + // kvend is a valid record
  22. (kvstart >= kvend
  23. ? kvstart
  24. : kvmeta.capacity() + kvstart) / NMETA;
  25. // 排序 只對元數(shù)據(jù)進行排序,只調整元數(shù)據(jù)在kvmeta中的順序
  26. // 排序規(guī)則是MapOutputBuffer.compare,
  27. // 先對partition進行排序其次對key值排序
  28. sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
  29. int spindex = mstart;
  30. // 創(chuàng)建rec,用于存放該分區(qū)在數(shù)據(jù)文件中的信息
  31. final IndexRecord rec = new IndexRecord();
  32. final InMemValBytes value = new InMemValBytes();
  33. for (int i = 0; i < partitions; ++i) {
  34. // 臨時文件是IFile格式的
  35. IFile.Writer writer = null;
  36. try {
  37. long segmentStart = out.getPos();
  38. FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
  39. writer = new Writer(job, partitionOut, keyClass, valClass, codec,
  40. spilledRecordsCounter);
  41. // 往磁盤寫數(shù)據(jù)時先判斷是否有combiner
  42. if (combinerRunner == null) {
  43. // spill directly
  44. DataInputBuffer key = new DataInputBuffer();
  45. // 寫入相同partition的數(shù)據(jù)
  46. while (spindex < mend &&
  47. kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
  48. final int kvoff = offsetFor(spindex % maxRec);
  49. int keystart = kvmeta.get(kvoff + KEYSTART);
  50. int valstart = kvmeta.get(kvoff + VALSTART);
  51. key.reset(kvbuffer, keystart, valstart - keystart);
  52. getVBytesForOffset(kvoff, value);
  53. writer.append(key, value);
  54. ++spindex;
  55. }
  56. } else {
  57. int spstart = spindex;
  58. while (spindex < mend &&
  59. kvmeta.get(offsetFor(spindex % maxRec)
  60. + PARTITION) == i) {
  61. ++spindex;
  62. }
  63. // Note: we would like to avoid the combiner if we've fewer
  64. // than some threshold of records for a partition
  65. if (spstart != spindex) {
  66. combineCollector.setWriter(writer);
  67. RawKeyValueIterator kvIter =
  68. new MRResultIterator(spstart, spindex);
  69. combinerRunner.combine(kvIter, combineCollector);
  70. }
  71. }
  72.  
  73. // close the writer
  74. writer.close();
  75.  
  76. // record offsets
  77. // 記錄當前partition i的信息寫入索文件rec中
  78. rec.startOffset = segmentStart;
  79. rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
  80. rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
  81. // spillRec中存放了spill中partition的信息,便于后續(xù)堆排序時,取出partition相關的數(shù)據(jù)進行排序
  82. spillRec.putIndex(rec, i);
  83.  
  84. writer = null;
  85. } finally {
  86. if (null != writer) writer.close();
  87. }
  88. }
  89. // 判斷內存中的index文件是否超出閾值,超出則將index文件寫入磁盤
  90. // 當超出閾值時只是把當前index和之后的index寫入磁盤
  91. if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
  92. // create spill index file
  93. // 創(chuàng)建index文件
  94. Path indexFilename =
  95. mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
  96. * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  97. spillRec.writeToFile(indexFilename, job);
  98. } else {
  99. indexCacheList.add(spillRec);
  100. totalIndexCacheMemory +=
  101. spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  102. }
  103. LOG.info("Finished spill " + numSpills);
  104. ++numSpills;
  105. } finally {
  106. if (out != null) out.close();
  107. }
  108. }

ortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁盤,調用sorter.sort對meta進行排序,先對partition進行排序,然后按key排序,排序的結果只調整meta的順序。

排序之后,判斷是否有combiner,沒有則直接將record寫入磁盤,寫入時是一個partition一個IndexRecord,如果有combiner,則將該partition的record寫入kvIter,然后調用combinerRunner.combine執(zhí)行combiner。

寫入磁盤之后,將spillx.out對應的spillRec放入內存indexCacheList.add(spillRec),如果所占內存totalIndexCacheMemory超過了indexCacheMemoryLimit,則創(chuàng)建index文件,將此次及以后的spillRec寫入index文件存入磁盤。

最后spill次數(shù)遞增。sortAndSpill結束之后,回到run方法中,執(zhí)行finally中的代碼,對kvstart和bufstart賦值,kvstart = kvend,bufstart = bufend,設置spillInProgress的狀態(tài)為false。

在spill的同時,map往buffer的寫操作并沒有停止,依然在調用collect,再次回到collect方法中,

 
 
 
 
  1. // MapOutputBuffer.collect
  2. public synchronized void collect(K key, V value, final int partition
  3. ) throws IOException {
  4. ...
  5. // 新數(shù)據(jù)collect時,先將剩余的空間減去元數(shù)據(jù)的長度,之后進行判斷
  6. bufferRemaining -= METASIZE;
  7. if (bufferRemaining <= 0) {
  8. // start spill if the thread is not running and the soft limit has been
  9. // reached
  10. spillLock.lock();
  11. try {
  12. do {
  13. // 首次spill時,spillInProgress是false
  14. if (!spillInProgress) {
  15. // 得到kvindex的byte位置
  16. final int kvbidx = 4 * kvindex;
  17. // 得到kvend的byte位置
  18. final int kvbend = 4 * kvend;
  19. // serialized, unspilled bytes always lie between kvindex and
  20. // bufindex, crossing the equator. Note that any void space
  21. // created by a reset must be included in "used" bytes
  22. <
    本文標題:MapReduce源碼解析--環(huán)形緩沖區(qū)
    網(wǎng)頁網(wǎng)址:http://www.dlmjj.cn/article/dppcejo.html