img

大数据就是将各种数据统一收集起来进行计算,发掘其中的价值。这些数据,既包括数据库的数据,也包括日志数据,还包括专门采集的用户行为数据;既包括企业内部自己产生的数据,也包括从第三方采购的数据,还包括使用网络爬虫获取的各种互联网公开数据 ...

面对如此庞大的数据,如何存储、如何有效利用大规模的服务器集群处理计算才是大数据技术的核心。

1. 大数据技术发展史

今天人们常说的大数据技术,其实起源于 Google 在 2004 年前后发表的三篇论文,也就是我们经常听到的“三驾马车”,分别是分布式文件系统 GFS、大数据分布式计算框架 MapReduce 和 NoSQL 数据库系统 BigTable。

搜索引擎主要就做两件事情,一个是网页抓取,一个是索引构建,而在这个过程中,有大量的数据需要存储和计算。这“三驾马车”其实就是用来解决这个问题的,从介绍中也能看出,一个文件系统、一个计算框架、一个数据库系统。

现在你听到分布式、大数据之类的词,肯定一点儿也不陌生。但你要知道,在 2004 年那会儿,整个互联网还处于懵懂时代,Google 发布的论文实在是让业界为之一振,大家恍然大悟,原来还可以这么玩。

因为那个时间段,大多数公司的关注点其实还是聚焦在单机上,在思考如何提升单机的性能,寻找更贵更好的服务器。而 Google 的思路是部署一个大规模的服务器集群,通过分布式的方式将海量数据存储在这个集群上,然后利用集群上的所有机器进行数据计算。 这样,Google 其实不需要买很多很贵的服务器,它只要把这些普通的机器组织到一起,就非常厉害了。

当时的天才程序员,也是 Lucene 开源项目的创始人 Doug Cutting 正在开发开源搜索引擎 Nutch,阅读了 Google 的论文后,他非常兴奋,紧接着就根据论文原理初步实现了类似 GFS 和 MapReduce 的功能。

两年后的 2006 年,Doug Cutting 将这些大数据相关的功能从 Nutch 中分离了出来,然后启动了一个独立的项目专门开发维护大数据技术,这就是后来赫赫有名的 Hadoop,主要包括 Hadoop 分布式文件系统 HDFS 和大数据计算引擎 MapReduce。

Hadoop 发布之后,Yahoo 很快就用了起来。大概又过了一年到了 2007 年,百度和阿里巴巴也开始使用 Hadoop 进行大数据存储与计算。

2008 年,Hadoop 正式成为 Apache 的顶级项目,后来 Doug Cutting 本人也成为了 Apache 基金会的主席。自此,Hadoop 作为软件开发领域的一颗明星冉冉升起。

同年,专门运营 Hadoop 的商业公司 Cloudera 成立,Hadoop 得到进一步的商业支持。

这个时候,Yahoo 的一些人觉得用 MapReduce 进行大数据编程太麻烦了,于是便开发了 Pig。Pig 是一种脚本语言,使用类 SQL 的语法,开发者可以用 Pig 脚本描述要对大数据集上进行的操作,Pig 经过编译后会生成 MapReduce 程序,然后在 Hadoop 上运行。

编写 Pig 脚本虽然比直接 MapReduce 编程容易,但是依然需要学习新的脚本语法。于是 Facebook 又发布了 Hive。Hive 支持使用 SQL 语法来进行大数据计算,比如说你可以写个 Select 语句进行数据查询,然后 Hive 会把 SQL 语句转化成 MapReduce 的计算程序。

这样,熟悉数据库的数据分析师和工程师便可以无门槛地使用大数据进行数据分析和处理了。Hive 出现后极大程度地降低了 Hadoop 的使用难度,迅速得到开发者和企业的追捧。据说,2011 年的时候,Facebook 大数据平台上运行的作业 90% 都来源于 Hive。

随后,众多 Hadoop 周边产品开始出现,大数据生态体系逐渐形成,其中包括:专门将关系数据库中的数据导入导出到 Hadoop 平台的 Sqoop;针对大规模日志进行分布式收集、聚合和传输的 Flume;MapReduce 工作流调度引擎 Oozie 等。

在 Hadoop 早期,MapReduce 既是一个执行引擎,又是一个资源调度框架,服务器集群的资源调度管理由 MapReduce 自己完成。但是这样不利于资源复用,也使得 MapReduce 非常臃肿。于是一个新项目启动了,将 MapReduce 执行引擎和资源调度分离开来,这就是 Yarn。2012 年,Yarn 成为一个独立的项目开始运营,随后被各类大数据产品支持,成为大数据平台上最主流的资源调度系统。

同样是在 2012 年,UC 伯克利 AMP 实验室(Algorithms、Machine 和 People 的缩写)开发的 Spark 开始崭露头角。当时 AMP 实验室的马铁博士发现使用 MapReduce 进行机器学习计算的时候性能非常差,因为机器学习算法通常需要进行很多次的迭代计算,而 MapReduce 每执行一次 Map 和 Reduce 计算都需要重新启动一次作业,带来大量的无谓消耗。还有一点就是 MapReduce 主要使用磁盘作为存储介质,而 2012 年的时候,内存已经突破容量和成本限制,成为数据运行过程中主要的存储介质。Spark 一经推出,立即受到业界的追捧,并逐步替代 MapReduce 在企业应用中的地位。

一般说来,像 MapReduce、Spark 这类计算框架处理的业务场景都被称作批处理计算,因为它们通常针对以“天”为单位产生的数据进行一次计算,然后得到需要的结果,这中间计算需要花费的时间大概是几十分钟甚至更长的时间。因为计算的数据是非在线得到的实时数据,而是历史数据,所以这类计算也被称为大数据离线计算。

而在大数据领域,还有另外一类应用场景,它们需要对实时产生的大量数据进行即时计算,比如对于遍布城市的监控摄像头进行人脸识别和嫌犯追踪。这类计算称为大数据流计算,相应地,有 Storm、Flink、Spark Streaming 等流计算框架来满足此类大数据应用的场景。 流式计算要处理的数据是实时在线产生的数据,所以这类计算也被称为大数据实时计算。在典型的大数据的业务场景下,数据业务最通用的做法是,采用批处理的技术处理历史全量数据,采用流式计算处理实时新增数据。而像 Flink 这样的计算引擎,可以同时支持流式计算和批处理计算。

除了大数据批处理和流处理,NoSQL 系统处理的主要也是大规模海量数据的存储与访问,所以也被归为大数据技术。 NoSQL 曾经在 2011 年左右非常火爆,涌现出 HBase、Cassandra 等许多优秀的产品,其中 HBase 是从 Hadoop 中分离出来的、基于 HDFS 的 NoSQL 系统。

此外,大数据要存入分布式文件系统(HDFS),要有序调度 MapReduce 和 Spark 作业执行,并能把执行结果写入到各个应用系统的数据库中,还需要有一个大数据平台整合所有这些大数据组件和企业应用系统。

88283b2f-24b7-4be0-8908-e28be1a01f1c

图中的所有这些框架、平台以及相关的算法共同构成了大数据的技术体系

2. HDFS

Google 大数据“三驾马车”的第一驾是 GFS(Google 文件系统),而 Hadoop 的第一个产品是 HDFS,可以说分布式文件存储是分布式计算的基础,也可见分布式文件存储的重要性。

