前言:

最近一直在分析hadoop的运行流程,我们查阅了大量的资料,虽然从感性上对这个流程有了一个认识但是我总是感觉对mapreduce的运行还是没有一个全面的认识,所以决定从源代码级别对mapreduce的运行流程做一个分析。

前奏:

首先从任务提交开始,如果我们使用的是job类的话那么提交任务的触发语句是

job.waitForCompletion(true),true表示运行时打印运行的信息;

在 eclipse中我们按F3键可以发现这个方法的代码,这个方法实际是调用了job类的submit方法,而submit方法又调用 submitJobInternal(conf)的方法提交任务,然后这个方法会将job的job.jar,job.split,job.xml三个文件 上传倒hdfs文件系统,

 ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {

       public RunningJob run() throws FileNotFoundException,

       ClassNotFoundException,

       InterruptedException,

       IOException{

         JobConf jobCopy = job;

         Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,

             jobCopy);

         //得到job的ID;

         JobID jobId = jobSubmitClient.getNewJobId();

       //上传Job的路径

         Path submitJobDir = new Path(jobStagingArea, jobId.toString());

         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());

         JobStatus status = null;

         try {

           populateTokenCache(jobCopy, jobCopy.getCredentials());

           copyAndConfigureFiles(jobCopy, submitJobDir);

         //获得namenode的任务代理

           // get delegation token for the dir

           TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),

                                               new Path [] {submitJobDir},

                                               jobCopy);

        //得到job配置路径;

           Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

        //设置job的reduce的任务数目默认为1

           int reduces = jobCopy.getNumReduceTasks();

        //得到本地的IP

           InetAddress ip = InetAddress.getLocalHost();

           if (ip != null) {

             job.setJobSubmitHostAddress(ip.getHostAddress());

             job.setJobSubmitHostName(ip.getHostName());

           }

           JobContext context = new JobContext(jobCopy, jobId);

           jobCopy = (JobConf)context.getConfiguration();

           // Check the output specification

           if (reduces == 0 ? jobCopy.getUseNewMapper() :

             jobCopy.getUseNewReducer()) {

             org.apache.hadoop.mapreduce.OutputFormat<?,?> output =

               ReflectionUtils.newInstance(context.getOutputFormatClass(),

                   jobCopy);

             output.checkOutputSpecs(context);

           } else {

             jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);

           }

          //创建文件的split

           FileSystem fs = submitJobDir.getFileSystem(jobCopy);

           LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));

           int maps = writeSplits(context, submitJobDir);

           jobCopy.setNumMapTasks(maps);

           // write "queue admins of the queue to which job is being submitted"

           // to job file.

           String queue = jobCopy.getQueueName();

           AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);

           jobCopy.set(QueueManager.toFullPropertyName(queue,

               QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

           // Write job file to JobTracker's fs

           FSDataOutputStream out =

             FileSystem.create(fs, submitJobFile,

                 new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

           try {

              //写入xml文件

             jobCopy.writeXml(out);

           } finally {

             out.close();

           }

           //

           // Now, actually submit the job (using the submit name)

           //

           printTokens(jobId, jobCopy.getCredentials());

        //使用代理机制提交作业

           status = jobSubmitClient.submitJob(

               jobId, submitJobDir.toString(), jobCopy.getCredentials());

           if (status != null) {

             return new NetworkedJob(status);

           } else {

             throw new IOException("Could not launch job");

           }

         } finally {

           if (status == null) {

             LOG.info("Cleaning up the staging area " + submitJobDir);

             if (fs != null && submitJobDir != null)

               fs.delete(submitJobDir, true);

           }

         }

       }

     });

查看这个方法我们发现在这里我们可以得到一个job的路径和job的id。然后使用代理机制提交作业。

任务提交完之后jobtracker会产生一个JobInProgress类的实例,这个类表示了job的各种信息,以及job所需要执行的各种动作。JobTracker收倒提交的数据之后就会根据job的配置tasktracker分配任务,当所有的tasktracker执行完之后才会通知jobclient任务完成。Jobtracker会将job加到一个队列中去这个队列叫jobInitQuene,然后在JobTracker有一个名为JobQueueTaskSchedule的对象,会轮询队列的每一个对象,一旦有新的job加入就将其取出,然后将其初始化。对于每个task还会有一个TaskInprogress对象与其对应。TaskTracker启动之后和JobTracker的通信机制是通过心跳机制。

