背景

count(*) group by order by count(*) desc limit x用来统计 topn。

topn是运营的重要指标,比如排行前10的活跃用户。

如果数据量非常庞大,统计会比较耗时,citus提供了一个topn插件,与HLL类似,核心是使用少量空间存储聚合过程中的数据,同时返回一个固定大小(参数设置topn.number_of_counters)的JSONB,可用于下次聚合。(注意,PostgreSQL 11支持更强大的hashagg parallel后,聚合大数据量已不是问题)

topn插件聚合过程如图。

对topn的结果使用topn_union_agg可以再次聚合。

postgres=#\dftopn*ListoffunctionsSchema|Name|Resultdatatype|Argumentdatatypes|Type--------+------------------+-------------------+---------------------+--------public|topn|SETOFtopn_record|jsonb,integer|normalpublic|topn_add|jsonb|jsonb,text|normalpublic|topn_add_agg|jsonb|text|aggpublic|topn_add_trans|internal|internal,text|normalpublic|topn_pack|jsonb|internal|normalpublic|topn_union|jsonb|jsonb,jsonb|normalpublic|topn_union_agg|jsonb|jsonb|aggpublic|topn_union_trans|internal|internal,jsonb|normal(8rows)

--startingfromnothing,recordthatwesawan"a"selecttopn_add('{}','a');--=>{"a":1}--recordthesightingofanother"a"selecttopn_add(topn_add('{}','a'),'a');--=>{"a":2}--fornormal_randcreateextensiontablefunc;--countvaluesfromanormaldistributionSELECTtopn_add_agg(floor(abs(i))::text)FROMnormal_rand(1000,5,0.7)i;--=>{"2":1,"3":74,"4":420,"5":425,"6":77,"7":3}

从topn jsonb中直接获取topn的值

postgres=#select(topn(topn_union_agg(agg_prodid),5)).*fromreviews_by_prodid;item|frequency--------+-----------509594|66497599|59505217|58461257|58403111|57(5rows)使用topn

1、所有节点(包括coordinator, worker)安装topn软件

cd~./var/lib/pgsql/.bash_profilegitclonehttps://github.com/citusdata/postgresql-topncdpostgresql-topnUSE_PGXS=1makeUSE_PGXS=1makeinstall

2、安装插件(coordinator)

postgres=#createextensiontopn;CREATEEXTENSION

3、安装插件(worker),在coordinator中调用run_command_on_workers,在所有worker中执行。

postgres=#selectrun_command_on_workers('createextensiontopn;');run_command_on_workers--------------------------------------------(xxx.xxx.xxx.224,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.225,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.226,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.227,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.229,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.230,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.231,1921,t,"CREATEEXTENSION")(xxx.xxx.xxx.232,1921,t,"CREATEEXTENSION")(8rows)测试

1、测试表

createtabletbl(idserial8,gidint,prodidint,c1int,c2int);postgres=#\dtblTable"public.tbl"Column|Type|Collation|Nullable|Default--------+---------+-----------+----------+---------------------------------id|bigint||notnull|nextval('tbl_id_seq'::regclass)gid|integer|||prodid|integer|||c1|integer|||c2|integer|||postgres=#altersequencetbl_id_seqcache10000;ALTERSEQUENCE

2、写入2亿测试数据

vitest.sql\setgidrandom_gaussian(1,1000,2.5)\setprodidrandom_gaussian(1,1000000,2.5)\setc1random(1,3000)\setc2random(1,100000000)insertintotbl(gid,prodid,c1,c2)values(:gid,:prodid,:c1,:c2);pgbench-Mprepared-n-r-P1-f./test.sql-c64-j64-T1200

postgres=#selectcount(*)fromtbl;count-----------216524755(1row)Time:421.860ms

3、几组真实的TOPN数据

postgres=#selectgid,count(*)fromtblgroupbygidorderbycount(*)desclimit10;gid|count-----+--------494|438102499|438017514|437929506|437852511|437546509|437469495|437458490|437320496|437257500|437239(10rows)postgres=#selectc1,count(*)fromtblgroupbyc1orderbycount(*)desclimit10;c1|count------+-------1370|73175168|731211016|731141816|730451463|73020585|729861529|729741857|729442580|72930298|72917(10rows)postgres=#selectprodid,count(*)fromtblgroupbyprodidorderbycount(*)desclimit10;prodid|count--------+-------516916|534481914|534520680|527530544|526449685|523493560|523520464|523502098|522495170|522501695|522(10rows)

4、gid维度估值topn (gid唯一值个数小于等于参数topn.number_of_counters)

结果精准