在整个大数据体系里面,最宝贵、最难以代替的资产就是数据,大数据所有的一切都要围绕数据展开。HDFS 作为最早的大数据存储系统,存储着宝贵的数据资产,各种新的算法、框架要想得到人们的广泛使用,必须支持 HDFS 才能获取已经存储在里面的数据。所以大数据技术越发展,新技术越多,HDFS 得到的支持越多,我们越离不开 HDFS。HDFS 也许不是最好的大数据存储技术,但依然最重要的大数据存储技术。

Hadoop 分布式文件系统 HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供数以 PB 计的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

那么,HDFS 是如何实现大数据高速、可靠的存储和访问的。

2.1 大容量存储和高速访问

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

1812b1ce-6261-46b7-ab97-a2fb05ce85ab

上图是 HDFS 的架构图,从图中你可以看到 HDFS 的关键组件有两个,一个是 DataNode,一个是 NameNode。

DataNode 负责文件数据的存储和读写操作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。应用程序客户端(Client)可以并行对这些数据块进行访问,从而使得 HDFS 可以在服务器集群规模上实现数据并行访问,极大地提高了访问速度。

在实践中,HDFS 集群的 DataNode 服务器会有很多台,一般在几百台到几千台这样的规模,每台服务器配有数块磁盘,整个集群的存储容量大概在几 PB 到数百 PB。

NameNode 负责整个分布式文件系统的元数据(MetaData)管理,也就是文件路径名、数据块的 ID 以及存储位置等信息,相当于操作系统中文件分配表(FAT)的角色。HDFS 为了保证数据的高可用,会将一个数据块复制为多份(缺省情况为 3 份),并将多份相同的数据块存储在不同的服务器上,甚至不同的机架上。这样当有磁盘损坏,或者某个 DataNode 服务器宕机,甚至某个交换机宕机,导致其存储的数据块不能访问的时候,客户端会查找其备份的数据块进行访问。

下面这张图是数据块多份复制存储的示意,图中对于文件 /users/sameerp/data/part-0,其复制备份数设置为 2,存储的 BlockID 分别为 1、3。Block1 的两个备份存储在 DataNode0 和 DataNode2 两个服务器上,Block3 的两个备份存储 DataNode4 和 DataNode6 两个服务器上,上述任何一台服务器宕机后,每个数据块都至少还有一个备份存在,不会影响对文件 /users/sameerp/data/part-0 的访问。

7c07ad03-03a8-4638-94e1-6998bad00f1e

数据分成若干数据块后存储到不同服务器上,可以实现数据大容量存储,并且不同分片的数据可以并行进行读 / 写操作,进而实现数据的高速访问。

2.2 HA(High Availability)架构模型

  1. 数据存储故障容错
    磁盘介质在存储过程中受环境或者老化影响,其存储的数据可能会出现错乱。HDFS 的应对措施是,对于存储在 DataNode 上的数据块,计算并存储校验和(CheckSum)。在读取数据的时候,重新计算读取出来的数据的校验和,如果校验不正确就抛出异常,应用程序捕获异常后就到其他 DataNode 上读取备份数据。

  2. 磁盘故障容错
    如果 DataNode 监测到本机的某块磁盘损坏,就将该块磁盘上存储的所有 BlockID 报告给 NameNode,NameNode 检查这些数据块还在哪些 DataNode 上有备份,通知相应的 DataNode 服务器将对应的数据块复制到其他服务器上,以保证数据块的备份数满足要求。

  3. DataNode 故障容错
    DataNode 会通过心跳和 NameNode 保持通信,如果 DataNode 超时未发送心跳,NameNode 就会认为这个 DataNode 已经宕机失效,立即查找这个 DataNode 上存储的数据块有哪些,以及这些数据块还存储在哪些服务器上,随后通知这些服务器再复制一份数据块到其他服务器上,保证 HDFS 存储的数据块备份数符合用户设置的数目,即使再出现服务器宕机,也不会丢失数据。

  4. NameNode 故障容错
    NameNode 是整个 HDFS 的核心,记录着 HDFS 文件分配表信息,所有的文件路径和数据块存储信息都保存在 NameNode,如果 NameNode 故障,整个 HDFS 系统集群都无法使用;如果 NameNode 上记录的数据丢失,整个集群所有 DataNode 存储的数据也就没用了。
    所以,NameNode 高可用容错能力非常重要。NameNode 采用主从热备的方式提供高可用服务,请看下图。
    在一个 HA 集群中,会配置两个 NameNode ,一个是 Active NameNode(主),一个是 Standby NameNode(备)。主节点负责执行所有修改命名空间的操作,备节点则执行同步操作,以保证与主节点命名空间的一致性。HA 架构模型如下图所示:

    9b2bcf58-64a4-482d-a2a0-f428c13c2ca8

    HA 集群中所包含的进程的职责各不相同。为了使得主节点和备用节点的状态一致,采用了 Quorum Journal Manger (QJM)方案解决了主备节点共享存储问题,如图 JournalNode 进程,下面依次介绍各个进程在架构中所起的作用:

    • Active NameNode:它负责执行整个文件系统中命名空间的所有操作;维护着数据的元数据,包括文件名、副本数、文件的 BlockId 以及 Block 块所对应的节点信息;另外还接受 Client 端读写请求和 DataNode 汇报 Block 信息。
    • Standby NameNode:它是 Active NameNode 的备用节点,一旦主节点宕机,备用节点会切换成主节点对外提供服务。它主要是监听 JournalNode Cluster 上 editlog 变化,以保证当前命名空间尽可能的与主节点同步。任意时刻,HA 集群只有一台 Active NameNode,另一个节点为 Standby NameNode。
    • JournalNode Cluster: 用于主备节点间共享 editlog 日志文件的共享存储系统。负责存储 editlog 日志文件, 当 Active NameNode 执行了修改命名空间的操作时,它会定期将执行的操作记录在 editlog 中,并写入 JournalNode Cluster 中。Standby NameNode 会一直监听 JournalNode Cluster 上 editlog 的变化,如果发现 editlog 有改动,备用节点会读取 JournalNode 上的 editlog 并与自己当前的命名空间合并,从而实现了主备节点的数据一致性。

    注意:QJM 方案是基于 Paxos 算法实现的,集群由 2N + 1 JouranlNode 进程组成,最多可以容忍 N 台 JournalNode 宕机,宕机数大于 N 台,这个算法就失效了!

    • ZKFailoverController: ZKFC 以独立进程运行,每个 ZKFC 都监控自己负责的 NameNode,它可以实现 NameNode 自动故障切换:即当主节点异常,监控主节点的 ZKFC 则会断开与 ZooKeeper 的连接,释放分步式锁,监控备用节点的 ZKFC 进程会去获取锁,同时把备用 NameNode 切换成 主 NameNode。
    • ZooKeeper: 为 ZKFC 进程实现自动故障转移提供统一协调服务。通过 ZooKeeper 中 Watcher 监听机制,通知 ZKFC 异常NameNode 下线;保证同一时刻只有一个主节点。
    • DataNode: DataNode 是实际存储文件 Block 块的地方,一个 Block 块包含两个文件:一个是数据本身,一个是元数据(数据块长度、块数据的校验和、以及时间戳),DataNode 启动后会向 NameNode 注册,每 6 小时同时向主备两个 NameNode 上报所有的块信息,每 3 秒同时向主备两个 NameNode 发送一次心跳。

    DataNode 向 NameNode 汇报当前块信息的时间间隔,默认 6 小时,其配置参数名如下:

    <property>
     <name>dfs.blockreport.intervalMsec</name>
     <value>21600000</value>
     <description>Determines block reporting interval in milliseconds.</description>
    </property>
    