TaskTracker 每个三秒钟向JobTracker发送一个JobTracker发送一个HearBeat,HeartBeat中会有很多Taskracker的信息。 JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果 TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在 Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的 Heartbeat的回复消息称为HeartbeatResponse。

在 TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内 部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任 务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。

不 论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的 文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。 等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。

等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。

代码详解:

当 我们在hadoop中bin/start-all.sh之后我们查看脚本会发现,它启动了三个脚本hadoop-config.sh,start- dfs.sh ,start-mapred.sh。Hadoop会根据一系列的配置启动JobTracker和TaskTracker。Master会根据SSH登录登 录到slaves机器上启动tasktracker和datanode。

下面结合hadoop的源代码进行流程分析

先介绍JobTracker和TaskTracker

在 每一个JobTracker对应一个org.apache.hadoop.mapred.JobTracker类,这个类主要负责任务的接受,调度以及对 TaskTracker的监控,每个JobTracker类是作为一个单独的JVM来使用的。通过方法startTracker()方法启动一个 JobTracker。源代码:

 /**

   * Start the JobTracker with given configuration.

   *

   * The conf will be modified to reflect the actual ports on which

   * the JobTracker is up and running if the user passes the port as

   * <code>zero</code>.

   *

   * @param conf configuration for the JobTracker.

   * @throws IOException

   */

 public static JobTracker startTracker(JobConf conf, String identifier)

  throws IOException, InterruptedException {

    DefaultMetricsSystem.initialize("JobTracker");

    JobTracker result = null;

    while (true) {

      try {

       //初始化 JobTracker名为result

        result = new JobTracker(conf, identifier);

        result.taskScheduler.setTaskTrackerManager(result);

        break;

      } catch (VersionMismatch e) {

        throw e;

      } catch (BindException e) {

        throw e;

      } catch (UnknownHostException e) {

        throw e;

      } catch (AccessControlException ace) {

        // in case of jobtracker not having right access

        // bail out

        throw ace;

      } catch (IOException e) {

        LOG.warn("Error starting tracker: " +

                 StringUtils.stringifyException(e));

      }

      Thread.sleep(1000);

    }

    if (result != null) {

      JobEndNotifier.startNotifier();

      MBeans.register("JobTracker", "JobTrackerInfo", result);

    }

    return result;

  }

startTracker根据conf配置创建JobTracker对象,然后进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。

还有一个重要的方法就是offerService方法

/**

   * Run forever

   */

  public void offerService() throws InterruptedException, IOException {

    // Prepare for recovery. This is done irrespective of the status of restart

    // flag.

    while (true) {

      try {

        recoveryManager.updateRestartCount();

        break;

      } catch (IOException ioe) {

        LOG.warn("Failed to initialize recovery manager. ", ioe);

        // wait for some time

        Thread.sleep(FS_ACCESS_RETRY_PERIOD);

        LOG.warn("Retrying...");

      }

    }

    taskScheduler.start();

    //  Start the recovery after starting the scheduler

    try {

    //恢复

      recoveryManager.recover();

    } catch (Throwable t) {

      LOG.warn("Recovery manager crashed! Ignoring.", t);

    }

    // refresh the node list as the recovery manager might have added

    // disallowed trackers

    refreshHosts();

    this.expireTrackersThread = new Thread(this.expireTrackers,

                                          "expireTrackers");

    this.expireTrackersThread.start();

    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");

    this.retireJobsThread.start();

    expireLaunchingTaskThread.start();

    if (completedJobStatusStore.isActive()) {

      completedJobsStoreThread = new Thread(completedJobStatusStore,

                                            "completedjobsStore-housekeeper");

      completedJobsStoreThread.start();

    }

    // start the inter-tracker server once the jt is ready

    this.interTrackerServer.start();

    synchronized (this) {

      state = State.RUNNING;

    }

    LOG.info("Starting RUNNING");

    this.interTrackerServer.join();

    LOG.info("Stopped interTrackerServer");

  }

这个方法会一直执行,这里虽然是个死循环但是它能不断的恢复,启动任务,通过这种方式首先调度。

该 方法调用了taskSchedule对象的start()方法。该对象是JobTracker的数据成员,类型提供了一些列接口,使得 JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类为 JobQueueTaskSchedule类。所以,taskSchedule.start()方法实际执行的是JobQueueSchedule的 start方法;

