datawarehouse

数仓知识

1.什么是数据仓库

数据仓库是面向主题的(主要用于业务分析),集成的(将多源数据集成在一起),相对稳定的(数据进入到数据仓库中不易发生改变),随时间变化(随着时间发展,数据仓库中存留的数据会越来越多)的数据集合。

2.数仓建模有哪几种方式

ER建模(范式建模)、维度建模、Data Vault建模、Anchor建模,其中ER和维度是主流建模,ER建模常用于关系数据库,维度建模常用于数仓建模。

范式建模和维度建模的优缺点:

范式建模符合三范式、没有数据冗余,保证了数据的一致性,数据解耦,方便更新,同时也使得表的数量较多,设计起来较为复杂,关联查询较为复杂,速度较慢,维护成本较高,需要在加入表之后仍满足范式规则。

维度建模通过预计算和聚合实现了较快的查询,维护成本较低,易于扩展,设计复杂度相较于范式建模更低。

ER建模的三范式

第一范式:属性不可分割,一个格子里只有一个值

第二范式:不能存在部分函数依赖,一行内的所有属性必须完全依赖于主键。例如(姓名不完全依赖于学号课名)

第三范式:不能存在传递函数依赖,一行内的所有属性必须直接依赖于主键。例如(系主任名不直接依赖于学号)

3.数仓分层

ODS层

DWD层

DIM层

DWM层

DWS层

ADS层

4.数仓分层的作用:

1.有更清晰的数据结构

2.数据血缘追踪

3.通过中间层的构建减少重复计算

4.复杂问题简单化

5.数仓建模的意义

  • 性能:良好的模型能帮我们快速的查找数据,减少数据的I/O吞吐

  • 成本:减少数据的重复计算

  • 效率:改善用户使用数据的体验,提升使用数据的效率

  • 改善统计口径的不一致性,减少数据计算错误的可能性

6.如何评价一个数仓建设的好坏

  • 完善度:汇总数据能直接满足多少查询需求,即应用层访问汇总层数据的查询比例,可以快速地响应业务方的需求。
  • 复用度:模型被读取用于产生下游模型的平均数量。
  • 规范度:主题域、分层、命名规范
  • 稳定性:取数时是否有时效保障
  • 扩展性:新增加模型时是否会和老模型产生冲突
  • 准确性:输出的数据指标质量能够保证
  • 健壮性:业务快速更新不会影响底层模型
  • 成本低:存储成本、时间成本、资源成本

7.数仓建模要素概念

  • 度量是业务产生的一个数值
  • 事实是一条业务中度量的集合,事实是指在业务过程或分析领域中实际发生的、可度量的事件或情况
  • 粒度是度量的单位,是事实的细节程度
  • 维度是描述事实的角度

8.数据倾斜产生的原因

在并行处理的数据集中,某一部分的数据显著多于其他部分(Spark的partition,mr的partition),会造成不同分区的数量具有较大的差异,数量大的分区在后续的task执行变慢,影响了整个任务的结束。

9.如何排查数据倾斜

(1)Spark:

  • 查看SparkUI 发现job的执行时间很长,去查询stage中task的执行时间,若task运行时间差异较大,则发生了数据倾斜,否则是资源不够
  • 去DAG执行计划中寻找Scan查看表名,确认代码位置,查询各key的数量

(2)Hive :

  • 运行输出,某个task卡在99%,或查看8088,某个stage运行时间很长,其他的很短
  • 使用explain 查看sql语句的执行计划,通过8088看哪个stage时间最长,在执行计划中查找其stage中scan对应的表名,然后去sql语句中查找,再查询各key的数量

10.如何解决数据倾斜-参数

(1)Hive:

  • 防止小文件引起的数据倾斜,大量的小文件会启动大量的maptask 导致某个节点的maptask过多,节点间的数据量不均,降低运行效率。

  • set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat在map输入之前合并小文件

  • set hive.merge.mapfiles=true 在map端输出对小文件进行合并

  • set hive.map.aggr=true 在map端进行聚合,combiner,减小map端的压力

  • set hive.groupby.skewindata=true 开启负载均衡,开启之后将mr job拆分成两个mr job,第一个mr job 将map端数据随机分布到reduce中,在reduce上进行部分聚合,第二个mr job 根据第一个mr job结果进行group by,以实行负载均衡。

  • set hive.auto.convert.join=true;设置 MapJoin 优化自动开启

  • set hive.mapjoin.smalltable.filesize=25000000 设置小表不超过多大时开启 mapjoin 优化

  • 调整reduce任务数,以调整分区数量,增加并行度(前提是未手动设置分区数量)

  • 当map端计算量较大时,增加map任务数

(2)Spark:

  • spark.conf.set("spark.sql.shuffle.partitions", "200") 调整spark的shuffle时的分区数
  • spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") 调整spark广播连接时的最大阈值

11.如何解决数据倾斜-sql写法

  • 去除无效Key,若不需要保留key值,则对key进行过滤,或直接将left join改为inner join
  • 去除无效Key,若需要保留Key值,将key随机打散,这里选用非keyid的负值,避免和skuid重复,发生错误关联 left join on on if(t1.sku_id is null,order_id*-1,t1.sku_id) = t2.sku_id
  • 大表join小表,调整广播连接(Spark)/MapJoin(Hive)参数,使用hit语法 /*+ MAPJOIN(XX) */
  • 大表join大表,热点key,热门商家id,先取订单量大的商家id,然后将热门商家的订单表和维度表关联得到热门商家信息,接着将与热门商家关联不上的冷门商家与维度表关联得到冷门商家信息,再union all。(这样热key就可以走广播表join了)
  • 大表join大表,倾斜key较多,倍数扩容,利用orderid%n将订单表打散成n份,维度表复制n份,将订单表的每部分与维度表连接,再最后union all (原理是打散订单表,增加并行度)
  • count(distinct) 转换为group by,再先预聚合,再聚合 (group by sid,uid) group by sid
  • 窗口聚合函数同理,先粗partition,再细partition

