Linux命令汇总:
hadoop辅助yarn运行mapreduce程序命令:
hadoop jar 包名 主类名 读取文件名 输出目录
查看文件权限:
ls -la
修改文件权限:
chmod 600(755、777) 文件名
修该属主:(root权限下)
chown (-R、*) 用户名1:用户名2
(其中-R表示递归、*表示所有文件文件夹)
mapreduce工作原理:
map阶段:KEYIN: maptask这个程序读到的那一行在整个文件中的起始偏移量 VALUEIN: maptask这个程序读到的那一行的内容
KEYOUT:是我们的处理逻辑生成的结果(键值对)中的key VALUEOUT:是我们的处理逻辑生成的结果(键值对)中的value
分区:MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。
默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
将数据写入内存缓冲区中:缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。
溢写:这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,
然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结
果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值
(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中
写,互不影响。内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不
同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一
块,减少与partition相关的索引记录。
MapReduce的shuffle机制:见详细图解,重要!!!
mapreduce性能优化:
maptask数量决定机制:
底层是如何决定切片的?
return Math.max(minSize, Math.min(maxSize, blockSize));
FileInputFormat可以自定义切片:
FileInputFormat的切片大小是可以自定义的,只要设置minSize和maxSize大小即可,比如机器的cpu很差,一个maptask处理128M的切片吃不消,
那么就可以这样设置参数以调节切片的大小。
maptask进程数量决定机制:由于首先调用FileInputFormat对文件进行切片,每一个切片就起一个maptask进程去处理,FileInputFormat的默认切片机制是一个物理
block就是一个切片,但是若同一台机器上存在多个小文件,那么默认机制就会对应启动多个maptask进程去对应处理这多个小文件,影响了性能。所有当同一台机器有多
个小文件时,可以考虑使用CombineInputFormat将多个小文件看做一个切片去处理。当然最好的方法时在处理之前就将这多个小文件合并成一个大文件再做处理。
一个maptask进程。
reducetask数量决定机制:
默认是1个,可以通过代码设置其数量:wcJob.setNumReduceTasks(x);
但是,当reducetask数量较多时,就容易出现数据倾斜,出现这些reducetask负载不均衡的情况,影响并发效率。
注意reducetask数量不是可以任意设置,有些业务需要计算全局汇总结果,那就只能有一个reducetask,此时reduce阶段就没有办法并发了。
MapReduce-Demo
电信上行、下行、总流量信息按不同手机号汇总并且按总流量大小排序
自定义分组策略:
按手机号地区不同将不同手机号的流量信息写到不同文件中--此处用到自定义分组策略(自定义从map到reduce之间的数据分组策略)
只需要重写原来的分区类:
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
static {
provinceMap.put("134", 0);
provinceMap.put("135", 1);
provinceMap.put("136", 2);
provinceMap.put("137", 3);
provinceMap.put("138", 4);
provinceMap.put("139", 5);
}
@Override
public int getPartition(Text phone, FlowBean fb, int arg2) {
Integer code= provinceMap.get(phone.toString().substring(0, 3));
return code == null ? 6 : code;
}
}
并在Driver中声明该分区策略,并制定reduce的并发数,即可
fcJob.setPartitionerClass(ProvincePartitioner.class);
fcJob.setNumReduceTasks(Integer.parseInt(args[2]));
为了配合自定义分组策略中分组数,需要设置相应的reduce的并发数 如果“并发数”>分组数 ,就会出现空的结果文件,
如果"并发数"<分组数,就会报错:非法的分区号,
但是如果并发数=1,则不会调用分组逻辑,所有的数据进入唯一的一个reduceTask,也就不会报错
Partitioner和Combiner
为了减少reduce阶段的工作量,可以在map阶段就将多个<hello,1>先合并为<hello,n>,再输出给reduce。有两种方法可以做到这一点:
一是自己实现:在map方法中自己创建一个hashmap先缓存这多个<hello,1>,再在map方法中重写父类中的cleanup方法,
当多次调用完map方法后再调用这个cleanup方法将hashmap缓存中多个<hello,1>遍历为<hello,n>输出。
二是使用Combiner组件,Combiner组件继承自Reducer,Combiner是maptask本地的Reducer,
用法:自定义一个Combiner类继承Reducer,再在Driver组件中添加这个组件:
wcjob.setCombinerClass(WordCountCombiner.class);
或者根本不用自定义这个Combiner组件,直接就用Reducer组件:
wcjob.setReducerClass(WordCountReducer.class);
(因为其实Combiner的逻辑跟Reducer是完全一样的,只不过Combiner组件是在每一个maptask中运行的,接受的是每一个maptask的数据,
而Reducer是在reducetask中运行的,接收的是全局多个maptask输出的数据)
但是注意:Combiner应用的前提是不能影响最终的业务逻辑。而且Combiner输出的KV类型要和Reducer的输入类型相对应!!!
Hadoop有自己的一套序列化机制:Writeable
如果需要自定义的bean放在key中传输则除了需要实现Writeable外,还需要实现Compareable(实现WriteableComparable),
若只是单纯地放在value中进行传输,则可以只是实现Writeable接口
public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long dFlow;
private long sumFlow;
public FlowBean() {}
public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow=in.readLong();
dFlow=in.readLong();
sumFlow=in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
}
@Override
public String toString() {
return upFlow + "\t" + dFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean fb) {
return fb.getSumFlow() > this.sumFlow ? 1 : -1;
}
}