基于分布式流数据的在线汇聚与统计

2018-01-18 09:13潘兆平张建军魏志强
数字技术与应用 2018年9期
关键词:分布式

潘兆平 张建军 魏志强

摘要:本文介绍了分布式流数据的在线汇聚合与统计的方法,该方法采用在分布式随机采样算法的基础上增加了一个权重的概念,它可以从分布式流数据中进行随机采样。该方法把多个在线查询任务分解成一个多层次处理单元集合,每个处理单元负责一个时段的数据查询,这些处理单元能够并行处理,在并行处理过程中,流数据以事件方式封装打包,通过处理单元之间的相互配合完成整个查询任务。在多层次查询过程中,处理单元能将一些重复性的计算进行合并,这样就避免重复计算带来的消耗,提高查询语句的执行效率。

关键词:汇聚与统计;分布式;流数据;随机采样

中图分类号:TP311.13 文献标识码:A 文章编号:1007-9416(2018)09-0140-04

1 引言

过去,用户从一个大型数据库在线查询数据时,需等待系统遍历完所有数据后才能得到最终查询结果,处理过程中没有任何反馈信息。

在查询过程中,为了实时地获得数据查询的中间状态,可以采取降低数据的准确性的策略来实现查询信息的及时反馈。在线汇聚与统计方法就是连续为用户提供近似查询结果,同时输出相应的统计结果,而不是等待处理完所有数据后再给出最终结果;随着在线处理的数据越来越多,汇聚与统计结果持续更新,直到结果能够满足用户需求为止。

2 实现方法

在线汇聚与统计能连续地提供实时汇聚与统计分析结果,本文采用的流数据处理方法具有在线汇聚能力,当在线接收到一组数据时,系统能快速地处理这组数据,并将汇聚与统计结果以异步的方式发送给用户。文献[1]提出,流数据操作有两种:无状态操作和有状态操作。无状态操作(如映射、并集等)根据输入数据进行处理,而后把结果发送给下一个操作即可,是不需要保留中间计算结果。而有状态操作(如合并,笛卡尔乘积)一般采用滑动窗口(可以是固定时间窗口,也可以是固定个数窗口)方法进行处理,需要一串数据同时操作。

为了支持状态操作,本文采用滑动窗口方式进行在线汇聚,通过时间窗口与顺序窗口两个条件同时触发状态汇聚操作。当在线接收到流数据时,系统将连续处理,并输出中间结果。如图1,系统包含采样、映射、汇聚 和统计等处理过程。

2.1 分布式流数据的随机采样

文献[2]指出,蓄水池采样算法可以从N个流数据中随机采样,其中N是一个很大的数,这些数通常不保存在内存中。在初始状态下,该算法有s个采样数据,当第i个数据到达时(其中i>s),有s/i的概率选择这个新数据。具体做法是:从数据[1,i]中随机得到一个值j,如果j≤s,把第i个数据替换掉内存中的第j个数据,否则丢弃这个数(第i个数)。这个算法只需要O(s)个空间和O(1)的采样时间。

但是,蓄水池采样算法无法从分布式流数据中进行随机采样,原因有两个:第一个是不同流数据可能采用不同的数据分布(文献[3]),如一个采用伯努利分布,另一个采用吉布斯。第二个是来自不同流数据有不同的权重,如网络监控和网页服务等。

为了解决上述问题,分布式流数据采样算法引入了一个权重的概念,该算法不仅能从不同的流数据中进行随机采样,而且这些流数据还带有不同的权重。分布式流数据采样算法拓扑结构见图2,图中有m个数据流,有m个本地采集器,每个数据流含有n个数据。本地采集器Pi对流数据Si进行采样,并把采样结果发送给M(M可以是一个有状态操作,也可以是一个无状态操作),每个采样结果都是带有权重的。

图2中,一个流数据端Si将以Pi的概率选择接收到的数据e(该数权重为wi),并把数据e发送给M。M中维护了一个数据候选集V(汇集了多个流数据的采样结果)。对于任何一个流数据vi(vi∈V),分配一个键值ki(ki=ui1/wi,ui是一个从0到1范围内随机的一个值),M总是从V中选择s个数据(其键值ki最大)作为最终采样数据。由于ui是个随机数,因此,ki就是个随机值,在加上s个数据是随机选择的,因此这s个数据是来自Si的随机采样结果。

2.2 分类汇聚处理方法

在线汇聚系统框图中,“映射器”负责对数据进行分类,将数据分配到不同的汇聚器上。汇聚器对数据进行过滤与筛选,而后将某个特定时间段的数据进行“合并”操作,最后把结果数据发送给统计器作统计;通过统计计算得出近似查询结果。

2.2.1 数据映射