2.3 HA主备故障切换流程

HA 集群刚启动时,两个 NameNode 节点状态均为 Standby,之后两个 NameNode 节点启动 ZKFC 进程后会去 ZooKeeper 集群抢占分步式锁,成功获取分步式锁,ZooKeeper 会创建一个临时节点,成功抢占分步式锁的 NameNode 会成为 Active NameNode,ZKFC 便会实时监控自己的 NameNode。

HDFS 提供了两种 HA 状态切换方式:一种是管理员手动通过DFSHAAdmin -faieover执行状态切换;另一种则是自动切换。下面分别从两种情况分析故障的切换流程:

  1. 主 NameNdoe 宕机后,备用 NameNode 如何升级为主节点?

当主 NameNode 宕机后,对应的 ZKFC 进程检测到 NameNode 状态,便向 ZooKeeper 发生删除锁的命令,锁删除后,则触发一个事件回调备用 NameNode 上的 ZKFC

ZKFC 得到消息后先去 ZooKeeper 争夺创建锁,锁创建完成后会检测原先的主 NameNode 是否真的挂掉(有可能由于网络延迟,心跳延迟),挂掉则升级备用 NameNode 为主节点,没挂掉则将原先的主节点降级为备用节点,将自己对应的 NameNode 升级为主节点。

  1. 主 NameNode 上的 ZKFC 进程挂掉,主 NameNode 没挂,如何切换?

ZKFC 挂掉后,ZKFC 和 ZooKeeper 之间 TCP 链接会随之断开,session 也会随之消失,锁被删除,触发一个事件回调备用 NameNode ZKFC,ZKFC 得到消息后会先去 ZooKeeper 争夺创建锁,锁创建完成后也会检测原先的主 NameNode 是否真的挂掉,挂掉则升级 备用 NameNode 为主节点,没挂掉则将主节点降级为备用节点,将自己对应的 NameNode 升级为主节点。

2.4 Block、packet及chunk 概念

在 HDFS 中,文件存储是按照数据块(Block)为单位进行存储的,在读写数据时,DFSOutputStream使用 Packet 类来封装一个数据包。每个 Packet 包含了若干个 chunk 和对应的 checksum。

  • Block: HDFS 上的文件都是分块存储的,即把一个文件物理划分为一个 Block 块存储。Hadoop 2.X/3.X 默认块大小为 128 M,1.X 为 64M.
  • Packet: 是 Client 端向 DataNode 或 DataNode 的 Pipline 之间传输数据的基本单位,默认 64 KB
  • Chunk: Chunk 是最小的单位,它是 Client 向 DataNode 或 DataNode PipLine 之间进行数据校验的基本单位,默认 512 Byte ,因为用作校验,所以每个 Chunk 需要带有 4 Byte 的校验位,实际上每个 Chunk 写入 Packtet 的大小为 516 Byte。

2.5 HDFS 读流程

8e0f31ac-2092-4206-934e-30961fea533d

以从 HDFS 读取一个 information.txt 文件为例,其读取流程如上图所示,分为以下几个步骤:

  1. 打开 information.txt 文件:首先客户端调用 DistributedFileSystem.open() 方法打开文件,这个方法在底层会调用DFSclient.open() 方法,该方法会返回一个 HdfsDataInputStream 对象用于读取数据块。但实际上真正读取数据的是 DFSInputStream ,而 HdfsDataInputStream 是 DFSInputStream 的装饰类(new HdfsDataInputStream(DFSInputStream))。
  2. 从 NameNode 获取存储 information.txt 文件数据块的 DataNode 地址:即获取组成 information.txt block 块信息。在构造输出流 DFSInputStream 时,会通过调用 getBlockLocations() 方法向 NameNode 节点获取组成 information.txt 的 block 的位置信息,并且 block 的位置信息是按照与客户端的距离远近排好序。
  3. 连接 DataNode 读取数据块: 客户端通过调用 DFSInputStream.read() 方法,连接到离客户端最近的一个 DataNode 读取 Block 块,数据会以数据包(packet)为单位从 DataNode 通过流式接口传到客户端,直到一个数据块读取完成;DFSInputStream会再次调用 getBlockLocations() 方法,获取下一个最优节点上的数据块位置。
  4. 直到所有文件读取完成,调用 close() 方法,关闭输入流,释放资源。

