《Flink Batch SQL Improvements on Lakehouse.pdf》由會員分享,可在線閱讀,更多相關《Flink Batch SQL Improvements on Lakehouse.pdf(41頁珍藏版)》請在三個皮匠報告上搜索。
1、FlinkBatch SQL Improvements on Lakehouse劉大龍/阿里云研發工程師Streaming lakehouse meetupFlink Batch on Paimon 挑戰0101CONTENT目錄 0202Flink Batch 核心優化0303后續規劃01Flink Batch on Paimon 挑戰Streaming Warehouse:Flink+PaimonLogsRDBMSFlink Table StoreFlink Table StoreFlink SQLStreaming&BatchFlink SQLStreaming&BatchbinlogD
2、ata ServingSystemsFlink SQLQueriesFlink Table StoreODSDWDDWSADSFlink SQLStreaming&BatchPaimonPaimonPaimonFlink CDC架構簡潔語義統一數據一致成本低廉透明開放Flink Batch 挑戰Schema 變更行級更新與刪除Snapshot 管理時間旅行查詢高效 ETL&Ad-hoc02Flink Batch 核心優化Year Recap of Apache Flink BatchFlink 1.16Flink 1.17Flink 1.18SQL GatewayAutomatic Colle
3、ction of StatisticsDynamic Partition PruningJoin HintAdaptive Hash JoinSpeculative ExecutionUpdate&DeleteDPP Strategy OptimizeBushy Join ReorderAdaptive Local HashAggAdaptive Batch SchedulerLakehouse APIsFlink JDBC DriverRuntime FilterOperator Fusion Codegen2022.102023.032023.09Part1:Lakehouse API E
4、nhanceALTER TABLE(FLINK-21634,FLINK-27237)CREATE/REPLACE TABLE AS SELECT(FLIP-218,FLIP-305,FLIP-303)Data Management APICALL Procedure(FLIP-311)Time Travel(FLIP-308)UPDATE/DELETE(FLIP-282)TRUNCATE TABLE(FLIP-302)Data Management APIPart2:Join 優化Statistics EnhanceAnalyze Table(FLIP-240)手動觸發,持久化到 Catalo
5、g統計信息豐富rowCountnullCount,ndvmin,maxavgLen,maxLenSupportReportStatistics(FLIP-231)自動收集,不持久化,更實時Flink CSV&Parquet&ORC Format 已支持Paimon 已支持Planner 優先從 Catalog 中獲取統計信息,沒有則通過 SupportReportStatistics 方式實時獲取45Join HintBroadcast Hash JoinBroadcast small table,build hash tableOnly support equi-join4545Shuffl
6、e Hash JoinSort Merge JoinNested Loop JoinBroadcast small table,spill to disk if too largeSupport both equi-join and no-equi-joinShuffle both side by join key and sortOnly support equi-joinShuffle both side by join key,build hash table use small tableOnly support equi-joinJoin Hint沒有統計信息,Planner 給出的
7、 Join 算子不合理,慢慢慢!別慌,我們有 Join HintJoin HintBROADCASTSHUFFLE_HASHSHUFFLE_MERGENEST_LOOPFlink provide following join hint(FLIP-229):Join ReorderWhat is Join Reorder?以 TPC-DS q18 為例Join Reorder:Left-Deep Treecustomerics_demographcustomerics_demographdate_dimcatalog_salescustomercustomer_addressitemJoinJo
8、inJoinJoinJoinJoin1.首先第1層計算出每張表的 rowCount2.第2層從第1層中選出 rowCount 最小的表,Join 其它 N-1 張表,計算出每兩個 Join 的輸出 rowCount3.第K層從第K-1層中選出 rowCount 最小的 Join,Join 其它N-K+1張表,計算出K張表Join的輸出 rowCount4.當K=N時,算法結束問題:貪心算法,容易陷入局部最優解Join 只能順序執行,無法并行影響 DPPJoin Reorder:Bushy Treecustomer_addressdate_dimcustomerics_demographcata
9、log_salesJoincustomerics_demographitemJoinJoincustomerJoinJoinJoin優勢:基于動態規劃算法,整棵樹更加平衡,全局最優更多pattern 可以 match 上 DPP根據時間和空間復雜度自適應切換兩種算法Q18 性能提升一倍FLINK-30376Partition Tableslod_date=2000slod_date=1slod_date=2CREATE TABLE sales(sold_dateINT,item_idINT,sales_priceDOUBLE,list_priceDOUBLE,quantityINT,disco
10、unt_amtDOUBLE,wholesale_cost DOUBLE,ext_taxDOUBLE)PARTITIONED BY(sold_date)WITH(CONNECTOR=ORC);如何減少無效分區數據讀???Dynamic Partition PruningSELECT*FROM sales WHERE sold_date=2/分區字段上有過濾條件12ScanFiltersold_date=212Scansold_date=2優化階段已經確定有效分區無分區裁剪靜態分區裁剪Dynamic Partition PruningSELECT*FROM sales JOIN date_dimWH
11、ERE sold_date=date_sk AND year=2000/分區字段上無過濾條件Joinsold_date=date_skFilteryear=2000Scandate_dimCREATE TABLE date_dim(date_idINT,date_skINT,year INT,month INT,date VARCHAR)WITH(CONNECTOR=ORC);只有 year=2000 的 date_sk 可以關聯出結果12Scansales過濾掉無效的分區FLIP-24812ScansalesSELECT*FROM sales JOIN date_dim WHERE item
12、_id=date_skJoinitem_id=date_skExchangeScandate_dimCREATE TABLE date_dim(date_idINT,date_skINT,year INT,month INT,date VARCHAR)WITH(CONNECTOR=ORC);Runtime FilterExchangeJoin key不包含分區字段,能不能減少Shuffle數據?12ScansalesJoinitem_id=date_skExchangeScandate_dimRuntime FilterRuntimeFilterdate_sk in set(item_id)E
13、xchangeFilterBuilderset(item_id)根據date_dim表item_id集合,提前過濾sales表的大量數據,提升Join算子的性能FLIP-324Part3:Rutime 優化Operator Fusion CodegenTPC-DS Q99 profiling(多路 Join)虛函數調用中間數據物化到內存Source Iterator開銷collector開銷無效計算Operator Fusion Codegen我們需要優化什么?盡量避免內存訪問,數據駐留在寄存器消除虛函數調用編譯執行,面向 query 生成最優的代碼For 循環處理延遲計算 Thomas Ne
14、umanns paper:Efficiently Compiling Efficient Query Plans for Modern Hardware Collapse entire query into a single operator Generate one function for the entire query 批作業支持 OFCG(FLIP-315)Operator Fusion CodegenScanFilterProjectAggregatelong count=0;for(item in sales)if(price 100)count+=1;設置作業并行度?海量批處理
15、作業作業數據量每日變化中間節點數據量難以判斷SQL 作業并行度全局統一費時費力調優困難資源浪費額外開銷Adaptive Batch SchedulerAdaptive Batch SchedulerA1A2Vertex AIntermediate ResultVertex BB1B4(1)(2)(3)1x2x3x4x expected data per task改進效果(FLIP-187)更易用,無需為每個作業單獨調整并行度更高的資源利用率自動負載均衡批作業默認批作業默認SchedulerScheduler(FLIPFLIP-283283)整體性能追齊主流批計算引擎TPC-DS Benchma
16、rk Evolution8873699352774943010002000300040005000600070008000900010000總時間(單位:s)TPCTPC-DSDS 10T10TFlink-1.16分區表Flink-1.16Flink-1.17Flink-1.18(TODO)Part4:穩定性優化Adaptive Hash Join問題:對于 Hybrid HashJoin,數據傾斜非常嚴重時,遞歸會無限循環下去,導致 Join 無法結束Hybrid HashJoin 算法流程Adaptive Hash Join對傾斜分區進行優化,當遞歸次數大于3時,轉為 Sort Merge
17、 Join,既保證了性能,又提升了穩定性FLINK-26929Adaptive HashJoin 算法流程Node 3熱點機器(I/O 繁忙,CPU 高負載)無法避免,其上執行的數據處理任務可能異常緩慢批作業產出時間難以得到保障預測執行預測執行為熱點機器上的慢任務創建新的執行實例(attempts),并部署在正常的機器節點上這些預測執行實例和對應的原始實例具有相同的輸入和產出最先完成的實例會被承認,其他相應實例會被取消A1A2A3B1B2Node 1Node 2Speculative ExecutionSpeculative ExecutionNode 3A1A2A3B1B2Node 1Nod
18、e 2A3Node 4Speculative SchedulerSlowTaskDetectorwatch(1)report slow tasksBlocklistHandler(2)block slow nodes(5)cancel(4)finish(3)allocate slot and deploySource 支持預測執行(FLIP-245)保證同一個 Source 并發的不同執行實例讀取相同的數據SourceFunction、SourceFormat、FLIP-27 Source 都已支持Sink 支持預測執行(FLIP-281)SinkFunction、SinkFormat、Sin
19、kV2 都已支持FLIP-168Part5:SQL 服務化SQL CLISQL Gateway 支持 HTTPS REST 協議(FLIP-91)SQL Gateway 支持 HiveServer2 協議(FLIP-223)Flink SQL CLI 支持連接到 SQL Gateway(FLIP-275)提供 Flink JDBC Driver,供應用程序連接到 SQL Gateway(FLIP-293)SQL Gateway03后續規劃后續規劃用戶與生態引擎持續優化 現有功能持續完善,如 Runtime Filter、OFCG Format 讀寫性能優化 Adaptive Query Execution.Paimon&Hudi&Iceberg 等湖存儲生態對接,數據湖分析場景發力 更多 Focus 在解決用戶問題,打磨引擎(字節、快手、Shopee、小紅書等).ThanksStreaming lakehouse meetup