您当前的位置: 首页 >  梁云亮

全排序

梁云亮 发布时间:2019-12-11 09:42:49 ,浏览量:3

示例:将统计结果按照手机号,以136、137、138、139开头的数据分别放到一个独立的文件中,其他开头的放到一个文件中,最终按照总流量内部排序(区内排序)。 待排数据
1863157985066   120.196.100.82 	2481    24681    200
1363157995033   120.197.40.4      264    0    200
1373157993055   120.196.100.99    132    1512    200
1393154400022   120.197.40.4      240    0    200
1363157993044   120.196.100.99    1527    2106    200
1397157993055   120.197.40.4      4116    1432    200
1463157993055   120.196.100.99    1116    954    200
1383157995033   120.197.40.4      3156    2936    200
1363157983019   120.196.100.82    240    0    200
1383154400022   120.197.40.4      6960    690    200
1363157973098   120.197.40.4      3659    3538    200
1373157993055   120.196.100.99    1938    180    200
1363154400022   120.196.100.99    918    4938    200
1393157993055   120.197.40.4      180    180    200
1363157984040   120.197.40.4      1938    2910    200
1383157995033   120.196.100.82    3008    3720    200
1363154400022   120.196.100.99    7335    110349    200
1373157993055   120.196.100.99    9531    2412    200
1363157990043   120.196.100.55    11058    48243    200
1383157993055   120.196.100.82    120    120    200
1323157985066   120.196.100.82    2481    24681    200
1393157993055   120.196.100.99    1116    954    200
期待结果

在这里插入图片描述

  • part-r-00000 在这里插入图片描述
具体实现 第一步:自定义Bean:
public class FlowBean implements WritableComparable {
    private String phoneNumber;//电话号码
    private long upFlow;//上行流量
    private long downFlow;//下行流量
    private long sumFlow;//总流量

    public String getPhoneNumber() {
        return phoneNumber;
    }

    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public FlowBean(String phoneNumber, long upFlow, long downFlow) {  //为了对象数据的初始化方便,加入一个带参的构造函数
        this.phoneNumber = phoneNumber;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public FlowBean() {    //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数
    }

    //重写toString()方法
    @Override
    public String toString() {
        return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";
    }

    @Override
    public void readFields(DataInput in) throws IOException {    //从数据流中反序列出对象的数据
        phoneNumber = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {    //将对象数据序列化到流中
        out.writeUTF(phoneNumber);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public int compareTo(FlowBean o) {
        return (int) (o.getSumFlow() - this.getSumFlow());
    }
}
第二步:自定义Mapper
public class FlowMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();   //拿到一行数据
        String[] fields =  line.split("\\s+");  //切分成各个字段
        String phoneNumber = fields[0]; //拿到手机号的字段
        long upFlow = Long.parseLong(fields[2]);  //拿到上行流量字段
        long downFlow = Long.parseLong(fields[3]); //拿到下行流量字段
        //封装数据为key-value进行输出
        context.write(new FlowBean(phoneNumber, upFlow, downFlow),new Text(phoneNumber));
    }
}
第三步:自定义Reducer
public class FlowReducer extends Reducer {
    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, key);
        }
    }
}
第四步:自定义Driver
public class FlowDriver {
    public static void main(String[] args) throws Exception {
        // 数据输入路径和输出路径
        args = new String[2];
        args[0] = "src/main/resources/sort/feni";
        args[1] = "src/main/resources/sort/feno";

        Configuration cfg = new Configuration();// 读取配置文件
        //设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)
        cfg.set("mapreduce.framework.name", "local");
        cfg.set("fs.defaultFS", "file:///");

        Job job = Job.getInstance(cfg);// 新建一个任务

        job.setJarByClass(FlowDriver.class);  // 设置主类

        job.setInputFormatClass(TextInputFormat.class);//设置输入格式
        job.setOutputFormatClass(TextOutputFormat.class);

        //本job使用的mapper和reducer
        job.setMapperClass(FlowMapper.class);   // Mapper
        job.setReducerClass(FlowReducer.class); // Reducer

        //指定mapper输出数据的key-value类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定最终输出数据的key-value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));   // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径

        // 提交任务
        int res = job.waitForCompletion(true) ? 0 : 1;
        System.exit(res);
    }
}
关注
打赏
1688896170
查看更多评论

梁云亮

暂无认证

  • 3浏览

    0关注

    1121博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.1415s