parquet列式文件实战

小说:大学生兼职管理群安全吗作者:密华更新时间:2019-03-23字数:11431

“给我睡过去。”将裂空座弄伤之后刘皓一拳砸在了裂空座的身上,震荡之力一波波的在裂空座体内震动起来,几个呼吸之间裂空座发出了一声悲鸣身体好像失去了力量使得,意识也昏迷了过去。

年底小生意

中岛鬼子一直收不到那个突袭镇江城南独立师指挥部的松本大佐的消息,在指挥部内背着双手来回踱步着,当他命令手下参谋向那个去接应的铃木鬼子呼叫的时候,才发觉那个铃木鬼子也联络不上了,中岛鬼子立即就警觉起来,马上命令那个距离镇江城最近的鬼子旅团,要他派出部队搜索!
林风够细心,知道管事的不会说实话,所以从他身边的那些人下手,第一个说是童山,还是不敢大意,接连确认了几次,最终确定,他就是童山,田城管事,晋王的人。

《酸甜》的女声部分,本来是SHE三人合唱,章璇的声音勉强凑合能达到三人的那种厚度,但楚娇的声音太细,声音又压得很低,她一句唱出,完全不合理想,二连忙喊了停。

前言

列式文件,顾名思义就是按列存储到文件,和行式存储文件对应。保证了一列在一个文件中是连续的。下面从parquet常见术语,核心schema和文件结构来深入理解。最后通过java api完成write和read。

 

术语

block

parquet层面和row group是一个意思

 

row group

逻辑概念,用于对row进行分区。由数据集中每个column的column chunk组成。是读写过程中的缓存单元,一般在hdfs上推荐一个block为1GB,一个HDFS文件1个bolock

 

column chunk

某个column的所有数据被称为column chunk,存在与row group,并保证在文件中是连续的

 

page

多个column chunk之间用page分开,也就是说一个page只会包含一个column的数据,一个page是一个独立的单元(可以被编码或者压缩)

 

dictionary page

每个page之前都可以选择是否需要dictionary page。dictionary page记录了该page所有不同的值。这可以增强处理速度提高压缩率。

 

总结

一个文件由多个row group组成,一个row group包括了多个column chunk,一个column chunck就是某个column的所有数据集, 被分割成多个page,一个page是最小的处理单元,可以被编码或者压缩。

 

schema

每种文件都有自己特有的规则,像csv文件,是用分隔符分隔开的一个个列。parquet文件也有自己独特的schema格式。

这是一个parquet文件的schema例子,对应的api是MessageType

message person{
  required binary name (UTF8);
  required int age;
  repeated group family{
    required binary father (UTF8);
    required binary mother (UTF8);
    optional binary sister (UTF8);
  }
}

 

message

固定声明,就像结构体中的struct一样。

 

person

message name,可以粗暴的理解为表名,因为里面都是field。

 

optional,required,repeated

这是三种field的关键字,分别表示可选,必选,可重复选

可选和必选类似数据库中的nullable,可重复选是为了支持复杂的嵌套结构。

 

field类型

目前parquet支持int32,int64,int96(有些系统会把时间戳存成int96如老版本hive),float,double,boolean,binary,fixed_len_byte_array。

参考类org.apache.parquet.schema. PrimitiveType.PrimitiveTypeName

 

UTF8

field的原始类型(Original Type),可以辅助field的type进行细粒度的类型判断。

参考类 org.apache.parquet.schema. OriginalType

 

group

嵌套结构声明,类似json对象

 

schema&数据

schema有了,那如何把schema和数据关联起来,也就是说可以通过schema构建或者解析出相应的数据。那就引出了嵌套关系,definition level和repetitional level。用于定位数据到底出现在嵌套中(如果有嵌套的话)的哪一层。值得注意的是,嵌套关系是针对列而言的,不同列有各自的嵌套关系。

 

definition level

optional字段定位,如果实际没有数据就为0,有数据就为1。涉及到嵌套optional,那么可以这么理解,如果从某一层开始没有该数据,那么该层之前肯定是有数据的,该层之后肯定没有数据。举个简单的例子