从上述流程可知,整个过程最主要涉及到 open()read()两个方法(其它方法都是在这两个方法的调用链中调用。

2.6 HDFS 写流程

HDFS 写流程涉及的方法比较多,过程也比较复杂。

1192ba64-822c-4f1e-b749-332830d75472

  1. 在 namenode 创建文件: 当 client 写一个新文件时,首先会调用 DistributeedFileSytem.creat() 方法,DistributeFileSystem 是客户端创建的一个对象,在收到 creat 命令之后,DistributeFileSystem 通过 RPC 与 NameNode 通信,让它在文件系统的 namespace 创建一个独立的新文件;namenode 会先确认文件是否存在以及客户端是否有权限,确认成功后,会返回一个 HdfsDataOutputStream 对象,与读流程类似,这个对象底层包装了一个 DFSOutputStream 对象,它才是写数据的真正执行者。
  2. 建立数据流 pipeline 管道: 客户端得到一个输出流对象,还需要通过调用 ClientProtocol.addBlock()向 namenode 申请新的空数据块,addBlock( ) 会返回一个 LocateBlock 对象,该对象保存了可写入的 DataNode 的信息,并构成一个 pipeline,默认是有三个 DataNode 组成。
  3. 通过数据流管道写数据:DFSOutputStream调用 write()方法把数据写入时,数据会先被缓存在一个缓冲区中,写入的数据会被切分成多个数据包,每当达到一个数据包长度(默认65536字节)时,DFSOutputStream会构造一个 Packet 对象保存这个要发送的数据包;新构造的 Packet 对象会被放到 DFSOutputStream维护的 dataQueue 队列中,DataStreamer 线程会从 dataQueue 队列中取出 Packet 对象,通过底层 IO 流发送到 pipeline 中的第一个 DataNode,然后继续将所有的包转到第二个 DataNode 中,以此类推。发送完毕后,这个 Packet 会被移出 dataQueue,放入 DFSOutputStream 维护的确认队列 ackQueue 中,该队列等待下游 DataNode 的写入确认。当一个包已经被 pipeline 中所有的 DataNode 确认了写入磁盘成功,这个数据包才会从确认队列中移除。
  4. 关闭输入流并提交文件: 当客户端完成了整个文件中所有的数据块的写操作之后,会调用 close() 方法关闭输出流,客户端还会调用 ClientProtoclo.complete( ) 方法通知 NameNode 提交这个文件中的所有数据块,NameNode 还会确认该文件的备份数是否满足要求。对于 DataNode 而言,它会调用 blockReceivedAndDelete() 方法向 NameNode 汇报,NameNode 会更新内存中的数据块与数据节点的对应关系。

从上述流程来看,整个写流程主要涉及到了 creat()write()这些方法。

3. MapReduce

3.1 移动计算

传统的软件计算处理模型,都是“输入 -> 计算 -> 输出”模型。也就是说,一个程序给它传入一些数据也好,它自己从某个地方读取一些数据也好,总是先有一些输入数据,然后对这些数据进行计算处理,最后得到输出结果。

但是在互联网大数据时代,需要计算处理的数据量急速膨胀。一来是因为互联网用户数远远超过传统企业的用户,相应产生了更大量的数据;二来很多以往被忽视的数据重新被发掘利用,比如用户在一个页面的停留时长、鼠标在屏幕移动的轨迹都会被记录下来进行分析。在稍微大一点的互联网企业,需要计算处理的数据量常常以 PB 计(1015 Byte)。

正因为如此,传统的计算处理模型不能适用于大数据时代的计算要求。你能想象一个程序读取 PB 级的数据进行计算是怎样一个场景吗?一个程序所能调度的网络带宽(通常数百 MB)、内存容量(通常几十 GB )、磁盘大小(通常数 TB)、CPU 运算速度是不可能满足这种计算要求的。

3.2 PB 级数据计算

既然数据是庞大的,而程序要比数据小得多,将数据输入给程序是不划算的,那么就反其道而行之,将程序分发到数据所在的地方进行计算,也就是所谓的移动计算。

  1. 将待处理的大规模数据存储在服务器集群的所有服务器上,主要使用 HDFS 分布式文件存储系统,将文件分成很多块(Block),以块为单位存储在集群的服务器上。
  2. 大数据引擎根据集群里不同服务器的计算能力,在每台服务器上启动若干分布式任务执行进程,这些进程会等待给它们分配执行任务。
  3. 使用大数据计算框架支持的编程模型进行编程,比如 Hadoop 的 MapReduce 编程模型,或者 Spark 的 RDD 编程模型。应用程序编写好以后,将其打包,MapReduce 和 Spark 都是在 JVM 环境中运行,所以打包出来的是一个 Java 的 JAR 包。
  4. 用 Hadoop 或者 Spark 的启动命令执行这个应用程序的 JAR 包,首先执行引擎会解析程序要处理的数据输入路径,根据输入数据量的大小,将数据分成若干片(Split),每一个数据片都分配给一个任务执行进程去处理。
  5. 任务执行进程收到分配的任务后,检查自己是否有任务对应的程序包,如果没有就去下载程序包,下载以后通过反射的方式加载程序。走到这里,最重要的一步,也就是移动计算就完成了。
  6. 加载程序后,任务执行进程根据分配的数据片的文件地址和数据在文件内的偏移量读取数据,并把数据输入给应用程序相应的方法去执行,从而实现在分布式服务器集群中移动计算程序,对大规模数据进行并行处理的计算目标。

3.3 MapReduce编程模型

Hadoop 1 中 MapReduce 既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于 MapReduce 编程模型进行编程开发,然后将程序通过 MapReduce 计算框架分发到 Hadoop 集群中运行。

其编程模型只包含 Map 和 Reduce 两个过程,map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value 集合 >;再将这个 <Key, Value 集合 > 输入 reduce,经过计算输出零个或多个 <Key, Value> 对。

以 WordCount 程序为例,一起来看下 MapReduce 的计算过程。

WordCount 主要解决的是文本处理中词频统计的问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十 KB 到几 MB 的数据,只需要写一个程序,将数据读入内存,建一个 Hash 表记录每个词出现的次数就可以了。这个统计过程你可以看下面这张图。

330f7781-5631-4628-bb22-68b2aa2ac7a0

如果用 Python 语言,单机处理 WordCount 的代码是这样的。

# 文本前期处理
strl_ist = str.replace('\n', '').lower().split(' ')
count_dict = {}
# 如果字典里有该单词则加1,否则添加入字典
for str in strl_ist:
if str in count_dict.keys():
    count_dict[str] = count_dict[str] + 1
    else:
        count_dict[str] = 1

简单说来,就是建一个 Hash 表,然后将字符串里的每个词放到这个 Hash 表里。如果这个词第一次放到 Hash 表,就新建一个 Key、Value 对,Key 是这个词,Value 是 1。如果 Hash 表里已经有这个词了,那么就给这个词的 Value + 1。

小数据量用单机统计词频很简单,但是如果想统计全世界互联网所有网页(数万亿计)的词频数(而这正是 Google 这样的搜索引擎的典型需求),不可能写一个程序把全世界的网页都读入内存,这时候就需要用 MapReduce 编程来解决。

WordCount 的 MapReduce 程序如下。

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
}

可以从这段代码中看到,MapReduce 版本 WordCount 程序的核心是一个 map 函数和一个 reduce 函数。

map 函数的输入主要是一个 <Key, Value> 对,在这个例子里,Value 是要统计的所有文本中的一行数据,Key 在一般计算中都不会用到。

public void map(Object key, Text value, Context context)

map 函数的计算过程是,将这行文本中的单词提取出来,针对每个单词输出一个 <word, 1> 这样的 <Key, Value> 对。

MapReduce 计算框架会将这些 <word , 1> 收集起来,将相同的 word 放在一起,形成 <word , <1,1,1,1,1,1,1…>> 这样的 <Key, Value 集合 > 数据,然后将其输入给 reduce 函数。

public void reduce(Text key, Iterable<IntWritable> values, Context context)

这里 reduce 的输入参数 Values 就是由很多个 1 组成的集合,而 Key 就是具体的单词 word。

reduce 函数的计算过程是,将这个集合里的 1 求和,再将单词(word)和这个和(sum)组成一个 <Key, Value>,也就是 <word, sum> 输出。每一个输出就是一个单词和它的词频统计总和。

一个 map 函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是 HDFS 所做的),MapReduce 计算框架为每个数据块分配一个 map 函数去计算,从而实现大数据的分布式计算。

假设有两个数据块的文本数据需要进行词频统计,MapReduce 计算过程如下图所示。

2043fa42-71e7-4586-9776-8e5b5d73a657

以上就是 MapReduce 编程模型的主要计算过程和原理,但是这样一个 MapReduce 程序要想在分布式环境中执行,并处理海量的大规模数据,还需要一个计算框架,能够调度执行这个 MapReduce 程序,使它在分布式的集群中并行运行,而这个计算框架也叫 MapReduce。

3.4 MapReduce计算框架

MapReduce 计算框架是如何运作,这个过程有两个关键问题需要处理。

  1. 如何为每个数据块分配一个 Map 计算任务,也就是代码是如何发送到数据块所在服务器的,发送后是如何启动的,启动以后如何知道自己需要计算的数据在文件什么位置(BlockID 是什么)。
  2. 处于不同服务器的 map 输出的 <Key, Value> ,如何把相同的 Key 聚合在一起发送给 Reduce 任务进行处理。

图中标红的两处,这两个关键问题对应的就是图中的两处“MapReduce 框架处理”,具体来说,它们分别是 MapReduce 作业启动和运行,以及 MapReduce 数据合并与连接。

dd0c4dd3-2e43-4b4d-8a1d-7fd1a271de77

以 Hadoop 1 为例,MapReduce 运行过程涉及三类关键进程。

  1. 大数据应用进程。这类进程是启动 MapReduce 程序的主入口,主要是指定 Map 和 Reduce 类、输入输出文件路径等,并提交作业给 Hadoop 集群,也就是下面提到的 JobTracker 进程。这是由用户启动的 MapReduce 程序进程,比如我们上期提到的 WordCount 程序。
  2. JobTracker 进程。这类进程根据要处理的输入数据量,命令下面提到的 TaskTracker 进程启动相应数量的 Map 和 Reduce 进程任务,并管理整个作业生命周期的任务调度和监控。这是 Hadoop 集群的常驻进程,需要注意的是,JobTracker 进程在整个 Hadoop 集群全局唯一。
  3. TaskTracker 进程。这个进程负责启动和管理 Map 进程以及 Reduce 进程。因为需要每个数据块都有对应的 map 函数,TaskTracker 进程通常和 HDFS 的 DataNode 进程启动在同一个服务器。也就是说,Hadoop 集群中绝大多数服务器同时运行 DataNode 进程和 TaskTracker 进程。