由于来自不同数据流的数据格式可能不一样,而且存在无效值,因此,在数据进映射前,需要对采样到的流数据进行清洗(即统一格式),将每个数据转化成的数据块,其中key是数据的键值,value是数据的属性值。数据经过清洗后发送给映射器,由于不同数据块含有不同的key,因此“映射器”通过key对数据块分类,并把分类结果发送给相应的汇聚处理单元进行处理,汇聚处理单元之间可并发执行,同时单个机器节点可以分配多个汇聚处理单元。

2.2.2 流数据汇聚

在汇聚器中有许多处理单元(简称:PE),这些PE间相互独立,可以并行处理数据;每个PE中的数据具有相同的“key”,PE对收到的数据进行过滤与筛选,其方法与数据库中的行过滤与列过滤相同,这样可以减少汇聚的计算成本与数据通信成本。SQL查询语句参见图3。

分布式流數据汇聚操作见图4,每个PE中有两个列表(流A和流B),它们之间通过key值进行汇聚操作得到相关结果。给定一个key,PE将从表流A和流B中检索相关数据,并对相关数据进行匹配与连接。流A和B的数据经过采样后存储在内存中,系统在一个时间窗口内自动执行合并任务,当一个时间窗口的合并操作结束后(当前窗口的数据从内存中释放掉,并等待下一个时间窗口的运行),将通知汇聚开始这个窗口的数据汇聚。

在实际中,流数据往往存在不同步和无序的特性,这样的数据流可能导致时间窗口不完整,因此,检测一个时间窗口是否结束就变得十分复杂。在这里将采用标记方式来指示一个时间窗口合并操作是否结束,当汇聚操作单元接收到所有标记通知时,才开始这个窗口的汇聚操作和更新状态。

由于内存有限,本方案只把部分PE常驻于内存中,如果某个PE已经完成任务,则移除对应的PE,释放出相应的内存,但是建立与移除PE时耗费了一定的时间和计算能力,因此,为了提高机器的计算能力、减少不必要的消耗,本文采用计算单元重用的方法:当内存不足而有其它PE空闲时,系统将自动使用这些空闲PE;当内存足够而所有的PE都在忙时,那么将为该任务重新创建一个PE。

3 统计分析

汇聚操作结束后,系统对汇聚结果进行统计评估,计算其置信区间与误差范围;根据单表汇聚操作与多表汇聚操作的不同,其计算置信区间与误差范围时的方法也有一些区别。

3.1 单表汇聚查询

单表汇聚查询形式可以简单表示如下:

SELECT opt(Exp(xi)) FROM A.a

流数据汇聚操作采用了一个基于时间窗口的策略:若在第i个时间窗口(τ=Ti-Ti-1)PE存有m个样本数据(x1,x2,...,xm),那么,汇聚时该PE需要计算四个值(Ni是第i个时间窗口的数据个数,是第i个时间窗口中所有数据的平均数,Sumi是第i个窗口所有数据的总和,σi2是第i个时间窗口中数据的方差值),求解公式如下:

根据公式3-4,如果给定一个置信度p,可以计算出∈的值,得到误差范围为[λ-∈,λ+∈];或者給定一个误差范围∈,也可以同样计算出这个范围内置信度的值p。

3.2 多表汇聚查询

多表汇聚查询形式可以表示如下:

SELECT opt(Exp(xi,xj))FROM A,B

WHERE A.a=B.b

要想计算多表汇聚查询的置信区间和误差范围,需要对每个表的方差值进行分别计算,根据文献[4]中的表述,多表汇聚查询结果的计算公式如下:

其中σA和σB是表A和B的近似方差值,因此,一个基于时间窗口的汇聚结果可以输出为:(Ti,Ti-1,Ni,,Sumi,σi2),夺标汇聚查询结果与单表汇聚查询结果相似。

4 实验结果评估

4.1 实验环境配置

分布式流数据的在线汇聚与统计算法部署在24台机器中,每台机器的硬件配置:CPU因特尔E2600,内存16GB,硬盘1TB;每台机器的软件配置:Apache S4 系统平台,编程语言是JAVA;机器之间采用万兆网络。在实验过程中,将1台机器配置管理节点,另外20台机器作为计算节点,剩余3台机器作为备用机器(故障检查及容错)。

实验数据有四张表,总数据量约10GB,其中Litem表含有1300万行记录,Orders表有350万行记录, Part表有80万行记录,表Psup有160万行记录。

本次实验采用以下三个SQL语句来评估分布式流数据在线汇聚与统计算法的实际性能,包含汇聚、统计和多语句同时查询。实验的结果都是经过多次运行后计算所得的平均值,整个SQL的运行完成时间是从提交SQL查询到执行完毕为止。