CREATETABLEreviews_by_gid(aggjsonb);SELECTcreate_reference_table('reviews_by_gid');INSERTINTOreviews_by_gidSELECTtopn_add_agg(gid::text)FROMtbl;postgres=#select(topn(agg,5)).*fromreviews_by_gid;item|frequency------+-----------494|438102499|438017514|437929506|437852511|437546(5rows)

5、prodid维度估值topn (prodid唯一值个数远远大于等于参数topn.number_of_counters)

结果偏差非常大。

CREATETABLEreviews_by_prodid(agg_prodidjsonb);SELECTcreate_reference_table('reviews_by_prodid');INSERTINTOreviews_by_prodidSELECTtopn_add_agg(prodid::text)FROMtbl;postgres=#select(topn(agg_prodid,5)).*fromreviews_by_prodid;item|frequency--------+-----------470098|36531880|35451724|34420093|34522676|33(5rows)

6、c1维度估值topn (c1唯一值个数略大于等于参数topn.number_of_counters)

结果不精准。

CREATETABLEreviews_by_c1(aggc1jsonb);SELECTcreate_reference_table('reviews_by_c1');INSERTINTOreviews_by_c1SELECTtopn_add_agg(c1::text)FROMtbl;postgres=#select(topn(aggc1,5)).*fromreviews_by_c1;item|frequency------+-----------2580|370731016|361621983|353111752|352852354|34740(5rows)精度、截断

造成以上精准度偏差的原因:

当topn hashtable已满,有新值写入时,会导致清除hashtable中一半的元素(item, count)pairs(指按count排序后,较小的一半)。

The TopN approximation algorithm keeps a predefined number of frequent items and counters. If a new item already exists among these frequent items, the algorithm increases the item's frequency counter. Else, the algorithm inserts the new item into the counter list when there is enough space. If there isn't enough space, the algorithm evicts the bottom half of all counters. Since we typically keep counters for many more items (e.g. 100*N) than we are actually interested in, the actual top N items are unlikely to get evicted and will typically have accurate counts.

You can increase the algoritm's accuracy by increasing the predefined number of frequent items/counters.

对应代码

/**PruneHashTableremovessomeitemsfromtheHashTabletodecreaseitssize.Itfinds*minimumandmaximumfrequenciesfirstandremovestheitemswhichhavelowerfrequency*thantheaverageofthem.*/staticvoidPruneHashTable(HTAB*hashTable,intitemLimit,intnumberOfRemainingElements){SizetopnArraySize=0;inttopnIndex=0;FrequentTopnItem*sortedTopnArray=NULL;boolitemAlreadyHashed=false;HASH_SEQ_STATUSstatus;FrequentTopnItem*currentTask=NULL;FrequentTopnItem*frequentTopnItem=NULL;intindex=0;inthashTableSize=hash_get_num_entries(hashTable);if(hashTableSize<=itemLimit){return;}/*createanarraytocopytop-nitemsandsortthemlater*/topnArraySize=sizeof(FrequentTopnItem)*hashTableSize;sortedTopnArray=(FrequentTopnItem*)palloc0(topnArraySize);hash_seq_init(&status,hashTable);while((currentTask=(FrequentTopnItem*)hash_seq_search(&status))!=NULL){frequentTopnItem=palloc0(sizeof(FrequentTopnItem));memcpy(frequentTopnItem->key,currentTask->key,sizeof(frequentTopnItem->key));frequentTopnItem->frequency=currentTask->frequency;sortedTopnArray[topnIndex]=*frequentTopnItem;topnIndex++;}qsort(sortedTopnArray,hashTableSize,sizeof(FrequentTopnItem),compareFrequentTopnItem);for(index=numberOfRemainingElements;index<hashTableSize;index++){FrequentTopnItem*topnItem=&(sortedTopnArray[index]);hash_search(hashTable,(void*)topnItem->key,HASH_REMOVE,&itemAlreadyHashed);}}如何修改hash table size

postgres=#load'topn';LOADpostgres=#showtopn.number_of_counters;topn.number_of_counters-------------------------1000(1row)settopn.number_of_counters=20000;

需要在所有节点(coordinator+worker)操作,例如。

postgresql.confshared_preload_libraries='citus,topn,pg_stat_statements'topn.number_of_counters=10000小结最佳实践

1、建议阶段性聚合,并且保证每个阶段被聚合的字段,唯一值个数小于topn.number_of_counters,否则会失真。

例如每小时有1万个活跃用户,那么topn.number_of_counters,建议设置为1万或更大,并且按小时聚合。每个小时存一个聚合后的jsonb结果。需要统计天的结果时,再将全天的jsonb进行聚合。

2、元素个数大于topn.number_of_counters时,会导致topn结果失真。

参考

https://github.com/citusdata/postgresql-topn

https://docs.citusdata.com/en/v7.5/develop/reference_sql.html

《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》

原文地址:https://github.com/digoal/blog/blob/master/201809/20180914_01.md