JobTracker 进程和 TaskTracker 进程是主从关系,主服务器通常只有一台(或者另有一台备机提供高可用服务,但运行时只有一台服务器对外提供服务,真正起作用的只有一台),从服务器可能有几百上千台,所有的从服务器听从主服务器的控制和调度安排。主服务器负责为应用程序分配服务器资源以及作业执行的调度,而具体的计算操作则在从服务器上完成。

具体来看,MapReduce 的主服务器就是 JobTracker,从服务器就是 TaskTracker。还记得我们讲 HDFS 也是主从架构吗,HDFS 的主服务器是 NameNode,从服务器是 DataNode。后面会讲到的 Yarn、Spark 等也都是这样的架构,这种一主多从的服务器架构也是绝大多数大数据系统的架构方案。

具体的作业启动和计算过程到底是怎样的呢?根据上面所讲的绘制成一张图,可以从图中一步一步来看,感受一下整个流程。

c77bc9f0-df58-4650-9bd7-1f6c54f8ccc4

  1. 应用进程 JobClient 将用户作业 JAR 包存储在 HDFS 中,将来这些 JAR 包会分发给 Hadoop 集群中的服务器执行 MapReduce 计算。
  2. 应用程序提交 job 作业给 JobTracker。
  3. JobTracker 根据作业调度策略创建 JobInProcess 树,每个作业都会有一个自己的 JobInProcess 树。
  4. JobInProcess 根据输入数据分片数目(通常情况就是数据块的数目)和设置的 Reduce 数目创建相应数量的 TaskInProcess。
  5. TaskTracker 进程和 JobTracker 进程进行定时通信。
  6. 如果 TaskTracker 有空闲的计算资源(有空闲 CPU 核心),JobTracker 就会给它分配任务。分配任务的时候会根据 TaskTracker 的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据,以实现我们一开始就提到的“移动计算比移动数据更划算”。
  7. TaskTracker 收到任务后根据任务类型(是 Map 还是 Reduce)和任务参数(作业 JAR 包路径、输入数据文件路径、要处理的数据在文件中的起始位置和偏移量、数据块多个备份的 DataNode 主机名等),启动相应的 Map 或者 Reduce 进程。
  8. Map 或者 Reduce 进程启动后,检查本地是否有要执行任务的 JAR 包文件,如果没有,就去 HDFS 上下载,然后加载 Map 或者 Reduce 代码开始执行。
  9. 如果是 Map 进程,从 HDFS 读取数据(通常要读取的数据块正好存储在本机);如果是 Reduce 进程,将结果数据写出到 HDFS。

通过这样一个计算旅程,MapReduce 可以将大数据作业计算任务分布在整个 Hadoop 集群中运行,每个 Map 计算任务要处理的数据通常都能从本地磁盘上读取到。

3.5 MapReduce 数据合并与连接机制

WordCount 例子中,我们想要统计相同单词在所有输入数据中出现的次数,而一个 Map 只能处理一部分数据,一个热门单词几乎会出现在所有的 Map 中,这意味着同一个单词必须要合并到一起进行统计才能得到正确的结果。

几乎所有的大数据计算场景都需要处理数据关联的问题,像 WordCount 这种比较简单的只要对 Key 进行合并就可以了,对于像数据库的 join 操作这种比较复杂的,需要对两种类型(或者更多类型)的数据根据 Key 进行连接。

在 map 输出与 reduce 输入之间,MapReduce 计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫 shuffle。那到底什么是 shuffle?shuffle 的具体过程又是怎样的呢?请看下图。

37292a98-8891-4d43-8736-827f2551c9c5

每个 Map 任务的计算结果都会写入到本地文件系统,等 Map 任务快要计算完成的时候,MapReduce 计算框架会启动 shuffle 过程,在 Map 任务进程调用一个 Partitioner 接口,对 Map 产生的每个 <Key, Value> 进行 Reduce 分区选择,然后通过 HTTP 通信发送给对应的 Reduce 进程。这样不管 Map 位于哪个服务器节点,相同的 Key 一定会被发送给相同的 Reduce 进程。Reduce 任务进程对收到的 <Key, Value> 进行排序和合并,相同的 Key 放在一起,组成一个 <Key, Value 集合 > 传递给 Reduce 执行。

map 输出的 <Key, Value>shuffle 到哪个 Reduce 进程是这里的关键,它是由 Partitioner 来实现,MapReduce 框架默认的 Partitioner 用 Key 的哈希值对 Reduce 任务数量取模,相同的 Key 一定会落在相同的 Reduce 任务 ID 上。从实现上来看的话,这样的 Partitioner 代码只需要一行。

/** Use {@link Object#hashCode()} to partition. */ 
public int getPartition(K2 key, V2 value, int numReduceTasks) { 
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
 }

分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是 shuffle。

4. YARN

4.1 YARN 基本架构

在 MapReduce 应用程序的启动过程中,最重要的就是要把 MapReduce 程序分发到大数据集群的服务器上,在 Hadoop 1 中,这个过程主要是通过 TaskTracker 和 JobTracker 通信来完成。

这种架构方案的主要缺点是,服务器集群资源调度管理和 MapReduce 执行过程耦合在一起,如果想在当前集群中运行其他计算任务,比如 Spark 或者 Storm,就无法统一使用集群中的资源了。

随着大数据技术的发展,各种新的计算框架不断出现,我们不可能为每一种计算框架部署一个服务器集群,而且就算能部署新集群,数据还是在原来集群的 HDFS 上。所以我们需要把 MapReduce 的资源管理和计算框架分开,这也是 Hadoop 2 最主要的变化,就是将 Yarn 从 MapReduce 中分离出来,成为一个独立的资源调度框架。

Yarn 是“Yet Another Resource Negotiator”的缩写,字面意思就是“另一种资源调度器”。事实上,在 Hadoop 社区决定将资源管理从 Hadoop 1 中分离出来,独立开发 Yarn 的时候,业界已经有一些大数据资源管理产品了,比如 Mesos 等。

下图是 Yarn 的架构。

6551cc53-78fa-42f1-984a-fa61fef8a9c7

从图上看,Yarn 包括两个部分:一个是资源管理器(Resource Manager),一个是节点管理器(Node Manager)。这也是 Yarn 的两种主要进程:ResourceManager 进程负责整个集群的资源调度管理,通常部署在独立的服务器上;NodeManager 进程负责具体服务器上的资源和任务管理,在集群的每一台计算服务器上都会启动,基本上跟 HDFS 的 DataNode 进程一起出现。

具体说来,资源管理器又包括两个主要组件:调度器和应用程序管理器。

调度器其实就是一个资源分配算法,根据应用程序(Client)提交的资源申请和当前服务器集群的资源状况进行资源分配。Yarn 内置了几种资源调度算法,包括 Fair Scheduler、Capacity Scheduler 等,你也可以开发自己的资源调度算法供 Yarn 调用。

Yarn 进行资源分配的单位是容器(Container),每个容器包含了一定量的内存、CPU 等计算资源,默认配置下,每个容器包含一个 CPU 核心。容器由 NodeManager 进程启动和管理,NodeManger 进程会监控本节点上容器的运行状况并向 ResourceManger 进程汇报。

