文章 56
评论 99
浏览 272355
hadoop平台wordcount程序的python实现

hadoop平台wordcount程序的python实现

摘要: ​尽管 Hadoop 框架是用 Java 写的,但是 Hadoop 程序不限于 Java,可以用 python、C++、Ruby 等。本例子中直接用 python 写一个 MapReduce 实例,而不是用 Jython 把 python 代码转化成 jar 文件。

例子的目的是统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:文本(每行包括单词和单词的词频,两者之间用'\t'隔开)

Python MapReduce 代码

使用 python 写 MapReduce 的“诀窍”是利用 Hadoop 流的 API,通过 STDIN(标准输入)、STDOUT(标准输出)在 Map 函数和 Reduce 函数之间传递数据。
我们唯一需要做的是利用 Python 的 sys.stdin 读取输入数据,并把我们的输出传送给 sys.stdout。Hadoop 流将会帮助我们处理别的任何事情。

Map 阶段:mapper.py

在这里,我们假设把文件保存到 hadoop-0.20.2/test/code/mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

文件从 STDIN 读取文件。把单词切开,并把单词和词频输出 STDOUT。Map 脚本不会计算单词的总数,而是输出 1。在我们的例子中,我们让随后的 Reduce 阶段做统计工作。

为了是脚本可执行,增加 mapper.py 的可执行权限

chmod +x hadoop-0.20.2/test/code/mapper.py

Reduce 阶段:reducer.py

在这里,我们假设把文件保存到 hadoop-0.20.2/test/code/reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是数字的话,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘记最后的输出
    print "%s\t%s" % (current_word, current_count)

文件会读取 mapper.py 的结果作为 reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到 STDOUT。

为了是脚本可执行,增加 reducer.py 的可执行权限

chmod +x hadoop-0.20.2/test/code/reducer.py

细节:split(chara, m),第二个参数的作用,下面的例子很给力:

str = 'server=mpilgrim&ip=10.10.10.10&port=8080'
print str.split('=', 1)[0]  #1表示=只截一次
print str.split('=', 1)[1]
print str.split('=')[0]
print str.split('=')[1]

输出:

server  
mpilgrim&ip=10.10.10.10&port=8080  
server  
mpilgrim&ip  

测试代码(cat data | map | sort | reduce)

这里建议大家在提交给 MapReduce job 之前在本地测试 mapper.py 和 reducer.py 脚本。否则 jobs 可能会成功执行,但是结果并非自己想要的。

功能性测试 mapper.py 和 reducer.py

[rte@hadoop-0.20.2]$cd test/code
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py
foo 1
foo 1
quux    1
labs    1
foo 1
bar 1
quux    1
[rte@code]$echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
bar 1
foo 3
labs    1
quux    2

细节:sort -k1,1 参数何意?

-k, -key=POS1[,POS2] 键以 pos1 开始,以 pos2 结束

有时候经常使用 sort 来排序,需要预处理把需要排序的 field 语言在最前面。实际上这是

完全没有必要的,利用-k 参数就足够了。

比如 sort all

1 4
2 3
3 2
4 1
5 0

如果 sort -k 2 的话,那么执行结果就是

5 0
4 1
3 2
2 3
1 4

在 Hadoop 上运行 python 代码

数据准备

下载以下三个文件

我把上面三个文件放到 hadoop-0.20.2/test/datas/目录下

运行

把本地的数据文件拷贝到分布式文件系统 HDFS 中。

bin/hadoop dfs -copyFromLocal /test/datas  hdfs_in

查看

bin/hadoop dfs -ls

结果

drwxr-xr-x   - rte supergroup          0 2014-07-05 15:40 /user/rte/hdfs_in

##查看具体的文件

bin/hadoop dfs -ls /user/rte/hdfs_in

执行 MapReduce job

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file test/code/mapper.py     -mapper test/code/mapper.py \
-file test/code/reducer.py    -reducer test/code/reducer.py \
-input /user/rte/hdfs_in/*    -output /user/rte/hdfs_out

实例输出

image

查看输出结果是否在目标目录/user/rte/hdfs_out

bin/hadoop dfs -ls /user/rte/hdfs_out

输出:

Found 2 items
drwxr-xr-x   - rte supergroup          0 2014-07-05 20:51 /user/rte/hdfs_out2/_logs
-rw-r--r--   2 rte supergroup     880829 2014-07-05 20:51 /user/rte/hdfs_out2/part-00000

查看结果:

bin/hadoop dfs -cat /user/rte/hdfs_out2/part-00000

输出
image

以上已经达成目的了,但是可以利用 python 迭代器和生成器优化

利用 python 的迭代器和生成器优化 Mapper 和 Reducer 代码

python 中的迭代器和生成器

python迭代器和生成器

优化 Mapper 和 Reducer 代码

mapper.py

#!/usr/bin/env python
import sys
def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print "%s%s%d" % (word, separator, 1)

if __name__ == "__main__":
    main()

reducer.py

#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator = '\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator = '\t'):
    data = read_mapper_output(sys.stdin, separator = separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except valueError:
            pass

if __name__ == "__main__":
    main()

细节:groupby

from itertools import groupby
from operator import itemgetter

things = [('2009-09-02', 11),
          ('2009-09-02', 3),
          ('2009-09-03', 10),
          ('2009-09-03', 4),
          ('2009-09-03', 22),
          ('2009-09-06', 33)]

sss = groupby(things, itemgetter(0))
for key, items in sss:
    print key
    for subitem in items:
        print subitem
    print '-' * 20

结果:

>>>
2009-09-02
('2009-09-02', 11)
('2009-09-02', 3)
--------------------
2009-09-03
('2009-09-03', 10)
('2009-09-03', 4)
('2009-09-03', 22)
--------------------
2009-09-06
('2009-09-06', 33)
--------------------

注:

groupby(things, itemgetter(0)) 以第 0 列为排序目标

groupby(things, itemgetter(1))以第 1 列为排序目标

groupby(things)以整行为排序目标


微信公众号

潘建锋

关于版权和转载

本文由 潘建锋 创作,采用 署名 4.0 国际 (CC BY 4.0) 国际许可协议进行授权。
本站文章除注明转载/出处外,均为本站原创或翻译,转载时请务必署名,否则,本人将保留一切追究责任的权利。
署名 4.0 国际 (CC BY 4.0)

转载规范

标题:hadoop平台wordcount程序的python实现
作者:潘建锋
原文:https://taohuawu.club/wordcout-in-hadoop-by-python

关于留言和评论

如果您对本文《hadoop平台wordcount程序的python实现》的内容有任何疑问、补充或纠错,欢迎在下面的评论系统中留言,与作者一起交流进步,谢谢!(~ ̄▽ ̄)~

鲜衣怒马提银枪,一日看尽长安花,此间少年。