/**

   * Lifecycle method to allow the scheduler to start any work in separate

   * threads.

   * @throws IOException

   */

  public void start() throws IOException {

    // do nothing

  }

    //请看JobQueueSchedule的start方法;

     public synchronized void start() throws IOException {

           super.start();//什么都不做

    //添加JobInProgressListener监听器

    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);

    //添加任务初始化监听器

    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);

    eagerTaskInitializationListener.start();//启动

    taskTrackerManager.addJobInProgressListener(

        eagerTaskInitializationListener);

  }

JobQueueTaskScheduler 类的start方法主要注册了两个非常重要的监听器jobQueueJobInProgressListener和 eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监 听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方 法,对job进行初始化。

在看一下JobInProgress类的initTasks方法

/**

   * Construct the splits, etc.  This is invoked from an async

   * thread so that split-computation doesn't block anyone.

   */

  public synchronized void initTasks()

  throws IOException, KillInterruptedException {

    if (tasksInited || isComplete()) {

      return;

    }

    synchronized(jobInitKillStatus){

      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {

        return;//已经启动或者任务被kill直接结束方法;

      }

      jobInitKillStatus.initStarted = true;

    }

    LOG.info("Initializing " + jobId);

    final long startTimeFinal = this.startTime;

    // log job info as the user running the job

try {

在这里执行job

    userUGI.doAs(new PrivilegedExceptionAction<Object>() {

      @Override

      public Object run() throws Exception {

        JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,

            startTimeFinal, hasRestarted());

        return null;

      }

    });

    } catch(InterruptedException ie) {

      throw new IOException(ie);

    }

    // log the job priority

    setPriority(this.priority);

    //

    // generate security keys needed by Tasks

    //

    generateAndStoreTokens();

    //

    // read input splits and create a map per a split

    //

    //读取每个spit

    TaskSplitMetaInfo[] splits = createSplits(jobId);

    if (numMapTasks != splits.length) {

      throw new IOException("Number of maps in JobConf doesn't match number of " +

           "recieved splits for job " + jobId + "! " +

           "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);

    }

    numMapTasks = splits.length;

    //map和reduce任务等待直到得到slots才开始执行;

    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);

    jobtracker.getInstrumentation().addWaitingReduces(getJobID(),numReduceTasks;

下面通过初始化map任务

maps = new TaskInProgress[numMapTasks];

    for(int i=0; i < numMapTasks; ++i) {

      inputLength += splits[i].getInputDataLength();

    //初始化map任务;

      maps[i] = new TaskInProgress(jobId, jobFile,

                                   splits[i],

                                   jobtracker, conf, this, i, numSlotsPerMap);

    }

    LOG.info("Input size for job " + jobId + " = " + inputLength

        + ". Number of splits = " + splits.length);

    // Set localityWaitFactor before creating cache

    localityWaitFactor =

      conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);

    if (numMapTasks > 0) {

    //创建

      nonRunningMapCache = createCache(splits, maxLevel);

    }

    // set the launch time

    this.launchTime = jobtracker.getClock().getTime();

在这里创建reduce任务;

  //

    // Create reduce tasks

    //

    this.reduces = new TaskInProgress[numReduceTasks];

    for (int i = 0; i < numReduceTasks; i++) {

      reduces[i] = new TaskInProgress(jobId, jobFile,

                                      numMapTasks, i,

                                      jobtracker, conf, this, numSlotsPerReduce);

      nonRunningReduces.add(reduces[i]);

    }

在这里计算启动reduce任务时最小的map任务数目

 completedMapsForReduceSlowstart =

      (int)Math.ceil(

          (conf.getFloat("mapred.reduce.slowstart.completed.maps",

                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *

           numMapTasks));

估计所有的map输出的数目

 resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

    // create cleanup two cleanup tips, one map and one reduce.

    cleanup = new TaskInProgress[2];

    // cleanup map tip. This map doesn't use any splits. Just assign an empty

Split的信息.

  TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;

    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

            jobtracker, conf, this, numMapTasks, 1);

    cleanup[0].setJobCleanupTask();

    // cleanup reduce tip.

    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                       numReduceTasks, jobtracker, conf, this, 1);

    cleanup[1].setJobCleanupTask();

    // create two setup tips, one map and one reduce.

    setup = new TaskInProgress[2];

    // setup map tip. This map doesn't use any split. Just assign an empty

    // split.

    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

            jobtracker, conf, this, numMapTasks + 1, 1);

    setup[0].setJobSetupTask();

    // setup reduce tip.

    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                       numReduceTasks + 1, jobtracker, conf, this, 1);

    setup[1].setJobSetupTask();

    synchronized(jobInitKillStatus){

      jobInitKillStatus.initDone = true;

      if(jobInitKillStatus.killed) {

        throw new KillInterruptedException("Job " + jobId + " killed in init");

      }

    }

    tasksInited = true;

    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,

                                 numMapTasks, numReduceTasks);

   // Log the number of map and reduce tasks

   LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks

            + " map tasks and " + numReduceTasks + " reduce tasks.");

  }

