您当前的位置: 首页 >  hive

段智华

暂无认证

  • 2浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作

段智华 发布时间:2016-12-11 21:27:13 ,浏览量:2

 

第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作:

 

0^^Hadoop^^America^^5000|8000|12000|level8^^male 1^^Spark^^America^^8000|10000|15000|level9^^famale 2^^Flink^^America^^7000|8000|13000|level10^^male 3^^Hadoop^^America^^9000|11000|12000|level10^^famale 4^^Spark^^America^^10000|11000|12000|level12^^male 5^^Flink^^America^^11000|12000|18000|level18^^famale 6^^Hadoop^^America^^15000|16000|19000|level16^^male 7^^Spark^^America^^18000|19000|20000|level20^^male 8^^Flink^^America^^15000|16000|19000|level19^^male

 

实现:inputformat格式编码解析,灵活对hive源数据进行清洗

1,按^^进行分割

2,同时也按|进行切分

 

实现步骤:

1,源数据位置:

root@master:/usr/local/IMF_testdata/hivestudy#ls

  employeesinputformat.txt  IMFInputFormat2.jar

 

2,查看文件内容

root@master:/usr/local/IMF_testdata/hivestudy#cat employeesinputformat.txt

0^^Hadoop^^America^^5000|8000|12000|level8^^male

1^^Spark^^America^^8000|10000|15000|level9^^famale

2^^Flink^^America^^7000|8000|13000|level10^^male

3^^Hadoop^^America^^9000|11000|12000|level10^^famale

4^^Spark^^America^^10000|11000|12000|level12^^male

5^^Flink^^America^^11000|12000|18000|level18^^famale

6^^Hadoop^^America^^15000|16000|19000|level16^^male

7^^Spark^^America^^18000|19000|20000|level20^^male

8^^Flink^^America^^15000|16000|19000|level19^^male

 

3,开发inputformat代码,源代码附后.导出jar包IMFInputFormat2.jar

代码中使用了正则表达式对文本进行了解析:

String patternhive = "^(.*)\\^\\^(.*)\\^\\^(.*)\\^\\^(.*)\\|(.*)\\|(.*)\\|(.*)\\^\\^(.*)";

按^^及|进行解析,解析以后进行分组,依次获取各分组的值,然后使用"\u001"组拼接成字符串.

问题:使用"\t"拼接在hive中导入数据为null;

解决:使用"\u001"组拼接成字符串.,顺利导入数据到hive。

 

 

4,在hive中的操作:

删表:

drop table employee_inputformat;

 

导入jar包

add jar/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;

 

建立表

CREATE TABLEemployee_InputFormat(userid  INT,nameString,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4string , gendre string)  stored asINPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

 

加载数据

LOAD DATA LOCAL INPATH'/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLEemployee_InputFormat;

 

数据查询

select * from employee_InputFormat;

 

 

 

5,运行结果如下:

 

 

 

 

 

hive>    desc formatted employee_inputformat;

OK

# col_name              data_type               comment            

                

userid                  int                                         

name                    string                                     

address                 string                                     

salarys1                int                                        

salarys2                int                                        

salarys3                int                                        

salarys4                string                                     

gendre                  string                                      

                

# Detailed Table Information            

Database:               default                 

Owner:                  root                    

CreateTime:             Sun Dec 11 20:47:21 CST 2016    

LastAccessTime:         UNKNOWN                 

Protect Mode:           None                    

Retention:              0                       

Location:              hdfs://master:9000/user/hive/warehouse/employee_inputformat     

Table Type:             MANAGED_TABLE           

Table Parameters:               

       COLUMN_STATS_ACCURATE   true               

       numFiles                1                  

       totalSize               467                

       transient_lastDdlTime  1481460441          

                

# Storage Information           

SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      

InputFormat:           com.dt.spark.hive.IMFInputFormat        

OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat      

Compressed:             No                      

Num Buckets:            -1                      

Bucket Columns:         []                      

Sort Columns:           []                      

Storage Desc Params:            

       serialization.format    1                  

Time taken: 0.111 seconds, Fetched: 36row(s)

hive>

 

 

附件源代码:

 

 

package com.dt.spark.hive;

 

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.mapred.InputSplit;

import org.apache.hadoop.mapred.JobConf;

importorg.apache.hadoop.mapred.JobConfigurable;

importorg.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.mapred.Reporter;

importorg.apache.hadoop.mapred.TextInputFormat; 

 

public class IMFInputFormat extends  TextInputFormat implements   

JobConfigurable

     {

            public RecordReader getRecordReader(   

                     InputSplit genericSplit, JobConfjob, Reporter reporter)   

                     throws IOException {   

            

                    

                reporter.setStatus(genericSplit.toString());   

                 return new IMFRecordReader((FileSplit)genericSplit,job);   

             }   

 

 

 

}

 

源代码:

package com.dt.spark.hive;

 

import java.io.IOException;

import java.io.InputStream;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.CompressionCodecFactory;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.util.LineReader;

import org.apache.hadoop.mapred.RecordReader;

 

public class IMFRecordReader implements RecordReader {

 

    private CompressionCodecFactorycompressionCodecs = null;

    private long start;

    private long pos;

    private long end;

    private LineReaderlineReader;

    int maxLineLength;

 

    public IMFRecordReader(FileSplitinputSplit, Configuration job) throws IOException {

       maxLineLength = job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);

       start = inputSplit.getStart();

       end = start + inputSplit.getLength();

       final Pathfile = inputSplit.getPath();

       compressionCodecs = new CompressionCodecFactory(job);

       final CompressionCodeccodec = compressionCodecs.getCodec(file);

 

       // Open file and seek to thestart of the split

       FileSystem fs = file.getFileSystem(job);

       FSDataInputStream fileIn =fs.open(file);

       booleanskipFirstLine = false;

       if (codec !=null) {

           lineReader = new LineReader(codec.createInputStream(fileIn),job);

           end = Long.MAX_VALUE;

       } else {

           if (start != 0) {

              skipFirstLine = true;

              --start;

              fileIn.seek(start);

           }

           lineReader = new LineReader(fileIn,job);

       }

       if (skipFirstLine) {

           start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE,end - start));

       }

       this.pos =start;

    }

 

    public IMFRecordReader(InputStreamin, longoffset, longendOffset, intmaxLineLength) {

       this.maxLineLength =maxLineLength;

       this.lineReader =new LineReader(in);

       this.start =offset;

       this.pos =offset;

       this.end =endOffset;

    }

 

    public IMFRecordReader(InputStreamin, longoffset, longendOffset, Configuration job) throws IOException {

       this.maxLineLength =job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);

       this.lineReader =new LineReader(in,job);

       this.start =offset;

       this.pos =offset;

       this.end =endOffset;

    }

 

    public LongWritable createKey() {

       return new LongWritable();

    }

 

    public Text createValue() {

       return new Text();

    }

 

    /**

     * Reads the next record inthe split. getusefull fields from the raw nginx

     * log.

     *

     * @param key

     *            key of the record which will map tothe byte offset of the

     *            record's line

     * @param value

     *            the record in text format

     * @return true if a recordexisted, false otherwise

     * @throws IOException

     */

 

    public synchronized boolean next(LongWritablekey, Text value)throws IOException {

       // Stay within the split

       while (pos

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.2368s