MapReduce编写实现wordcount词频统计

p>首先编写WordCountDriver:

创新互联公司服务项目包括鄂温克网站建设、鄂温克网站制作、鄂温克网页制作以及鄂温克网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,鄂温克网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到鄂温克省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

package com.jym.hadoop.mr.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

* 这个程序相当于一个yarn集群的客户端,

* 需要在此封装我们的mr程序的相关运行参数,指定jar包,

* 最后提交给yarn

* */

public class WordcountDriver

{

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException

{

Configuration conf=new Configuration();

/*其实如果在本地运行MR程序其实不用配置下面的代码程序,在MR默认下就是本地运行*/

/**下面这段代码配置的是在本地模式下运行MR程序*/

/**是否运行为本地模式,就是看这个参数值是否为local,默认就是local;*/

//conf.set("mapreduce.framework.name", "local"); //在本地运行MR程序

//本地模式运行MR程序时,输入输出的数据可以在本地,也可以在hdfs上

//到底在哪里,就看以下两行配置用哪一行了,默认是“file:///”

/**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系统

//conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盘

/**运行集群模式,就是把程序提交到yarn中去运行

* 要想运行为集群模式,以下3个参数要指定为集群上的值

* */

conf.set("mapreduce.framework.name", "yarn");

conf.set("yarn.resourcemanager.hostname", "hadoop1");

conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

Job job = Job.getInstance(conf);

/**要想在Windows的Eclipse上运行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/

/**job.setJar("c:/wc.jar");*/

//job.setJar("/home/hadoop/wc.jar"); //这种是将程序打包成jar包,放到指定的位置,缺乏灵活性,不建议使用;

//指定本程序的jar包所在的本地路径

job.setJarByClass(WordcountDriver.class);

//指定本业务job要使用的mapper/reducer业务类

job.setMapperClass(WordcountMapper.class);

job.setReducerClass(WordcountReducerr.class);

//指定mapper输出数据的kv类型;

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定最终输出的数据的kv类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//指定需要使用的combiner,以及用哪一个类作为combiner的逻辑

/*job.setCombinerClass(WordcountCombiner.class);*/

job.setCombinerClass(WordcountReducerr.class);

/**因为combiner的工作原理通reducecer的作用是一样的,所以直接反射调用reducerr类其实作用是一样的*/

/**此处为之后为测试添加的*/

//如果不设置InputFormat,它默认使用的是TextInputFormat.class

/**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

*/

//指定job的输入原始文件所在目录

//FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此处添加的路径为HDFS文件系统的路径;

FileInputFormat.setInputPaths(job, new Path(args[0])); //传一个路径参数

//指定job的输出结果所在目录

FileOutputFormat.setOutputPath(job, new Path(args[1])); //传一个参数进来作为输出的路径参数

//将job中配置的相关参数,以及job所用的Java类所在的jar包,提交给yarn去运行;

/*job.submit(); */

boolean res = job.waitForCompletion(true);

System.exit(res?0:1);

}

}

其次编写WordCountMapper:

package com.jym.hadoop.mr.demo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

//这是一个简单的MapReduce例子,进行单词数量的统计操作;

import org.apache.hadoop.mapreduce.Mapper;

/**

* KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long类型,但是在Hadoop中有更精简的序列化接口,因此采用LongWritable类型;

* VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String类型的,同上用Text(org.apache.hadoop.io.Text)类型;

* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,为String类型,同上用Text类型;

* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词数量,为Integer类型,同上用IntWritable类型;

* */

public class WordcountMapper extends Mapper

{

/**

* map阶段的业务逻辑就写在自定义的map()方法中,

* maptask会对每一行输入数据调用一次我们自定义的map()方法;

* */

@Override //覆写Mapper中的方法;

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

{

//将maptask传给我们的文本内容先转换成String类型

String line = value.toString();

//根据空格将这一行切分成单词;

String[] words = line.split(" ");

//将单词输出为<单词,1>

for(String word:words)

{

//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会分到相同的reduce task中;

context.write(new Text(word),new IntWritable(1)); //进行类型转换一下;

}无锡×××医院 https://yyk.familydoctor.com.cn/20612/

}

最后编写WordCountReduceer:

package com.jym.hadoop.mr.demo;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/**

* KEYIN,VALUEIN应该对应mapper中的输出的KEYOUT,VALUEOUT类型;

* KEYOUT是单词

* VALUEOUT是总次数*/

public class WordcountReducerr extends Reducer

{

/**

* 例如:

*

* 输入参数key,是一组相同单词kv对的key

* */

@Override

protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

{

int count= 0;

/* //采用迭代器的方式进行统计单词的数量;

Iterator iterator = values.iterator();

while(iterator.hasNext())

{

count+=iterator.next().get(); //获取key对应的value值

}

*/

//下面的for循环和上面注释中的效果是一样的;

for(IntWritable value:values)

{

count+=value.get();

}

//输出统计结果

context.write(key, new IntWritable(count));

/**

* 默认情况下reduce task会将输出结果放到一个文件中(最好是HDFS文件系统上的一个文件)

* */

}

}

然而还可以编写一个Combiner类:

package com.jym.hadoop.mr.demo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

/*

* 此处的这个combiner其实不用自己编写,因为combiner的工作原理同reducer的原理是一样

* 的,故可以直接反射调用WordcountReducer类即可

* */

public class WordcountCombiner extends Reducer

{

@Override

protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

{

}


分享文章:MapReduce编写实现wordcount词频统计
URL分享:http://scyanting.com/article/iphepc.html