代码中我们发现maptask和reducetTask都是TaskInProgress的实例,并且Task的数目与map的数目保持一致。

接下来要创建datanode的tasktracker

在hadoop中每一个tasktracker对应一个org.apache.hadoop.mapred.TaskTracker 类,这个类实现了tasktracker的各种功能。每一个TaskTracker也是组为一个单独的JVM来使用的。在hadoop脚本中对应 bin/hadoop-daemon.sh start jobtracker

先来看一下TaskTracker的主函数:

 /**

   * Start the TaskTracker, point toward the indicated JobTracker

   */

  public static void main(String argv[]) throws Exception {

    StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);

    if (argv.length != 0) {

      System.out.println("usage: TaskTracker");

      System.exit(-1);

    }

    try {

      JobConf conf=new JobConf();

      // enable the server to track time spent waiting on locks

      //使用代理机制让服务器等待判断

      ReflectionUtils.setContentionTracing

        (conf.getBoolean("tasktracker.contention.tracking", false));

      DefaultMetricsSystem.initialize("TaskTracker");

       TaskTracker tt = new TaskTracker(conf);

      MBeans.register("TaskTracker", "TaskTrackerInfo", tt);

    //做重要的语句启动 TaskTracker

      tt.run();

    } catch (Throwable e) {

      LOG.error("Can not start task tracker because "+

                StringUtils.stringifyException(e));

      System.exit(-1);

    }

  }

在run方法中会启动方法offerService()方法:

public void run() {

    try {

      getUserLogManager().start();

      startCleanupThreads();

      boolean denied = false;

      while (running && !shuttingDown && !denied) {

        boolean staleState = false;

        try {

          // This while-loop attempts reconnects if we get network errors

          while (running && !staleState && !shuttingDown && !denied) {

            try {

              State osState = offerService();

              if (osState == State.STALE) {

                staleState = true;

              } else if (osState == State.DENIED) {

                denied = true;

              }

            } catch (Exception ex) {

              if (!shuttingDown) {

                LOG.info("Lost connection to JobTracker [" +

                         jobTrackAddr + "].  Retrying...", ex);

                try {

                  Thread.sleep(5000);

                } catch (InterruptedException ie) {

                }

              }

            }

          }

        } finally {

          close();

        }

        if (shuttingDown) { return; }

        LOG.warn("Reinitializing local state");

        initialize();

      }

      if (denied) {

        shutdown();

      }

    } catch (IOException iex) {

      LOG.error("Got fatal exception while reinitializing TaskTracker: " +

                StringUtils.stringifyException(iex));

      return;

    }

    catch (InterruptedException i) {

      LOG.error("Got interrupted while reinitializing TaskTracker: " +

          i.getMessage());

      return;

    }

  }