应用程序管理器负责应用程序的提交、监控应用程序运行状态等。应用程序启动后需要在集群中运行一个 ApplicationMaster,ApplicationMaster 也需要运行在容器里面。每个应用程序启动后都会先启动自己的 ApplicationMaster,由 ApplicationMaster 根据应用程序的资源需求进一步向 ResourceManager 进程申请容器资源,得到容器以后就会分发自己的应用程序代码到容器上启动,进而开始分布式计算。

4.2 YARN 工作调度流程

当用户向 YARN 集群中提交一个 MR 程序后,可以分成 2 个大的阶段:

  • 首先,启动 ApplicationMaster
  • 其次,ApplicationMaster 创建应用程序,并为该应用程序申请资源,监控它的运行状态,直到运行完成。

为了更好的理解 YARN 的工作流程,下面以 MapReduce 计算框架为例,通过提交一个 WordCount 应用程序,来全面解析 YARN 的工作机制。其运行流程如图所示:

3ddba366-209e-4a1b-a225-3817ed3a6f07

  1. 我们向 Yarn 提交应用程序,包括 MapReduce ApplicationMaster、我们的 MapReduce 程序,以及 MapReduce Application 启动命令。
  2. ResourceManager 进程和 NodeManager 进程通信,根据集群资源,为用户程序分配第一个容器,并将 MapReduce ApplicationMaster 分发到这个容器上面,并在容器里面启动 MapReduce ApplicationMaster。
  3. MapReduce ApplicationMaster 启动后立即向 ResourceManager 进程注册,并为自己的应用程序申请容器资源。
  4. MapReduce ApplicationMaster 申请到需要的容器后,立即和相应的 NodeManager 进程通信,将用户 MapReduce 程序分发到 NodeManager 进程所在服务器,并在容器中运行,运行的就是 Map 或者 Reduce 任务。
  5. Map 或者 Reduce 任务在运行期和 MapReduce ApplicationMaster 通信,汇报自己的运行状态,如果运行结束,MapReduce ApplicationMaster 向 ResourceManager 进程注销并释放所有的容器资源。

MapReduce 如果想在 Yarn 上运行,就需要开发遵循 Yarn 规范的 MapReduce ApplicationMaster,相应地,其他大数据计算框架也可以开发遵循 Yarn 规范的 ApplicationMaster,这样在一个 Yarn 集群中就可以同时并发执行各种不同的大数据计算框架,实现资源的统一调度管理。

5. Spark

5.1 Spark编程模型

Spark 和 MapReduce 相比,有更快的执行速度。下图是 Spark 和 MapReduce 进行逻辑回归机器学习的性能比较,Spark 比 MapReduce 快 100 多倍。

eff81715-9f15-4da6-a77f-61436e7eb2e3

除了速度更快,Spark 和 MapReduce 相比,还有更简单易用的编程模型。使用 Scala 语言在 Spark 上编写 WordCount 程序,主要代码只需要三行。

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

第 1 行代码:根据 HDFS 路径生成一个输入数据 RDD。

第 2 行代码:在输入数据 RDD 上执行 3 个操作,得到一个新的 RDD。

将输入数据的每一行文本用空格拆分成单词。

将每个单词进行转换,word => (word, 1),生成 <Key, Value> 的结构。

相同的 Key 进行统计,统计方式是对 Value 求和,(_ + _)。

第 3 行代码:将这个 RDD 保存到 HDFS。

RDD 是 Spark 的核心概念,是弹性数据集(Resilient Distributed Datasets)的缩写。RDD 既是 Spark 面向开发者的编程模型,又是 Spark 自身架构的核心元素。

大数据计算就是在大规模的数据集上进行一系列的数据计算处理。MapReduce 针对输入数据,将计算过程分为两个阶段,一个 Map 阶段,一个 Reduce 阶段,可以理解成是面向过程的大数据计算。我们在用 MapReduce 编程的时候,思考的是,如何将计算逻辑用 Map 和 Reduce 两个阶段实现,map 和 reduce 函数的输入和输出是什么,这也是我们在学习 MapReduce 编程的时候一再强调的。

而 Spark 则直接针对数据进行编程,将大规模数据集合抽象成一个 RDD 对象,然后在这个 RDD 上进行各种计算处理,得到一个新的 RDD,继续计算处理,直到得到最后的结果数据。所以 Spark 可以理解成是面向对象的大数据计算。我们在进行 Spark 编程的时候,思考的是一个 RDD 对象需要经过什么样的操作,转换成另一个 RDD 对象,思考的重心和落脚点都在 RDD 上。

所以在上面 WordCount 的代码示例里,第 2 行代码实际上进行了 3 次 RDD 转换,每次转换都得到一个新的 RDD,因为新的 RDD 可以继续调用 RDD 的转换函数,所以连续写成一行代码。事实上,可以分成 3 行。

val rdd1 = textFile.flatMap(line => line.split(" "))
val rdd2 = rdd1.map(word => (word, 1))
val rdd3 = rdd2.reduceByKey(_ + _)

RDD 上定义的函数分两种,一种是转换(transformation)函数,这种函数的返回值还是 RDD;另一种是执行(action)函数,这种函数不再返回 RDD。

RDD 定义了很多转换操作函数,比如有计算 map(func)、过滤 filter(func)、合并数据集 union(otherDataset)、根据 Key 聚合 reduceByKey(func, [numPartitions])、连接数据集 join(otherDataset, [numPartitions])、分组 groupByKey([numPartitions]) 等十几个函数。

作为 Spark 架构核心元素的 RDD。跟 MapReduce 一样,Spark 也是对大数据进行分片计算,Spark 分布式计算的数据分片、任务调度都是以 RDD 为单位展开的,每个 RDD 分片都会分配到一个执行进程去处理。

RDD 上的转换操作又分成两种,一种转换操作产生的 RDD 不会出现新的分片,比如 map、filter 等,也就是说一个 RDD 数据分片,经过 map 或者 filter 转换操作后,结果还在当前分片。就像你用 map 函数对每个数据加 1,得到的还是这样一组数据,只是值不同。实际上,Spark 并不是按照代码写的操作顺序去生成 RDD,比如rdd2 = rdd1.map(func)这样的代码并不会在物理上生成一个新的 RDD。物理上,Spark 只有在产生新的 RDD 分片时候,才会真的生成一个 RDD,Spark 的这种特性也被称作惰性计算。

另一种转换操作产生的 RDD 则会产生新的分片,比如reduceByKey,来自不同分片的相同 Key 必须聚合在一起进行操作,这样就会产生新的 RDD 分片。实际执行过程中,是否会产生新的 RDD 分片,并不是根据转换函数名就能判断出来的。

Spark 也有自己的生态体系,以 Spark 为基础,有支持 SQL 语句的 Spark SQL,有支持流计算的 Spark Streaming,有支持机器学习的 MLlib,还有支持图计算的 GraphX。利用这些产品,Spark 技术栈支撑起大数据分析、大数据机器学习等各种大数据应用场景。

9ba709fa-e2f9-49ed-9e28-baa6e050bc53

5.2 Spark 的计算阶段

和 MapReduce 一样,Spark 也遵循移动计算比移动数据更划算这一大数据计算基本原则。但是和 MapReduce 僵化的 Map 与 Reduce 分阶段计算相比,Spark 的计算框架更加富有弹性和灵活性,进而有更好的运行性能。

Spark 的计算阶段我们可以对比来看。首先和 MapReduce 一个应用一次只运行一个 map 和一个 reduce 不同,Spark 可以根据应用的复杂程度,分割成更多的计算阶段(stage),这些计算阶段组成一个有向无环图 DAG,Spark 任务调度器可以根据 DAG 的依赖关系执行计算阶段。

