Hadoop 程序中常用到的计算就如下几种:汇总(分组汇总)、JoinLeftAnti)和排序(或是根据排序的结果进行计算)。

下面就说一下这几种计算如果开发。

  • 汇总计算的开发相对简单,并且汇总计算也最能体现出 Hadoop 计算的特别,好比以前一个人做的事现在可以同时分给好几个人做。

    Mapper:在进行汇总类计算时 Mapper 程序主要需要完成的任务是将分组字段读取后作为第一字段输出,这样作的目的是在Map->Reduce 分发时,Hadoop 将依据此字段进行分发将同一组数据排在一起并发送到同一个 Reducer 中。

    另外在 Mapper 中完成汇总程序时通常还会完成另一个目的,以提搞程序的计算性能。就是将读入的数据先进行一次分组计算后再将,计算的结果输出。

    Partitioner:汇总计算不用配置 Partitioner,因为汇总计算不需要进行二次排序。运行参数据中只需要设置第一个字段为Key 字段就可以了。

    Reducer:经过以上处理后在 Reducer 收到数据时,数据已按组排列好。所以程序只用一条条数据读入进行计算就可以了。

    例:现在有一份某年级学生 X 课的成绩表,分别有以下字段,学号 ID、班级 ID、成绩。用 Hadoop 程序完成计算此年级X 课在各班的平均分计算。

    Mapper 部分代码(Python 实现):

    import sys  
    def mapperprocess():
     combine = {}  #Python中字典的初始化,字典与Java中Map对应,KV的数据结构
     for line in sys.stdin:  #以循环的方式一行行从标准输入中读入数据
      fields = line[:-1].split(“\t”)   #line[:-1]的作用是将一行中的最后一个字符也就是\n,python中可以以数组的方式直接操作字符串
      stdNO= fields[0]
      clsNO=fields[1]
      score=fields[2]
      aClsData= combine.get(clsNO,[ clsNO,0,0]) #从字典中拿数据可以这样data[key],get的作用是当Key没有对应的Value时返回后面给定的值,在这里也就是[ clsNO,0,0]
      clsStdAmount= aClsData[1]+1
      clsStdSumScore= aClsData[2]+int(score)
      combine[clsNO]=( clsNO, clsStdAmount, clsStdSumScore)
     for aClsNO in combine:
      ( clsNO, clsStdAmount, clsStdSumScore) = combine[aClsNO] #这在Python中是分解组元的方法,在这里是把[ clsNO,0,0]这样一个数组中的每个值分别赋给了clsNO, clsStdAmount, clsStdSumScore这三个变量(按顺序)
      print “\t”.join((clsNO, ”%s”% clsStdAmount, ”%s”% clsStdSumScore)) #Print是将字符串输出到标准输出中。join是字符函数作用是将 “,”.join([1,2,3])变为 “1,2,3”.”%s”%XXXX是将XXXX变量进行字符格式化类似JAVA中的sprintf
    
    if __name__ == '__main__':   #这个是入口判断,和JAVA中Main的作用一样
     mapperprocess()

    Reducer 部分代码:

    import sys
    def reducerprocess():
    lastClsNO=””
    allClsStdAmount=0
    allClsStdSumScore=0
    for line in sys.stdin:
      fields = line[:-1].split(“\t”)
      clsNO = fields[0]
      clsStdAmount =fields[1]
      clsStdSumScore =fields[2]
      
      if clsNO != lastClsNO:    #判断:当分组的Key变化时则表示上一组数据已读完,可以对上一组数据进行最后的计算处理并输出
       if lastClsNO != “”:
        print “\t”.join(lastClsNO,”%s”%( allClsStdSumScore/ allClsStdAmount))
    allClsStdAmount=0
    allClsStdSumScore=0
      lastClsNO= clsNO
     
    allClsStdAmount += clsStdAmount
    allClsStdSumScore += clsStdSumScore
    print “\t”.join(lastClsNO,”%s”%( allClsStdSumScore/ allClsStdAmount)) #最后一组数据不会经过数据变组的判断所以需要在读入的循环之后进行计算处理并输出。

    执行脚本 :

    hadoop streaming $HADOOP_RUNNING_CONF \
    -D mapred.map.tasks=300 \  指定Map的任务个数。运行时不一定为指定数目,还是以输入的文件数由Hadoop本身来确定
    -D mapred.reduce.tasks=233 \  指定Reduce的任务个数
    -D mapred.job.map.capacity=300 \ 指定运行Map的个数,代表执行时同时会有多个个Map运行
    -D mapred.job.reduce.capacity=233 \指定运行Reduce的个数,代表执行时同时会有多个个Reduce运行
    -D mapred.job.name="std_avg" \
    -D stream.num.map.output.key.fields=1 \ 指定以数据中第一列为Key
    -input "/input/data" \
    -output "/output/data" \
    -mapper "python26/bin/python26.sh mapper.py" \
    -reducer "python26/bin/python26.sh reducer.py" \
    -file "mapper.py" \
    -file "reducer.py" \
    -cacheArchive "/share/python26.tar.gz#python26" 
    

  • Hadoop 常常也用来进行数据 Join 的计算,常用的 Join 操作有 Inner joinLeft joinAnti join 等等。这些 Join 的计算逻辑基本相似,下面以 Left Join 来讲如何编写 Join 计算。

    Mapper:通过环境变量 map_input_file 得到读入的数据文件名,并给不同来源的数据加上数据标识列作为第 2 个字段,将关联的 Key 作为第一个字段进行输出。数据标识列中,小数据量的数据指定标识为较小的数如 1 ,大数据量的数据指定为较大的数如 2

    Partitioner:指定 Partitioner 依据的 Key 为第一个字段,数据记录的 Key 为前二个字段。

    Reducer:根据一组 Key 相同的方式来进行 Join 计算,因为经过 Hadoop 排序,所以一组数据中会先读标识为 1 的数据。先将标识为 1 的数据存放到内存中,再读入数据标识为 2 的的记录时与内存中的数据进行合并后再输出,这样就完成了 Join 操作。

    例:现在有某课的学生成绩数据(学号、成绩)与学生信息数据(学号、姓名、…),现在需要得到学生成绩表(姓名、成绩)。

    学生成绩数据:


    学生信息数据:


    Map 完成执行后输出的数据如下:


    Hadoop 完成排序后输入到 Reduce 时的数据如下:



    执行脚本 :

    hadoop streaming $HADOOP_RUNNING_CONF \
    -D mapred.map.tasks=300 \  
    -D mapred.reduce.tasks=233 \  
    -D mapred.job.map.capacity=300 \ 
    -D mapred.job.reduce.capacity=233 \
    -D mapred.job.name="std_avg" \
    -D num.key.fields.for.partition=1 \  指定以第一个字段进行分发,第一个字段相同的一定会放在一起作为一组并分发到同一个Reduce中
    -D stream.num.map.output.key.fields=2 \   指定以前二个字段作为数据的Key进行排序,这样数据标识列为1的数据一定会被Reduce先读到
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \  指定分发的执行类,当需要Join计算时一定要加入此行
    -input "/input/data" \
    -output "/output/data" \
    -mapper "python26/bin/python26.sh mapper.py" \
    -reducer "python26/bin/python26.sh reducer.py" \
    -file "mapper.py" \
    -file "reducer.py" \
    -cacheArchive "/share/python26.tar.gz#python26" 
  • 排序计算相对于汇总和 Join 来说用得要少一些,从逻辑上来说排序计算的原理和 Join 是一样的——“二次排序”,Hadoop 中配置对应的 Partition 后就可以做到以第一列分组,第二列排序的效果。在排序计算中需要注意的是 Hadoop 是以字符进行排序,所以需要以数字排序时需要注意以下问题:如需要对以下数列进行排序:

    1,2,3,10,20

    需要前补 0 后才可进行排序否则 Hadoop 的排序结果如下:

    1,10,2,10,3

    0 后排序的结果如下:

    01,02,03,10,20