再来看offerService()方法:

 State offerService() throws Exception {

    //得到上次发送心跳的时间;

    long lastHeartbeat = System.currentTimeMillis();

    while (running && !shuttingDown) {

      try {

        long now = System.currentTimeMillis();

        // accelerate to account for multiple finished tasks up-front

        synchronized (finishedCount) {

          long remaining =

            (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;

          while (remaining > 0) {

            // sleeps for the wait time or

            // until there are *enough* empty slots to schedule tasks

            finishedCount.wait(remaining);

+

            // Recompute

            now = System.currentTimeMillis();

            remaining =

              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;

          }

          // Reset count

          finishedCount.set(0);

        }

        // If the TaskTracker is just starting up:

        // 1. Verify the buildVersion

        // 2. Get the system directory & filesystem

        if(justInited) {

          String jobTrackerBV = jobClient.getBuildVersion();

          if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {

            String msg = "Shutting down. Incompatible buildVersion." +

            "\nJobTracker's: " + jobTrackerBV +

            "\nTaskTracker's: "+ VersionInfo.getBuildVersion();

            LOG.error(msg);

            try {

              jobClient.reportTaskTrackerError(taskTrackerName, null, msg);

            } catch(Exception e ) {

              LOG.info("Problem reporting to jobtracker: " + e);

            }

            return State.DENIED;

          }

          String dir = jobClient.getSystemDir();

          if (dir == null) {

            throw new IOException("Failed to get system directory");

          }

          systemDirectory = new Path(dir);

          systemFS = systemDirectory.getFileSystem(fConf);

        }

        // Send the heartbeat and process the jobtracker's directives

        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

        // Note the time when the heartbeat returned, use this to decide when to send the

        // next heartbeat

        lastHeartbeat = System.currentTimeMillis();

        // Check if the map-event list needs purging

        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();

        if (jobs.size() > 0) {

          synchronized (this) {

            // purge the local map events list

            for (JobID job : jobs) {

              RunningJob rjob;

              synchronized (runningJobs) {

                rjob = runningJobs.get(job);

                if (rjob != null) {

                  synchronized (rjob) {

                    FetchStatus f = rjob.getFetchStatus();

                    if (f != null) {

                      f.reset();

                    }

                  }

                }

              }

            }

            // Mark the reducers in shuffle for rollback

            synchronized (shouldReset) {

              for (Map.Entry<TaskAttemptID, TaskInProgress> entry

                   : runningTasks.entrySet()) {

                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {

                  this.shouldReset.add(entry.getKey());

                }

              }

            }

          }

        }

在返回的心跳heartbeatResponse中有很多jobTracker的指令

  TaskTrackerAction[] actions = heartbeatResponse.getActions();

        if(LOG.isDebugEnabled()) {

          LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +

                    heartbeatResponse.getResponseId() + " and " +

                    ((actions != null) ? actions.length : 0) + " actions");

        }

        if (reinitTaskTracker(actions)) {

          return State.STALE;

        }

        // resetting heartbeat interval from the response.

        heartbeatInterval = heartbeatResponse.getHeartbeatInterval();

        justStarted = false;

        justInited = false;

下面执行action

   if (actions != null){

          for(TaskTrackerAction action: actions) {

            if (action instanceof LaunchTaskAction) {

              添加进队列

              addToTaskQueue((LaunchTaskAction)action);

            } else if (action instanceof CommitTaskAction) {

              CommitTaskAction commitAction = (CommitTaskAction)action;

              if (!commitResponses.contains(commitAction.getTaskID())) {

                LOG.info("Received commit task action for " +

                          commitAction.getTaskID());

                commitResponses.add(commitAction.getTaskID());

              }

            } else {

              tasksToCleanup.put(action);

            }

          }

        }

        markUnresponsiveTasks();

        killOverflowingTasks();

        //we've cleaned up, resume normal operation

        if (!acceptNewTasks && isIdle()) {

          acceptNewTasks=true;

        }

        //The check below may not be required every iteration but we are

        //erring on the side of caution here. We have seen many cases where

        //the call to jetty's getLocalPort() returns different values at

        //different times. Being a real paranoid here.

        checkJettyPort(server.getPort());

      } catch (InterruptedException ie) {

        LOG.info("Interrupted. Closing down.");

        return State.INTERRUPTED;

      } catch (DiskErrorException de) {

        String msg = "Exiting task tracker for disk error:\n" +

          StringUtils.stringifyException(de);

        LOG.error(msg);

        synchronized (this) {

报告错误

          jobClient.reportTaskTrackerError(taskTrackerName,

                                           "DiskErrorException", msg);

        }

        return State.STALE;

      } catch (RemoteException re) {

        String reClass = re.getClassName();

        if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {

          LOG.info("Tasktracker disallowed by JobTracker.");

          return State.DENIED;

        }

      } catch (Exception except) {

        String msg = "Caught exception: " +

          StringUtils.stringifyException(except);

        LOG.error(msg);

      }

    }

    return State.NORMAL;

  }

TaskTracker每个3秒钟会向JobTracker发送一个心跳。心跳机制采用了RPC代理机制实现通信。关于代理机制其实就是一个远程方法调用的机制,这个大家可以参考其他的资料。

