`
coderplay
  • 浏览: 571903 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

hadoop源码分析之MapReduce(二)

阅读更多

 

任务的申请、派发与执行

TaskTracker.run() 连接JobTracker

TaskTracker 的启动过程会初始化一系列参数和服务(另有单独的一节介绍),然后尝试连接JobTracker 服务(即必须实现InterTrackerProtocol 接口),如果连接断开,则会循环尝试连接JobTracker ,并重新初始化所有成员和参数,此过程参见run() 方法。

TaskTracker.offerService() 主循环

如果连接JobTracker 服务成功,TaskTracker 就会调用offerService() 函数进入主执行循环中。这个循环会每隔10 秒与JobTracker 通讯一次,调用transmitHeartBeat() 获得HeartbeatResponse 信息。然后调用HeartbeatResponsegetActions() 函数获得JobTracker 传过来的所有指令即一个TaskTrackerAction 数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction 则调用startNewTask() 函数执行新任务,

如果是 CommitTaskAction

否则加入到tasksToCleanup 队列,交给一个taskCleanupThread 线程来处理,如执行KillJobAction 或者KillTaskAction 等。

TaskTracker.transmitHeartBeat() 获取JobTracker 指令

transmitHeartBeat() 函数处理中,TaskTracker 会创建一个新的TaskTrackerStatus 对象记录目前任务的执行状况,然后通过IPC 接口调用JobTrackerheartbeat() 方法发送过去,并接受新的指令,即返回值TaskTrackerAction 数组。在这个调用之前,TaskTracker 会先检查目前执行的Task 数目以及本地磁盘的空间使用情况等,如果可以接收新的Task 则设置heartbeat()askForNewTask 参数为true 。操作成功后再更新相关的统计信息等。

JobTracker 调度作业第二步:派发任务

JobTracker 接到TaskTrackerheartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。JobTracker 会使用它的调度器taskScheduler 来组装任务到一个任务列表tasks 中。具体实现在taskSchedulerassignTasks() 方法。得到tasks 的数据后,把这些任务封装在一些LanuchTaskAction 中,发回给TaskTracker ,让它去执行任务。此时JobTrackerhearbeat() 结束派发任务。

下面简单分析下 hadoop 默认的作业调度器 JobQueueTaskScheduler 怎么实现以上所说的 assignTasks() 方法。首先它会检查 TaskTracker 端还可以做多少个 map reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask ReduceTask 。产生 Map 任务使用 JobInProgress obtainNewMapTask() 方法,实质上最后调用了 JobInProgress findNewMapTask() 访问 nonRunningMapCache 得到 Map 任务的 TaskInProgress ;而产生 Reduce 任务使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress findNewReduceTask() 访问 nonRuningReduceCache 得到 Reduce 任务的 TaskInProgress

 

6
0
分享到:
评论
13 楼 ski_jugg 2012-03-11  
你好,有一个问题很困扰我。
JobTracker处理心跳采用JobQueueTaskScheduler调度机制给TT分配map任务时,会执行很多次obtainNewLocalMapTask和一次obtainNewNonLocalMapTask。最终使用的都是JobInProgress的findNewMapTask方法,只是传递的level不一样。
可是我不知道为什么obtainNewLocalMapTask会传递maxlevel这个值,而且maxlevel=NetworkTopology.DEFAULT_HOST_LEVEL=2。我很是不理解。
1·难道说obtainNewLocalMapTask只是会从近到远寻找任务,而不见得一定要是本地嘛?
2·maxlevel这个值是代表集群的级别数吗?为什么是2?
12 楼 ytulgr 2011-03-21  
是每当TrakTracker发送heartbeat到TaskTracker才触发任务分配吗?这样是否可认为按照TaskTracker发送heartbeat的先后顺序得到任务执行?
11 楼 riddle_chen 2009-05-05  
jiwenke 写道

coderplay 写道sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?期待您的指点!谢谢!

JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码。假设没有设置mapred.min.split.size,缺省为1的情况下,针对每个文件会按照min (totalsize[所有文件总大小]/mapnum[jobconf设置的mapnum], blocksize)为大小来拆分,并不是说文件小于block size就不去拆分。
10 楼 coderplay 2009-02-26  
gonggaosheng 写道

你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!

请见
http://wiki.apache.org/hadoop/HowToDebugMapReducePrograms
9 楼 gonggaosheng 2009-02-25  
你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
8 楼 jiwenke 2009-02-17  
coderplay 写道

jiwenke 写道maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?TaskRunner.java中的280行上下.

看到了,谢谢!
看到TaskLauncher的run调用了startNewTasks,但在哪里调用TaskLauncher的run?
在TaskTracker中:
      
if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            } else if (action instanceof CommitTaskAction) {
              CommitTaskAction commitAction = (CommitTaskAction)action;
              if (!commitResponses.contains(commitAction.getTaskID())) {
                LOG.info("Received commit task action for " + 
                          commitAction.getTaskID());
                commitResponses.add(commitAction.getTaskID());
              }

但到addToTaskQueue没有看到调用TaskLauncher的run啊?
7 楼 coderplay 2009-02-17  
jiwenke 写道

maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?

TaskRunner.java中的280行上下.
6 楼 jiwenke 2009-02-17  
coderplay 写道

sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.

是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。
还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.
从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
期待您的指点!谢谢!

5 楼 coderplay 2009-02-17  
jiwenke 写道

太感谢你的分析了!我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)* In many "pleasantly" parallel applications, each process/mapper  * processes the same input file (s), but with computations are  * controlled by different parameters.(Referred to as "parameter sweeps"). * One way to achieve this, is to specify a set of parameters  * (one set per line) as input in a control file  * (which is the input path to the map-reduce application, * where as the input dataset is specified  * via a config variable in JobConf.). *  * The NLineInputFormat can be used in such applications, that splits  * the input file such that by default, one line is fed as * a value to one map task, and key is the offset. * i.e. (k,v) is (LongWritable, Text). * The location hints will span the whole mapred cluster.但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!

sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.
至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
4 楼 jiwenke 2009-02-16  
太感谢你的分析了!
我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。
按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)
* In many "pleasantly" parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as "parameter sweeps").
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
3 楼 diddyrock 2009-01-21  
谢谢高人!我争取能把原来的一些程序升级到0.19哈,偷懒了,没去看源码,老版本里面好像连FileInputFormat都没有,希望早点出到1.0版本,那样以后维护就方便了
2 楼 coderplay 2009-01-21  
diddyrock 写道

高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦

FileInputFormat.addInputPath(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));

另外提醒你JobConf将要被弃用了, Mapper, Reducer等再也不是接口了,而是一个抽象类。OutputCollector和Reporter等都要合进Context里面了。变化挺大的。
1 楼 diddyrock 2009-01-21  
高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。
我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦

相关推荐

Global site tag (gtag.js) - Google Analytics