1200字范文,内容丰富有趣,写作的好帮手!
1200字范文 > 使用Mapreduce案例编写用于统计文本中单词出现的次数的案例 mapreduce本地运行等 Co

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例 mapreduce本地运行等 Co

时间:2019-10-24 14:22:31

相关推荐

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例 mapreduce本地运行等 Co

工程结构:

在整个案例过程中,代码如下:

WordCountMapper的代码如下:

WordCountReducer的代码如下:

WordCountDriver的代码如下:

运行前的准备工作:

B:使用WordCount本地运行,并且使用Combiner的案例(主要改变是在WordCountDriver中),代码如下:

package cn.toto.bigdata.mr.wc;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** 本类是客户端用来指定wordcount job程序运行时所需要的很多参数:* 比如,指定用哪个组件作为数据读取器、数据结果输出器*指定用哪个类作为map阶段的业务逻辑类,哪个类作为reduce阶段的业务逻辑类*指定wordcount job程序的jar包所在路径*....*运行前准备工作*1、将当前的工程导出成wordcount.jar*2、准备/home/toto/software/wordcount/input/a.txt 其中a.txt中的内容类似:The truenobility is in being superior to your previous self guess3、将 /home/toto/software/wordcount通过 hadoop fs -put wordcount /wordcount 上传到hdfs中**以及其他各种需要的参数*hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver*上面的命令等同:*java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver**上面的含义是通过hadoop jar将hadoop classpath的jar都拷贝到应用中,并且指定执行cn.toto.bigdata.mr.wc.WordCountDriver**最后查看结果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通过这个命令可以查看查看到 */public class WordCountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//conf.set("fs.defaultFS", "hdfs://hadoop:9000");/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);//告诉框架,我们的程序所在jar包的路径// job.setJar("c:/wordcount.jar");job.setJarByClass(WordCountDriver.class);//告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//使用tCombiner,使用Combiner的好处是让数据在mapper task中就做统计求和,然后将求和后的结果传递给//reducer,然后reducer可以在进行求和。这样的好处是减少了reducer的工作。让每个mapper task自己做聚合,//通过分担的方式让效率得以提升,由于combiner的内容结构,编程规范也是集成reducer,所以在当前场景中可以将combiner直接//设置成WordCountReducerjob.setCombinerClass(WordCountReducer.class);//告诉框架,我们的mapperreducer输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告诉框架,我们的数据读取、结果输出所用的format组件// TextInputFormat是mapreduce框架中内置的一种读取文本文件的输入组件job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 告诉框架,我们要处理的文件在哪个路径下,注意若hdfs中已经有了/wordcount/input/这个文件,说明FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output/"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

准备工作:

在E盘下准备e:/wordcount/input/a.txt,其中的内容如下:

The truenobility is in being superior to your previous self guessNo great discovery was ever made without a bold Knowledge will give you power but character respectThe sun is just rising in the morning of another day II figure life is a gift and I don't intend on wasting

右键运行上面的代码,进入:

E:\wordcount\output\part-r-00000中看结果,结果内容如下:

I3Knowledge1No1The2a2and1another1being1bold1but1character1day1discovery1don't1ever1figure1gift1give1great1guess1in2intend1is3just1life1made1morning1nobility1of1on1power1previous1respect1rising1self1sun1superior1the1to1true1was1wasting1will1without1you1your1

经过上面的所有步骤之后,程序已经编写完成

总结:

3.MAPREDUCE中的Combiner[dht1]

(1)combiner是MR程序中Mapper和Reducer之外的一种组件

(2)combiner组件的父类就是Reducer

(3)combiner和reducer的区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行

Reducer是接收全局所有Mapper的输出结果;

(4) combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

具体实现步骤:

1、自定义一个combiner继承Reducer,重写reduce方法

2、在job中设置:job.setCombinerClass(CustomCombiner.class)

(5) combiner能够应用的前提是不能影响最终的业务逻辑

而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来


Combiner的使用要非常谨慎

因为combiner在mapreduce过程中可能调用也肯能不调用,可能调一次也可能调多次

所以:combiner使用的原则是:有或没有都不能影响业务逻辑

===============================================================================