接下来JobTracker接受心跳并向TaskTracker分配任务,当JobTracker接受到心跳的时候会调用方法:heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)返回值是一个HeartbeatResponse对象:


  public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

                                                  boolean restarted,

                                                  boolean initialContact,

                                                  boolean acceptNewTasks,

                                                  short responseId)

    throws IOException {

    if (LOG.isDebugEnabled()) {

      LOG.debug("Got heartbeat from: " + status.getTrackerName() +

                " (restarted: " + restarted +

                " initialContact: " + initialContact +

                " acceptNewTasks: " + acceptNewTasks + ")" +

                " with responseId: " + responseId);

    }

    // Make sure heartbeat is from a tasktracker allowed by the jobtracker.

    if (!acceptTaskTracker(status)) {

      throw new DisallowedTaskTrackerException(status);

    }

    // First check if the last heartbeat response got through

    String trackerName = status.getTrackerName();

    long now = clock.getTime();

    if (restarted) {

      faultyTrackers.markTrackerHealthy(status.getHost());

    } else {

      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);

    }

    HeartbeatResponse prevHeartbeatResponse =

      trackerToHeartbeatResponseMap.get(trackerName);

    boolean addRestartInfo = false;

    if (initialContact != true) {

      // If this isn't the 'initial contact' from the tasktracker,

      // there is something seriously wrong if the JobTracker has

      // no record of the 'previous heartbeat'; if so, ask the

      // tasktracker to re-initialize itself.

      if (prevHeartbeatResponse == null) {

        // This is the first heartbeat from the old tracker to the newly

        // started JobTracker

        if (hasRestarted()) {

          addRestartInfo = true;

          // inform the recovery manager about this tracker joining back

          recoveryManager.unMarkTracker(trackerName);

        } else {

          // Jobtracker might have restarted but no recovery is needed

          // otherwise this code should not be reached

          LOG.warn("Serious problem, cannot find record of 'previous' " +

                   "heartbeat for '" + trackerName +

                   "'; reinitializing the tasktracker");

          return new HeartbeatResponse(responseId,

              new TaskTrackerAction[] {new ReinitTrackerAction()});

        }

      } else {

        // It is completely safe to not process a 'duplicate' heartbeat from a

        // {@link TaskTracker} since it resends the heartbeat when rpcs are

        // lost see {@link TaskTracker.transmitHeartbeat()};

        // acknowledge it by re-sending the previous response to let the

        // {@link TaskTracker} go forward.

        if (prevHeartbeatResponse.getResponseId() != responseId) {

          LOG.info("Ignoring 'duplicate' heartbeat from '" +

              trackerName + "'; resending the previous 'lost' response");

          return prevHeartbeatResponse;

        }

      }

    }

    // Process this heartbeat

    short newResponseId = (short)(responseId + 1);

    status.setLastSeen(now);

    if (!processHeartbeat(status, initialContact, now)) {

      if (prevHeartbeatResponse != null) {

        trackerToHeartbeatResponseMap.remove(trackerName);

      }

      return new HeartbeatResponse(newResponseId,

                   new TaskTrackerAction[] {new ReinitTrackerAction()});

    }

    // Initialize the response to be sent for the heartbeat

    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

    boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());

    // Check for new tasks to be executed on the tasktracker

    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);

      if (taskTrackerStatus == null) {

        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

      } else {

        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

        if (tasks == null ) {

          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));

        }

        if (tasks != null) {

          for (Task task : tasks) {

            expireLaunchingTasks.addNewTask(task.getTaskID());

            if(LOG.isDebugEnabled()) {

              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());

            }

            actions.add(new LaunchTaskAction(task));

          }

        }

      }

    }

    // kill的任务的队列

    List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);

    if (killTasksList != null) {

      actions.addAll(killTasksList);

    }

    // Check for jobs to be killed/cleanedup

    List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);

    if (killJobsList != null) {

      actions.addAll(killJobsList);

    }

    // Check for tasks whose outputs can be saved

    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);

    if (commitTasksList != null) {

      actions.addAll(commitTasksList);

    }

    // calculate next heartbeat interval and put in heartbeat response

    int nextInterval = getNextHeartbeatInterval();

    response.setHeartbeatInterval(nextInterval);

    response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));

    // check if the restart info is req

    if (addRestartInfo) {

      response.setRecoveredJobs(recoveryManager.getJobsToRecover());

    }

    // Update the trackerToHeartbeatResponseMap

    trackerToHeartbeatResponseMap.put(trackerName, response);

    // Done processing the hearbeat, now remove 'marked' tasks

    removeMarkedTasks(trackerName);

    return response;

  }

之后启用调度器,默认的调度器为JobQueueTaskScheduler它的assignTasks方法如下:

