柚子快报邀请码778899分享:大数据技术之
第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案
第6章 Hadoop企业优化(重中之重)
6.1 MapReduce 跑的慢的原因
6.2 MapReduce优化方法
MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。
6.2.1 数据输入
6.2.2 Map阶段
6.2.3 Reduce阶段
1、
2、
6.2.4 I/O传输
6.2.5 数据倾斜问题
1、
2、
6.2.6 常用的调优参数
1、资源相关参数(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)2、容错相关参数(MapReduce性能优化)
6.3 HDFS小文件优化方法
6.3.1 HDFS小文件弊端
HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。
6.3.2 HDFS小文件解决方案
小文件的优化无非以下几种方式: (1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。 (2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。 (3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。1、
2、
第7章 MapReduce扩展案例
7.1 倒排索引案例(多job串联)
1、需求 有大量的文本(文档、网页),需要建立搜索索引,如下图所示。(1)数据输入
(2)期望输出数据
atguigu c.txt-->2 b.txt-->2 a.txt-->3 pingping c.txt-->1 b.txt-->3 a.txt-->1 ss c.txt-->1 b.txt-->1 a.txt-->2
2、需求分析
3、第一次处理(1)第一次处理,编写OneIndexMapper类
package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class OneIndexMapper extends Mapper
(2)第一次处理,编写OneIndexReducer类
package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class OneIndexReducer extends Reducer
(3)第一次处理,编写OneIndexDriver类
package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneIndexDriver { public static void main(String[] args) throws Exception { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "d:/temp/atguigu/0529/input/inputoneindex", "d:/temp/atguigu/0529/output17" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OneIndexDriver.class); job.setMapperClass(OneIndexMapper.class); job.setReducerClass(OneIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
(4)查看第一次输出结果
atguigu---a.txt 3atguigu---b.txt 2atguigu---c.txt 2pingping---a.txt 1pingping---b.txt 3pingping---c.txt 1ss---a.txt 2ss---b.txt 1ss---c.txt 1
4、第二次处理(1)第二次处理,编写TwoIndexMapper类
package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class TwoIndexMapper extends Mapper
(2)第二次处理,编写TwoIndexReducer类
package com.atguigu.mr.index;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class TwoIndexReducer extends Reducer
(3)第二次处理,编写TwoIndexDriver类
package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TwoIndexDriver { public static void main(String[] args) throws Exception { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "d:/temp/atguigu/0529/input/inputtowindex", "d:/temp/atguigu/0529/output18" }; Configuration config = new Configuration(); Job job = Job.getInstance(config); job.setJarByClass(TwoIndexDriver.class); job.setMapperClass(TwoIndexMapper.class); job.setReducerClass(TwoIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
(4)第二次查看最终结果
atguigu c.txt-->2 b.txt-->2 a.txt-->3pingping c.txt-->1 b.txt-->3 a.txt-->1ss c.txt-->1 b.txt-->1 a.txt-->2
7.2 TopN案例
1、需求 对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息。(1)输入数据 (2)输出数据
2、需求分析 同上图。3、实现代码(1)编写FlowBean类
package com.atguigu.mr.topn;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable
(2)编写TopNMapper类
package com.atguigu.mr.topn;import java.io.IOException;import java.util.Iterator;import java.util.TreeMap;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class TopNMapper extends Mapper
(3)编写TopNReducer类
package com.atguigu.mr.topn;import java.io.IOException;import java.util.Iterator;import java.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class TopNReducer extends Reducer
(4)编写TopNDriver类
package com.atguigu.mr.topn;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TopNDriver { public static void main(String[] args) throws Exception { args = new String[] { "d:/temp/atguigu/0529/input/inputtopn", "d:/temp/atguigu/0529/output20" }; // 1、获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 6、指定本程序的jar包所在的本地路径 job.setJarByClass(TopNDriver.class); // 2、指定本业务job要使用的mapper/reducer业务类 job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); // 3、指定mapper输出数据的kv类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 4、指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 5、指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
7.3 找博客共同粉丝案例
1、需求 以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的) 求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都有谁?(1)数据输入
A:B,C,D,F,E,OB:A,C,E,KC:F,A,D,ID:A,E,F,LE:B,C,D,M,LF:A,B,C,D,E,O,MG:A,C,D,E,FH:A,C,D,E,OI:A,OJ:B,OK:A,C,DL:D,E,FM:E,F,GO:A,H,I,J
2、需求分析先求出A、B、C、……等是谁的粉丝第一次输出结果
A I,K,C,B,G,F,H,O,D,B A,F,J,E,C A,E,B,H,F,G,K,D G,C,K,A,L,F,E,H,E G,M,L,H,A,F,B,D,F L,M,D,C,G,A,G M,H O,I O,C,J O,K B,L D,E,M E,F,O A,H,I,J,F,
第二次输出结果
A-B E C A-C D F A-D E F A-E D B C A-F O B C D E A-G F E C D A-H E C D O A-I O A-J O B A-K D C A-L F E D A-M E F B-C A B-D A E B-E C B-F E A C B-G C E A B-H A E C B-I A B-K C A B-L E B-M E B-O A C-D A F C-E D C-F D A C-G D F A C-H D A C-I A C-K A D C-L D F C-M F C-O I A D-E L D-F A E D-G E A F D-H A E D-I A D-K A D-L E F D-M F E D-O A E-F D M C B E-G C D E-H C D E-J B E-K C D E-L D F-G D C A E F-H A D O E C F-I O A F-J B O F-K D C A F-L E D F-M E F-O A G-H D C E A G-I A G-K D A C G-L D F E G-M E F G-O A H-I O A H-J O H-K A C D H-L D E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E F
3、代码实现(1)第一次Mapper类
package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class OneShareFriendsMapper extends Mapper
(2)第一次Reducer类
package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class OneShareFriendsReducer extends Reducer
(3)第一次Driver类
package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneShareFriendsDriver { public static void main(String[] args) throws Exception { // 0、根据自己电脑路径重新配置 args = new String[] { "d:/temp/atguigu/0529/input/inputfriend", "d:/temp/atguigu/0529/output21" }; // 1、获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2、指定jar包运行的路径 job.setJarByClass(OneShareFriendsDriver.class); // 3、指定map/reduce使用的类 job.setMapperClass(OneShareFriendsMapper.class); job.setReducerClass(OneShareFriendsReducer.class); // 4、指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 5、指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6、指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7、提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
(4)第二次Mapper类
package com.atguigu.mr.friends;import java.io.IOException;import java.util.Arrays;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class TwoShareFriendsMapper extends Mapper
(5)第二次Reducer类
package com.atguigu.mr.friends;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class TwoShareFriendsReducer extends Reducer
(6)第二次Driver类
package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TwoShareFriendsDriver { public static void main(String[] args) throws Exception { // 0、根据自己电脑路径重新配置 args = new String[] { "d:/temp/atguigu/0529/input/inputfriends", "d:/temp/atguigu/0529/output22" }; // 1、获取job对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2、指定jar包运行的路径 job.setJarByClass(TwoShareFriendsDriver.class); // 3、指定map/reduce使用的类 job.setMapperClass(TwoShareFriendsMapper.class); job.setReducerClass(TwoShareFriendsReducer.class); // 4、指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // // 5、指定最终输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 6、指定job的输入原始所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7、提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
第8章 常见错误及解决方案
1)导包容易出错。尤其Text和CombineTextInputFormat。2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable,报的错误是类型转换异常。3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。4)如果分区数不是1,但是reducetask为1,是否执行分区过程。 答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。5)在Windows环境编译的jar包导入到Linux环境中运行:
hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output
报如下错误:
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0
原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。 解决方案:统一jdk版本。6)缓存pd.txt小文件案例中,报找不到pd.txt文件 原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。7)报类型转换异常。 通常都是在驱动函数中设置Map输出和最终输出时编写错误。 Map输出的key如果没有排序,也会报类型转换异常。8)集群中运行wc.jar时出现了无法获得输入文件。 原因:WordCount案例的输入文件不能放在 HDFS 集群的根目录。9)出现了如下相关异常
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609) at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371) at org.apache.hadoop.util.Shell.
解决方案一:拷贝hadoop.dll文件(文件位置:D:\work\Hadoop\hadoop-2.7.2\bin)到Windows目录C:\Windows\System32。个别同学电脑还需要修改Hadoop源码。解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下
10)自定义Outputformat时,注意在RecordWirter中的close()方法必须关闭流资源。否则输出的文件内容中数据为空。
@Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (atguigufos != null) { atguigufos.close(); } if (otherfos != null) { otherfos.close(); } }
柚子快报邀请码778899分享:大数据技术之
发表评论