首页 源码 正文

apache源码分析,nginxapache

2024-05-13 13:05:37 61 0
admin

我们在使用不同apache源码分析的引擎进行大数据计算时,需要将数据根据计算引擎进行适配。这是一个相当棘手apache源码分析的问题,为此出现了一种新的解决方案apache源码分析:介于上层计算引擎和底层存储格式之间的一个中间层。这个中间层不是数据存储的方式,只是定义了数据的元数据组织方式,并向计算引擎提供统一的类似传统数据库中"表"的语义。它的底层仍然是Parquet、ORC等存储格式。

基于此,Netflix开发了Iceberg,目前已经是Apache的顶级项目,https://iceberg.apache.org/

一 数据湖的解决方案-Iceberg1.1 Iceberg是什么

Apache Iceberg is an open table format for huge *** ytic datasets. Iceberg adds tables to compute engines including Flink, Trino, Spark and Hive using a high-performance table format that works just like a SQL table.

Iceberg是一种开放的数据湖表格式。可以简单理解为是基于计算层(Flink , Spark)和存储层(ORC,Parqurt,Avro)的一个中间层,用Flink或者Spark将数据写入Iceberg,然后再通过其他方式来读取这个表,比如Spark,Flink,Presto等。

apache源码分析

在文件Format(parquet/avro/orc等)之上实现Table语义apache源码分析

支持定义和变更Schema支持Hidden Partition和Partition变更ACID语义历史版本回溯借助partition和columns统计信息实现分区裁剪不绑定任何存储引擎,可拓展到HDFS/S3/OSS等容许多个writer并发写入,乐观锁机制解决冲突。1.2 Iceberg的Table Format介绍

Iceberg是为分析海量数据而设计的,被定义为Table Format,Table Format介于计算层和存储层之间。

Table Format向下管理在存储系统上的文件,向上为计算层提供丰富的接口。存储系统上的文件存储都会采用一定的组织形式,譬如读一张Hive表的时候,HDFS文件系统会带一些Partition,数据存储格式、数据压缩格式、数据存储HDFS目录的信息等,这些信息都存在Metastore上,Metastore就可以称之为一种文件组织格式。

一个优秀的文件组织格式,如Iceberg,可以更高效的支持上层的计算层访问磁盘上的文件,做一些list、rename或者查找等操作。

表和表格式是两个概念。表是一个具象的概念,应用层面的概念,我们天天说的表是简单的行和列的组合。而表格式是数据库系统实现层面一个抽象的概念,它定义了一个表的Scheme定义:包含哪些字段,表下面文件的组织形式(Partition方式)、元数据信息(表相关的统计信息,表索引信息以及表的读写API),如下图左侧所示:

apache源码分析

上图右侧是Iceberg在数据仓库生态中的位置,和它差不多相当的一个组件是Metastore。不过Metastore是一个服务,而Iceberg就是一系列jar包。对于Table Format,我认为主要包含4个层面的含义,分别是表schema定义(是否支持复杂数据类型),表中文件的组织形式,表相关统计信息、表索引信息以及表的读写API信息。

表schema定义了一个表支持字段类型,比如int、string、long以及复杂数据类型等。表中文件组织形式最典型的是Partition模式,是Range Partition还是Hash Partition。Metadata数据统计信息。表的读写API。上层引擎通过对应的API读取或者写入表中的数据。1.3 Iceberg的核心思想apache源码分析

Iceberg的核心思想,就是在时间轴上跟踪表的所有变化:

快照表示表数据文件的一个完整 *** 。每次更新操作会生成一个新的快照。1.4 Iceberg的元数据管理apache源码分析

从图中可以看到Iceberg将数据进行分层管理,主要分为元数据管理层和数据存储层。元数据管理层又可以细分为三层:

Metadata FileSnapshotManifest

Metadata File存储当前版本的元数据信息(所有snapshot信息)apache源码分析;Snapshot表示当前操作的一个快照,每次commit都会生成一个快照,一个快照中包含多个Manifest。每个Manifest中记录了当前操作生成数据所对应的文件地址,也就是data files的地址。基于snapshot的管理方式,Iceberg能够进行time travel(历史版本读取以及增量读取),并且提供了serializable isolation。 数据存储层支持不同的文件格式,目前支持Parquet、ORC、AVRO。

