更多
当前位置: 首页 > 综合

MapReduce Shuffle源码解读-世界短讯

发布时间:2023-03-26 11:32:58 来源:博客园
MapReduce Shuffle源码解读

相信很多小伙伴都背过shuffle的八股文,但一直不是很理解shuffle的过程,这次我通过源码来解读下shuffle过程,加深对shuffle的理解,但是我自己还是个菜鸟,这篇博客也是参考了很多资料,如果有不对的地方,请指正。


(资料图)

shuffle是Map Task和 Reduce Task之间的一个阶段,本质上是一个跨节点跨进程间的数据传输,网上的资料也把MapReduce的过程细分为六个阶段:

Collect 2. Spill 3.Merge 4.Copy 5.Merge 6. Sort

看过源码之后,这几个阶段划分的还是很有道理的,首先看看官网上对shuffle的描述图,有个印象

Map

首先,我们先来看看Map阶段的代码,先找到Map Task的入口(org/apache/hadoop/mapred/MapTask.java)的run方法,当map task启动时都会执行这个方法。

@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)  throws IOException, ClassNotFoundException, InterruptedException {  this.umbilical = umbilical;   // 一个taskAttempt的代理,后面比较多的地方使用  if (isMapTask()) {    // If there are no reducers then there won"t be any sort. Hence the map     // phase will govern the entire attempt"s progress.    if (conf.getNumReduceTasks() == 0) {      mapPhase = getProgress().addPhase("map", 1.0f);    } else {      // If there are reducers then the entire attempt"s progress will be       // split between the map phase (67%) and the sort phase (33%).      mapPhase = getProgress().addPhase("map", 0.667f);      sortPhase  = getProgress().addPhase("sort", 0.333f);    }  }  // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)  TaskReporter reporter = startReporter(umbilical);  boolean useNewApi = job.getUseNewMapper();  initialize(job, getJobID(), reporter, useNewApi);  // 重要方法,可以认为初始化task启动的一切资源了  // check if it is a cleanupJobTask  if (jobCleanup) {    runJobCleanupTask(umbilical, reporter);    return;  }  if (jobSetup) {    runJobSetupTask(umbilical, reporter);    return;  }  if (taskCleanup) {    runTaskCleanupTask(umbilical, reporter);    return;  }  if (useNewApi) {    runNewMapper(job, splitMetaInfo, umbilical, reporter); // 核心代码,点进去  } else {    runOldMapper(job, splitMetaInfo, umbilical, reporter);  }  done(umbilical, reporter);}

这里umbilical比较难理解,我其实也没怎么搞懂,看名字是个协议,这里贴出它的注释

任务子进程用于联系其父进程的协议。父进程是一个守护进程,它轮询中央主进程以获取新的map或reduce Task,并将其作为子进程(Child)运行。孩子和父母之间的所有通信都是通过此协议进行的

看起来是个RPC,这个父进程我不是很清楚,我理解是在v1版本的话,这个可能是taskTracker,如果在v2版本(yarn)可能是ApplicationMaster,如果不对,请大神解答我的疑问。

进入runNewMapper方法

@SuppressWarnings("unchecked")private void runNewMapper(final JobConf job,                  final TaskSplitIndex splitIndex,                  final TaskUmbilicalProtocol umbilical,                  TaskReporter reporter                  ) throws IOException, ClassNotFoundException,                           InterruptedException {  // make a task context so we can get the classes  创建Task的上下文环境  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,                                                                 getTaskID(),                                                                reporter);  // make a mapper  通过反射创建mapper  org.apache.hadoop.mapreduce.Mapper mapper =    (org.apache.hadoop.mapreduce.Mapper)      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);  // make the input format   通过反射创建inputFormat,来读取数据  org.apache.hadoop.mapreduce.InputFormat inputFormat =    (org.apache.hadoop.mapreduce.InputFormat)      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);  // rebuild the input split // 获取切片信息  org.apache.hadoop.mapreduce.InputSplit split = null;  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),      splitIndex.getStartOffset());  LOG.info("Processing split: " + split);  org.apache.hadoop.mapreduce.RecordReader input =    new NewTrackingRecordReader   //通过反射创建RecordReader。InputFormat是通过RecordReader来读取数据,这个也是大学问,在job submit时很关键      (split, inputFormat, reporter, taskContext);    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());  org.apache.hadoop.mapreduce.RecordWriter output = null;    // get an output object  if (job.getNumReduceTasks() == 0) { // 如果没有reduce任务,则直接写入磁盘    output =       new NewDirectOutputCollector(taskContext, job, umbilical, reporter);  } else { //  核心代码,创建collector收集器  ,点进去    output = new NewOutputCollector(taskContext, job, umbilical, reporter);  }  org.apache.hadoop.mapreduce.MapContext   mapContext =     new MapContextImpl(job, getTaskID(),         input, output,         committer,         reporter, split);  org.apache.hadoop.mapreduce.Mapper.Context       mapperContext =         new WrappedMapper().getMapContext(            mapContext);  try {    input.initialize(split, mapperContext);    mapper.run(mapperContext);  // 调用我们自己实现的mapper类    mapPhase.complete();    setPhase(TaskStatus.Phase.SORT);    statusUpdate(umbilical);    input.close();    input = null;    output.close(mapperContext);    output = null;  } finally {    closeQuietly(input);    closeQuietly(output, mapperContext);  }}

马上进入collect阶段了,点进 NewOutputCollector,看看如何创建Collector

private class NewOutputCollector    extends org.apache.hadoop.mapreduce.RecordWriter {    private final MapOutputCollector collector;    private final org.apache.hadoop.mapreduce.Partitioner partitioner;    private final int partitions;    @SuppressWarnings("unchecked")    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,                       JobConf job,                       TaskUmbilicalProtocol umbilical,                       TaskReporter reporter                       ) throws IOException, ClassNotFoundException {      collector = createSortingCollector(job, reporter);      partitions = jobContext.getNumReduceTasks();  // partitions数等于reduce任务数      if (partitions > 1) {        partitioner = (org.apache.hadoop.mapreduce.Partitioner)          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);      } else {        partitioner = new org.apache.hadoop.mapreduce.Partitioner() {          @Override          public int getPartition(K key, V value, int numPartitions) {            return partitions - 1;          }        };      }    }    @Override    public void write(K key, V value) throws IOException, InterruptedException {      collector.collect(key, value, // 向对应分区的环形缓冲区写入(k,v)                        partitioner.getPartition(key, value, partitions));    }    @Override    public void close(TaskAttemptContext context                      ) throws IOException,InterruptedException {      try {        collector.flush();//核心方法,将数据刷出去。      } catch (ClassNotFoundException cnf) {        throw new IOException("can"t find class ", cnf);      }      collector.close();    }  }

点进 creareSortingCollector

@SuppressWarnings("unchecked")private  MapOutputCollector  // collector是map 类型        createSortingCollector(JobConf job, TaskReporter reporter)  throws IOException, ClassNotFoundException {  MapOutputCollector.Context context =    new MapOutputCollector.Context(this, job, reporter);  Class[] collectorClasses = job.getClasses(  // 获取Map Collector的类型    JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);  // 说到底还是MapOutputBuffer类型  int remainingCollectors = collectorClasses.length;  Exception lastException = null;  for (Class clazz : collectorClasses) {    try {      if (!MapOutputCollector.class.isAssignableFrom(clazz)) {  // MapOutputCollector是不是clazz或者其父类        throw new IOException("Invalid output collector class: " + clazz.getName() +          " (does not implement MapOutputCollector)");      }      Class subclazz =        clazz.asSubclass(MapOutputCollector.class);      LOG.debug("Trying map output collector class: " + subclazz.getName());      MapOutputCollector collector =        ReflectionUtils.newInstance(subclazz, job); //  创建collector      collector.init(context);   // 初始化 点进去      LOG.info("Map output collector class = " + collector.getClass().getName());      return collector;    } catch (Exception e) {      String msg = "Unable to initialize MapOutputCollector " + clazz.getName();      if (--remainingCollectors > 0) {        msg += " (" + remainingCollectors + " more collector(s) to try)";      }      lastException = e;      LOG.warn(msg, e);    }  }}

这个init方法十分的关键,不仅涉及了环形缓冲区,还涉及了Spill

public void init(MapOutputCollector.Context context                     // 这个方法中,主要就是对收集器对象进行一些初始化                ) throws IOException, ClassNotFoundException {  job = context.getJobConf();  reporter = context.getReporter();  mapTask = context.getMapTask();  mapOutputFile = mapTask.getMapOutputFile();  sortPhase = mapTask.getSortPhase();  spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);  partitions = job.getNumReduceTasks();  rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();  //sanity checks  final float spillper =    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);  // 设置环形缓冲区溢写比例为0.8  final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,      MRJobConfig.DEFAULT_IO_SORT_MB);  //  默认环形缓冲区大小为100M  indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);  if (spillper > (float)1.0 || spillper <= (float)0.0) {    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +        "\": " + spillper);  }  if ((sortmb & 0x7FF) != sortmb) {    throw new IOException(        "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);  }  // 排序,默认使用的快排  // 获取到排序对象,在数据由环形缓冲区溢写到磁盘中前  // 并且排序是针对索引的,并非对数据进行排序。  sorter = ReflectionUtils.newInstance(job.getClass(               MRJobConfig.MAP_SORT_CLASS, QuickSort.class,               IndexedSorter.class), job);  // buffers and accounting  // 对环形缓冲区初始化,大名鼎鼎的环形缓冲区本质上是个byte数组  int maxMemUsage = sortmb << 20;  // 将MB转换为Bytes  // 一对kv数据有四个元数据MATE,分别是valstart,keystart,partitions,vallen,都是int类型  // METASIZE 就是4个int转换成byte就是4*4  maxMemUsage -= maxMemUsage % METASIZE;  // 计算METE数据存储的大小  kvbuffer = new byte[maxMemUsage]; // 元数据数组  以byte为单位  bufvoid = kvbuffer.length;  kvmeta = ByteBuffer.wrap(kvbuffer)     .order(ByteOrder.nativeOrder())     .asIntBuffer();  // 将byte单位的kvbuffer转换成int单位的kvmeta  setEquator(0);  bufstart = bufend = bufindex = equator;  kvstart = kvend = kvindex;  // kvmeta中存放元数据实体的最大个数  maxRec = kvmeta.capacity() / NMETA;  softLimit = (int)(kvbuffer.length * spillper); // buffer 溢写的阈值  bufferRemaining = softLimit;  LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);  LOG.info("soft limit at " + softLimit);  LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);  LOG.info("kvstart = " + kvstart + "; length = " + maxRec);  // k/v serialization  comparator = job.getOutputKeyComparator();  keyClass = (Class)job.getMapOutputKeyClass();  valClass = (Class)job.getMapOutputValueClass();  serializationFactory = new SerializationFactory(job);  keySerializer = serializationFactory.getSerializer(keyClass);  keySerializer.open(bb);  // 将key写入bb中 blockingbuffer  valSerializer = serializationFactory.getSerializer(valClass);  valSerializer.open(bb); // 将value写入bb中  // output counters  mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);  mapOutputRecordCounter =    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);  fileOutputByteCounter = reporter      .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);  // compression  压缩器,减少shuffle数据量  if (job.getCompressMapOutput()) {    Class codecClass =      job.getMapOutputCompressorClass(DefaultCodec.class);    codec = ReflectionUtils.newInstance(codecClass, job);  } else {    codec = null;  }  // combiner  // combiner  map端的reduce  final Counters.Counter combineInputCounter =    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);  combinerRunner = CombinerRunner.create(job, getTaskID(),                                          combineInputCounter,                                         reporter, null);  if (combinerRunner != null) {    final Counters.Counter combineOutputCounter =      reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);    combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, job);  } else {    combineCollector = null;  }  // 溢写线程  spillInProgress = false;  minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);  spillThread.setDaemon(true); //  是个守护线程  spillThread.setName("SpillThread"); //  spillLock.lock();  try {    spillThread.start();  // 启动一个spill线程    while (!spillThreadRunning) {      spillDone.await();    }  } catch (InterruptedException e) {    throw new IOException("Spill thread failed to initialize", e);  } finally {    spillLock.unlock();  }  if (sortSpillException != null) {    throw new IOException("Spill thread failed to initialize",        sortSpillException);  }}

从这个类,我们可以看到环形缓冲区的一些初始化过程,如大小为100M,开始溢写的比例是0.8,实际上,Collector是一个宏观的概念,本质上就是一个MapOutputBuffer对象。

后面还启动了Spill线程,不过如果是第一次进去会被阻塞这里我们先按下不表。

至此,一些map开始之前的工作已经准备好了,至于它是怎么工作的我们可以从我们写的mapper中write方法debug进去,发现其实还是NewOutputCollector中定义的write方法,点进去是MapOutputBuffer的collect方法

public synchronized void collect(K key, V value, final int partition                                 ) throws IOException {  reporter.progress();  if (key.getClass() != keyClass) {    throw new IOException("Type mismatch in key from map: expected "                          + keyClass.getName() + ", received "                          + key.getClass().getName());  }  if (value.getClass() != valClass) {    throw new IOException("Type mismatch in value from map: expected "                          + valClass.getName() + ", received "                          + value.getClass().getName());  }  if (partition < 0 || partition >= partitions) {    throw new IOException("Illegal partition for " + key + " (" +        partition + ")");  }  checkSpillException();  bufferRemaining -= METASIZE;  // 新数据collect时,先将元数据长度前去,之后判断  if (bufferRemaining <= 0) { // 说明已经超过阈值了    // start spill if the thread is not running and the soft limit has been    // reached    spillLock.lock();    try {      do {        // 首次spill时,spillInProgress是false        if (!spillInProgress) {          final int kvbidx = 4 * kvindex; // 单位是byte          final int kvbend = 4 * kvend;  // 单位是byte          // serialized, unspilled bytes always lie between kvindex and          // bufindex, crossing the equator. Note that any void space          // created by a reset must be included in "used" bytes          final int bUsed = distanceTo(kvbidx, bufindex);  // 剩下可以写入的空间大小          final boolean bufsoftlimit = bUsed >= softLimit;  // true说明已经超过softLimit了          if ((kvbend + METASIZE) % kvbuffer.length !=              equator - (equator % METASIZE)) {            // spill finished, reclaim space            resetSpill();            bufferRemaining = Math.min(                distanceTo(bufindex, kvbidx) - 2 * METASIZE,                softLimit - bUsed) - METASIZE;  // 这里是重新选择equator吧,但是计算方式不了解            continue;          } else if (bufsoftlimit && kvindex != kvend) {            // spill records, if any collected; check latter, as it may            // be possible for metadata alignment to hit spill pcnt            startSpill();  //开始溢写,里面唤醒spill线程              final int avgRec = (int)              (mapOutputByteCounter.getCounter() /              mapOutputRecordCounter.getCounter());            // leave at least half the split buffer for serialization data            // ensure that kvindex >= bufindex            final int distkvi = distanceTo(bufindex, kvbidx);            final int newPos = (bufindex +              Math.max(2 * METASIZE - 1,                      Math.min(distkvi / 2,                               distkvi / (METASIZE + avgRec) * METASIZE)))              % kvbuffer.length;            setEquator(newPos);            bufmark = bufindex = newPos;            final int serBound = 4 * kvend;            // bytes remaining before the lock must be held and limits            // checked is the minimum of three arcs: the metadata space, the            // serialization space, and the soft limit            bufferRemaining = Math.min(                // metadata max                distanceTo(bufend, newPos),                Math.min(                  // serialization max                  distanceTo(newPos, serBound),                  // soft limit                  softLimit)) - 2 * METASIZE;          }        }      } while (false);   // 这是什么写法?????    } finally {      spillLock.unlock();    }  }  // 直接写入buffer,不涉及spill  try {    // serialize key bytes into buffer    int keystart = bufindex;    keySerializer.serialize(key);    // key所占空间被bufvoid分隔,则移动key,    // 将其值放在连续的空间中便于sort时key的对比    if (bufindex < keystart) {      // wrapped the key; must make contiguous      bb.shiftBufferedKey();      keystart = 0;    }    // serialize value bytes into buffer    final int valstart = bufindex;    valSerializer.serialize(value);    // It"s possible for records to have zero length, i.e. the serializer    // will perform no writes. To ensure that the boundary conditions are    // checked and that the kvindex invariant is maintained, perform a    // zero-length write into the buffer. The logic monitoring this could be    // moved into collect, but this is cleaner and inexpensive. For now, it    // is acceptable.    bb.write(b0, 0, 0);    // the record must be marked after the preceding write, as the metadata    // for this record are not yet written    int valend = bb.markRecord();    mapOutputRecordCounter.increment(1);    mapOutputByteCounter.increment(        distanceTo(keystart, valend, bufvoid)); //计数器+1    // write accounting info    kvmeta.put(kvindex + PARTITION,               );    kvmeta.put(kvindex + KEYSTART, keystart);    kvmeta.put(kvindex + VALSTART, valstart);    kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));    // advance kvindex    kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();  } catch (MapBufferTooSmallException e) {    LOG.info("Record too large for in-memory buffer: " + e.getMessage());    spillSingleRecord(key, value, partition);  // 长record就直接写入磁盘    mapOutputRecordCounter.increment(1);    return;  }}

这里首先最重要的方法就是第46行的startSpill()方法,这里点进去会发现一个spillReady.signal(),这就是唤醒之前因spillReady.await()方法阻塞的spill线程,这里的spillReady就是可重入锁,这里spill开始正式工作,这里涉及了环形缓冲区如何写和如何读,会比较抽象,我之后再写一篇关于环形缓冲区的文章。

这里代码就是Collect,本质上就是map端将输出的(k,v)数据和它的元数据写入MapOutputBuffer中。

此外,这个代码里也有唤醒spill线程的代码,找到SpillThread的run方法,很明显里面有个很重要的方法sortAndSpill

private void sortAndSpill() throws IOException, ClassNotFoundException,                                   InterruptedException {  //approximate the length of the output file to be the length of the  //buffer + header lengths for the partitions  final long size = distanceTo(bufstart, bufend, bufvoid) +              partitions * APPROX_HEADER_LENGTH;  // 写出长度  FSDataOutputStream out = null;  FSDataOutputStream partitionOut = null;  try {    // create spill file    final SpillRecord spillRec = new SpillRecord(partitions);    final Path filename =        mapOutputFile.getSpillFileForWrite(numSpills, size);// 默认是output/spillx.out    out = rfs.create(filename);// 创建分区文件    final int mstart = kvend / NMETA;    final int mend = 1 + // kvend is a valid record      (kvstart >= kvend      ? kvstart      : kvmeta.capacity() + kvstart) / NMETA;    // 对元数据进行排序,先按照partition进行排序,再按照key值进行排序    // 二次排序,排的是元数据部分    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);    int spindex = mstart;    final IndexRecord rec = new IndexRecord();    final InMemValBytes value = new InMemValBytes();    for (int i = 0; i < partitions; ++i) {//循环分区      // 溢写时的临时文件 类型是IFile      IFile.Writer writer = null;      try {        long segmentStart = out.getPos();        partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);        writer = new Writer(job, partitionOut, keyClass, valClass, codec,                                  spilledRecordsCounter);        if (combinerRunner == null) {          // spill directly          DataInputBuffer key = new DataInputBuffer();          // 写入相同的partition数据          while (spindex < mend &&              kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {            final int kvoff = offsetFor(spindex % maxRec);            int keystart = kvmeta.get(kvoff + KEYSTART);            int valstart = kvmeta.get(kvoff + VALSTART);            key.reset(kvbuffer, keystart, valstart - keystart);            getVBytesForOffset(kvoff, value);            writer.append(key, value);            ++spindex;          }        } else {    // 进行combiner,避免小文件问题          int spstart = spindex;          while (spindex < mend &&              kvmeta.get(offsetFor(spindex % maxRec)                        + PARTITION) == i) {            ++spindex;          }          // Note: we would like to avoid the combiner if we"ve fewer          // than some threshold of records for a partition          if (spstart != spindex) {            combineCollector.setWriter(writer);            RawKeyValueIterator kvIter =              new MRResultIterator(spstart, spindex);            combinerRunner.combine(kvIter, combineCollector);          }        }        // close the writer        writer.close();  ///  将文件写入本地磁盘中,不是HDFS上        if (partitionOut != out) {          partitionOut.close();          partitionOut = null;        }        // record offsets        // 记录当前partition i的信息写入索文件rec中        rec.startOffset = segmentStart;        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);        //spillRec中存放了spill中partition的信息        spillRec.putIndex(rec, i);        writer = null;      } finally {        if (null != writer) writer.close();      }    }    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {      // create spill index file      Path indexFilename =          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions              * MAP_OUTPUT_INDEX_RECORD_LENGTH);      spillRec.writeToFile(indexFilename, job);  // 将内存中的index文件写入磁盘    } else {      indexCacheList.add(spillRec);      totalIndexCacheMemory +=        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;    }    LOG.info("Finished spill " + numSpills);    ++numSpills;  } finally {    if (out != null) out.close();    if (partitionOut != null) {      partitionOut.close();    }  }}

很明显,spill有两个临时文件生成,一个是(k,v)文件,它保存在默认路径是output/spill{x}.out文件中,注意,这段代码里并没有明显的将(k,v)文件写入磁盘的代码,这些代码在writer.close()中实现。而另一个明显写入磁盘的是spillRec.writeToFile(indexFilename, job),这个存放的每个partition的index。

在SpillThread在辛辛苦苦进行sortAndSpill工作时,map Task 也不断地产生新(k,v)写入MapOutputBuffer中,环形缓冲区的读线程和写线程同时工作!!怎么避免冲突呢?答案是反向写。

红色箭头是写(k,v)数据,蓝色箭头是写元数据,紫色是预留的百分之20的空间不能写,绿色是已经写入的数据部分,正在被spill线程读取操作。

至此,spillsort阶段算是大功告成,那么还有个疑问,如果MapOutPutBuffer还有部分数据,但这部分数据并没有达到spill的标准,怎么办呢?还是回到NewOutputCollector部分中close方法,里面有MapOutputBuffer的flush方法会解决这个问题。

最后就是Map Task中Shuffle过程的最后一个阶段Merge,这部分有点多就不贴代码了,感兴趣的同学可以查看MapOutputBuffer中mergeParts方法,这个方法在上面的flush方法里调用,该作用是合并spill阶段产生出来的out文件和index文件。

Merge过程目的很简单,但是过程确实很复杂。首先,Merge过程会扫描目录获取out文件的地址,存放一个数组中,同时也会获得index文件,存放到另一个数组中。好奇的同学可能再想既然又要读入到内存中,当初为啥要刷进磁盘里呢,这不是闲着没事干嘛,确实,这是MapReduce的缺陷,太过于批处理了,磁盘IO也限制了它的其他可能性,比如机器学习需要反复迭代,MapReduce就做不了这个,但是这一步确实很有必要的,因为早期内存很贵,不是每个人都是土豪的,考虑到OOM的风险,把所有的(K,V)数据和index数据刷进磁盘是非常有必要的,但是后面又可以全读入内存,那是因为缓存缓冲区这个大东西已经不再使用,内存就富裕起来了。

同时,Merge过程还涉及到归并算法,这个并不是简单的归并过程,而是一个很复杂的过程,因为考虑到一个partition并不只存在一种key,所以源码里有着相当复杂的过程同时注释也很迷惑人,注释里有优先队列和Heap的字样,看代码的时候可能以为采用了堆排序,有兴趣的同学可以看看,并不是太重要(ps我也看得一知半解)。

Reduce

Reduce部分我就长话短说,只看重点了。

同样,第一步就是查看 Reduce Task的run方法,这是启动redduce逻辑的自动过程

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)   throws IOException, InterruptedException, ClassNotFoundException {   job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());   if (isMapOrReduce()) { // reduce的三个阶段     copyPhase = getProgress().addPhase("copy");     sortPhase  = getProgress().addPhase("sort");     reducePhase = getProgress().addPhase("reduce");   }   // start thread that will handle communication with parent   // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)   TaskReporter reporter = startReporter(umbilical);      boolean useNewApi = job.getUseNewReducer();   initialize(job, getJobID(), reporter, useNewApi);//核心代码,初始化任务   // check if it is a cleanupJobTask   if (jobCleanup) {     runJobCleanupTask(umbilical, reporter);     return;   }   if (jobSetup) {     runJobSetupTask(umbilical, reporter);     return;   }   if (taskCleanup) {     runTaskCleanupTask(umbilical, reporter);     return;   }      // Initialize the codec   codec = initCodec();   RawKeyValueIterator rIter = null;   ShuffleConsumerPlugin shuffleConsumerPlugin = null;      Class combinerClass = conf.getCombinerClass();   CombineOutputCollector combineCollector =      (null != combinerClass) ?     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;   Class clazz =         job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);// 设置shuffle插件   shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);   LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);   ShuffleConsumerPlugin.Context shuffleContext =      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,                  super.lDirAlloc, reporter, codec,                  combinerClass, combineCollector,                  spilledRecordsCounter, reduceCombineInputCounter,                 shuffledMapsCounter,                 reduceShuffleBytes, failedShuffleCounter,                 mergedMapOutputsCounter,                 taskStatus, copyPhase, sortPhase, this,                 mapOutputFile, localMapFiles);   shuffleConsumerPlugin.init(shuffleContext);   // 执行shuffle过程中的远程数据拉取,在拉取的过程中   // 内部 启动 map-completion event fetch线程 获取map端完成的event信息   // 在开启默认5个的fetch 线程 拉取数据,里面核心函数就是一直点进去是doShuffle,有两种一种是in-memory另一种就是on-disk   // 超出shuffle内存就merge到disk   // shuffle插件内部有个mergeMangager 会在合适的时候就是快超过shuffle内存缓存的时候,启动merge线程   // 这个表面是一次网络IO,本质上是一个RPC,通过umbilical代理获取已经完成的MapTask任务的taskAttempt的ID,存入schedule中,为后面shuffle做准备   rIter = shuffleConsumerPlugin.run();   // free up the data structures   // 一个sort set,是TreeSet数据结构·   mapOutputFilesOnDisk.clear();      sortPhase.complete();                         // sort is complete   setPhase(TaskStatus.Phase.REDUCE);    statusUpdate(umbilical);   Class keyClass = job.getMapOutputKeyClass();   Class valueClass = job.getMapOutputValueClass();   RawComparator comparator = job.getOutputValueGroupingComparator();   if (useNewApi) {     runNewReducer(job, umbilical, reporter, rIter, comparator,                    keyClass, valueClass); // 执行reduce操作,(用户定义的逻辑)   } else {     runOldReducer(job, umbilical, reporter, rIter, comparator,                    keyClass, valueClass);   }   shuffleConsumerPlugin.close();   done(umbilical, reporter); }

Reduce Task的重点比较清晰,就是57行的初始化shuffleConsumerPlugin这个Shuffle插件,以及66行运行这个插件,让他拉取数据。

初始化shuffle插件过程中,有两个组件一个是schedule调度器,另一个就是MergeManager,这个MergeManger有大用处。

接下来查看run方法

public RawKeyValueIterator run() throws IOException, InterruptedException {  // Scale the maximum events we fetch per RPC call to mitigate OOM issues  // on the ApplicationMaster when a thundering herd of reducers fetch events  // TODO: This should not be necessary after HADOOP-8942  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);  // Start the map-completion events fetcher thread  // 启动 一个 event fetcher线程 获取map端完成的event信息  final EventFetcher eventFetcher =     new EventFetcher(reduceId, umbilical, scheduler, this,        maxEventsToFetch);  eventFetcher.start();    // Start the map-output fetcher threads  启动fetch线程  // fetch 线程 远程从map端拉取对应partition的数据  boolean isLocal = localMapFiles != null;  final int numFetchers = isLocal ? 1 :    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);  Fetcher[] fetchers = new Fetcher[numFetchers];  if (isLocal) {    fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),        localMapFiles);    fetchers[0].start();  } else {    for (int i=0; i < numFetchers; ++i) {      fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,                                      reporter, metrics, this,                                      reduceTask.getShuffleSecret());      fetchers[i].start();    }  }    // Wait for shuffle to complete successfully  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {    reporter.progress();        synchronized (this) {      if (throwable != null) {        throw new ShuffleError("error in shuffle in " + throwingThreadName,                               throwable);      }    }  }  // Stop the event-fetcher thread  eventFetcher.shutDown();    // Stop the map-output fetcher threads  for (Fetcher fetcher : fetchers) {    fetcher.shutDown();  }    // stop the scheduler  scheduler.close();  copyPhase.complete(); // copy is already complete  taskStatus.setPhase(TaskStatus.Phase.SORT);  reduceTask.statusUpdate(umbilical);  // Finish the on-going merges...  RawKeyValueIterator kvIter = null;  try {    kvIter = merger.close();  } catch (Throwable e) {    throw new ShuffleError("Error while doing final merge " , e);  }  // Sanity check  synchronized (this) {    if (throwable != null) {      throw new ShuffleError("error in shuffle in " + throwingThreadName,                             throwable);    }  }    return kvIter;}

重点就是两线程,一种是Event fetch,另一种是fetch线程

首先,event fetch线程的作用是获取TaskAttempt的ID等信息,存入schedule中,方面以后Shuffle尤其是sort时使用,本质上这是个RPC,注意看event fetch初始化时的参数里有个umbilical代理对象。

而fetch线程的工作原理是通过HTTP向各个Map任务拖取它所需要的数据(至于HTTP和RPC的区别有兴趣的同学可以查查),里面最核心的方法是doShuffle(一直点进去才能找到这个),在Copy的同时还会MergeSort。doShuffle它有两个实现,一个是In-memory,另一个是On-disk有两个实现(同样的,Merge也分为这两种)。是基于考虑到拉取相同的key值可能有很大的数据量,那么有必要写入磁盘中了,但为了减少这种情况,在达到缓存区(默认是64K)阈值的时候会将数据merge(如果太大的话就在磁盘中merge),Merge的工作就是交给Shuffle插件的MergeManager管理。

所以,copy和Merge和Sort是重叠过程的。

至此,Shuffle部分的源码基本讲解完成。

参考资料

MapReduce ReduceTask源码解析

MapReduce中的shuffle详解

环形缓冲区

上一篇:【短讯】剑南春的十年沉浮:74岁董事长被罚4亿,营收百亿早已掉队

下一篇:暖了!河南下周多地最高气温重回20℃+