`
coderplay
  • 浏览: 571293 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

用户推荐Slope One算法与mapreduce&hive实现

阅读更多

下载本文代码

用户推荐越来越热, Google使用MinHash, PLSI, LDA, SVD, SVM等算法,分析用户的喜好, 实现新闻的自动分类;新浪也用Slope One以及一些Item-based的算法对音乐进行推荐; 淘宝定期会启动MapReduce作业分析前一天或者一个月用户收藏的宝贝,给相同喜好的买家提供推荐服务。

本文要描述的Slope One 算法是一种对评分进行预测的算法, 它相对于SVD, PLSI, LDA这一类model-based算法来说有以下特点:

1. 简单, 容易实现

2. 训练得到的模型可以增量更新

3. 预测速度很快

4. 用户可以只做过一两次评分,就可以获得推荐.

5. 准确度比较理想

 

okay,  找到一篇介绍算法的:http://www.fuchaoqun.com/2008/09/slope_one/

讲的不错,就不再重复了。

英文wiki上也有介绍http://en.wikipedia.org/wiki/Slope_One

其中python的实现比较简洁

# Copyright 2006 Bryan O'Sullivan <bos@serpentine.com>.
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, version 2 or later, which is
# incorporated herein by reference.

class SlopeOne(object):
    def __init__(self):
        self.diffs = {}
        self.freqs = {}

    def predict(self, userprefs):
        preds, freqs = {}, {}
        for item, rating in userprefs.iteritems():
            for diffitem, diffratings in self.diffs.iteritems():
                try:
                    freq = self.freqs[diffitem][item]
                except KeyError:
                    continue
                preds.setdefault(diffitem, 0.0)
                freqs.setdefault(diffitem, 0)
                preds[diffitem] += freq * (diffratings[item] + rating)
                freqs[diffitem] += freq
        return dict([(item, value / freqs[item])
                     for item, value in preds.iteritems()
                     if item not in userprefs and freqs[item] > 0])

    def update(self, userdata):
        for ratings in userdata.itervalues():
            for item1, rating1 in ratings.iteritems():
                self.freqs.setdefault(item1, {})
                self.diffs.setdefault(item1, {})
                for item2, rating2 in ratings.iteritems():
                    self.freqs[item1].setdefault(item2, 0)
                    self.diffs[item1].setdefault(item2, 0.0)
                    self.freqs[item1][item2] += 1
                    self.diffs[item1][item2] += rating1 - rating2
                    print self.diffs[item1][item2]
        for item1, ratings in self.diffs.iteritems():
            for item2 in ratings:
                ratings[item2] /= self.freqs[item1][item2]

if __name__ == '__main__':
    userdata = dict(
        alice=dict(squid=1.0,
                   cuttlefish=0.5,
                   octopus=0.2),
        bob=dict(squid=1.0,
                 octopus=0.5,
                 nautilus=0.2),
        carole=dict(squid=0.2,
                    octopus=1.0,
                    cuttlefish=0.4,
                    nautilus=0.4),
        dave=dict(cuttlefish=0.9,
                  octopus=0.4,
                  nautilus=0.5),
        )
    s = SlopeOne()
    s.update(userdata)
    print s.predict(dict(octopus=0.4)
   

 

现在分析一下Slope One训练的空间及时间复杂度,

如果有m个用户,分别对n件物品进行了评分。每个用户得进行 n 2 次计算,将产生n(n-1)/2级别的数据量(由于diff是个对角矩阵,可以只取下三角)。所以对m个用户来说, CPU计算时间是m n 2 , 产生的中间数据是mn(n-1)/2,最后合并m个用户的这些数据,产生的数据量是n(n-1)/2。

这个算法的计算量对物品数据是呈平方级别地增长,对用户数量是线性的。比较恐怖的是它产生的中间数据,如果某用户物品评价数据为1MB左右, 且数据是double型占8字节, 则有1MB / 8B = 128K,此用户将产生的数据是1MB * (128K - 1) / 2 约为64GB数据量, 这部分中间数据是不可能放在内存的,只能通过磁盘,然而磁盘读写与主存完全不是一个级别,速度上又造成一个瓶颈。

 

当然也不必这么悲观, Slope One是一个可以进行增量的算法。假设已经对y件物品进行了训练,则当前训练的时间复杂度不会超过n 2 +my 2 . 撇开增量算法不管, 我们可以用MapReduce的优势分布式地进行训练(可以同时使用增量和MapReduce)。以Netflix Prize 的数据为例, 它包含480189个用户对17770部影片的评分数据,训练数据有17770个文件,每个文件代表一部影片, 其中第一行是影片的id, 其余行是各用户对此影片的评分记录。

MovieID:

CustomerID,Rating,Date

 

这些文件都比较小,最大的不过4793673字节,最小的才70字节,而MapReduce的文件块为64MB。小文件对于mapreduce任务来说是不利的,将会产生太多mapper. 这儿有一种解决办法,将tar包转成sequecefile .

 

省略此步,直接把解压后的文件put到HDFS,然后使用一道mapreduce把数据转成我们需要的格式。

hadoop dfs -put $NETFLIX_HOME/training_set /user/zhoumin/netflix-source
# 将附件中的代码成slopeone-0.00.1-dev.jar后运行
hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOnePreproccessor /user/zhoumin/netflix-source/user/zhoumin/netflix

  然后用SlopeOneTrainer进行训练。

SlopeOneTrainer的原理每个mapper计算一个用户各item的diff矩阵。了解hadoop中mapper运行机制的人就会发现,有的用户数据量大,很有可能产生上面说的数十GB的中间数据, 远远超过io.sort.mb的值。会造成mapper不停地merge数据,致使速度较慢, 使用36台个slaves的集群运行netflix的这部分训练花了4个多小时,绝大部分时间会花在mapper之上,特别是mapper的merge阶段.

 

于是假如把中间数据交给reducer去处理,更为理想,其实此步训练相当于一个join操作。于是使用hive比较方便。先将原始数据转成hive所需要的格式.

hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOneHive /user/zhoumin/netflix-source /user/zhoumin/netflix-hive
 

然后再建立两张表,netflix是处理后的netflix训练数据, freq_diff是训练后的模型矩阵

CREATE EXTERNAL TABLE netflix(
  movie_id STRING,
  user_id STRING,
  rate DOUBLE,
  rate_date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' 
STORED AS TEXTFILE 
LOCATION '/user/zhoumin/netflix-hive'; 

CREATE TABLE freq_diff (
    movie_id1 STRING,
    movie_id2 STRING,
    freq     DOUBLE,
    diff     DOUBLE
);

  okay,运行训练SQL

INSERT OVERWRITE TABLE freq_diff
SELECT
  nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
FROM
  netflix nf1 
JOIN 
  netflix nf2 ON nf1.user_id = nf2.user_id 
WHERE nf1.movie_id > nf2.movie_id
GROUP BY nf1.movie_id, nf2.movie_id;

 此SQL将会产生两道mapreduce job,使用 explain命令即可以看到, 第一道主要做join的工作,在reduce端会输

出所有的中间数据。Hive自动会调整reducer的数量,但这儿的reducer为3, 跑得比较慢(超过9小时),可以将reducer显式地设大些,我这儿设为160,再跑上面的训练SQL.

set mapred.reduce.tasks=160;

 两道job第一道花了33mins, 35sec,第二道花了1hrs, 29mins, 29sec,训练时间总共约2小时,可以接受。

 

 

训练完毕,就可以试一试预测功能了。假设某用户给影片1000评了2分,那么他将会对其它影片评多少分呢? 他将喜欢哪些影片呢?

 

okay,先做些准备工作

CREATE TABLE predict(
  movie_id STRING,
  rate FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

echo "1000,2" > predict_data

LOAD DATA LOCAL INPATH './predict_data' OVERWRITE INTO TABLE predict;
 

然后就可以进行预测了:

 

CREATE TABLE slopeone_result(
    movie_id STRING,
    freq     DOUBLE,
    pref     DOUBLE,
    rate     DOUBLE
);

INSERT OVERWRITE TABLE slopeone_result
SELECT
    /*+ MAPJOIN(p) */
    movie_id1                          as movie_id,
    sum(freq)                          as freq,
    sum(freq*(diff + rate))            as pref,
    sum(freq*(diff + rate))/sum(freq)  as rate
FROM
    predict p 
JOIN freq_diff fd ON fd.movie_id2 = p.movie_id
GROUP BY movie_id1

 注意上面使用了一个Map-Side Join的hint, 因为predict表非常小,只需要跑一个map only的job就可以完成join,无需shuffle数据给reduce. 这一步把用户自身的movie_id也参与计算,由于hive不支持in,所以结果有些偏差。可以用一道MapReduce作业来做预测这一步。

最后select .. order by一下就知道此用户喜欢哪些影片了。

 

结论:

1. 使用mapreduce,将运算移至reduce端, 避免map端的merge可以有效地提高训练速度

2. Slope One是一种简单易实现的用户推荐算法,而且可以增量训练

3. 结合以上两点,加上BigTable, HyperTable, Voldermort, Cassendera这种分布式key-value存储库,完全可以做到实时用户推荐(HBase甭提了)。

 

 

 

 

-----------------------------------------------------------------------------------------------------

附: hive生成的mr job描述.

hive> explain
    > INSERT OVERWRITE TABLE freq_diff
    > SELECT
    >   nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
    > FROM
    >   netflix nf1
    > JOIN
    >   netflix nf2 ON nf1.user_id = nf2.user_id
    > WHERE nf1.movie_id > nf2.movie_id
    > GROUP BY nf1.movie_id, nf2.movie_id;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF netflix nf1) (TOK_TABREF netflix nf2) (= (. (TOK_TABLE_OR_COL nf1) user_id) (. (TOK_TABLE_OR_COL nf2) user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB freq_diff)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf1) movie_id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf2) movie_id)) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (/ (TOK_FUNCTION sum (- (. (TOK_TABLE_OR_COL nf1) rate) (. (TOK_TABLE_OR_COL nf2) rate))) (TOK_FUNCTION count 1)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        nf2
          TableScan
            alias: nf2
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 1
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
        nf1
          TableScan
            alias: nf1
            Reduce Output Operator
              key expressions:
                    expr: user_id
                    type: string
              sort order: +
              Map-reduce partition columns:
                    expr: user_id
                    type: string
              tag: 0
              value expressions:
                    expr: movie_id
                    type: string
                    expr: rate
                    type: double
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          condition expressions:
            0 {VALUE._col0} {VALUE._col2}
            1 {VALUE._col0} {VALUE._col2}
          outputColumnNames: _col0, _col2, _col4, _col6
          Filter Operator
            predicate:
                expr: (_col0 > _col4)
                type: boolean
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col4
                    type: string
                    expr: _col2
                    type: double
                    expr: _col6
                    type: double
              outputColumnNames: _col0, _col4, _col2, _col6
              Group By Operator
                aggregations:
                      expr: count(1)
                      expr: sum((_col2 - _col6))
                keys:
                      expr: _col0
                      type: string
                      expr: _col4
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1, _col2, _col3
                File Output Operator
                  compressed: false
                  GlobalTableId: 0
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        hdfs://xxx:9000/user/zhoumin/hive-tmp/22895032/10002
            Reduce Output Operator
              key expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              sort order: ++
              Map-reduce partition columns:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
              tag: -1
              value expressions:
                    expr: _col2
                    type: bigint
                    expr: _col3
                    type: double
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: count(VALUE._col0)
                expr: sum(VALUE._col1)
          keys:
                expr: KEY._col0
                type: string
                expr: KEY._col1
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1, _col2, _col3
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col2
                  type: bigint
                  expr: (_col3 / _col2)
                  type: double
            outputColumnNames: _col0, _col1, _col2, _col3
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: string
                    expr: UDFToDouble(_col2)
                    type: double
                    expr: _col3
                    type: double
              outputColumnNames: _col0, _col1, _col2, _col3
              File Output Operator
                compressed: true
                GlobalTableId: 1
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                    name: freq_diff

  Stage: Stage-0
    Move Operator
      tables:
          replace: true
          table:
              input format: org.apache.hadoop.mapred.TextInputFormat
              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              name: freq_diff

 

 

8
1
分享到:
评论
6 楼 egoegmdslls 2012-05-24  
你好,我想请问你使用hive方法,每个节点的配置是怎么样的?
我这边19个节点,8核,16G,160G硬盘,
mapred.tasktracker.map.tasks.maximum = 7
mapred.tasktracker.reduce.tasks.maximum =  7
mapred.reduce.tasks =126
mapred.child.java.opts = -Xmx1024m
训练SQL到了74%,然后往后倒退到68%左右挂掉。。
是否您还做了其他的优化?
急~
5 楼 coderplay 2012-05-11  
cuijiwei1989 写道
您好,请问如何拿到原始数据文件?Netflix Prize 的数据

Google一下, 搜得到
4 楼 cuijiwei1989 2012-05-08  
您好,请问如何拿到原始数据文件?Netflix Prize 的数据
3 楼 iamicebergs 2010-11-08  
我在做分布式计算和数据挖掘方向的学习,每次都能搜到楼主的帖子,这个帖子挺不错的!
2 楼 asdfsx 2010-10-13  
这几天看《集体智慧编程》,正在研究相关的算法知识
并思考如何在分布式的环境中使用
这文章可以当教程收藏一下了
1 楼 itstarting 2009-11-18  
看起来有点吃力,但还是看完了,赞一个

相关推荐

Global site tag (gtag.js) - Google Analytics