1.5 Iceberg的重要特性

Apache Iceberg设计初衷是为了解决Hive离线数仓计算慢的问题,经过多年迭代已经发展成为构建数据湖服务的表格式标准。关于Apache Iceberg的更多介绍,请参见Apache Iceberg官网。

目前Iceberg提供以下核心能力:

apache源码分析

1.5.1 丰富的计算引擎优秀的内核抽象使之不绑定特定引擎,目前在支持的有Spark、Flink、Presto、Hive;Iceberg提供了Java native API,不用特定引擎也可以访问Iceberg表。1.5.2 灵活的文件组织形式提供了基于流式的增量计算模型和基于批处理的全量表计算模型,批任务和流任务可以使用相同的存储模型(HDFS、OZONE),数据不再孤立,以构建低成本的轻量级数据湖存储服务;Iceberg支持隐藏分区(Hidden Partitioning)和分区布局变更(Partition Evolution),方便业务进行数据分区策略更新;支持Parquet、ORC、Avro等存储格式。1.5.3 优化数据入湖流程Iceberg提供ACID事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了ETL;Iceberg提供upsert/merge into行级别数据变更,可以极大地缩小数据入库延迟。1.5.4 增量读取处理能力Iceberg支持通过流式方式读取增量数据,实现主流开源计算引擎入湖和分析场景的完善对接;Spark struct streaming支持;Flink table source支持;支持历史版本回溯。1.6 数据文件结构

我们先了解一下Iceberg在文件系统中的布局,总体来讲Iceberg分为两部分数据,第一部分是数据文件,如下图中的 parquet 文件。第二部分是表元数据文件(Metadata 文件),包含 Snapshot 文件(snap-*.avro)、Manifest 文件(*.avro)、TableMetadata 文件(*.json)等。

apache源码分析

1.6.1 元数据文件

其中metadata目录存放元数据管理层的数据,表的元数据是不可修改的,并且始终向前迭代;当前的快照可以回退。

1.6.1.1 Table Metadata

version[number].metadata.json:存储每个版本的数据更改项。

1.6.1.2 快照(SnapShot)

snap-[snapshotID]-[attemptID]-[commitUUID].avro:存储快照snapshot文件;

快照代表一张Iceberg表在某一时刻的状态。也被称为清单列表(Manifest List),里面存储的是清单文件列表,每个清单文件占用一行数据。清单列表文件以snap开头,以avro后缀结尾,每次更新都产生一个清单列表文件。每行中存储了清单文件的路径。

清单文件里面存储数据文件的分区范围、增加了几个数据文件、删除了几个数据文件等信息。数据文件(Data Files)存储在不同的Manifest Files里面,Manifest Files存储在一个Manifest List文件里面,而一个Manifest List文件代表一个快照。

1.6.1.3 清单文件(Manifest File)

[commitUUID]-[attemptID]-[manifestCount].avro:manifest文件

清单文件是以avro格式进行存储的,以avro后缀结尾,每次更新操作都会产生多个清单文件。其里面列出了组成某个快照(snapshot)的数据文件列表。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据的行数等信息。其中列级别的统计信息在 Scan 的时候可以为算子下推提供数据,以便可以过滤掉不必要的文件。

1.6.2 数据文件

data目录组织形式类似于hive,都是以分区进行目录组织(图中dt为分区列)

Iceberg的数据文件通常存放在data目录下。一共有三种存储格式(Avro、Orc和Parquet),主要是看您选择哪种存储格式,后缀分别对应avro、orc或者parquet。在一个目录,通常会产生多个数据文件。

二 Apache Iceberg的实现细节2.1 快照设计方式2.1.1 快照隔离读操作仅适用当前已生成快照;写操作会生成新的隔离快照,并在写完成后原子性提交。

如下图所示,虚线框(snapshot-1)表示正在进行写操作,但是还没有发生commit操作,这时候snapshot-1是不可读的,用户只能读取已经commit之后的snapshot。同理,snapshot-2,snapshot-3表示已经可读。