逻辑回归机器学习性能的例子中发现 Spark 比 MapReduce 快 100 多倍。因为某些机器学习算法可能需要进行大量的迭代计算,产生数万个计算阶段,这些计算阶段在一个应用中处理完成,而不是像 MapReduce 那样需要启动数万个应用,因此极大地提高了运行效率。

所谓 DAG 也就是有向无环图,就是说不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系方向执行,被依赖的阶段执行完成之前,依赖的阶段不能开始执行,同时,这个依赖关系不能有环形依赖,否则就成为死循环了。下面这张图描述了一个典型的 Spark 运行 DAG 的不同阶段。

7221b565-009e-4359-a99f-94af2d2430bc

从图上看,整个应用被切分成 3 个阶段,阶段 3 需要依赖阶段 1 和阶段 2,阶段 1 和阶段 2 互不依赖。Spark 在执行调度的时候,先执行阶段 1 和阶段 2,完成以后,再执行阶段 3。如果有更多的阶段,Spark 的策略也是一样的。只要根据程序初始化好 DAG,就建立了依赖关系,然后根据依赖关系顺序执行各个计算阶段,Spark 大数据应用的计算就完成了。

上图这个 DAG 对应的 Spark 程序伪代码如下。

rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)

所以,你可以看到 Spark 作业调度执行的核心是 DAG,有了 DAG,整个应用就被切分成哪些阶段,每个阶段的依赖关系也就清楚了。之后再根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理,Spark 就实现了大数据的分布式计算。

具体来看的话,负责 Spark 应用 DAG 生成和管理的组件是 DAGScheduler,DAGScheduler 根据程序代码生成 DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。

当 RDD 之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段。一个 RDD 代表一个数据集,图中每个 RDD 里面都包含多个小块,每个小块代表 RDD 的一个分片。

一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在 MapReduce 的运行过程中也看到过。

97a62484-f531-4027-8d5e-e328529f5a28

这就是 shuffle 过程,Spark 也需要通过 shuffle 将数据进行重新组合,相同 Key 的数据放在一起,进行聚合、关联等操作,因而每次 shuffle 都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行 shuffle,并得到数据。

这里需要特别注意的是,计算阶段划分的依据是 shuffle,不是转换函数的类型,有的函数有时候有 shuffle,有时候没有。比如上图例子中 RDD B 和 RDD F 进行 join,得到 RDD G,这里的 RDD F 需要进行 shuffle,RDD B 就不需要。

f168b8f6-b404-4775-a757-e5a6127936b4

因为 RDD B 在前面一个阶段,阶段 1 的 shuffle 过程中,已经进行了数据分区。分区数目和分区 Key 不变,就不需要再进行 shuffle。

fe40d0db-cf97-4e74-ac29-c375b652da22

这种不需要进行 shuffle 的依赖,在 Spark 里被称作窄依赖;相反的,需要进行 shuffle 的依赖,被称作宽依赖。跟 MapReduce 一样,shuffle 也是 Spark 最重要的一个环节,只有通过 shuffle,相关数据才能互相计算,构建起复杂的应用逻辑。

从本质上看,Spark 可以算作是一种 MapReduce 计算模型的不同实现。Hadoop MapReduce 简单粗暴地根据 shuffle 将大数据计算分成 Map 和 Reduce 两个阶段,然后就算完事了。而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效的计算模型,虽然其本质依然是 Map 和 Reduce。但是这种多个计算阶段依赖执行的方案可以有效减少对 HDFS 的访问,减少作业的调度执行次数,因此执行速度也更快。

并且和 Hadoop MapReduce 主要使用磁盘存储 shuffle 过程中的数据不同,Spark 优先使用内存进行数据存储,包括 RDD 数据。除非是内存不够用了,否则是尽可能使用内存, 这也是 Spark 性能比 Hadoop 高的另一个原因。

5.3 Spark 的作业管理

Spark 里面的 RDD 函数有两种,一种是转换函数,调用以后得到的还是一个 RDD,RDD 的计算逻辑主要通过转换函数完成。

另一种是 action 函数,调用以后不再返回 RDD。比如 count() 函数,返回 RDD 中数据的元素个数;saveAsTextFile(path),将 RDD 数据存储到 path 路径下。Spark 的 DAGScheduler 在遇到 shuffle 的时候,会生成一个计算阶段,在遇到 action 函数的时候,会生成一个作业(job)。

RDD 里面的每个数据分片,Spark 都会创建一个计算任务去处理,所以一个计算阶段会包含很多个计算任务(task)。

DAGScheduler 根据代码生成 DAG 图以后,Spark 的任务调度就以任务为单位进行分配,将任务分配到分布式集群的不同机器上执行。

5.4 Spark 的执行过程

Spark 支持 Standalone、Yarn、Mesos、Kubernetes 等多种部署方案,几种部署方案原理也都一样,只是不同组件角色命名不同,但是核心功能和运行流程都差不多。

e2d6d9f8-a348-4c18-b202-f68bc0e516a9

上面这张图是 Spark 的运行流程,我们一步一步来看。

首先,Spark 应用程序启动在自己的 JVM 进程里,即 Driver 进程,启动后调用 SparkContext 初始化执行配置和输入数据。SparkContext 启动 DAGScheduler 构造执行的 DAG 图,切分成最小的执行单位也就是计算任务。

然后 Driver 向 Cluster Manager 请求计算资源,用于 DAG 的分布式计算。Cluster Manager 收到请求以后,将 Driver 的主机地址等信息通知给集群的所有计算节点 Worker。

Worker 收到信息以后,根据 Driver 的主机地址,跟 Driver 通信并注册,然后根据自己的空闲资源向 Driver 通报自己可以领用的任务数。Driver 根据 DAG 图开始向注册的 Worker 分配任务。

Worker 收到任务后,启动 Executor 进程开始执行任务。Executor 先检查自己是否有 Driver 的执行代码,如果没有,从 Driver 下载执行代码,通过 Java 反射加载后开始执行。

总结来说,Spark 有三个主要特性:RDD 的编程模型更简单,DAG 切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得 Spark 相对 Hadoop MapReduce 可以有更快的执行速度,以及更简单的编程实现。

6. 流式计算的代表:Storm、Flink、Spark Streaming

6.1 Storm

其实大数据实时处理的需求早已有之,最早的时候,我们用消息队列实现大数据实时处理,如果处理起来比较复杂,那么就需要很多个消息队列,将实现不同业务逻辑的生产者和消费者串起来。这个处理过程类似下面图里的样子。

946802e6-82da-42c5-a5c9-a7be0a20ad47

图中的消息队列负责完成数据的流转;处理逻辑既是消费者也是生产者,也就是既消费前面消息队列的数据,也为下个消息队列产生数据。这样的系统只能是根据不同需求开发出来,并且每次新的需求都需要重新开发类似的系统。因为不同应用的生产者、消费者的处理逻辑不同,所以处理流程也不同,因此这个系统也就无法复用。

之后我们很自然地就会想到,能不能开发一个流处理计算系统,我们只要定义好处理流程和每一个节点的处理逻辑,代码部署到流处理系统后,就能按照预定义的处理流程和处理逻辑执行呢?Storm 就是在这种背景下产生的,它也算是一个比较早期的大数据流计算框架。上面的例子如果用 Storm 来实现,过程就变得简单一些了。

fdbe7a60-85dc-439d-a3e7-3e55c1744798

