如何实现一个MapReduce读取数据存入HBase

这篇文章给大家介绍如何实现一个MapReduce读取数据存入HBase,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

成都创新互联公司专业为企业提供襄城网站建设、襄城做网站、襄城网站设计、襄城网站制作等企业网站建设、网页设计与制作、襄城企业网站模板建站服务,十多年襄城做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

车辆位置数据文件,格式:车辆id  速度:油耗:当前里程。

通过MapReduce算出每辆车的平均速度、油耗、里程

vid1 78:8:120
vid1 56:11:124
vid1 98:5:130
vid1 72:6:131
vid2 78:4:281
vid2 58:9:298
vid2 67:15:309

创建Map类和map函数

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class VehicleMapper extends Mapper {

	@Override
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		String vehicle = value.toString();// 将输入的纯文本的数据转换成String
		// 将输入的数据先按行进行分割
		StringTokenizer tokenizerArticle = new StringTokenizer(vehicle, "\n");
		// 分别对每一行进行处理
		while (tokenizerArticle.hasMoreTokens()) {
			// 每行按空格划分
			StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken());
			String vehicleId = tokenizer.nextToken(); // vid
			String vehicleInfo = tokenizer.nextToken(); // 车辆信息
			Text vid = new Text(vehicleId);
			Text info = new Text(vehicleInfo);
			context.write(vid, info);
		}
	}

}

创建Reduce类

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

public class VehicleReduce extends TableReducer {

	@Override
	public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
		int speed = 0;
		int oil = 0;
		int mile = 0;
		int count = 0;
		for (Text val : values) {
			String str = val.toString();
			String[] arr = str.split(":");
			speed += Integer.valueOf(arr[0]);
			oil += Integer.valueOf(arr[1]);
			mile += Integer.valueOf(arr[2]) - mile; // 累积里程
			count++;
		}
		speed = (int) speed / count; // 求平均值
		oil = (int) oil / count;
		mile = (int) mile / count;
		String result = speed + ":" + oil + ":" + mile;

		Put put = new Put(key.getBytes());
		put.add(Bytes.toBytes("info"), Bytes.toBytes("property"), Bytes.toBytes(result));
		ImmutableBytesWritable keys = new ImmutableBytesWritable(key.getBytes());
		context.write(keys, put);
	}

}

运行任务

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class VehicleMapReduceJob {

	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		conf = HBaseConfiguration.create(conf);
		Job job = new Job(conf, "HBase_VehicleInfo");
		job.setJarByClass(VehicleMapReduceJob.class);
		job.setMapperClass(VehicleMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(args[0])); // 设置输入文件路径
		TableMapReduceUtil.initTableReducerJob("vehicle", VehicleReduce.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

将代码导出成vehicle.jar,放在hadoop-1.2.1目录下,输入命令

./bin/hadoop jar vehicle.jar com/xh/vehicle/VehicleMapReduceJob input/vehicle.txt

如何实现一个MapReduce读取数据存入HBase

HBase结果查询:

如何实现一个MapReduce读取数据存入HBase

关于如何实现一个MapReduce读取数据存入HBase就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


本文标题:如何实现一个MapReduce读取数据存入HBase
文章分享:http://scyanting.com/article/gpjphi.html