apache源码分析

可以支持并发读,例如可以同时读取S1、S2、S3的快照数据,同时,可以回溯到snapshot-2或者snapshot-3。在snapshot-4 commit完成之后,这时候snapshot-4已经变成实线,就可以读取数据了。

例如,现在current Snapshot的指针移到S3,用户对一张表的读操作,都是读current Snapshot指针所指向的Snapshot,但不会影响前面的Snapshot的读操作。

当一切准备完毕之后,会以原子操作的方式Commit这个Metadata文件,这样一次Iceberg的数据写入就完成了。随着每次的写入,Iceberg就生成了下图这样的一个文件组织模式。

2.1.2 增量读取数据

Iceberg的每个snapshot都包含前一个snapshot的所有数据,每次都相当于全量读取数据,对于整个链路来说,读取数据的代价是非常高的。

如果我们只想读取当前时刻的增量数据,就可以根据Iceberg中Snapshot的回溯机制来实现,仅读取Snapshot1到Snapshot2的增量数据,也就是下图中的紫色数据部分。

同理,S3也可以只读取红色部分的增量数据,也可以读取S1-S3的增量数据。

Iceberg支持读写分离,也就是说可以支持并发读和增量读。

apache源码分析

2.1.3 原子性操作

对于文件列表的所有修改都是原子操作。

在分区中追加数据;合并或是重写分区。apache源码分析

Iceberg是以文件为粒度提交事务的,所以就没有办法做到以秒为单位提交事务,否则会造成文件数据量膨胀。比如Flink是以CheckPoint为写入单位,物理数据在写入Iceberg之后并不能被直接查询,只有当触发了CheckPoint时才会写metadata,这时数据才会由不可见变成可见。而每次CheckPoint执行也需要一定的时间。2.2 事务性提交2.2.1 写操作要求记录当前元数据的版本--base version;创建新的元数据以及manifest文件;原子性的将base version 替换为新的版本。

原子性替换保证了线性的历史;

原子性替换需要依靠以下操作来保证。

2.2.2 冲突解决--乐观锁假定当前没有其他的写操作;遇到冲突则基于当前最新的元数据进行重试;元数据管理器所提供的能力;HDFS或是本地文件系统所提供的原子化的rename能力。三 Iceberg结合Flink场景分享3.1 构建近实时Data Pipeline

Iceberg可以做到分钟级别的准实时数据拉取。

apache源码分析

首先,Flink+Iceberg最经典的一个场景就是构建实时的Data Pipeline。业务端产生的大量日志数据,被导入到Kafka这样的消息队列。运用Flink流计算引擎执行ETL后,导入到Apache Iceberg原始表中。有一些业务场景需要直接跑分析作业来分析原始表的数据,而另外一些业务需要对数据做进一步的提纯。那么我们可以再新起一个Flink作业从Apache Iceberg表中消费增量数据,经过处理之后写入到提纯之后的Iceberg表中。此时,可能还有业务需要对数据做进一步的聚合,那么我们继续在Iceberg表上启动增量Flink作业,将聚合之后的数据结果写入到聚合表中。

有人会想,这个场景好像通过Flink+Hive也能实现。 Flink+Hive的确可以实现,但写入到Hive的数据更多地是为了实现数仓的数据分析,而不是为了做增量拉取。一般来说,Hive的增量写入以Partition为单位,时间是15min以上,Flink长期高频率地写入会造成Partition膨胀。而Iceberg容许实现1分钟甚至30秒的增量写入,这样就可以大大提高了端到端数据的实时性,上层的分析作业可以看到更新的数据,下游的增量作业可以读取到更新的数据。

3.2 CDC数据实时摄入摄出

FlinkCDC增量数据写入Iceberg。

支持准实时的数据入湖和数据分析。计算引擎原生支持CDC,无需添加额外的组件。采用统一的数据湖存储方案,并支持多种数据分析引擎。支持增量数据读取。apache源码分析

