`
coderplay
  • 浏览: 571656 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

怎么在hadoop作map/reduce时输出N种不同类型的value

阅读更多

BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.

 

在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.

 

用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三者全加入TermWritable的CLASSES中。

package redpoll.examples;

import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.Writable;

/**
 * Generic Writable class for terms.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermWritable extends GenericWritable {
  private static Class<? extends Writable>[] CLASSES = null;

  static {
    CLASSES = (Class<? extends Writable>[]) new Class[] {
        org.apache.hadoop.io.ArrayWritable.class,
        org.apache.hadoop.io.IntWritable.class,
        redpoll.examples.TFWritable.class
        };
  }

  public TermWritable() {
  }

  public TermWritable(Writable instance) {
    set(instance);
  }

  @Override
  protected Class<? extends Writable>[] getTypes() {
    return CLASSES;
  }
}

 Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。

package redpoll.examples;

import java.io.IOException;
import java.io.StringReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.standard.StandardAnalyzer;

/**
 * A class provides for doing words segmenation and counting term TFs and DFs.<p>
 * in: key is document id, value is a document instance. <br>
 * output:
 * <li>key is term, value is a <documentId, tf> pair</li>
 * <li>key is term, value is document frequency corresponsing to the key</li>
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermMapper extends MapReduceBase implements
    Mapper<LongWritable, Document, Text, TermWritable> {
  private static final Log log = LogFactory.getLog(TermMapper.class
      .getName());
  
  /* analyzer for words segmentation */
  private Analyzer analyzer = null;
   
  /* frequency weight for document title */
  private IntWritable titleWeight = new IntWritable(2);
  /* frequency weight for document content */
  private IntWritable contentWeight = new IntWritable(1);

  
  public void map(LongWritable key, Document value,
      OutputCollector<Text, TermWritable> output, Reporter reporter)
      throws IOException {
    doMap(key, value.getTitle(), titleWeight, output, reporter);
    doMap(key, value.getContent(), contentWeight, output, reporter);
  }
  
  private void doMap(LongWritable key, String value, IntWritable weight,
      OutputCollector<Text, TermWritable> output, Reporter reporter)
      throws IOException {
    // do words segmentation
    TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value));
    Token token = new Token();
    while ((token = ts.next(token)) != null) {
      String termString = new String(token.termBuffer(), 0, token.termLength());
      Text term = new Text(termString);
      // <term, <documentId,tf>>
      TFWritable tf = new TFWritable(key, weight);
      output.collect(term, new TermWritable(tf)); // wrap then collect
      // <term, weight>
      output.collect(term, new TermWritable(weight)); // wrap then collect
    }
  }
    
  @Override
  public void configure(JobConf job) {
    String analyzerName = job.get("redpoll.text.analyzer");
    try {
      if (analyzerName != null)
        analyzer = (Analyzer) Class.forName(analyzerName).newInstance();
    } catch (Exception excp) {
      excp.printStackTrace();
    }
    if (analyzer == null)
      analyzer = new StandardAnalyzer();
  }

}
 

Reduce如果想获取数据,则可以解包(unwrap)它:

package redpoll.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * Form a tf vector and caculate the df for terms.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> {
  
  private static final Log log = LogFactory.getLog(TermReducer.class.getName());
  
  public void reduce(Text key, Iterator<TermWritable> values,
      OutputCollector<Text, Writable> output, Reporter reporter)
      throws IOException {
    ArrayList<TFWritable> tfs = new ArrayList<TFWritable>();
    int sum = 0;
//    log.info("term:" + key.toString());
    while (values.hasNext()) {
      Writable value = values.next().get(); // unwrap
      if (value  instanceof TFWritable) {
        tfs.add((TFWritable) value ); 
      }else {
        sum += ((IntWritable) value).get();
      }
    }
    
    TFWritable writables[] = new TFWritable[tfs.size()];
    ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables));
    // wrap again
    output.collect(key, new TermWritable(aw)); 
    output.collect(key, new TermWritable(new IntWritable(sum)));
  }

}

 这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。

 

分享到:
评论
6 楼 qingzew 2014-05-22  
请问如果是map的输出中一个key有多个value值该怎么办
5 楼 javalive20120108 2012-06-20  
回答3楼的问题:
   在map里
    String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
可以得到所有的输入文件的全路径,可以在这里判断哪些作为输入文件
4 楼 riddle_chen 2009-05-27  
you_laner 写道

确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。


MultipleOutputFormat可以让你根据不同的key把汇总好的value保存不同的文件中,至于在后续任务中加入输入文件只要使用FileInputFormat.setInputPaths即可。
3 楼 you_laner 2009-05-20  
确切的说,这个不算是输出多个不同类型的value,只是把不同类型的value封装成同一class而已。

我想通过不同的key来区分value,从而将value保存在多个文件中,而且在后续job中将前一job中的某些文件作为输入,只是不知道如何处理。
2 楼 shuchaoo 2009-05-05  
不错,GenericWritable的应用!没看出来你这个词频是怎么计算的,有combiner?
1 楼 yawl 2008-10-30  
看来EC2这种scale好的平台还是最适合了,反正一台机器跑10个小时和10台机器跑1个小时都是$1 (small instance)

相关推荐

    论文研究-Hadoop平台中一种Reduce负载均衡贪心算法 .pdf

    Hadoop平台中一种Reduce负载均衡贪心算法,刘朵,曾锋,MapReduce是目前广泛应用的并行计算框架,是Hadoop平台的重要组成部分。主要包括Map函数和Reduce函数。Map函数输出key-value键值对作为Reduce的

    Hadoop集群安装

    這個實做會架設運作在叢集環境上的Hadoop,因此若是你的電腦還存在著 之前的實做一的環境,請先作step 0,以移除掉之前的設定。 • 確認您"主機一"的 hostname 與 "主機二" 的 hostname,並將下面指令 有 主機一與...

    hadoop 2.7.6 eclipse插件

    18/05/25 19:51:49 INFO mapreduce.Job: map 0% reduce 0% 18/05/25 19:52:20 INFO mapreduce.Job: map 100% reduce 0% 18/05/25 19:52:29 INFO mapreduce.Job: map 100% reduce 100% 18/05/25 19:52:31 INFO ...

    Hadoop平台中一种Reduce负载均衡贪心算法 (2016年)

    主要包括Map和Reduce函数,Map函数输出key-value键值对作为Reduce的输入。由于输入的动态性,不同主机上的Reduce处理的输入量存在不均衡性。如何解决Reduce的负载均衡是优化MapReduce的一个重要研究方向。对整体数据...

    Apress - Pro Hadoop

    Map是把输入Input分解成中间的Key/Value对,Reduce把Key/Value合成最终输出Output。这两个函数由程序员提供给系统,下层设施把Map和Reduce操作分布在集群上运行,并把结果存储在GFS上。  3、BigTable。一个大型的...

    Hadoop中MapReduce基本案例及代码(一)

    MapReduce意味着在计算过程中实际分为两大步,Map过程和Reduce过程。 下面以一个统计单词次数简单案例为例: 数据源 Map类 import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org....

    hadoop倒排索引实现 完整代码+报告

    Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...

    hadoop-example:hadoop入门例子实践

    MAP阶段:使用StringTokenizer 将一行String分离成不同的单词,输出, 例如 REDUCE阶段:&lt;KEY&gt; 例子如下&lt;WORD&gt;&gt; 将VALUE的值进行相加,输出结果 remove duplication MAP阶段: MAP阶段:将数据源的VALUE作为key输出,...

    hadoop集群

    用户指定一个map函数,通过这个map函数处理key/value(键/值)对,并且产生一系列的中间key/value对,并且使用reduce函数来合并所有的具有相同key值的中间键值对中的值部分。现实生活中的很多任务的实现都是基于这个...

    Hadoop中MapReduce基本案例及代码(五)

    下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...

    Nutch相关框架视频教程讲义 (1-20)

    . Hadoop是大数据的核心技术之一,而Nutch集Hadoop之大成,是Hadoop的源头。学习Hadoop,没有数据怎么办?...Volume数据体量巨大,Variety数据类型繁多,Value价值密度低,商业价值高,Velocity处理速度快。

    hadoop5hadoop5

    对于输入中的每一个, value&gt; pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件信息传回给Master(Master需要把这些信息发送给...

    论文研究-基于开源Hadoop的矢量空间数据分布式处理研究.pdf

    为实现大规模矢量数据的高性能处理,在开源项目Hadoop基础上,设计与开发了一个基于MapReduce的矢量数据分布式计算系统。根据矢量空间数据的特点,通过分析Key/Value数据模型及GeoJSON地理数据编码格式,构建了可...

    hadoop_4hadoop_4

    对于输入中的每一个, value&gt; pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件信息传回给Master(Master需要把这些信息发送给...

    大数据投资,技能和企业价值-研究论文

    该分析使用了一个新的数据源(LinkedIn技能数据库),可以直接衡量公司对新兴技术技能(如Hadoop,Map / Reduce和Apache Pig)的投资。 生产率估算表明,从2006年到2011年,公司的Hadoop投资与生产率提高3%的速度...

    hadoop学习

    ◆MapReduce是一种分布式计算模型,由Google...◆MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。 ◆这两个函数的形参是key、value对,表示函数的输入信息。

    google 论文 mapreduce 中文版

    用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集。然后再指定一个reduce函数合并所有的具有相同中间key的中间value。下面将列举许多可以用这个模型来表示的现实世界的工作

    Apache Nutch v1.15

    Volume数据体量巨大,Variety数据类型繁多,Value价值密度低,商业价值高,Velocity处理速度快。 Hadoop是大数据的核心技术之一,而Nutch集Hadoop之大成,是Hadoop的源头。学习Hadoop,没有数据怎么办?用Nutch抓!...

    Mapreduce#文档.docx

    个编程接口,大家可以看出 MapReduce 处理的数据类型是,value&gt;键值对。  统一构架,隐藏系统层细节 如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要 考虑诸如数据存储、划分、分发、结果...

    Apache Accumulo for Developers

    It’s assumed that you have an understanding of how Hadoop works, both HDFS and the Map/Reduce. No prior knowledge of ZooKeeper is assumed. Table of Contents Chapter 1: Building an Accumulo Cluster ...

Global site tag (gtag.js) - Google Analytics