Combiner怎么使用

本篇内容介绍了“Combiner怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

目前创新互联已为上千多家的企业提供了网站建设、域名、虚拟主机、网站托管、服务器租用、企业网站设计、临西网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

1、Combiner的使用~合并

1、combiner的工作位置:
   kv从缓冲区中溢写到磁盘时可以使用combiner(只要设置,无条件使用)
   每个MapTask的所有数据都从缓冲区写到磁盘后,在进行归并时可以使用combiner(满足条件使用,溢写次数>=3)

2、Combiner:  合并
   目的就是在每个MapTask中将输出的kv提前进行局部合并。
   能够降低map到reduce传输的kv对数量及 reduce最终处理的数据量. 

3、Combiner使用限制:
   在不改变业务逻辑的情况下才能使用combiner.
   --例如:求平均值时,就不宜使用
   
4、Combiner组件父类就是Reducer
	Combiner是在每一个MapTask所在的节点运行;
	Reducer是接收全局所有Mappei的输出结果;
1、自定义Compiner类
/**
 * combiner作用:
 * 在mapTask进行溢写时,对每一个mapTask输出的数据提前进行局部汇总,减少写进reduceTask的整体数据量
 * 注意:自定义Combiner类,属于MapTask阶段(虽然它继承Reducer)
 */
public class WordCountCombiner extends Reducer {
    int count = 0;
    @Override
    protected void reduce(Text key, 
	Iterable values, Context context) throws Exception{
        for (IntWritable value : values) {
            count+=value.get();
        }
        context.write(key,new IntWritable(count));
    }
}
2、WordCountMapper
public class WordCountMapper extends Mapper {
    private Text outk = new Text();
    private IntWritable outv = new IntWritable(1);

    @Override
    protected void map(LongWritable key,Text value,Context context) throws Exception {
//        获取输入到的一行数据
        String lineData = value.toString();
//        提前分析知道,按照空格进行切割,得到每个单词
        String[] splitData = lineData.split(" ");
//        遍历数据,将切割得到的数据写出
        for (String str : splitData) {
//            注意,这里得到的数据类型是String,需要转为Text
            outk.set(str);
            context.write(outk,outv);
        }
    }
}
3、WordCountReduce
public class WordCountReduce extends Reducer {
    private IntWritable outv = new IntWritable();
    
    @Override
    protected void reduce(Text key, 
	Iterable values, Context context) throws Exception {
//        定义一个变量,用来接收遍历中次数汇总
        int count = 0;
//        直接读取values,获取到迭代器对象中记录的每个单词出现次数
        for (IntWritable value : values) {
//        因为得到的value对象是IntWritable对象,不可以直接进行加操作,所以要转换为int
            count += value.get();   //get()方法转为int
        }
//        写出计算之后的数据,对count类型进行转换
        outv.set(count);
        context.write(key,outv);
    }
}
4、WordCountDriver
public class WordCountDriver {
    public static void main(String[] args) throws Exception {
//        1、获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
//        2、关联jar,配置执行程序时,使用的具体驱动类
        job.setJarByClass(WordCountDriver.class);
//        3、关联mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
//        4、设置mapper的输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5、设置程序最终输出的key和value类型,如果有reducer
//        就写reducer输出的kv类型,如果没有reducer,就写mapper输出的kv类型.
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
		
//      设置自定义Combiner类
        job.setCombinerClass(WordCountCombiner.class);
// 	job.setCombinerClass(WordCountReduce.class);也能这样用

        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job,4194304);

//        6、设置文件的输入和输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput"));
         //要求该路径不能存在,交给mr程序创建
	FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); 
//        7、提交job
        job.waitForCompletion(true);
    }
}

2、OutPutFormat数据输出

2.1、OutputFormat介绍

①:Outputformat是一个接口,其内部定义两个抽象方法
--RecordWriter getRecordWriter(FileSystem ignored,
JobConf job,String name,Progressable progress):
该方法用来获取RecordWriter对象,主负责数据的写出操作.

--void checkOutputSpecs(FileSystem ignored, JobConf job):
该方法用来检测输出路径,当driver中的输出路径存在时,会由该方法的实现类抛出异常
//131行抛出异常("Output directory " + outDir + " already exists")

②:通过ctrl+h 查看当前接口的实现类如下图
--TextOutputFormat(hadoop默认使用的写出方式),按行写出,内部重写了getRecordWriter()方法
--SequenceFileOutputFormat(最终写出的文件是二进制格式)
--MultipleOutputFormat(子抽象类,其下还有具体实现方法)

![OutputFormat实现类](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat实现类")

2.2、自定义输出类

//1、LogMapper
public class LogMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws Exception {
        context.write(value,NullWritable.get());
    }
}
//2、LogReducer
public class LogReducer extends Reducer {
    @Override
    protected void reduce(Text key,
	Iterable values, Context context) throws Exception{
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}
//3、MyOutPutFormat
public class MyOutPutFormat extends FileOutputFormat {
    /**
     * 重写getRecordWriter()方法,在内部自定义一个写出类
     * @param job
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public RecordWriter 
		getRecordWriter(TaskAttemptContext job) throws Exception {
        LogRecordWriter rw = new LogRecordWriter(job.getConfiguration());
        return rw;
    }
}
//4、LogRecordWriter
/**
 * 自定义LogRecordWriter对象需要继承RecordWriter类
 *
 * 需求:
 *     将包含"luck"的日志数据写到   D:/bigtools/luck.log
 *     将不包含"luck"的日志数据写到 D:/bigtools/other.log
 */
public class LogRecordWriter extends RecordWriter {

//    文件输出路径
    private String luckPath = "D:/bigtools/luck.log";
    private String otherPath = "D:/bigtools/other.log";
    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;
    private FileSystem fs;
    /**
     * 初始化
     * @param conf
     */
    public LogRecordWriter(Configuration conf){
        try {
            fs = FileSystem.get(conf);
            luckOut = fs.create(new Path(luckPath));
            otherOut = fs.create(new Path(otherPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 重写write方法
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    public void write(Object key, Object value) throws Exception {
        String log = key.toString();
        if(log.contains("luck")){
            luckOut.writeBytes(log + "\n");
        }else{
            otherOut.writeBytes(log + "\n");
        }
    }
    /**
     * 关闭流
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(luckOut);
        IOUtils.closeStream(otherOut);
    }
}
//5、LogDriver
public class LogDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);

        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
//        设置自定义输出
        job.setOutputFormatClass(MyOutPutFormat.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput"));
        job.waitForCompletion(true);
    }
}

“Combiner怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


本文名称:Combiner怎么使用
地址分享:http://scyanting.com/article/jicehh.html