Hadoop 实现倒排索引


最近在给给女朋友做大数据选修课作业,顺便把之前学的东西往回捡一捡 2333。

作业要求

1. 程序设计伪代码

不太清楚为什么会要求作业报告中包含伪代码,可以跳过直接看第2、3部分~

  • Mapper

    将每一行的内容分词,输出 key 为“单词,文件名,单词总数”,输出 value 为“出现次数”

    // 获取文件名
    fileName = value.getFileName()
    // 从输入中获取所有单词
    words = value.item
    for(word : words){
        // key = 单词,文件名,单词总数
        key = word + "," + fileName + "," + words.length()
        value = 1
        context.write(key, value)
    }
    
  • Combiner

    针对每一个输入 key,将 value 值累加,并将 key 中的文章放入 value,输出 key 为“单词”,输出 value 为“文件名,单词总数,出现次数 ….. ”

    // 先统计同一文件下相同单词的个数
    for(value: values){
        sum += value
    }
    // 从 key 中取出单词作为下一阶段的 key,把 value 设置为:文件名,单词总数,出现次数
    key = key.subString(0, key.indexOf(","))
    value = key.subString(key.indexOf(",") + "," sum)
    context.write(key, value)
    
  • Reducer

    针对每一个输入 key,以冒号分割,将 value 值中的出现次数取出来累加,输出 key 为“单词”,输出value为“总出现次数:{(文件名,单词总数,出现次数)…}”

    for(value: values){
        // 重设 value
      data[3] = value.split(",")
      fileList = "(" + data[0] + "," + data[2] + "," + data[1] + ")"
      // 统计出现总次数
      sum += data[2];
    }
    //总出现次数:{(文件名,单词总数,出现次数)...}
    result = sum + ":{" + fileList.toString() + "}";
    context.write(key, result)
    

2. 设计思路

map 阶段主要是将文件中的单词根据哈希取出(参考 hadoop 官方提供的 wordcount 例子)

在 reduce 阶段,以单词作为 key ,分两步:

  1. 第一步先计算出每个单词在每个文件中出现的次数
  2. 第二步计算出该单词在所有文件中出现的次数,并格式化输出内容

难点:

由于 Map 阶段与 wordcount 例子不同,在该阶段计算出的 key 是“单词,文件名,单词总数”,因此在 reduce 阶段需要提取出我们最终的想要的单词作为 key。

再根据单词作为 key 进行混洗,并对 value 进行格式化最终输出。

另外,最终输出时需要自定义 map 输出的分隔符

示意图如下:

graph TD
file1[文件1]
file2[文件2]
file3[文件3]
file4[文件4]
map(Map阶段 => key=单词,文件名,单词总数,value=1)
combine(Reduce阶段1 => key=单词, value=文件名,单词总数,出现次数)
reduce(最终结果)
file1 --> |提取|map
file2 --> |提取|map
file3 --> |提取|map
file4 --> |提取|map
map --> |重设 Key, 计算文件内出现次数|combine
combine --> |计算总出现次数,格式化输出|reduce

3. 代码实现

3.1 环境说明

  • Hadoop :2.7.7
  • Java : 1.8

在数据量不大的情况下,使用单机模式即可,maven 项目的 pom.xml 文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.myhadoop</groupId>
    <artifactId>hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.7</version>
        </dependency>
    </dependencies>
</project>

3.2 源代码

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount &#123;
    public static class myMapper extends Mapper<Object, Text, Text, Text>
    &#123;
        private Text one = new Text();
        private Text word = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        &#123;
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            StringTokenizer itr = new StringTokenizer(value.toString());
            String len = String.valueOf(itr.countTokens());
            while (itr.hasMoreElements()) &#123;
                // 文件名
                String fileName = fileSplit.getPath().getName();
                // 单词,文件名,总单词数
                word.set(itr.nextToken() + "," + fileName + "," + len);
                one.set("1");
                context.write(word, one);
            &#125;
        &#125;
    &#125;

    public static class myCombiner extends Reducer<Text,Text,Text,Text> &#123;
        Text info = new Text();
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException &#123;
            int sum = 0;
            for (Text value : values) &#123;
                sum += Integer.parseInt(value.toString());
            &#125;
            int splitIndex = key.toString().indexOf(",");
            info.set(key.toString().substring(splitIndex + 1) + "," + sum);
            //取出单词作为 key
            key.set(key.toString().substring(0, splitIndex));
            context.write(key, info);
        &#125;
    &#125;

    public static class myReducer extends Reducer<Text, Text, Text, Text>
    &#123;
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException
        &#123;
            StringBuilder fileList = new StringBuilder();
            String[] data;
            int sum =0;
            for (Text value : values)
            &#123;
                data = value.toString().split(",");
                //格式化输出:(文件名,出现次数, 单词总数)
                fileList.append("(").append(data[0]).append(",").append(data[2]).append(",").append(data[1]).append(")");
                // 统计出现总次数
                sum += Integer.parseInt(data[2]);
            &#125;
            //总出现次数:&#123;(文件名,单词总数,出现次数)...&#125;
            result.set(sum + ":&#123;" + fileList.toString() + "&#125;");
            key.set(key.toString());
            context.write(key,result);
        &#125;
    &#125;

    public static void main(String[] args) throws Exception
    &#123;
        Configuration conf = new Configuration();
        // 设置分隔符
        conf.set("mapred.textoutputformat.separator", "->");
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);

        job.setMapperClass(myMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setCombinerClass(myCombiner.class);

        job.setReducerClass(myReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    &#125;
&#125;

3.3 输出结果

bear->1:{(d2.txt,1,5)}
cat->6:{(d2.txt,3,5)(d1.txt,2,4)(d4.txt,1,5)}
dog->2:{(d1.txt,1,4)(d3.txt,1,3)}
fox->3:{(d3.txt,1,3)(d2.txt,1,5)(d1.txt,1,4)}
hen->1:{(d4.txt,1,5)}
rabbit->1:{(d4.txt,1,5)}
sheep->1:{(d4.txt,1,5)}
wolf->2:{(d4.txt,1,5)(d3.txt,1,3)}

文章作者: SuperChen
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC 4.0 许可协议。转载请注明来源 SuperChen !
评论
  目录