流量统计和自定义类实现序列化案例:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 自定义的类要被mapreduce使用,需要序列化WritableComparable*/public class FlowBean implements WritableComparable<FlowBean> {private String phoneNbr;private long upFlow;private long dFlow;private long sumFlow;/*** */public FlowBean() {}/*** 序列化框架在反序列化操作创建对象实例时会调用无参构造*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNbr);out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法,注意:字段的反序列化顺序与序列化时的顺序保持一致*/@Overridepublic void readFields(DataInput in) throws IOException {this.phoneNbr = in.readUTF();this.upFlow = in.readLong();this.dFlow = in.readLong();this.sumFlow = in.readLong();}@Overridepublic int compareTo(FlowBean o) {return (int)(o.getSumFlow() - this.sumFlow);}public void set(long upFlow,long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(String phoneNbr,long upFlow, long dFlow) {this.phoneNbr = phoneNbr;this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public String getPhoneNbr() {return phoneNbr;}public void setPhoneNbr(String phoneNbr) {this.phoneNbr = phoneNbr;}@Overridepublic String toString() {return "FlowBean [phoneNbr=" + phoneNbr + ", upFlow=" + upFlow + ", dFlow=" + dFlow + ", sumFlow=" + sumFlow+ "]";}}

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import mons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSum {//在kv中传输我们自定义的对象时可以的,但是必须实现hadoop的序列化机制 implements Writablepublic static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取业务所需要的个字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(upFlow, dFlow);context.write(k, v);}}public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {FlowBean v = new FlowBean();/*** reduce方法接收到的key是某一组<a手机号,bean><a手机号,bean><a手机号,bean>中的第一个手机号* reduce方法接收到的vlaues是这一组kv中的所有bean的一个迭代器*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upFlowCount = 0;long dFlowCount = 0;for (FlowBean bean : values) {upFlowCount += bean.getUpFlow();dFlowCount += bean.getdFlow();}v.set(upFlowCount, dFlowCount);context.write(key, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);// 告诉框架,我们的程序所在jar包的路径// job.setJar("c:/wordcount.jar");job.setJarByClass(FlowSum.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumMapper.class);job.setReducerClass(FlowSumReducer.class);// 告诉框架,我们的mapperreducer输出的数据类型/** job.setMapOutputKeyClass(Text.class);* job.setMapOutputValueClass(FlowBean.class);*/// 如果map阶段输出的数据类型跟最终输出的数据类型一致,就只要以下两行代码来指定job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 框架中默认的输入输出组件就是这俩货,所以可以省略这两行代码/** job.setInputFormatClass(TextInputFormat.class);* job.setOutputFormatClass(TextOutputFormat.class);*/// 告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/output/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 实现流量汇总并且按照流量大小倒序排序 前提:处理的数据是已经汇总过的结果文件* * @author* */public class FlowSumSort {public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean k = new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlowSum = Long.parseLong(fields[1]);long dFlowSum = Long.parseLong(fields[2]);k.set(upFlowSum, dFlowSum);v.set(phoneNbr);context.write(k, v);}}public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {context.write(phoneNbrs.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumSort.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumSortMapper.class);job.setReducerClass(FlowSumSortReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉框架,我们要处理的文件在哪个路径下(注意:这里的程序执行)FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/output/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

运行条件模拟:

1、配置环境变量为HADOOP_HOME=E:\learnTempFolder\hadoop-2.7.3

2、从CSDN资源上下载支持win10版本的:E:\learnTempFolder\hadoop-2.7.3\bin\winutils.exe 和 E:\learnTempFolder\hadoop-2.7.3\bin\hadoop.dll

界面效果如下:

3、准备要处理的资料:

HTTP_0313143750.dat 数据文件的具体内容如:

1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82i02.24272481246812001363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.44026402001363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.992413215122001363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.44024002001363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY120.196.100.视频网站1512152721062001363157995074 841384135C-0E-8B-8C-E8-20:7DaysInn120.197.40.4122.72.52.12411614322001363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.99181511169542001363157995033 15932575C-0E-8B-C7-BA-20:CMCC120.197.40.4sug.信息安全315629362001363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY120.196.100.824024002001363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY120.197.站点统计24969606902001363157973098 150136858585C-0E-8B-C7-F7-90:CMCC120.197.40.4rank.搜索引擎2827365935382001363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY120.196.100.站点统计3319381802001363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.9915991849382001363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY120.197.40.4331801802001363157984040 136028465655C-0E-8B-8B-B6-00:CMCC120.197.40.42052.flash2-综合门户1512193829102001363157995093 1392231446600-FD-07-A2-EC-BA:CMCC120.196.100.121230083701363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY120.196.100.综合门户5710273351103492001363157986072 183338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.搜索引擎2118953124122001363157990043 1392505741300-1F-64-E1-E6-9A:CMCC120.196.100.搜索引擎696311058482432001363157988072 1376077871000-FD-07-A4-7B-08:CMCC120.196.100.8222102001363157985066 1372623888800-FD-07-A4-72-B8:CMCC120.196.100.82i02.24272481246812001363157993055 13560436666C4-17-FE-BA-DE-D9:CMCC120.196.100.9918151116954200

4、先运行FlowSum(右键执行Java程序)

运行生成的文件为E:\learnTempFolder\flow\output\part-r-00000,内容如下:

13480253104180180360135024688237335110349117684135604366661116954207013560439658203458927926136028465651938291048481366057799169606907650137191994192400240137262305032481246812716213726238888248124681271621376077871010240138265441012640264139223144663008372067281392505741311058482435930113926251106240024013926435656132151216441501368585836593538719715932573156293660921598900211919381802118182115759611527210636331833382953124121194384138413411614325548

5、运行FlowSumSort(注意不要删除上面的part-r-00000)

运行后产生的文件内容是:

135024688237335110349117684139250574131105848243593011372623050324812468127162183338295312412119431356043965820345892792613660577991696069076501501368585836593538719713922314466300837206728159325731562936609284138413411614325548136028465651938291048481821157596115272106363315989002119193818021181356043666611169542070139264356561321512164413480253104180180360138265441012640264137191994192400240

当然,我们也可以一次性求和并运算出结果输出到指定的文件目录中,代码如下:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import java.util.Map.Entry;import java.util.Set;import java.util.TreeMap;import mons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneStepSumSort {public static class OneStepSumSortMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取业务所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class OneStepSumSortReducer extends Reducer<Text, FlowBean, Text, FlowBean> {TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean,Text>();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);Text text = new Text(key.toString());treeMap.put(sumBean, text);}@Overrideprotected void cleanup(Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();for (Entry<FlowBean, Text> ent : entrySet) {context.write(ent.getValue(), ent.getKey());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OneStepSumSort.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(OneStepSumSortMapper.class);job.setReducerClass(OneStepSumSortReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));// 告诉框架,我们的处理结果要输出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

到"E:/flow/sortout/"目录下,查看结果:

即:

13502468823FlowBean [phoneNbr=13502468823, upFlow=7335, dFlow=110349, sumFlow=117684]13925057413FlowBean [phoneNbr=13925057413, upFlow=11058, dFlow=48243, sumFlow=59301]13726238888FlowBean [phoneNbr=13726230503, upFlow=2481, dFlow=24681, sumFlow=27162]1833382FlowBean [phoneNbr=1833382, upFlow=9531, dFlow=2412, sumFlow=11943]13560439658FlowBean [phoneNbr=13560439658, upFlow=2034, dFlow=5892, sumFlow=7926]13660577991FlowBean [phoneNbr=13660577991, upFlow=6960, dFlow=690, sumFlow=7650]15013685858FlowBean [phoneNbr=15013685858, upFlow=3659, dFlow=3538, sumFlow=7197]13922314466FlowBean [phoneNbr=13922314466, upFlow=3008, dFlow=3720, sumFlow=6728]1593257FlowBean [phoneNbr=1593257, upFlow=3156, dFlow=2936, sumFlow=6092]84138413FlowBean [phoneNbr=84138413, upFlow=4116, dFlow=1432, sumFlow=5548]13602846565FlowBean [phoneNbr=13602846565, upFlow=1938, dFlow=2910, sumFlow=4848]18211575961FlowBean [phoneNbr=18211575961, upFlow=1527, dFlow=2106, sumFlow=3633]15989002119FlowBean [phoneNbr=15989002119, upFlow=1938, dFlow=180, sumFlow=2118]13560436666FlowBean [phoneNbr=13560436666, upFlow=1116, dFlow=954, sumFlow=2070]13926435656FlowBean [phoneNbr=13926435656, upFlow=132, dFlow=1512, sumFlow=1644]13480253104FlowBean [phoneNbr=13480253104, upFlow=180, dFlow=180, sumFlow=360]13826544101FlowBean [phoneNbr=13826544101, upFlow=264, dFlow=0, sumFlow=264]13926251106FlowBean [phoneNbr=13719199419, upFlow=240, dFlow=0, sumFlow=240]

6、为不同的手机号设置分区,让不同的手机号在不同的文件中。方法如下:

A:下面是自定义分区,自定分区的代码如下:

package cn.toto.bigdata.mr.wc.flowsum;import java.util.HashMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/*** 自定义分区要继承Partitioner*/public class ProvincePartitioner extends Partitioner<Text, FlowBean>{private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static {provinceMap.put("138", 0);provinceMap.put("139", 1);provinceMap.put("136", 2);provinceMap.put("137", 3);provinceMap.put("135", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = provinceMap.get(key.toString().substring(0,3));if (code != null) {return code;}return 5;}}

B:测试一下自定义分区:

package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import mons.lang.StringEscapeUtils;import mons.lang.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSumProvince {public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读到的一行数据进行字段切分String line = value.toString();String[] fields = StringUtils.split(line, "\t");// 抽取业务所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);context.write(key, sumBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumProvince.class);// 告诉框架,我们的程序所用的mapper类和reducer类job.setMapperClass(FlowSumProvinceMapper.class);job.setReducerClass(FlowSumProvinceReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告诉框架,我们的mapperreducer输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置shuffle的分区组件使用我们自定义的分区组件,按照手机号进行分区,注意在自定义的手机号分区中有5个,所以我们的分区不能少于6个job.setPartitionerClass(ProvincePartitioner.class);//设置reduce task的数量job.setNumReduceTasks(6);//告诉框架,我们要处理的文件在哪个路径下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));//告诉框架,我们的处理结果要输出到哪里去Path out = new Path("E:/flow/provinceout/");FileSystem fs = FileSystem.get(conf);if (fs.exists(out)) {fs.delete(out,true);}FileOutputFormat.setOutputPath(job, out);boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

C:运行所需的准备:

数据文件:

文件内容如下:

1363157985066 1372623050300-FD-07-A4-72-B8:CMCC120.196.100.82i02.24272481246812001363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC120.197.40.44026402001363157991076 1392643565620-10-7A-28-CC-0A:CMCC120.196.100.992413215122001363154400022 139262511065C-0E-8B-8B-B1-50:CMCC120.197.40.44024002001363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY120.196.100.视频网站1512152721062001363157995074 841384135C-0E-8B-8C-E8-20:7DaysInn120.197.40.4122.72.52.12411614322001363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.99181511169542001363157995033 15932575C-0E-8B-C7-BA-20:CMCC120.197.40.4sug.信息安全315629362001363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY120.196.100.824024002001363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY120.197.站点统计24969606902001363157973098 150136858585C-0E-8B-C7-F7-90:CMCC120.197.40.4rank.搜索引擎2827365935382001363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY120.196.100.站点统计3319381802001363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC120.196.100.9915991849382001363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY120.197.40.4331801802001363157984040 136028465655C-0E-8B-8B-B6-00:CMCC120.197.40.42052.flash2-综合门户1512193829102001363157995093 1392231446600-FD-07-A2-EC-BA:CMCC120.196.100.121230083701363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY120.196.100.综合门户5710273351103492001363157986072 183338284-25-DB-4F-10-1A:CMCC-EASY120.196.100.99input.搜索引擎2118953124122001363157990043 1392505741300-1F-64-E1-E6-9A:CMCC120.196.100.搜索引擎696311058482432001363157988072 1376077871000-FD-07-A4-7B-08:CMCC120.196.100.8222102001363157985066 1372623888800-FD-07-A4-72-B8:CMCC120.196.100.82i02.24272481246812001363157993055 13560436666C4-17-FE-BA-DE-D9:CMCC120.196.100.9918151116954200

运行后的结果如下:

part-r-00001中内容:

等等

使用Mapreduce案例编写用于统计文本中单词出现的次数的案例 mapreduce本地运行等 Combiner使用及其相关的知识 流量统计案例和流量总和以及流量排序案例 自定义Partitioner

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。