三 MapReduce框架原理
3.1 MapReduce工作流程
1)流程示意图
2.Submit()方法包含在这里面–
然后接着是切片处理数据(128M为一片)。很明显图例200M的文件需要切成两片处理。分配两个map进行计算操作
3.正式提交任务到yarn上,包含一些job的相关信息。
4.MrAppMaster进行资源调度。根据片块数分配相应数量的MapTask(这里分配两个MapTask)
5.然后MapTask根据InputFormat去读取文本数据。一行一行的经过Mapper程序的map()方法进行计算操作,最后输出到分区中,并有序的存储。
6.等到所有MapTask计算完毕后。启动MrAppMaster启动相对应分区数量的reduce数量进行统计操作。最后生成多个分区对应的统计文件。输出。
2)流程详解
上面的流程是整个mapreduce最全工作流程,但是shuffle过程只是从第7步开始到第16步结束,具体shuffle过程详解,如下:
1)maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程中,及合并的过程中,都要调用partitioner进行分区和针对key进行排序
5)reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6)reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
3)注意
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认100M。
3.2 InputFormat数据输入
3.2.1 Job提交流程和切片源码详解
1)job提交流程源码详解
|
2)FileInputFormat源码解析(input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历处理(规划切片)目录下的每一个文件
(3)遍历第一个文件ss.txt
a)获取文件大小fs.sizeOf(ss.txt);
b)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默认情况下,切片大小=blocksize
d)开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成。
g)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
h)注意:block是HDFS物理上存储的数据,切片是对数据逻辑上的划分。
(4)提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
23.2.2 FileInputFormat切片机制
1)FileInputFormat中默认的切片机制:
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片(他会遍历输入目录里面的文件,一个一个处理,debug查看FileInputFormat的getSplits方法可知)
比如待处理数据有两个文件:
file1.txt 320M file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
|
2)FileInputFormat切片大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。
maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。
3)获取切片信息API
|
3.2.3 CombineTextInputFormat切片机制
1)关于大量小文件的优化策略
1)默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。
2)优化策略
(1)最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。
(2)补救措施:如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。
(3)优先满足最小切片大小,不超过最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
0.5+1+0.3 = 1.8没有满足最小切片大小,所以向5借0.2M,最后合并成2+4.8,但是4.8大于最大切片数,所以拆成4+0.8 ,所以这个四个小文件最后合并成三个文件
3)具体实现步骤
注意CombineTextInputFormat的jar包是:
|
4)案例
大量小文件的切片优化(CombineTextInputFormat)。
4.1 数据准备
准备5个小文件(这里准备五个txt文本)
4.2 我们依旧使用我们上一个章节使用的统计文本中单词出现个数的代码
代码详见 《hadoop大数据(十)-Mapreduce基础 的 1.5 4) 章节案例》
先不进行任何的改造操作,直接用着五个小文件当做输入,运行后查看日志。
(1)不做任何处理,运行需求1中的wordcount程序,观察切片个数为5
(2)在WordcountDriver中增加如下代码,运行程序,并观察运行的切片个数为1
|
3.2.4 InputFormat接口实现类
MapReduce任务的输入文件一般是存储在HDFS里面。输入的文件格式包括:基于行的日志文件、二进制格式文件等。这些文件一般会很大,达到数十GB,甚至更大。那么MapReduce是如何读取这些数据的呢?下面我们首先学习InputFormat接口。
InputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
1)TextInputFormat
TextInputFormat是默认的InputFormat。每条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。
以下是一个示例,比如,一个分片包含了如下4条文本记录。
|
每条记录表示为以下键/值对:
|
很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。
2)KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, “ “);来设定分隔符。默认分隔符是tab(\t)。
以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符。
|
每条记录表示为以下键/值对:
|
此时的键是每行排在制表符之前的Text序列。
3)NLineInputFormat
如果使用NlineInputFormat,代表每个map进程处理的InputSplit不再按block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1。
以下是一个示例,仍然以上面的4行输入为例。
|
例如,如果N是2,则每个输入分片包含两行。开启2个maptask。
(0,Rich learning form)
(19,Intelligent learning engine)
另一个 mapper 则收到后两行:
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)
这里的键和值与TextInputFormat生成的一样。
3.2.5 自定义InputFormat
1)概述
(1)自定义一个类继承FileInputFormat。
(2)改写RecordReader,实现一次读取一个完整文件封装为KV。
(3)在输出时使用SequenceFileOutPutFormat输出合并文件。
2)案例
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。将多个小文件合并成一个文件SequenceFile,SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
(2)在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
(3)在mapreduce处理时,可采用CombineTextInputFormat提高效率
2.1 数据准备
准备三个文本文件。
|
最终预期文件格式:
part-r-00000
2.2 代码实现
使用自定义InputFormat的方式,处理输入小文件的问题。
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
(3)在输出时使用SequenceFileOutPutFormat输出合并文件
|
(2)自定义RecordReader
|
(3)SequenceFileMapper处理流程
|
(4)SequenceFileReducer处理流程
|
(5)SequenceFileDriver处理流程
|
3.3 MapTask工作机制
3.3.1 并行度决定机制
1)问题引出
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,mapTask并行任务是否越多越好呢?
2)MapTask并行度决定机制
一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
3.3.2 MapTask工作机制
(1)Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner—调用用户自定义getPartition方法),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
https://blog.csdn.net/qq_41455420/article/details/79288764
好的总结:
3.4 Shuffle机制
3.4.1 Shuffle机制
Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入传给reducer)称为shuffle。
3.4.2 Partition分区
分区的行为在每一次的map操作都会调用一或者多次
0)问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
默认只输出到一个分区,也就是结果输出到一个文件
1)默认partition分区
|
默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。(numReduceTasks默认是1,也就是说,默认返回0,也就是只创建一个分区,所以是part-r-00000)
2)自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
|
(2)在job驱动中,设置自定义partitioner:
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
3)注意:
如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
(2)job.setNumReduceTasks(2);会报错
(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
4)案例
4.1 案例1
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
1)数据准备
phone.txt
|
2)分析
(1)Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发
(2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
自定义一个CustomPartitioner继承抽象类:Partitioner
(3)在job驱动中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
3)在<hadoop大数据(十)-Mapreduce基础 章节的2.6.2 案例>的基础上,增加一个分区类
|
在驱动函数中增加自定义数据分区设置和reduce task设置
|
4.2 案例2
把单词按照ASCII码奇偶分区(Partitioner),结合<hadoop大数据(十)-Mapreduce基础 的 1.5 4) 章节–统计一堆文件中单词出现的个数>
只需要在此代码的基础上,添加自定义分区
|
在驱动类中配置加载分区,设置reducetask个数
|
5)总结
l 结果输出文件,跟分区数量和reduce数量有关系
l getPartition方法是在map调用之后才会进入,而且是每一次map可能会调用多次getPartition。为什么说是多次调用分区方法呢?我们知道每一次进入map方法都是一行数据(例如 hello.txt的第一行hello kingge),那么经过分割后生成两个单词,调用两次**context.write()所以为了确定这两个单词所属那个分区,那么就需要调用两次getPartition。也就说在这个例子中,一次map调用处理完后需要调用两次getPartition。(即:context.write()内部会进行分区)
l 如果job.setNumReduceTasks(1)(也就是保持默认值),那么就是生成一个分区,不会进入自定义的分区方法。Redeucetask必须大于1,自定义分区方法才会生效。
3.4.3 WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
每个阶段的默认排序
1)排序的分类:
(1)部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。例如输出文件到五个分区,那么部分排序能够保证各个五个分区的数据都是有序的。
(2)全排序:
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区,那么这个分区里面的数据全局都是排序的。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。
替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。这种方式可以达到全排序的功能
(3)辅助排序:(GroupingComparator分组)
Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。
(4)二次排序:
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2)自定义排序WritableComparable
(1)原理分析
bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
|
3)案例
3.1 案例1
在<hadoop大数据(十)-Mapreduce基础 章节的2.6.2 案例>输出结果的基础上增加一个新的需求
根据2.6.2 案例输出的结果:再次对总流量进行排序
1)数据准备 phone.txt
|
2)分析
(1)把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序
(2)context.write(总流量,手机号)
(3)FlowBean实现WritableComparable接口重写compareTo方法
|
3)代码实现
(1)FlowBean对象在在需求2.6.2基础上增加了比较功能(compareTo)
|
(2)编写mapper
|
(3)编写reducer
|
(4)编写driver
|
3.2 案例2
改造案例1的需求
要求每个省份手机号输出的文件中按照总流量内部排序。(部分排序)
2)做法
在案例1的基础上增加自定义分区类即可。
|
(2)在驱动类中添加分区类
|
3.4.4 GroupingComparator分组(辅助排序)
1)对reduce阶段的数据根据某一个或几个字段进行分组。
2)案例
求出每一个订单中最贵的商品(GroupingComparator)
1)需求
有如下订单数据
订单id | 商品id | 成交金额 |
---|---|---|
0000001 | Pdt_01 | 222.8 |
0000001 | Pdt_06 | 25.8 |
0000002 | Pdt_03 | 522.8 |
0000002 | Pdt_04 | 122.4 |
0000002 | Pdt_05 | 722.4 |
0000003 | Pdt_01 | 222.8 |
0000003 | Pdt_02 | 33.8 |
现在需要求出每一个订单中最贵的商品。
2)输入数据
goods.txt
|
输出数据预期:
3 222.8 2 722.4 1 222.8
3)分析
(1)利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce。
(2)在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值。
4)代码实现
(1)定义订单信息OrderBean
|
(2)编写OrderSortMapper
|
(3)编写OrderSortPartitioner
|
(4)编写OrderSortGroupingComparator
|
(5)编写OrderSortReducer
|
(6)编写OrderSortDriver
|
3.4.5 Combiner合并
1)combiner是MR程序中Mapper和Reducer之外的一种组件。
2)combiner组件的父类就是Reducer。
3)combiner和reducer的区别在于运行的位置:
Combiner是在每一个maptask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果;
4)combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
5)combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。
|
很明显,combiner不适合做求平均值这样的操作。他适合做汇总这样的业务场景。
6)自定义Combiner实现步骤:
(1)自定义一个combiner继承Reducer,重写reduce方法
|
(2)在job驱动类中设置:
job.setCombinerClass(WordcountCombiner.class);
7)案例
前提:结合<hadoop大数据(十)-Mapreduce基础 的 1.5 4) 章节–统计一堆文件中单词出现的个数> 代码
数据输入也是同上
需求:统计过程中对每一个maptask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
方案一
1)增加一个WordcountCombiner类继承Reducer
|
// 9 指定需要使用combiner,以及用哪个类作为combiner的逻辑 job.setCombinerClass(WordcountCombiner.class);
|
方案二
1)将WordcountReducer作为combiner在WordcountDriver驱动类中指定
|
运行程序
总结
自定义Combiner的调用时机:是在MapTask阶段的split溢写阶段,需要写入到磁盘的之前进行。将有相同 key 的 key/value 对的 value 加起来,减少溢写到磁盘的数据量。调用完后进入**reduce**方法
3.5 ReduceTask工作机制
1)设置ReduceTask并行度(个数)
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4 job.setNumReduceTasks(4);
2)注意
(1)reducetask=0 ,表示没有reduce阶段,输出文件个数和map个数一致。
例子7.1.1 job.setNumReduceTasks(0); 输出
生成一个分区,但是分区内的单词没有汇总
(2)reducetask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜(也就是说,相同key被partition分配到一个分区里,造成了’一个人累死,其他人闲死’的情况)
(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。
(5)具体多少个reducetask,需要根据集群性能而定。
(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行。
3)实验:测试reducetask多少合适。
(1)实验环境:1个master节点,16个slave节点:CPU:8GHZ,内存: 2G
(2)实验结论:
表1 改变reduce task (数据量为1GB)
Map task =16 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
Reduce task | 1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 |
总时间 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |
4)ReduceTask工作机制
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
3.6 OutputFormat数据输出
3.6.1 OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1)文本输出TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
2)SequenceFileOutputFormat
SequenceFileOutputFormat将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
3)自定义OutputFormat
根据用户需求,自定义实现输出。
3.6.2 自定义OutputFormat
为了实现控制最终文件的输出路径,可以自定义OutputFormat。
要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
1)自定义OutputFormat步骤
(1)自定义一个类继承FileOutputFormat。
(2)改写recordwriter,具体改写输出数据的方法write()。
2)案例
修改日志内容及自定义日志输出路径(自定义OutputFormat)。
1)需求
过滤输入的log日志中是否包含kingge
(1)包含kingge的网站输出到e:/kingge.log
(2)不包含kingge的网站输出到e:/other.log
2)输入数据(pp.txt)
|
输出预期:
kingge.log文件包含: http://www.kingge.com
other.log文件包含:
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
3)代码实现:
(1)自定义一个outputformat
|
(2)具体的写数据RecordWriter
|
(3)编写FilterMapper
|
(4)编写FilterReducer
|
(5)编写FilterDriver
|
3.7 Join多种应用
3.7.1 Reduce join
1)原理:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。
2)该方法的缺点
这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
3)案例
reduce端表合并(数据倾斜)
通过将关联条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce
task,在reduce中进行数据的串联。
1)代码实现
1.1 创建商品和订合并后的bean类
|
2)编写TableMapper程序
|
3)编写TableReducer程序
|
4)编写TableDriver程序
|
3)运行程序查看结果
|
缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端实现数据合并
3.7.2 Map join(Distributedcache分布式缓存)
1)使用场景:一张表十分小、一张表很大。
2)解决方案
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用distributedcache
(1)在mapper的setup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
job.addCacheFile(new URI(“file:/e:/mapjoincache/pd.txt”));// 缓存普通文件到task运行节点
4)案例:
map端表合并(Distributedcache) - 结合上个案例代码(3.7.1 3 案例)
1)分析
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行合并并输出最终结果,可以大大提高合并操作的并发度,加快处理速度。
2)实操案例
(1)先在驱动模块中添加缓存文件
|
(2)读取缓存的文件数据
|
3.8 数据清洗(ETL)
1)概述
在运行核心业务Mapreduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行mapper程序,不需要运行reduce程序。
2)案例
日志清洗(数据清洗)。
简单解析版
1)需求:
去除日志中字段长度小于等于11的日志。
2)输入数据
里面的内容就是我们平时网站输出的日志。例如:
|
3)实现代码:
(1)编写LogMapper
|
(2)编写LogDriver
|
复杂解析版
1)需求:
对web访问日志中的各字段识别切分
去除日志中不合法的记录
根据统计需求,生成各类访问请求过滤数据
2)输入数据
输入同上一个案例
3)实现代码:
(1)定义一个bean,用来记录日志数据中的各数据字段
|
(2)编写LogMapper程序
|
(3)编写LogDriver程序
|
3.9 计数器应用
Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。
1)API
(1)采用枚举的方式统计计数
enum MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)采用计数器组、计数器名称的方式统计
context.getCounter(“counterGroup”, “countera”).increment(1);
组名和计数器名称随便起,但最好有意义。
(3)计数结果在程序运行后的控制台上查看。
2)案例
数据清洗的两个案例
3.10 MapReduce开发总结
在编写mapreduce程序时,需要考虑的几个方面:
1)输入数据接口:InputFormat
默认使用的实现类是:TextInputFormat
TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key,value。默认分隔符是tab(\t)。
NlineInputFormat按照指定的行数N来划分切片。
CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。
用户还可以自定义InputFormat。
2)逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
3)Partitioner分区
有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
如果业务上有特别的需求,可以自定义分区。
4)Comparable排序
当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
部分排序:对最终输出的每一个文件进行内部排序。
全排序:对所有数据进行排序,通常只有一个Reduce。
二次排序:排序的条件有两个。
5)Combiner合并
Combiner合并可以提高程序执行效率,减少io传输。但是使用时必须不能影响原有的业务处理结果。
6)reduce端分组:Groupingcomparator
reduceTask拿到输入数据(一个partition的所有数据)后,首先需要对数据进行分组,其分组的默认原则是key相同,然后对每一组kv数据调用一次reduce()方法,并且将这一组kv中的第一个kv的key作为参数传给reduce的key,将这一组数据的value的迭代器传给reduce()的values参数。
利用上述这个机制,我们可以实现一个高效的分组取最大值的逻辑。
自定义一个bean对象用来封装我们的数据,然后改写其compareTo方法产生倒序排序的效果。然后自定义一个Groupingcomparator,将bean对象的分组逻辑改成按照我们的业务分组id来分组(比如订单号)。这样,我们要取的最大值就是reduce()方法中传进来key。
7)逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()
8)输出数据接口:OutputFormat
默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。
SequenceFileOutputFormat将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
用户还可以自定义OutputFormat。