- 浏览: 571903 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
任务的申请、派发与执行
TaskTracker.run() 连接JobTracker
TaskTracker 的启动过程会初始化一系列参数和服务(另有单独的一节介绍),然后尝试连接JobTracker 服务(即必须实现InterTrackerProtocol 接口),如果连接断开,则会循环尝试连接JobTracker ,并重新初始化所有成员和参数,此过程参见run() 方法。
TaskTracker.offerService() 主循环
如果连接JobTracker 服务成功,TaskTracker 就会调用offerService() 函数进入主执行循环中。这个循环会每隔10 秒与JobTracker 通讯一次,调用transmitHeartBeat() 获得HeartbeatResponse 信息。然后调用HeartbeatResponse 的getActions() 函数获得JobTracker 传过来的所有指令即一个TaskTrackerAction 数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction 则调用startNewTask() 函数执行新任务,
如果是 CommitTaskAction
否则加入到tasksToCleanup 队列,交给一个taskCleanupThread 线程来处理,如执行KillJobAction 或者KillTaskAction 等。
TaskTracker.transmitHeartBeat() 获取JobTracker 指令
在transmitHeartBeat() 函数处理中,TaskTracker 会创建一个新的TaskTrackerStatus 对象记录目前任务的执行状况,然后通过IPC 接口调用JobTracker 的heartbeat() 方法发送过去,并接受新的指令,即返回值TaskTrackerAction 数组。在这个调用之前,TaskTracker 会先检查目前执行的Task 数目以及本地磁盘的空间使用情况等,如果可以接收新的Task 则设置heartbeat() 的askForNewTask 参数为true 。操作成功后再更新相关的统计信息等。
JobTracker 调度作业第二步:派发任务
JobTracker 接到TaskTracker 的heartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。JobTracker 会使用它的调度器taskScheduler 来组装任务到一个任务列表tasks 中。具体实现在taskScheduler 的assignTasks() 方法。得到tasks 的数据后,把这些任务封装在一些LanuchTaskAction 中,发回给TaskTracker ,让它去执行任务。此时JobTracker 的hearbeat() 结束派发任务。
下面简单分析下 hadoop 默认的作业调度器 JobQueueTaskScheduler 怎么实现以上所说的 assignTasks() 方法。首先它会检查 TaskTracker 端还可以做多少个 map 和 reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask 或 ReduceTask 。产生 Map 任务使用 JobInProgress 的 obtainNewMapTask() 方法,实质上最后调用了 JobInProgress 的 findNewMapTask() 访问 nonRunningMapCache 得到 Map 任务的 TaskInProgress ;而产生 Reduce 任务使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress 的 findNewReduceTask() 访问 nonRuningReduceCache 得到 Reduce 任务的 TaskInProgress 。
评论
JobTracker处理心跳采用JobQueueTaskScheduler调度机制给TT分配map任务时,会执行很多次obtainNewLocalMapTask和一次obtainNewNonLocalMapTask。最终使用的都是JobInProgress的findNewMapTask方法,只是传递的level不一样。
可是我不知道为什么obtainNewLocalMapTask会传递maxlevel这个值,而且maxlevel=NetworkTopology.DEFAULT_HOST_LEVEL=2。我很是不理解。
1·难道说obtainNewLocalMapTask只是会从近到远寻找任务,而不见得一定要是本地嘛?
2·maxlevel这个值是代表集群的级别数吗?为什么是2?
coderplay 写道sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?期待您的指点!谢谢!
JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码。假设没有设置mapred.min.split.size,缺省为1的情况下,针对每个文件会按照min (totalsize[所有文件总大小]/mapnum[jobconf设置的mapnum], blocksize)为大小来拆分,并不是说文件小于block size就不去拆分。
你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
请见
http://wiki.apache.org/hadoop/HowToDebugMapReducePrograms
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
jiwenke 写道maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?TaskRunner.java中的280行上下.
看到了,谢谢!
看到TaskLauncher的run调用了startNewTasks,但在哪里调用TaskLauncher的run?
在TaskTracker中:
if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); }
但到addToTaskQueue没有看到调用TaskLauncher的run啊?
maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
TaskRunner.java中的280行上下.
sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。
还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.
从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
期待您的指点!谢谢!
太感谢你的分析了!我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)* In many "pleasantly" parallel applications, each process/mapper * processes the same input file (s), but with computations are * controlled by different parameters.(Referred to as "parameter sweeps"). * One way to achieve this, is to specify a set of parameters * (one set per line) as input in a control file * (which is the input path to the map-reduce application, * where as the input dataset is specified * via a config variable in JobConf.). * * The NLineInputFormat can be used in such applications, that splits * the input file such that by default, one line is fed as * a value to one map task, and key is the offset. * i.e. (k,v) is (LongWritable, Text). * The location hints will span the whole mapred cluster.但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.
至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。
按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)
* In many "pleasantly" parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as "parameter sweeps").
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦
FileInputFormat.addInputPath(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));
另外提醒你JobConf将要被弃用了, Mapper, Reducer等再也不是接口了,而是一个抽象类。OutputCollector和Reporter等都要合进Context里面了。变化挺大的。
我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8152把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6140很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9022点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4159花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7406前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7928LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3732MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3231hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 4985先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5053对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3731avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3804偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3700很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10853某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 50684月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4245这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 43911. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析
2008-12-26 15:37 4902blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ... -
hadoop源码分析之MapReduce(一)
2008-12-16 13:08 19279hadoop的源码已 ...
相关推荐
hadoop源码分析-mapreduce部分.doc
caibinbupt的Hadoop源码分析完整版,包括 HDFS 和 MapReduce。 HDFS: 41章 MapReduce: 14章
hadoop平台下hdfs和mapreduce的源码分析。
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...
目前它们都属亍Apache的Hadoop项目,对应的分删是: Chubby-->ZooKeeper GFS-->HDFS BigTable-->HBase MapReduce-->Hadoop 目前,基亍类似思想的Open Source项目迓径多,如Facebook用亍用户分析的Hive。 HDFS作为一...
hadoop源码分析hadoop_MapReduce源代码解析
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...
Hadoop分析气象数据完整版源代码(含Hadoop的MapReduce代码和SSM框架) 《分布式》布置了一道小作业,这是作业的所有代码,里面包含了Hadoop的MapReduce代码、和SSM框架显示数据的代码
Hadoop的源代码分析总共55章,包括HDFS: 41章、MapReduce: 14章。
Hadoop源码分析.rar 有助于hadoop学习者进一步学习!! 非常好的资源!!
Java操作Hadoop Mapreduce基本实践源码.
hdfs上传文件过程源码分析
MapReduce源码分析(主要四大模块,其他表示父目录下的.java文件的总称):1.org.apache.hadoop.mapred(旧版MapReduceAPI):( 1).jobcontrol(job作业直接控制类)(2 ).join :(作业作业中用于模仿数据连接处理...
java操作hadoop之mapreduce分析年气象数据最低温度实战源码,附带所需jar包,欢迎学习。
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
Hadoop架构分析之集群结构分析,Hadoop架构分析之HDFS架构分析,Hadoop架构分析之NN和DN原生文档解读,Hadoop MapReduce原理之流程图.Hadoop MapReduce原理之核心类Job和ResourceManager解读.Hadoop MapReduce原理之...
MapReduce2.0源码分析与实战编程 文字注释版,带有文字注释。