查询模版1(Q1):

SELECT Sum(Ext*Dis),Average(Ext*Dis)

FROM Litem

WHERE Dis<0.15 and Dis>=0.07

查询模版2(Q2):

SELECT A.ReturnFlag, A.LineStatus,B.OrderPriority,

Sum(A.Quantity),Count(A.*)

FROM Litem A, Orders B

WHERE A.OrderKey=B.OrderKey

and A.Dis<0.15 and A.Dis>=0.07

AND B.TotalPrice>=1000

AND B.TotalPrice<30000

GROUP BY A.RFlag,A.LStatus,B.OPriority

查询模版3(Q3):

(1)SELECT B.MFGR,B.BRAND,

Sum(A.Quantity),

Average(A.Quantity)

FROM Litem A,Part B

WHERE A.PartKey = B.PartKey

AND A.Dis<0.15 AND A.Dis>=0.07

GROUP BY B.MFGR,B.BRAND

(2)SELECT A.ReturnFlag, A.LStatus,Sum(B.SupplyCost),

Average(B.SupplyCost)

FROM Litem A,Psup B

WHERE A.PartKey = B.PartKey

AND A.Dis<0.15 AND A.Dis>=0.07

GROUP BY A.RFlag, A.LStatus

在对数据进行汇聚与统计计算时,时间窗口中缓存的数据量大小直接影响在线汇聚与统计算法的运行时间,因此首先评估了数据量大小与SQL的平均运行时间关系,然后评估了置信区间与误差区间之间的影响,分析数据量大小与执行时间的关系;通过调整集群中机器的个数(从2到22)进行比较分析,最后比较Spark与S4在处理流数据中性能。

4.2 多查询语句性能分析

通过实验分析多查询语句并行工作的性能,试验中实现的查询语句为两个Q3,这两个查询包含相同的滤波条件,滤波操作与结果可以合并成一个;但在汇聚操作中关联的表是不同的,因此,需要把滤波的数据结果分两块发送给各自的汇聚节点,通过相同操作合并机制,可以减少不同查询语句的重叠计算。

通过调整数据的大小(从1G到10G)进行测试,多查询语句Q3与子查询Q3-1和Q3-2分别运行在同一集群和同一数据集中。进行多次试验后平均得到结果,如图5显示。

当数据量增加时,所有查询的运行时间也随之增加,独立并行执行完查询Q3-1和Q3-2的时间比共享的运行时间多。我们计算了通过该拓扑结构带来的性能提高,假设Q3的运行时间是t3,Q3-1与Q3-2的运行时间分别是t31和t32,那么合并的查询Q3的提高为(t31+t32-t3)÷(t31+t32)。图6描述了平均提高效果为18%。

5 结语

在本文中,在分布式流數据系统S4中实现了一个在线汇聚的方法,采用“行为模式”简化复杂查询,支持流水线任务和并行处理;着重分析了MapReduce Online与本章方法的区别,并比较了两者的结果。由于MapReduce Online在处理流水线任务时,会把中间结果存储在硬盘中,而执行结果直接从内存中发送给下一个任务,实验结果显示该方法有效地支持实时数据汇聚;在处理多查询方面,通过一个拓扑结构指导查询的执行,该拓扑结构能够合并数据的重复操作,减少了系统计算工作量,实验结果显示拓扑结构能够很大程度上提高整体的查询效率;通过公开的基准数据TPC-H进行实验,结果显示该方法能够把较准确的结果快速反馈给用户,运行的速度与效果也都比MapReduceOnline好。

参考文献

[1]ABADI D, CARNEY D, C, ETINTEMEL U, et al. Aurora: a new model and architecture fordata stream management[J]. The international Journal on VLDB,2003,12(2):120-139.

[2]VITTER J S. Random sampling with a reservoir[J]. ACM Transactions on Mathematical Software,1985,11(1):37-57.

[3]CORMODE G, MUTHUKRISHNAN S, YI K, et al. Optimal sampling from distributed streams[C].Principles of Database Systems.[S.l.]:ACM,2010:77-86.

[4]HAAS P, HELLERSTEIN J. Ripple joins for online aggregation[C].ACM SIGMOD international conference on Management of data.[S.l.]:ACM,1999,28:287-298.

猜你喜欢
分布式
基于RTDS的分布式光伏并网建模研究
基于预处理MUSIC算法的分布式阵列DOA估计
基于点估计法的分布式电源的配置优化
一种用于微电网分布式发电的新型Buck-Boost逆变器
基于DDS的分布式三维协同仿真研究
西门子 分布式I/O Simatic ET 200AL
家庭分布式储能的发展前景
第26届IEEE并行及分布式处理国际会议