public synchronized List<Task> assignTasks(TaskTracker taskTracker)

      throws IOException {

    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();

    //得到云端的状态;

    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

    final int numTaskTrackers = clusterStatus.getTaskTrackers();

    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();

    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =

      jobQueueJobInProgressListener.getJobQueue();

    //

    // Get map + reduce counts for the current tracker.

    //

    //得到TaskTracker的状态

    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();

    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();

    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();

    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

    // 分配任务

    List<Task> assignedTasks = new ArrayList<Task>();

    //

    // Compute (running + pending) map and reduce task numbers across pool

    //

    int remainingReduceLoad = 0;

    int remainingMapLoad = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() == JobStatus.RUNNING) {

          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());

          if (job.scheduleReduces()) {

            remainingReduceLoad +=

              (job.desiredReduces() - job.finishedReduces());

          }

        }

      }

    }

    // Compute the 'load factor' for maps and reduces

    double mapLoadFactor = 0.0;

    if (clusterMapCapacity > 0) {

      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;

    }

    double reduceLoadFactor = 0.0;

    if (clusterReduceCapacity > 0) {

      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;

    }

    final int trackerCurrentMapCapacity =

      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),

                              trackerMapCapacity);

    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

    boolean exceededMapPadding = false;

    if (availableMapSlots > 0) {

      exceededMapPadding =

        exceededPadding(true, clusterStatus, trackerMapCapacity);

    }

    int numLocalMaps = 0;

    int numNonLocalMaps = 0;

    scheduleMaps:

    for (int i=0; i < availableMapSlots; ++i) {

      synchronized (jobQueue) {

        for (JobInProgress job : jobQueue) {

          if (job.getStatus().getRunState() != JobStatus.RUNNING) {

            continue;

          }

          Task t = null;

          // Try to schedule a node-local or rack-local Map task

          t =

            job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,

                                      taskTrackerManager.getNumberOfUniqueHosts());

          if (t != null) {

            assignedTasks.add(t);

            ++numLocalMaps;

            // Don't assign map tasks to the hilt!

            // Leave some free slots in the cluster for future task-failures,

            // speculative tasks etc. beyond the highest priority job

            if (exceededMapPadding) {

              break scheduleMaps;

            }

            // Try all jobs again for the next Map task

            break;

          }

          // Try to schedule a node-local or rack-local Map task

          t =

            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,

                                   taskTrackerManager.getNumberOfUniqueHosts());

          if (t != null) {

            assignedTasks.add(t);

            ++numNonLocalMaps;

            // We assign at most 1 off-switch or speculative task

            // This is to prevent TaskTrackers from stealing local-tasks

            // from other TaskTrackers.

            break scheduleMaps;

          }

        }

      }

    }

    int assignedMaps = assignedTasks.size();

    //

    // Same thing, but for reduce tasks

    // However we _never_ assign more than 1 reduce task per heartbeat

    //

    final int trackerCurrentReduceCapacity =

      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),

               trackerReduceCapacity);

    final int availableReduceSlots =

      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);

    boolean exceededReducePadding = false;

    if (availableReduceSlots > 0) {

      exceededReducePadding = exceededPadding(false, clusterStatus,

                                              trackerReduceCapacity);

      synchronized (jobQueue) {

        for (JobInProgress job : jobQueue) {

          if (job.getStatus().getRunState() != JobStatus.RUNNING ||

              job.numReduceTasks == 0) {

            continue;

          }

          Task t =

            job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,

                                    taskTrackerManager.getNumberOfUniqueHosts()

                                    );

          if (t != null) {

            assignedTasks.add(t);

            break;

          }

          // Don't assign reduce tasks to the hilt!

          // Leave some free slots in the cluster for future task-failures,

          // speculative tasks etc. beyond the highest priority job

          if (exceededReducePadding) {

            break;

          }

        }

      }

    }

    if (LOG.isDebugEnabled()) {

      LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +

                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +

                trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +

                (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +

                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +

                ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +

                trackerCurrentReduceCapacity + "," + trackerRunningReduces +

                "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +

                ", " + (assignedTasks.size()-assignedMaps) + "]");

    }

    return assignedTasks;

  }


JobTracker接受到heartbeat后,如果JobTracker返回的response含有分配好的任务
LaunchAction,TaskTracker则addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者
ReduceLauncher对象的taskToLauncher的队列中。MapLauncher和ReduceLauncher对象均为
TaskLauncher类的实例,它是TaskTracker类的一个内部类,具有一个数据成员,是
TaskTracker.TaskInProgress类型的队列,如果应答包中包含的任务是map
task则放入mapLauncher队列,如果是reduce task则放入reduceLauncher的taskToLaunch队列:

private void addToTaskQueue(LaunchTaskAction action) {

    if (action.getTask().isMapTask()) {

      mapLauncher.addToTaskQueue(action);

    } else {

      reduceLauncher.addToTaskQueue(action);

    }

  }

注册动作

