《2-2 StarRocks 中的實時更新.pdf》由會員分享,可在線閱讀,更多相關《2-2 StarRocks 中的實時更新.pdf(58頁珍藏版)》請在三個皮匠報告上搜索。
1、|StarRocks的實時更新StarRocks的實時更新常冰琳|Outline Real-time update use cases Common approaches Updates in StarRocks Ongoing&future works|實時更新需求01|Why?Traditional OLAPT+1 batch ETL,high latencyIncremental append only,no updateAppend update&merge-on-read,poor query performanceNew requirement in real-time analy
2、ticsrealtime data hot data volatile dataTP-AP sync pipelineIn database ELT|Use Case:full row upsert/deleteFull row upsert(or delete)most common formMySQLinsert into on duplicate key updateStarRocksunique key load(upsert)primary key load(upsert/delete)TP-AP CDC sync|Merge-on-ReadMergeFast write,slow
3、readExamples:Various LSM TreesHudi Merge-on-Read TableStarRocks Unique Key|Copy-on-WriteSlow write,Fast readExample:Delta LakeHudi Copy-on-WriteIcebergSnowflake|Copy-on-WriteSlow write,Fast readExample:Delta LakeHudi Copy-on-WriteIcebergSnowflakecheck overlapping filesidentify insert/updaterewrite o
4、verlapping files|Copy-on-WriteSlow write,Fast readExample:Delta LakeHudi Copy-on-WriteIcebergSnowflake|Delta StoreSlow(a bit)write,Fast readExample:KuduMany TP/HTAP DatabasesPrimaryIndexDeltaStore|Delete+InsertSlow(a bit)write,Fast readExample:SQL Server column storeAlibaba ADB,HologresStarRocks pri
5、mary key tablexxxxxDeleteBitmapPrimaryIndex|StarRocks的實時更新02|System OverviewFEBETablet 1Tablet 2Tablet 3BETablet 2Tablet 3Tablet 4BETablet 1Tablet 3Tablet 4BETablet 1Tablet 2Tablet 4|System OverviewFEBETablet 1Tablet 2Tablet 3BETablet 2Tablet 3Tablet 4BETablet 1Tablet 3Tablet 4BETablet 1Tablet 2Tabl
6、et 41.writeWrite TXN|System OverviewFEBETablet 1Tablet 2Tablet 3BETablet 2Tablet 3Tablet 4BETablet 1Tablet 3Tablet 4BETablet 1Tablet 2Tablet mit with versionWrite TXN|Inside a TabletRocksDBMetav1v2v9v10DelVectorPrimaryIndexRowset 0Rowset 1Rowset 2Rowset 3.Rowset N.|MetadataPB saved in RocksDBCached
7、in-memoryMetaversions:listEditVersion:major,minorRowsets:listDelta:listrowset_id_next:uint32 EditVersion:4,0,Rowsets:1,2,3 Delta:3 ,EditVersion:5,0,Rowsets:1,2,3,4 Delta:4 ,EditVersion:6,0,Rowsets:1,2,3,4,5 Delta:5 ,rowset_id_next:6|ExampleVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:0|Example
8、Version:(1,0)Rowsets:Delta:versions:rowset_id_next:1Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0|ExampleVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:2Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0Rowset 1commitversion:3Version:(3,0)Rowsets:0,1Delta:1|ExampleVersion:(1,0)Rowsets:D
9、elta:versions:rowset_id_next:2Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0Rowset 1commitversion:3Version:(3,0)Rowsets:0,1Delta:1start compactioninput:0,1|ExampleVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:3Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0Rowset 1commitversion:3Vers
10、ion:(3,0)Rowsets:0,1Delta:1compaction startinput:0,1Rowset 2commitversion:4Version:(4,0)Rowsets:0,1,2Delta:2|ExampleVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:4Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0Rowset 1commitversion:3Version:(3,0)Rowsets:0,1Delta:1compaction startinput:0,1R
11、owset 2commitversion:4Version:(4,0)Rowsets:0,1,2Delta:2compaction commit0,1-3Version:(4,1)Rowsets:2,3Delta:Rowset 3|ExampleVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:5Rowset 0commitversion:2Version:(2,0)Rowsets:0Delta:0Rowset 1commitversion:3Version:(3,0)Rowsets:0,1Delta:1compaction startinp
12、ut:0,1Rowset 2commitversion:4Version:(4,0)Rowsets:0,1,2Delta:2compaction commit0,1-3Version:(4,1)Rowsets:2,3Delta:Rowset 4commitversion:5Version:(5,0)Rowsets:2,3,4Delta:4Rowset 3|Example:Version GCVersion:(1,0)Rowsets:Delta:versions:rowset_id_next:5Rowset 0Version:(2,0)Rowsets:0Delta:0Rowset 1Versio
13、n:(3,0)Rowsets:0,1Delta:1Rowset 2Version:(4,0)Rowsets:0,1,2Delta:2Version:(4,1)Rowsets:2,3Delta:Rowset 4Version:(5,0)Rowsets:2,3,4Delta:4Rowset 3|Example:Version GCversions:rowset_id_next:5Rowset 2Version:(4,1)Rowsets:2,3Delta:Rowset 4Version:(5,0)Rowsets:2,3,4Delta:4Rowset 3|Write PipelineRowsetOpe
14、rationsLoadMemTablesortmergesplitflushUpsertsDeletesid1321c0acbac12642del00012b50id31c0cc162b5upsertsdeletes|Write PipelineRowsetOperationsLoadMemTablesortmergesplitflushUpsertsDeletescommitGet old row posMark deletedUpdate new row posRocksDBmetaid31c0cc162b5upsertsdeletesPrimary Indexupdate primary
15、 indexgen delvecupdate meta|Exampleid1234c0abcdVersion 1Rowset 0id1234rs0000Primary Indexrow0123|ExampleRowset 0id1234c0abcdVersion 2Rowset 1id135c0aacceid1234rs0000Primary Indexrow0123|ExampleRowset 0id1234c0abcdVersion 21010delvecRowset 1id135c0aacceid1234rs1010Primary Indexrow0113512|ExampleRowse
16、t 0Rowset 1Rowset 2id6c0fdel3id1234c0abcd1010delvecid135c0aacceVersion 3id1234rs1010Primary Indexrow0113512|ExampleRowset 0Rowset 1Rowset 2id6c0fdel3id1234c0abcd1010delvecid135c0aacce010delvecVersion 3id1264rs1020Primary Indexrow0103512|ExampleRowset 0Rowset 1Rowset 2id6c0fdel3id1234c0abcd1110delvec
17、id135c0aacce011delvecVersion 4Rowset 3id25c0bbeeid1264rs1020Primary Indexrow0103512|ExampleRowset 0Rowset 1Rowset 2id6c0fdel3id1234c0abcd1110delvecid135c0aacce011delvecVersion 4Rowset 3id25c0bbeeid1264rs1320Primary Indexrow0003531|MVCCid1234c0abcdVersion 1Rowset 0id1234c0abcdVersion 21010delvecRowse
18、t 1id135c0aacceRowset 2id6c0fdel3id1234c0abcd1010delvecid135c0aacce010delvecVersion 3id6c0fdel3id1234c0abcd1110delvecid135c0aacce011delvecVersion 4Rowset 3id25c0bbee|Concurrency Control|Concurrency ControlEach write-only txn has 2 phaseswrite:run concurrentlycommit:run serially,should be very fast!F
19、E commit with versionFE decide txn ordertxn 2txn 1txn 3writereceive data/sort,merge,flush memtablecreate rowset commitupdate primary indexwrite delvect&metav1v2v3|Primary IndexPrimary index update takes 90%time in commit phaseEfficient in-memory hashmapKey:composite primary key encoded binary sliceV
20、alue:uint64 row position phmapfast:20200ns/op or 5M50M op/s per threade.g.10M row write into 10 bucket in single TXNeach bucket update 1M op in hashmapcommit duration 0.12s(assuming 10M op/s)|Primary Index OptimizationCache miss for large hashmapbatch update:prefetchMemory usagefix length key:use Fi
21、xSlice as key(no need to store length)var length key:shard by length(1/2-1/3 memory)on-demand loadingrelease if no more load for 6minOngoing workShard by constant/Low cardinality columnsUse 128bit hash as primary key(small probability of conflict)|Persistent Primary Indexhttps:/ disk hashmapL0&L1 LS
22、M-like structure|Use Case Cold/Hot DataData partition by dateRecords getting cold graduallyOnly recent partitions have updatesso only fraction of partitions load indexExample:E-commerce ordersTaxi/bike ridesClient sessions|Use Case:Wide TableLarge columns(100 columns)Primary key only takes fraction
23、of total storage spaceExample:User profile,user_id as primary key|CompactionRowsets with increasing deletesneed to read&skip deleted rows,slow down scandelvector copy-on-write,meta overheadSmall rowset fileLSM compactionmerge sst-generate new sstatomically replace metaDifferent designNo duplicate ro
24、wsNo(range)delete tombstone(vs rocksdb)Need to maintain primary indexmerge small filesvacuum deletes|Compactionselect rowsetsmergecommitPrimary Indexupdate primary indexgen delvecupdate metaRocksDBmeta|ExampleNo need to do mergeNo need to read order_idPushdown state=2can use indexScanner only return
25、 revenue columnScanner can be parallelized|Benchmark:Simple Query on Single Table|Benchmark:TPC-H 1T|當前和未來工作03|Read-Write Updates:Partial UpdateRead then writeHarder than upsertLast writer wins doesnt workNeed to abort,retryOr resolve conflicthttps:/ Updates:Partial Updatehot column updatee.g.only u
26、pdate order statuswhole column batch updatee.g.A ML job batch update an user tag column in user_profile tablemerge into dest using src on dest.id=src.id when matched then update set dest.v=src.v;|Read-Write Updates:Partial UpdateJoin on loadWide flat tableModule AModule BEach module only knows subse
27、t of columnsExample:Ad display,clickOrder:shop,payment,inventory,logistics,reviewCurrent solution:Join in stream system(ie.flink),load to APupdate in TP,then CDC to APbatch“merge into”merge into order using pay on order.id=pay.id when matched then update set order.pay_ts=pay.ts order.pay_method=pay.
28、method .;merge into order using ship on order.id=ship.id when matched then update set order.ship_start_ts=ship.start_ts order.ship_end_ts=ship.end_ts .;|Read-Write UpdatesConditional Updateout-of-order arrivalupdate iff ts old.tsmerge into dest using src on dest.id=src.id when matched and src.ts des
29、t.ts then update set dest.c1=src.c1,.when not matched then insert*;|Read-Write UpdatesMerge Updatearray appendmap/set addmerge into dest using src on dest.id=src.id when matched then update set items=array_append(items,src.item)|Read-Write UpdatesGeneral read-write transactiondelete where col=valuei
30、nsert into selectmerge intomulti-statement transactione.g.data fix(batch delete+update)begin;delete from orders where userid=1001;insert into orders select*from user1001_fix;commit;|Transaction Difficulty LevelsTypeTraits(*Read-Set:R,Write-Set:W)ApplicationAppend onlyR:W=append only log analysisUpse
31、rt/DeleteR:TP CDCdata load with deduplication Local Update(Partial updateConditional updateMerge update)R=Weach written row only depending on selfMany use caseOptimization opportunitiesOngoing workGeneral read-writeR!=W#R#W 0Batch DML ELTOngoing workGeneral read-writewith rollbackR!=W#R#W 0non-deterministicMulti-statement batch DML ELT|Materialized View for Primary Key Tableidcountrycityrevenue10cnbj1012cnsh1510cnbj1112cnsh12xx10cnbj1112cnsh12countrycityrevenuecnbj100cnsh120cnbj1cnsh-3cnbj1cnsh-3Primary Table:orderagg materialized viewrevenue_by_cityupdatesdelta|非常感謝您的觀看非常感謝您的觀看|