message ExampleDefinitionLevel {
  optional group a {
    optional group b {
      optional string c;
    }
  }
}

这个schema对应的definition level所有的可能性如表所示

 

 

repetition level

repeated字段定位,如果在嵌套中某一层出现了值,那么就记录该层。那一个例子来说:

message AddressBook {
  required string owner;
  repeated string ownerPhoneNumbers;
  repeated group contacts {
    required string name;
    optional string phoneNumber;
  }
}

针对不同的列,defnition level和repetition level的最大值如表

 

 

文件结构

结构图

 

 

详细

一个parquet文件由3部分组成,header,blocks,footer。类似一般文档中的页眉,正文,页脚。

 

header

只包含4个字节的魔数,PAR1

 

blocks

block定义参考“术语”

 

footer

记录了该parquet文件正文所有metadata,

 

文件物理格式

通过 cat -v 查看一个parquet,会看到很多的non-printable字符,比如:^U^@^U^P^U^P,^U^B^U^@^

这些字符其实是可以和ascii互相映射,比如^@就是ascii中的0,详细可以看这篇文档

https://docstore.mik.ua/orelly/unix/upt/ch25_07.htm

其实就是八进制的ascii,小于100的+100,大于100的减100。

 

所有的列,包括嵌套结构,例如test.c1和test.c2属于两个列,都是连续存储在parquet文件中。

 

参考资料

// twitter对parquet的概述

https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop.html

// parquet的github

https://github.com/apache/parquet-format

// 很详细的parquet文件解析

http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format

 

coding

 

public static MessageType getMessageTypeFromCode(){
    MessageType messageType =
            Types.buildMessage()
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("id")
            .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name")
            .required(PrimitiveType.PrimitiveTypeName.INT32).named("age")
            .requiredGroup()
              .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test1")
              .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test2")
              .named("group1")
            .named("trigger");
    return messageType;
}

public static void writeParquet(String name){

    // 1. 声明parquet的messageType
    MessageType messageType = getMessageTypeFromCode();
    System.out.println(messageType.toString());

    // 2. 声明parquetWriter
    Path path = new Path("/tmp/etl/"+ name);
    Configuration configuration = new Configuration();
    GroupWriteSupport.setSchema(messageType, configuration);
    GroupWriteSupport writeSupport = new GroupWriteSupport();

    // 3. 写数据
    ParquetWriter<Group> writer = null;
    try {
        writer = new ParquetWriter<Group>(path,
                ParquetFileWriter.Mode.CREATE,
                writeSupport,
                CompressionCodecName.UNCOMPRESSED,
                128*1024*1024,
                5*1024*1024,
                5*1024*1024,
                ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
                ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
                ParquetWriter.DEFAULT_WRITER_VERSION,
                configuration);
        Random random = new Random();

        for(int i=0; i<10; i++){
            // 4. 构建parquet数据,封装成group
            Group group = new SimpleGroupFactory(messageType).newGroup();
            group.append("name", i+"@qq.com")
                    .append("id",i+"@id")
                    .append("age",i)
            .addGroup("group1")
                    .append("test1", "test1"+i)
                    .append("test2","test2"+i);
            writer.write(group);
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if(writer != null){
            try {
                writer.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}


public static void readParquet(String name){
    // 1. 声明readSupport
    GroupReadSupport groupReadSupport = new GroupReadSupport();
    Path path = new Path("/tmp/etl/"+name);

    // 2.通过parquetReader读文件
    ParquetReader<Group>reader = null;
    try {
        reader = ParquetReader.builder(groupReadSupport, path).build();
        Group group = null;
        while ((group = reader.read()) != null){
            System.out.println(group);
        }

    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if(reader != null){
            try {
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

编辑:安邓王辛

发布:2019-03-23 15:18:52

当前文章:http://leetaemin.cn/news/20190148504.html

零元购话术 徐州周一到周五兼职工作 静亮兼职靠什么挣钱 西安通常什么时候下班 赚客怎么注册 什么软件能赚微信零钱 秒赚赚钱神器app下载 手机挂机赚钱软件

39675 85462 75950 43058 45003 8247445813 62464 45303

我要说两句: (0人参与)

发布