可以用Flink+Iceberg来分析来自MySQL等关系型数据库的binlog等。一方面,Apache Flink已经原生地支持CDC数据解析,一条binlog数据通过ververica flink-cdc-connector拉取之后,自动转换成Flink Runtime能识别的INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER四种消息,供用户做进一步的实时计算。

此外,CDC数据成功入湖Iceberg之后,我们还会打通常见的计算引擎,例如Presto、Spark、Hive等,他们都可以实时地读取到Iceberg表中的最新数据。

3.3 从Iceberg历史数据启动Flink任务apache源码分析

上面的架构是采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。如果需要过去很长时间例如一年的数据,可以采用常见的Lambda架构,离线链路通过kafka->flink->iceberg同步写入到数据湖,由于Kafka成本较高,保留最近7天数据即可,Iceberg存储成本较低,可以存储全量的历史数据,启动新Flink作业的时候,只需要去拉Iceberg的数据,跑完之后平滑地对接到kafka数据即可。

3.4 通过Iceberg数据来修正实时聚合结果apache源码分析

同样是在Lambda架构下,实时链路由于事件丢失或者到达顺序的问题,可能导致流计算端结果不一定完全准确,这时候一般都需要全量的历史数据来订正实时计算的结果。而我们的Iceberg可以很好地充当这个角色,因为它可以高性价比地管理好历史数据。

四 Iceberg0.11.1源代码编译4.1 编译Iceberg

构建Iceberg需要Grade5.6和Java8的环境。

4.1.1 下载Iceberg0.11.1软件包

下载地址:

https://github.com/apache/iceberg/releases/tag/apache-iceberg-0.11.1https://www.apache.org/dyn/closer.cgi/iceberg/apache-iceberg-0.11.0/apache-iceberg-0.11.0.tar.gz4.1.2 解压Iceberg0.11.1软件包[bigdata@bigdata185 software]$ tar -zxvf iceberg-apache-iceberg-0.11.1.tar.gz -C /opt/module/[bigdata@bigdata185 software]$ cd /opt/module/iceberg-apache-iceberg-0.11.1/4.1.3 修改对应的版本

我们选择最稳定的版本进行编译,Hadoop2.7.7+Hive2.3.9+Flink1.11.6+Spark3.0.3

org.apache.flink:* = 1.11.6org.apache.hadoop:* = 2.7.7org.apache.hive:hive-metastore = 2.3.9org.apache.hive:hive-serde = 2.3.9org.apache.spark:spark-hive_2.12 = 3.0.34.1.4 编辑build.gradle文件,添加国内源

(1)在buildscript的repositories中添加

maven { url 'https://mirrors.huaweicloud.com/repository/maven/' }

添加后如下所示:

buildscript { repositories { jcenter() gradlePluginPortal() maven { url 'https://mirrors.huaweicloud.com/repository/maven/' } } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:5.0.0' classpath 'com.palantir.baseline:gradle-baseline-java:3.36.2' classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.3' classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0' classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0' classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' }}

(2)allprojects中添加

maven { url 'https://mirrors.huaweicloud.com/repository/maven/' }

添加后如下所示

allprojects { group = "org.apache.iceberg" version = getProjectVersion() repositories { maven { url 'https://mirrors.huaweicloud.com/repository/maven/' } mavenCentral() mavenLocal() }}4.1.5 下载依赖(可选)

进入项目根目录,执行脚本:

[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew dependenciesapache源码分析

4.1.6 正式编译

(1)进入项目根目录,执行:

[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew build

(2)上述命令会执行代码里的单元测试,如果不需要,则执行以下命令:

[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew build -x test -x scalaStyle apache源码分析

4.1.7 生成的目录apache源码分析

4.2 Iceberg环境部署

在后面的章节中,我们分别介绍如何Iceberg0.11.1和Flink1.11.6、Spark3.0.3和Hive2.3.9集成。

五 总结数据湖的解决方案-Iceberg介绍。Apache Iceberg的技术实现细节。Iceberg结合Flink场景分享。Iceberg0.11.1源码编译。
收藏
分享
海报
0 条评论
61

本站已关闭游客评论,请登录或者注册后再评论吧~