有了 Storm 后,开发者无需再关注数据的流转、消息的处理和消费,只要编程开发好数据处理的逻辑 bolt 和数据源的逻辑 spout,以及它们之间的拓扑逻辑关系 toplogy,提交到 Storm 上运行就可以了。

在了解了 Storm 的运行机制后,我们来看一下它的架构。Storm 跟 Hadoop 一样,也是主从架构。

5b9af043-8aa2-455d-8c2f-4e378a36c439

nimbus 是集群的 Master,负责集群管理、任务分配等。supervisor 是 Slave,是真正完成计算的地方,每个 supervisor 启动多个 worker 进程,每个 worker 上运行多个 task,而 task 就是 spout 或者 bolt。supervisor 和 nimbus 通过 ZooKeeper 完成任务分配、心跳检测等操作。

Hadoop、Storm 的设计理念,其实是一样的,就是把和具体业务逻辑无关的东西抽离出来,形成一个框架,比如大数据的分片处理、数据的流转、任务的部署与执行等,开发者只需要按照框架的约束,开发业务逻辑代码,提交给框架执行就可以了。

6.2 Spark Streaming

Spark 是一个批处理大数据计算引擎,主要针对大批量历史数据进行计算。 Spark 架构原理时介绍过,Spark 是一个快速计算的大数据引擎,它将原始数据分片后装载到集群中计算,对于数据量不是很大、过程不是很复杂的计算,可以在秒级甚至毫秒级完成处理。

Spark Streaming 巧妙地利用了 Spark 的分片和快速计算的特性,将实时传输进来的数据按照时间进行分段,把一段时间传输进来的数据合并在一起,当作一批数据,再去交给 Spark 去处理。下图这张图描述了 Spark Streaming 将数据分段、分批的过程。

d1e19ec9-7da2-4df2-afff-7544df4b5a1e

如果时间段分得足够小,每一段的数据量就会比较小,再加上 Spark 引擎的处理速度又足够快,这样看起来好像数据是被实时处理的一样,这就是 Spark Streaming 实时流计算的奥妙。

这里要注意的是,在初始化 Spark Streaming 实例的时候,需要指定分段的时间间隔。下面代码示例中间隔是 1 秒。

val ssc = new StreamingContext(conf, Seconds(1))

可以指定更小的时间间隔,比如 500ms,这样处理的速度就会更快。时间间隔的设定通常要考虑业务场景,比如你希望统计每分钟高速公路的车流量,那么时间间隔可以设为 1 分钟。

Spark Streaming 主要负责将流数据转换成小的批数据,剩下的就可以交给 Spark 去做了。

Spark Streaming 是将实时数据流按时间分段后,当作小的批处理数据去计算。那么 Flink 则相反,一开始就是按照流处理计算去设计的。当把从文件系统(HDFS)中读入的数据也当做数据流看待,他就变成批处理系统了。

为什么 Flink 既可以流处理又可以批处理呢?如果要进行流计算,Flink 会初始化一个流执行环境 StreamExecutionEnvironment,然后利用这个执行环境构建数据流 DataStream。

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

如果要进行批处理计算,Flink 会初始化一个批处理执行环境 ExecutionEnvironment,然后利用这个环境构建数据集 DataSet。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.readTextFile("/path/to/file");

然后在 DataStream 或者 DataSet 上执行各种数据转换操作(transformation),这点很像 Spark。不管是流处理还是批处理,Flink 运行时的执行引擎是相同的,只是数据源不同而已。

Flink 处理实时数据流的方式跟 Spark Streaming 也很相似,也是将流数据分段后,一小批一小批地处理。流处理算是 Flink 里的“一等公民”,Flink 对流处理的支持也更加完善,它可以对数据流执行 window 操作,将数据流切分到一个一个的 window 里,进而进行计算。

在数据流上执行

.timeWindow(Time.seconds(10))

可以将数据切分到一个 10 秒的时间窗口,进一步对这个窗口里的一批数据进行统计汇总。

Flink 的架构和 Hadoop 1 或者 Yarn 看起来也很像,JobManager 是 Flink 集群的管理者,Flink 程序提交给 JobManager 后,JobManager 检查集群中所有 TaskManager 的资源利用状况,如果有空闲 TaskSlot(任务槽),就将计算任务分配给它执行。

5eb7af1c-46b5-468b-92e5-140cc1912eb9

7. 使用场景

313acb7a-5eac-41a3-9840-359c68592410

从上面这张图来看大数据技术的分类,我们可以分为存储、计算、资源管理三大类。

最基本的存储技术是 HDFS。比如在企业应用中,会把通过各种渠道得到的数据,比如关系数据库的数据、日志数据、应用程序埋点采集的数据、爬虫从外部获取的数据,统统存储到 HDFS 上,供后续的统一使用。

HBase 作为 NoSQL 类非关系数据库的代表性产品,从分类上可以划分到存储类别,它的底层存储也用到了 HDFS。HBase 的主要用途是在某些场景下,代替 MySQL 之类的关系数据库的数据存储访问,利用自己可伸缩的特性,存储比 MySQL 多得多的数据量。比如滴滴的司机每隔几秒就会将当前的 GPS 数据上传,而滴滴上的司机数量号称有上千万,每天会产生数百亿的 GPS 数据,滴滴选择将这样海量的数据存储在 HBase 中,当订单行程结束的时候,会从 HBase 读取订单行程期间的 GPS 轨迹数据,计算路程和车费。

大数据计算框架最早是 MapReduce,目前看来,用的最多的是 Spark。但从应用角度讲,我们直接编写 MapReduce 或者 Spark 程序的机会并不多,通常我们会用 Hive 或者 Spark SQL 这样的大数据仓库工具进行大数据分析和计算。

MapReduce、Spark、Hive、Spark SQL 这些技术主要用来解决离线大数据的计算,也就是针对历史数据进行计算分析,比如针对一天的历史数据计算,一天的数据是一批数据,所以也叫批处理计算。而 Storm、Spark Streaming、Flink 这类的大数据技术是针对实时的数据进行计算,比如摄像头实时采集的数据、实时的订单数据等,数据实时流动进来,所以也叫流处理大数据技术。

不管是批处理计算还是流处理计算,都需要庞大的计算资源,需要将计算任务分布到一个大规模的服务器集群上。那么如何管理这些服务器集群的计算资源,如何对一个计算请求进行资源分配,这就是大数据集群资源管理框架 Yarn 的主要作用。各种大数据计算引擎,不管是批处理还是流处理,都可以通过 Yarn 进行资源分配,运行在一个集群中。

所以上面所有这些技术在实际部署的时候,通常会部署在同一个集群中,也就是说,在由很多台服务器组成的服务器集群中,某台服务器可能运行着 HDFS 的 DataNode 进程,负责 HDFS 的数据存储;同时也运行着 Yarn 的 NodeManager,负责计算资源的调度管理;而 MapReduce、Spark、Storm、Flink 这些批处理或者流处理大数据计算引擎则通过 Yarn 的调度,运行在 NodeManager 的容器(container)里面。至于 Hive、Spark SQL 这些运行在 MapReduce 或者 Spark 基础上的大数据仓库引擎,在经过自身的执行引擎将 SQL 语句解析成 MapReduce 或者 Spark 的执行计划以后,一样提交给 Yarn 去调度执行。

这里相对比较特殊的是 HBase,作为一个 NoSQL 存储系统,HBase 的应用场景是满足在线业务数据存储访问需求,通常是 OLTP(在线事务处理)系统的一部分,为了保证在线业务的高可用性和资源独占性,一般是独立部署自己的集群,和前面的 Hadoop 大数据集群分离部署。