博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop_22_MapReduce map端join实现方式解决数据倾斜(DistributedCache)
阅读量:6097 次
发布时间:2019-06-20

本文共 4818 字,大约阅读时间需要 16 分钟。

1.Map端Join解决数据倾斜  

  1.Mapreduce中会将map输出的kv对,按照相同key分组(调用getPartition),然后分发给不同的reducetask

  2.Map输出结果的时候调用了Partitioner组件(返回分区号),由它决定将数据放到哪个区中,默认的分组规

则为:根据key的hashcode%reducetask数来分发,源代码如下:

public class HashPartitioner
extends Partitioner
{ /** Use { @link Object#hashCode()} to partition. */ public int getPartition(K key, V value,int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }

  3.所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner自定义一个

CustomPartitioner继承抽象类:Partitioner,来返回一个分区编号

  4.然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)

  5.自定义partition后,要根据自定义partitioner的逻辑设置相应数量的ReduceTask


   存在的问题:如若Mapper输出的一些Key特别多,另一些Key特别少就会产生数据倾斜,造成一些Reducer特别忙

,一些则比较闲,我们说Mapper端相同key的输出数据会发到同一个Redurce端,需要把key相同的放在一起才能进行

拼接,所以才需要Reducer。如果我们不需要Reducer就能做拼接,就不存在数据倾斜了。

  解决方案:Map端Join解决数据倾斜,我们为每一个MapTask准备一个表的全表。这种机制叫做Map Side Join。

当然这个表的全表不能很大 

2.map端join算法实现原理:

  原理阐述:1.适用于关联表中有小表的情形;2.可以将小表分发到所有的map节点,这样,map节点就可以在本地

对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度;3.Hadoop提供

了一个Distributed Cache机制,能把文件在合适的时候发给MapTask,MapTask就可以从本地进行加载小表数据;

3.map端join代码实现示例: 

  :先在mapper类中预先定义好小表,进行join

  :并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加

载到小表的数据,进而在本地即可实现join

  :引入实际场景中的解决方案:一次加载数据库或者用distributedcache

代码:

package cn.gigdata.hdfs.mr;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 解决在Redue端的数据倾斜问题 * @author Administrator */public class MapSideJoin {    public static class MapSideJoinMapper extends Mapper
{ // 用一个hashmap来加载保存产品信息表 Map
pdInfoMap = new HashMap
(); Text k = new Text(); /** * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作 */ @Override protected void setup(Mapper
.Context context) throws IOException, InterruptedException { //读取MapTask工作目录(已经存在,直接根据文件名进行读取)的文件,将数据放入HashMap中 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt"))); String line; while(StringUtils.isNotEmpty(line = br.readLine())){ String[] fields = line.split(","); pdInfoMap.put(fields[0], fields[1]); } br.close(); } //由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String fields[] = orderLine.split(","); //根据订单ID,在商品表中get到ID对应的商品名称 String pdName = pdInfoMap.get(fields[2]); k.set(orderLine + "," + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); job.setMapperClass(MapSideJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定需要缓存一个文件到所有的maptask运行节点工作目录 /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中 /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中 /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录 /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录 // 将产品表文件缓存到Maptask工作节点的工作目录中去(Map运行时将会得到改文件) job.addCacheFile(new URI("file:/F:/pduct/pdts.txt")); //map端join的逻辑不需要reduce阶段,设置reducetask数量为0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); }}

 产品数据文件:pdts.txt

P0001,sss,1001,2P0002,111,1000,3P0003,www,1002,4

订单数据文件:order.txt

1001,20150710,P0001,21002,20150710,P0001,31002,20150710,P0002,31003,20150710,P0003,3

运行结果文件:part-m-00000

1001,20150710,P0001,2,sss1002,20150710,P0001,3,sss1002,20150710,P0002,3,1111003,20150710,P0003,3,www

 

 

 

 

 

  

 

转载于:https://www.cnblogs.com/yaboya/p/9246131.html

你可能感兴趣的文章
Introduction to Algorithm
查看>>
TYPESCRIPT指南(译文)
查看>>
一个用于轮播的ViewPagerIndicator
查看>>
前端模块化
查看>>
Java并发基础06. 线程范围内共享数据
查看>>
分享,第三方登录,统计,多渠道打包
查看>>
OS Client 与WebSocket 通信(一)(转)
查看>>
java8 Date/Time API
查看>>
传统加密技术续-Hill Vigenere Vernam
查看>>
Zookeeper系列一:Zookeeper基础命令操作
查看>>
程序员与笛卡尔积
查看>>
常用正则表达式
查看>>
摸着石头过河的区块链游戏真是一门好生意吗?
查看>>
智慧能源工厂监控门禁管理平台集成系统
查看>>
KVOController代码分析和踩坑
查看>>
由一条微博引发的 — Xcode LLDB 调试断点总结
查看>>
JavaScript 正则入门到掌握
查看>>
选择排序的Python实现
查看>>
CAAnimation 系列动画
查看>>
【Redis】redis各类型数据存储分析
查看>>