11.小文件如何产生

  • 动态分区插入数据(Spark2 和MapReduce)产生大量小文件,导致map数量剧增
  • reduce数量多,输出文件多
  • 数据源本身包含很多小文件
  • 实时数据落Hive也会有小文件

12.如何解决小文件问题

预防小文件:

  • 减少reduce数量
  • Distribute by rand()控制每个分区内的数据量平均
  • 在同步任务后加个Spark3回刷任务
  • 设置参数
1
2
3
set hive.merge.mapfiles=true;    默认值ture,在Map-only的任务结束时合并小文件。
set hive.merge.mapredfiles=true; 默认值false,在Map-Reduce的任务结束时合并小文件。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 执行MAP前进行小文件合并

已有小文件:

  • Spark3回刷分区数据

13.如何确定reduce任务和map任务的数量

map任务:

  • splitSize=Min(maxSize,Max(minSize,blockSize))
  • 按这个splitSize对文件进行分割,最后一个文件小于splitSize的1.1倍就不用分割。
  • 设置mapred.map.tasks 为一个较大的值(大于default_num=输入文件整体大小/splitSize)。

reduce任务:

  • min(hive.exec.reducers.max ,输入总数据量/hive.exec.reducers.bytes.per.reducer)
  • set mapreduce.job.reduces=1;

14.分桶表-分区表区别

  • 分桶表:clustered by sorted by 与mr过程中的分区类似,按哈希值均匀分配key值
  • 分区表:旨在更好的管理数据,按某个列值分配文件夹

15.spark shuffle的优化

  • 调整map缓冲区大小,避免过多的文件溢写:set(“spark.shuffle.file.buffer”, “64”)
  • 调整reduce端拉取数据缓冲区大小:set(“spark.reducer.maxSizeInFlight”, “96”)
  • 调节reduce端拉取数据重试次数:set(“spark.shuffle.io.maxRetries”, “6”)
  • 调节reduce端拉取数据等待间隔:set(“spark.shuffle.io.retryWait”, “60s”)
  • 调整SortShuffle排序操作阈值:set(“spark.shuffle.sort.bypassMergeThreshold”, “400”)

16.spark内存模型

堆内内存:

  • Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据
  • Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;
  • 用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。
  • 预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。

堆外内存:

  • execution 内存和storage内存

17.mapreduce优化

  • 输入端开启小文件输入合并 CombineHiveInputFormat

  • 减少溢写(Spill)次数:调整mapreduce.task.io.sort.mb和mapreduce.map.sort.spill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。

  • 减少合并(Merge)次数:调整mapreduce.task.io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,缩短MapReduce处理时间。

  • 先进行Combine处理:在不影响业务逻辑的前提下,先进行Combine处理,可以减少IO操作和提高处理速度。

  • reduce端

  • (1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。

    (2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

    (3)规避使用Reduce:因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

    (4)合理设置Reduce端的Buffer:默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。

18.hive/hdfs文件格式:

  • text file

  • sequence file

  • orc

  • parquet

19.orc与parquet优缺点

  • orc不支持嵌套,压缩率和读取速度比parquet高 适用于原生数据层
  • parquet支持嵌套 适用于高层

20.orc结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
------ Stripe 1 ------
[Index Data]Stripe 中的每一列会被划分成多个 Row Group,每一个 Row Group 默认是 10000 行。Row Group 是 ORC 的最小读取单元,每一个 Row Group 都有其对应的统计信息,存放在 Index Data 中。
[Row Data]
[Stripe Footer] Stripe Footer 里面存储着 Index Data 和 Row Data 中每一个 Stream 的 offset/length,不含列的统计信息。
------ Stripe 1 ------

------ Stripe N ------
[Index Data]
[Row Data]
[Stripe Footer]
------ Stripe N ------

------ Tail ------
[Metadata] 每个stripe每列的统计信息
[Footer] 每个stripe的元信息,行数,offset,length
[PostScript] footer和metadata的offset和length
[1 byte PostScript length]
------ Tail ------

21.parquet结构

1
2
3
4
5
6
7
8
9
10
11
12
------ Row Group 1 ------
[ColumnChunk] page1 page2 每个page有pageheader和实际数据
------ Row Group 1 ------

------ Row Group N ------
[ColumnChunk]
------ Row Group N ------

------ Tail ------
[FileMetadata] 存放了每个 Row Group 及其 ColumnChunk 的元信息和统计信息。这是因为 Row Group 是没有 footer 结构来承担该 Row Group 中的元信息 ,所以 FileMetadata 就承担了整个文件的所有元信息。
[4 byte FileMetadata length]
------ Tail ------
ORC Parquet 说明
Footer+Metadata FileMetadata 描述每一个 Stripe/RowGroup 的元信息和统计信息。
Stripe Row Group Stripe 有一个 StripeFooter,而 Row Group 没有。
Row Group Page 都是最小的读取单位。

datawarehouse
http://example.com/2024/11/14/数仓开发/数仓/数仓理论/
Author
lpy
Posted on
November 14, 2024
Licensed under