- 浏览: 571656 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.
在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.
用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。
package redpoll.examples; import org.apache.hadoop.io.GenericWritable; import org.apache.hadoop.io.Writable; /** * Generic Writable class for terms. * @author Jeremy Chow(coderplay@gmail.com) */ public class TermWritable extends GenericWritable { private static Class<? extends Writable>[] CLASSES = null; static { CLASSES = (Class<? extends Writable>[]) new Class[] { org.apache.hadoop.io.ArrayWritable.class, org.apache.hadoop.io.IntWritable.class, redpoll.examples.TFWritable.class }; } public TermWritable() { } public TermWritable(Writable instance) { set(instance); } @Override protected Class<? extends Writable>[] getTypes() { return CLASSES; } }
Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。
package redpoll.examples; import java.io.IOException; import java.io.StringReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Token; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.standard.StandardAnalyzer; /** * A class provides for doing words segmenation and counting term TFs and DFs.<p> * in: key is document id, value is a document instance. <br> * output: * <li>key is term, value is a <documentId, tf> pair</li> * <li>key is term, value is document frequency corresponsing to the key</li> * @author Jeremy Chow(coderplay@gmail.com) */ public class TermMapper extends MapReduceBase implements Mapper<LongWritable, Document, Text, TermWritable> { private static final Log log = LogFactory.getLog(TermMapper.class .getName()); /* analyzer for words segmentation */ private Analyzer analyzer = null; /* frequency weight for document title */ private IntWritable titleWeight = new IntWritable(2); /* frequency weight for document content */ private IntWritable contentWeight = new IntWritable(1); public void map(LongWritable key, Document value, OutputCollector<Text, TermWritable> output, Reporter reporter) throws IOException { doMap(key, value.getTitle(), titleWeight, output, reporter); doMap(key, value.getContent(), contentWeight, output, reporter); } private void doMap(LongWritable key, String value, IntWritable weight, OutputCollector<Text, TermWritable> output, Reporter reporter) throws IOException { // do words segmentation TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value)); Token token = new Token(); while ((token = ts.next(token)) != null) { String termString = new String(token.termBuffer(), 0, token.termLength()); Text term = new Text(termString); // <term, <documentId,tf>> TFWritable tf = new TFWritable(key, weight); output.collect(term, new TermWritable(tf)); // wrap then collect // <term, weight> output.collect(term, new TermWritable(weight)); // wrap then collect } } @Override public void configure(JobConf job) { String analyzerName = job.get("redpoll.text.analyzer"); try { if (analyzerName != null) analyzer = (Analyzer) Class.forName(analyzerName).newInstance(); } catch (Exception excp) { excp.printStackTrace(); } if (analyzer == null) analyzer = new StandardAnalyzer(); } }
Reduce如果想获取数据,则可以解包(unwrap)它:
package redpoll.examples; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; /** * Form a tf vector and caculate the df for terms. * @author Jeremy Chow(coderplay@gmail.com) */ public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> { private static final Log log = LogFactory.getLog(TermReducer.class.getName()); public void reduce(Text key, Iterator<TermWritable> values, OutputCollector<Text, Writable> output, Reporter reporter) throws IOException { ArrayList<TFWritable> tfs = new ArrayList<TFWritable>(); int sum = 0; // log.info("term:" + key.toString()); while (values.hasNext()) { Writable value = values.next().get(); // unwrap if (value instanceof TFWritable) { tfs.add((TFWritable) value ); }else { sum += ((IntWritable) value).get(); } } TFWritable writables[] = new TFWritable[tfs.size()]; ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables)); // wrap again output.collect(key, new TermWritable(aw)); output.collect(key, new TermWritable(new IntWritable(sum))); } }
这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。
评论
6 楼
qingzew
2014-05-22
请问如果是map的输出中一个key有多个value值该怎么办
5 楼
javalive20120108
2012-06-20
回答3楼的问题:
在map里
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
在map里
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
4 楼
riddle_chen
2009-05-27
you_laner 写道
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
MultipleOutputFormat可以让你根据不同的key把汇总好的value保存不同的文件中,至于在后续任务中加入输入文件只要使用FileInputFormat.setInputPaths即可。
3 楼
you_laner
2009-05-20
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。
我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
2 楼
shuchaoo
2009-05-05
不错,GenericWritable的应用!没看出来你这个词频是怎么计算的,有combiner?
1 楼
yawl
2008-10-30
看来EC2这种scale好的平台还是最适合了,反正一台机器跑10个小时和10台机器跑1个小时都是$1 (small instance)
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8151把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6137很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9021点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4156花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7404前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7927LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3731MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3229hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 4983先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5052对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3730avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3803偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3699很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10850某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 50674月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4243这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 43871. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析之MapReduce(二)
2009-01-18 22:14 8558任务的申请、派发与执行 TaskTracker.run() ... -
hadoop源码分析
2008-12-26 15:37 4897blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ...
相关推荐
Hadoop平台中一种Reduce负载均衡贪心算法,刘朵,曾锋,MapReduce是目前广泛应用的并行计算框架,是Hadoop平台的重要组成部分。主要包括Map函数和Reduce函数。Map函数输出key-value键值对作为Reduce的
這個實做會架設運作在叢集環境上的Hadoop,因此若是你的電腦還存在著 之前的實做一的環境,請先作step 0,以移除掉之前的設定。 • 確認您"主機一"的 hostname 與 "主機二" 的 hostname,並將下面指令 有 主機一與...
18/05/25 19:51:49 INFO mapreduce.Job: map 0% reduce 0% 18/05/25 19:52:20 INFO mapreduce.Job: map 100% reduce 0% 18/05/25 19:52:29 INFO mapreduce.Job: map 100% reduce 100% 18/05/25 19:52:31 INFO ...
主要包括Map和Reduce函数,Map函数输出key-value键值对作为Reduce的输入。由于输入的动态性,不同主机上的Reduce处理的输入量存在不均衡性。如何解决Reduce的负载均衡是优化MapReduce的一个重要研究方向。对整体数据...
Map是把输入Input分解成中间的Key/Value对,Reduce把Key/Value合成最终输出Output。这两个函数由程序员提供给系统,下层设施把Map和Reduce操作分布在集群上运行,并把结果存储在GFS上。 3、BigTable。一个大型的...
MapReduce意味着在计算过程中实际分为两大步,Map过程和Reduce过程。 下面以一个统计单词次数简单案例为例: 数据源 Map类 import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org....
Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...
MAP阶段:使用StringTokenizer 将一行String分离成不同的单词,输出, 例如 REDUCE阶段:<KEY> 例子如下<WORD>> 将VALUE的值进行相加,输出结果 remove duplication MAP阶段: MAP阶段:将数据源的VALUE作为key输出,...
用户指定一个map函数,通过这个map函数处理key/value(键/值)对,并且产生一系列的中间key/value对,并且使用reduce函数来合并所有的具有相同key值的中间键值对中的值部分。现实生活中的很多任务的实现都是基于这个...
下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...
. Hadoop是大数据的核心技术之一,而Nutch集Hadoop之大成,是Hadoop的源头。学习Hadoop,没有数据怎么办?...Volume数据体量巨大,Variety数据类型繁多,Value价值密度低,商业价值高,Velocity处理速度快。
对于输入中的每一个, value> pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件信息传回给Master(Master需要把这些信息发送给...
为实现大规模矢量数据的高性能处理,在开源项目Hadoop基础上,设计与开发了一个基于MapReduce的矢量数据分布式计算系统。根据矢量空间数据的特点,通过分析Key/Value数据模型及GeoJSON地理数据编码格式,构建了可...
对于输入中的每一个, value> pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件信息传回给Master(Master需要把这些信息发送给...
该分析使用了一个新的数据源(LinkedIn技能数据库),可以直接衡量公司对新兴技术技能(如Hadoop,Map / Reduce和Apache Pig)的投资。 生产率估算表明,从2006年到2011年,公司的Hadoop投资与生产率提高3%的速度...
◆MapReduce是一种分布式计算模型,由Google...◆MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。 ◆这两个函数的形参是key、value对,表示函数的输入信息。
用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集。然后再指定一个reduce函数合并所有的具有相同中间key的中间value。下面将列举许多可以用这个模型来表示的现实世界的工作
Volume数据体量巨大,Variety数据类型繁多,Value价值密度低,商业价值高,Velocity处理速度快。 Hadoop是大数据的核心技术之一,而Nutch集Hadoop之大成,是Hadoop的源头。学习Hadoop,没有数据怎么办?用Nutch抓!...
个编程接口,大家可以看出 MapReduce 处理的数据类型是,value>键值对。 统一构架,隐藏系统层细节 如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要 考虑诸如数据存储、划分、分发、结果...
It’s assumed that you have an understanding of how Hadoop works, both HDFS and the Map/Reduce. No prior knowledge of ZooKeeper is assumed. Table of Contents Chapter 1: Building an Accumulo Cluster ...