private TaskInProgress registerTask(LaunchTaskAction action,

      TaskLauncher launcher) {

    Task t = action.getTask();

    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +

             " task's state:" + t.getState());

    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);

    synchronized (this) {

      tasks.put(t.getTaskID(), tip);

      runningTasks.put(t.getTaskID(), tip);

      boolean isMap = t.isMapTask();

      if (isMap) {

        mapTotal++;

      } else {

        reduceTotal++;

      }

    }

    return tip;

  }

 private Path localizeJobConfFile(Path jobFile, String user,

      FileSystem userFs, JobID jobId)

  throws IOException {

    // Get sizes of JobFile and JarFile

    // sizes are -1 if they are not present.

    FileStatus status = null;

    long jobFileSize = -1;

    try {

      status = userFs.getFileStatus(jobFile);

      jobFileSize = status.getLen();

    } catch(FileNotFoundException fe) {

      jobFileSize = -1;

    }

    Path localJobFile =

      lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,

          jobId.toString()), jobFileSize, fConf);

    // Download job.xml

    userFs.copyToLocalFile(jobFile, localJobFile);

    return localJobFile;

  }

这个方法进行了一些列的本地化的处理,将jar包,split文件以及xml文件拷贝到本地。当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:

/**

     * Kick off the task execution

     */

    public synchronized void launchTask(RunningJob rjob) throws IOException {

      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||

          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||

          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {

        localizeTask(task);

        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

          this.taskStatus.setRunState(TaskStatus.State.RUNNING);

        }

        setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));

        this.runner.start();

        long now = System.currentTimeMillis();

        this.taskStatus.setStartTime(now);

        this.lastProgressReport = now;

      } else {

        LOG.info("Not launching task: " + task.getTaskID() +

            " since it's state is " + this.taskStatus.getRunState());

      }

}

之后就创建TaskRunner对象,运行任务。

public final void run() {

    String errorInfo = "Child Error";

    try {

      //before preparing the job localize

      //all the archives

      TaskAttemptID taskid = t.getTaskID();

      final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

      //simply get the location of the workDir and pass it to the child. The

      //child will do the actual dir creation

      final File workDir =

      new File(new Path(localdirs[rand.nextInt(localdirs.length)],

          TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),

          taskid.toString(),

          t.isTaskCleanupTask())).toString());

      String user = tip.getUGI().getUserName();

      if (!prepare()) {

        return;

      }

      // Accumulates class paths for child.

      List<String> classPaths = getClassPaths(conf, workDir,

                                              taskDistributedCacheManager);

      long logSize = TaskLog.getTaskLogLength(conf);

      //  Build exec child JVM args.

      Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);

      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);

      // set memory limit using ulimit if feasible and necessary ...

      String setup = getVMSetupCmd();

      // Set up the redirection of the task's stdout and stderr streams

      File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());

      File stdout = logFiles[0];

      File stderr = logFiles[1];

      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,

                 stderr);

      Map<String, String> env = new HashMap<String, String>();

      errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,

                                   logSize);

      // flatten the env as a set of export commands

      List <String> setupCmds = new ArrayList<String>();

      for(Entry<String, String> entry : env.entrySet()) {

        StringBuffer sb = new StringBuffer();

        sb.append("export ");

        sb.append(entry.getKey());

        sb.append("=\"");

        sb.append(entry.getValue());

        sb.append("\"");

        setupCmds.add(sb.toString());

      }

      setupCmds.add(setup);

      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);

      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());

      if (exitCodeSet) {

        if (!killed && exitCode != 0) {

          if (exitCode == 65) {

            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());

          }

          throw new IOException("Task process exit with nonzero status of " +

              exitCode + ".");

        }

      }

    } catch (FSError e) {

      LOG.fatal("FSError", e);

      try {

        tracker.fsErrorInternal(t.getTaskID(), e.getMessage());

      } catch (IOException ie) {

        LOG.fatal(t.getTaskID()+" reporting FSError", ie);

      }

    } catch (Throwable throwable) {

      LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);

      Throwable causeThrowable = new Throwable(errorInfo, throwable);

      ByteArrayOutputStream baos = new ByteArrayOutputStream();

      causeThrowable.printStackTrace(new PrintStream(baos));

      try {

        tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());

      } catch (IOException e) {

        LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);

      }

    } finally {

      // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with

      // *false* since the task has either

      // a) SUCCEEDED - which means commit has been done

      // b) FAILED - which means we do not need to commit

      tip.reportTaskFinished(false);

    }

04-14 19:34