`
coderplay
  • 浏览: 571883 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

最近项目进展及Hadoop自定义InputFormat

阅读更多

    题外话: 请关注http://code.google.com/p/redpoll

    如果有人可以提供10台左右普通机器测就好了,学校实验室不给这么多,俺已经写了一篇paper, 程序啥都好了,就差数据, 真郁闷。

 

      进展比较慢, 走了些弯路, 不过最终还是理清了。开始考虑文档聚类后,要能通过文档的id定位到该文档的内容。而且这个id最好是整型,或者long型,而搜狐新闻给的docno是占32字节的GUID。如果不考虑这点, hadoop在算词频, 势必会生成很庞大的中间文件。

 

term1    docId1:tf, docId2:tf, ....

term2    docId1:tf, docId2:tf, ....

...

 

     为了图个简便,考虑用数据库, select content from table where documentid = x 就可以直接实现这功能。然而MySql与hadoop的结合尚在初步, 没有作sharding。这样Mapper在读取数据时,就不是分布式的了。捣鼓了几天hbase, 写了些代码, 发现只存了17万条数据后,就再也存不下去了,原因不明。而且 我发现这个bigtable的仿制品还只是刚刚起步,有很多不稳定性。

     没办法,看来只能靠自己了。定位其实很简单, 只要知道这篇文档在文件中的偏移量,然后用这个偏移量seek就可以了。在java中,一个long型占8字节,而且把hadoop的hdfs一般操作64m以上的文件比较有利。 把3.4G+的搜狗语料(http://www.sogou.com/labs/dl/cs.html) 全部cat在一起,然后用偏移量做文档ID是比较合理的。要定义mapper接受的key-value不是<LongWritable, Text>对的话,那得自定义InputFormat。 我针对搜狗语料做了一个SogouInputFormat,然后还有对应的RecordReader, Writable实现。结果,学校网络有问题,googlecode的svn commit不了。

     搜狗的语料是采用类xml形式存储文本文件。因为处理大规模数据要求速度快,用DOM不现实。开始尝试用sax解析,结果有问题。因为有些格式出错。 于是我花了两个晚上,手写了两个状态机用来解析,终于可以读取正确,而且速度比较快。单机读取语料的速度平均51m/s,也就是说单机读取3.4G的搜狗语料一分钟多一点就可以完成。而且这种作法可以跑在mapreduce模型上了。

     接下来,就是处理分词, tf, df及计算tf-idf得出VSM。 

 

  贴些代码片段:

package redpoll.examples;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

/**
 * Input format for sogou corpus.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class SogouInputFormat extends FileInputFormat<LongWritable, DocumentWritable>
  implements JobConfigurable {

  private CompressionCodecFactory compressionCodecs = null;
  
  public void configure(JobConf conf) {
    compressionCodecs = new CompressionCodecFactory(conf);
  }
  
  protected boolean isSplitable(FileSystem fs, Path file) {
    return compressionCodecs.getCodec(file) == null;
  }

  public RecordReader<LongWritable, DocumentWritable> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    return new SogouRecordReader(job, (FileSplit) genericSplit);
  }
}
 
package redpoll.examples;

import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;

/**
 * A class that provides a sogou document reader from an input stream.
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class SogouCorpusReader {

  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
  private int bufferSize = DEFAULT_BUFFER_SIZE;
  
  /* input stream which we will get documents from */
  private InputStream in;
  /* a buffer stores temporary bytes readed from input stream */
  private byte[] buffer;
  /* the number of bytes of real data in the buffer */
  private int bufferLength = 0;
  /* the current position in the buffer */
  private int bufferPosn = 0;
  /* the buffer position in input stream */
  private long bufferCurrentPosn = 0;
  private long currentDocPosn = 0;

  /* xml-like mark tags used in sogou corpus */
  private byte[] docTag;
  private byte[] urlTag;
  private byte[] docnoTag;
  private byte[] titleTag;
  private byte[] contentTag;

  /* parser status */
  enum STATUS {
    PREPARE, START_ELEMENT, END_ELEMENT, TEXT
  };

  /* used for defining current parsing node */
  enum NODE {
    NULL, DOC, URL, DOC_NO, TITLE, CONTENT, FAILED, SUCCEED
  };

  private STATUS currentSatus;
  private NODE currentNode;

  public SogouCorpusReader(InputStream in) throws IOException {
    this(in, DEFAULT_BUFFER_SIZE);
  }

  public SogouCorpusReader(InputStream in, int bufferSize) throws IOException {
    this(in, bufferSize, "doc", "url", "docno", "contenttitle", "content");
  }

  public SogouCorpusReader(InputStream in, int bufferSize, String doc,
      String url, String docno, String title, String content)
      throws IOException {
    this.in = in;
    this.bufferSize = bufferSize;
    this.buffer = new byte[this.bufferSize];
    docTag = doc.getBytes("UTF-8");
    urlTag = url.getBytes("UTF-8");
    docnoTag = docno.getBytes("UTF-8");
    titleTag = title.getBytes("UTF-8");
    contentTag = content.getBytes("UTF-8");
  }

  public SogouCorpusReader(InputStream in, Configuration conf)
      throws IOException {
    this(in, conf.getInt("redpoll.sogou.doc.buffersize", DEFAULT_BUFFER_SIZE),
    conf.get("redpoll.sogou.doc", "doc"), 
    conf.get("redpoll.sogou.doc.url","url"), 
    conf.get("redpoll.sogou.doc.docno", "docno"), 
    conf.get("redpoll.sogou.doc.contenttitle", "contenttitle"), 
    conf.get("redpoll.sogou.doc.content", "content"));
  }

  /**
   * Gets a {@link redpoll.examples.Document} instance from sogou text file. If it reached EOF, return null. 
   * @param  a {@link redpoll.examples.Document} instance getting from sogou text file. 
   * @return the position of this document, -1 if it reached EOF. 
   * @throws IOException
   */
  public long nextDoc(SogouDocument doc) throws IOException {
    currentSatus = STATUS.PREPARE;
    currentNode = NODE.NULL;
    try {
      while (currentNode != NODE.SUCCEED) {
        adjustBuffer();
        if (currentSatus == STATUS.PREPARE) {
          if (buffer[bufferPosn] == '<')
            currentSatus = STATUS.START_ELEMENT;
        } else if (currentSatus == STATUS.START_ELEMENT) {
          if (buffer[bufferPosn] == '/') { // e.g. </node>
            currentSatus = STATUS.END_ELEMENT;
          } else {
            int start = bufferPosn; byte[] name = null;
            while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') {
              bufferPosn++;
              if(bufferPosn >= bufferLength) {
                name = new byte[bufferLength - start];
                System.arraycopy(buffer, start, name, 0, bufferLength - start);
                start = 0;
              }
              adjustBuffer();
            }
            // if a element ends with '\n', we consider it as a wrong element
            if (buffer[bufferPosn] == '\n') 
              failed(); // FAILED
            else if (buffer[bufferPosn] == '>') {
              int len = bufferPosn - start;
              if (len > 0) {
                if (name != null) {
                  byte[] newname = new byte[name.length + len];
                  System.arraycopy(name, 0, newname, 0, name.length);
                  System.arraycopy(buffer, start, newname, name.length, len);
                  name = newname;
                } else {
                  name = new byte[len];
                  System.arraycopy(buffer, start, name, 0, len);
                }
                startElement(name);
              }
              ignoreWhite();
              currentSatus = STATUS.TEXT;
            }
          }
        } else if (currentSatus == STATUS.TEXT) {
          int start = bufferPosn; byte[] text = null;
          while (buffer[bufferPosn] != '<' && buffer[bufferPosn] != '\n') {
            bufferPosn++;
            if(bufferPosn >= bufferLength) {
              // FIXME: if the content of a document passes through more than two buffers, it will get wrong! 
              text = new byte[bufferLength - start];
              System.arraycopy(buffer, start, text, 0, bufferLength - start);
              start = 0;
            }
            adjustBuffer();
          }
          if (buffer[bufferPosn] == '<') {
            int len = bufferPosn - start;
            if (len > 0) {
              if (text != null) {
                byte[] newtext = new byte[text.length + len];
                System.arraycopy(text, 0, newtext, 0, text.length);
                System.arraycopy(buffer, start, newtext, text.length, len);
                text = newtext;
              } else {
                text = new byte[len];
                System.arraycopy(buffer, start, text, 0, len);
              }
              characters(text, doc);
            }
            currentSatus = STATUS.START_ELEMENT;
          } else if (buffer[bufferPosn] == '\n')
            failed(); // FAILED
        } else if (currentSatus == STATUS.END_ELEMENT) {
          int start = bufferPosn; byte[] name = null;
          while (buffer[bufferPosn] != '>' && buffer[bufferPosn] != '\n') {
            bufferPosn++;
            if(bufferPosn >= bufferLength) {
              name = new byte[bufferLength - start];
              System.arraycopy(buffer, start, name, 0, bufferLength - start);
              start = 0;
            }
            adjustBuffer();
          }
          if (buffer[bufferPosn] == '>') {
            int len = bufferPosn - start;
            if (len > 0) {
              if (name != null) {
                byte[] newname = new byte[name.length + len];
                System.arraycopy(name, 0, newname, 0, name.length);
                System.arraycopy(buffer, start, newname, name.length, len);
                name = newname;
              } else {
                name = new byte[len];
                System.arraycopy(buffer, start, name, 0, len);
              }
              endElement(name);
            }
            ignoreWhite();
            currentSatus = STATUS.PREPARE;
          } else if (buffer[bufferPosn] != '\n')
            failed(); // FAILED
        }
        bufferPosn++;
      }
    } catch (EOFException eofe) {
      return -1;
    }
    return currentDocPosn;
  }

  /**
   * Close the underlying stream.
   * @throws IOException
   */
  public void close() throws IOException {
    in.close();
  }
  
  private void ignoreWhite() throws IOException, EOFException {
    do {
      bufferPosn++;
      adjustBuffer();
    } while (buffer[bufferPosn] == '\n' || buffer[bufferPosn] == '\r'
      || buffer[bufferPosn] == '\t' || buffer[bufferPosn] == ' ');
    bufferPosn--;
  }

  private void adjustBuffer() throws IOException, EOFException {
    if (bufferPosn >= bufferLength) {
      bufferCurrentPosn += bufferLength;
      bufferPosn = 0;
      bufferLength = in.read(buffer);
      if (bufferLength <= 0)
        throw new EOFException();
    }
  }
  
  private void startElement(byte[] name) {
    if ((currentNode == NODE.NULL || currentNode == NODE.FAILED) && equals(docTag, name)) {
      currentDocPosn = bufferCurrentPosn + bufferPosn - docTag.length - 1;
      currentNode = NODE.DOC;
    } else if (currentNode == NODE.DOC && equals(urlTag, name)) {
      currentNode = NODE.URL;
    } else if (currentNode == NODE.URL && equals(docnoTag, name)) {
      currentNode = NODE.DOC_NO;
    } else if (currentNode == NODE.DOC_NO && equals(titleTag, name)) {
      currentNode = NODE.TITLE;
    } else if (currentNode == NODE.TITLE && equals(contentTag, name)) {
      currentNode = NODE.CONTENT;
    } else {
      currentNode = NODE.FAILED;
    }
  }

  private void endElement(byte[] name) {
    if (currentNode == NODE.CONTENT && equals(contentTag, name)) {
      currentNode = NODE.SUCCEED;
    }
  }
  
  private void characters(byte[] text, SogouDocument doc) {
    if (currentNode == NODE.URL) {
      doc.setPathBytes(text);
    } else if (currentNode == NODE.DOC_NO) {
      doc.setIdBytes(text);
    } else if (currentNode == NODE.TITLE) {
      doc.setTitleBytes(text);
    } else if (currentNode == NODE.CONTENT) {
      doc.setContentBytes(text);
    }

  }

  private void failed() {
    currentNode = NODE.FAILED;
  }
  
  private boolean equals(final byte [] left, final byte [] right) {
    return left == null && right == null? true:
      left == null && right != null? false:
      left != null && right == null? false:
      left.length != right.length? false:
        WritableComparator.compareBytes(left, 0, left.length, right, 0, right.length) == 0;
  }

}

 

分享到:
评论
5 楼 coderplay 2008-11-04  
manbearpig1 写道

hbase现在是在解决稳定性的问题,性能/空间的优化要下个大版本。 楼主搞的这个和apache mahout有什么区别,有空我们交流下。

hbase的主要贡献者都被M$收了,这事不靠谱. 目前hive算是最好的了.
4 楼 manbearpig1 2008-11-04  
hbase现在是在解决稳定性的问题,性能/空间的优化要下个大版本。 楼主搞的这个和apache mahout有什么区别,有空我们交流下。
3 楼 coderplay 2008-11-03  
bjzhanghao 写道

SogouCorpusReader不用实现RecordReader吗?

要的, 只是没贴出来..我的原因是这东西还可以放在其它地方用, 实现RecordReader那就被hadoop框架局限了. 近期我对代码结构做了些调整
package redpoll.examples.sogou;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

import redpoll.text.Document;

/**
 * Treats keys as offset in file and value as an document. 
 * @author Jeremy Chow(coderplay@gmail.com)
 */
public class SogouRecordReader implements RecordReader<LongWritable, Document>{

  private static final Log LOG 
    = LogFactory.getLog(SogouRecordReader.class.getName());
  
  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long end;
  private long pos;
  
  private SogouCorpusReader in;
    
  public SogouRecordReader(Configuration job, 
      FileSplit split) throws IOException {  
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
    final CompressionCodec codec = compressionCodecs.getCodec(file);

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());
    if (codec != null) {
      in = new SogouCorpusReader(codec.createInputStream(fileIn), job);
      end = Long.MAX_VALUE;
    } else {
      if (start != 0) 
        fileIn.seek(start);
      in = new SogouCorpusReader(fileIn, job);
    }
    this.pos = start;
  }
  
  public LongWritable createKey() {
    return new LongWritable();
  }

  public Document createValue() {
    return new SogouDocument();
  }

  
  public long getPos() throws IOException {
    return pos;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (pos - start) / (float)(end - start));
    }
  }
  
  /**
   * Close the input stream
   */
  public void close() throws IOException {
    if (in != null) {
      in.close(); 
    }
  }

  public synchronized boolean next(LongWritable key, Document value)
      throws IOException {
    if(pos < end) {
      long docPos = in.nextDoc((SogouDocument)value);
      if(docPos < 0)
        return false;
      key.set(start + docPos);
      pos = start + in.getPosition();
      return true;
    }
    return false;
  }


}


最近上网有些问题,google的svn上不了, 所以一直没有commit不了
2 楼 bjzhanghao 2008-11-03  
SogouCorpusReader不用实现RecordReader吗?
1 楼 yawl 2008-10-27  
请问你需要10台机器多久?如果不是很长时间的话,我可以帮你开10个ec2 instances(http://xruby.iteye.com/blog/189316)用一阵,用完关掉就好了.

相关推荐

Global site tag (gtag.js) - Google Analytics