Implementation of the parallel GroupBy and Aggregation functions in a distributed database system
-
摘要: 伴随着新型互联网应用中对数据统计、分析需求的增大,分组、聚合已经成为数据分析应用中出现频率最多的请求之一.本文就类OLAP(on-line transactionprocessing)应用中常见的Aggregation、GroupBy原理进行了分析.针对一般事务型数据库采用排序分组的缺点,提出了两种Hash分组聚合的具体实现方案,并提出一种利用统计信息动态决策Hash桶数、Hash分组聚合方案的策略.根据分布式数据库多副本的特点,本文又提出了一种Hash分组聚合节点级的并行方案.最后,在开源数据库OceanBase进行了具体的实现.通过实验证明,本文提出的利用统计信息动态决策Hash分组聚合方案相比排序分组具有极大的效率提升.Abstract: With the increase in demand for data statistics and analysis in new Internet applications, data grouping and aggregation have become amongst the most common operations in data analysis applications. This paper analyzes the operating principles of the Aggregation and GroupBy functions commonly used in analytical applications. Based on the disadvantages of sort grouping for general-transactional databases, two kinds of Hash GroupBy implementations are proposed; in addition, a strategy for dynamically determining the number of Hash buckets and Hash GroupBy schemes, based on statistical information, is proposed. Based on the characteristics of distributed clusters, implementation of the Hash GroupBy operator push down is proposed. Experiments have shown that the use of statistical information to dynamically determine the Hash group option improves efficiency.
-
Key words:
- OceanBase /
- GroupBy /
- Hash /
- Data distribution
-
算法1桶内排序分组 输入:统计信息和初始数据表 输出:分组后的数据表 1: bucket_num = statis_info; //根据统计信息确认Hash桶数 2: Hash_table.create(bucket_num); //构建Hash桶 3: while true 4: 取出一个row, 并计算row对应GroupBy列的Hash_key; 5: res = Hash_table.set(Hash_key, & row); 6 if res不为0 then //意味着Hash_key不在Hash表内 7: vec.push(Hash_key); //将Hash_key存入vec数组中 8: end if 9: end while 10: while true 11: if vec数组不为空then 12: 取出数组中Hash_key; 13: 清空临时数组tmp_sort_array中所有元素; 14: while true 15: 根据Hash_key取出Hash表中对应元素, 加入到tmp_sort_array中; 16: end while 17: 全部取完后, 对临时数组排序; 18: end if 20: 将临时数组tmp_sort_array中元素加到全局sort_array数组中; 21: end while 算法2频数分组聚合 输入:统计信息和初始数据表 输出:分组聚合后的数据表 1: bucket_num = statis_info; //根据统计信息确认Hash桶数 2: Hash_table.create(bucket_num); //构建Hash桶 3: while true 4: 取出一个row, 并计算row对应GroupBy列的Hash_key; 5: pair < & row, count>; //将 < & row, int>组合成一个pair 6: count = 1; //设置每个row的初始值为1 7: res = Hash_table.set(Hash_key, & pair); 8: if res不为0 then//意味着Hash_key不在Hash表内 9: vec.push(Hash_key); //将Hash_key存入vec数组中 10: 直接调用聚合函数处理该row; 11: else if res为0 then//意味着Hash_key在Hash表内已存在 12: pair.count++; //计数加1 13: end if 14: while数组不为空 15: 取出数组中Hash_key; 16: 根据Hash_key取出表内元素; 17: if pair.count == 1 then//count为1, 则聚合结果值就是该row值 18: result_row = row 19: else //count不为1, 则聚合结果值为row*count 20: result_row = row* count; 21: end if 22: end while 表 1 集群服务器配置
Tab. 1 The cluster server configuration
角色 CPU 内存/GB 磁盘/TB 网络 CS/MS 20核40线程(Intel(R)Xeon(R)CPU E5-2620 V3@2.30GHz)*4 500 1 万兆网 RS/UPS 20核40线程(Intel(R)Xeon(R)CPU E5-2620 V3@2.30 GHz)*1 500 1.5 万兆网 表 2 测试表的Schema
Tab. 2 The schema of the test table
属性 是否为主键 数据类型 数据大小/Byte K 是 Int32 4 C1 否 Int32 8 C2 否 Int64 8 C3 否 Double 8 C4 否 Float 8 C5 否 Decimal(15, 5) 8 C6 否 Varchar(50) 50 表 3 测试表的大小和分布
Tab. 3 Test data table information
表名 数据分布(桶内相同元素个数) 数据量(行数)/万 G1 1 20 G2 3 60 G3 5 100 G4 10 100 G5 30 100 G6 50 100 -
[1] 杨传辉.大规模分布式存储系统[M].北京:机械工业出版社, 2013. [2] HO C T, AGRAWAL R, MEGIDDO N, et al. Range queries in OLAP data cubes[C]//ACM SIGMOD International Conference on Management of Data. ACM, 2008: 73-88. [3] ROUSSOPOULOS N. Materialized views and data warehouses[C]//ACM SIGMOD International Conference on Management of Data. ACM, 1998: 21-26. [4] GUPTA H, MUMICK I S. Selection of views to materialize in a data warehouse[J]. IEEE Transactions on Knowledge & Data Engineering, 1997, 17(1):24-43. http://d.old.wanfangdata.com.cn/OAPaper/oai_doaj-articles_80c6736eedb1b68f7800bf9abcf3b6e9 [5] GOVINDARAJAN S, AGARWAL P K, ARGE L. CRB-Tree:An efficient indexing scheme for range-aggregate queries[J]. Lecture Notes in Computer Science, 2003, 2572:143-157. doi: 10.1007/3-540-36285-1 [6] TAO Y, PAPADIAS D. Range aggregate processing in spatial databases[J]. IEEE Transactions on Knowledge & Data Engineering, 2004, 16(12):1555-1570. doi: 10.1109-TKDE.2004.93/ [7] SELLIS T. The R+-Tree: A dynamic index for multi-dimensional object[C]//Proceeding of the 13th VLDB Conf. VLDB, 1987: 507-518. [8] TAO Y, SHENG C, CHUNG C W, et al. Range aggregation with set selection[J]. IEEE Transactions on Knowledge & Data Engineering, 2014, 26(5):1240-1252. https://www.computer.org/csdl/trans/tk/2014/05/06570726.html [9] CONDIE T, CONWAY N, ALVARO P, et al. Online aggregation and continuous query support in MapReduce[C]//ACM SIGMOD International Conference on Management of Data. ACM, 2010: 1115-1118. [10] DEAN J, GHEMAWAT S. MapReduce:simplified data processing on large clusters[J]. Communications of The ACM, 2008, 51(1):107-113. doi: 10.1145/1327452