1、方勇/字節跳動基礎架構工程師Improvements of Job Scheduler and Improvements of Job Scheduler and Query Execution on Flink OLAPQuery Execution on Flink OLAP背景背景BackgroundBackground問題和分析問題和分析Problems Problems&A&Analysisnalysis調度執行優化調度執行優化Scheduler&ExecutionScheduler&ExecutionOptimizationOptimization未來計劃未來計劃FutureFut
2、ure#1#1#2#2#3#3#4#4#1#1背景背景BackgroundBackgroundHTAP Architecture In ByteDanceHTAP Architecture In ByteDanceMySQLProxyAP EngineCatalogConnectorMetaServiceHTAP StoreOther HTAPComponetsSubmit QueryFetch ResultSubmit JobFetch ResultScanFetchFlink SQLGatewayFlink ClusterWhy Can Flink For OLAPWhy Can Flin
3、k For OLAPPrestoPresto ClusterClusterFlink OLAP Flink OLAP ClusterClusterFlink Session Flink Session ClusterClusterTaskManagerFlink sql gatewayFlink sql gatewayFlink sql gatewayFlink sql gatewayConnectorSchedulerPolicyVSVSResourceManagerJobMasterCatalogTaskManagerDeploymentFailoverFlinkFlinkSessionS
4、essionClusterClusterWorkerParserOptimizerParserOptimizerParserCoordinatorCoordinatorOptimizerJobManagerSchedulerPolicyMetadataManagerResourceManagerWorkerConnectorDeploymentFailoverMPPMPPJob PlanJob PlanRuntimeRuntimeManagerManagerExecutionExecutionManagerManagerFailoverFailoverManagerManagerJobMana
5、gerJobManagerWhy Choose Flink In HTAPWhy Choose Flink In HTAPFlink OLAP Flink OLAP ClusterClusterFlink Session Flink Session ClusterClusterTaskManagerFlinkFlink SQL gatewaySQL gatewayFlinkFlink SQL gatewaySQL gatewayConnectorSchedulerPolicyResourceManagerJobMasterCatalogTaskManagerDeploymentFailover
6、FlinkFlinkSessionSessionClusterClusterParserOptimizerParserOptimizerJobManagerJobManager引擎統一Streaming Batch OLAP生態支持跨數據源查詢性能優勢TPC-DS 基準測試#2 2問題和分析問題和分析Problems&AnalysisProblems&AnalysisRest 協議,SQL 查詢特點問題要求模式架構Session On K8s,MPPBatch+Eager Scheduler秒級和毫秒級小作業作業頻繁啟停,資源碎片Latency+QPS 要求現狀和問題現狀和問題接入How?Ho
7、w?Source_Source_SinkSinkSourceSource-SinkSinkWordCountWordCountJoinJoinSource_Source_Tokenizer_Tokenizer_LocalAggLocalAggSource_Source_Tokenizer_Tokenizer_LocalAggLocalAggSource1Source1Source2Source2Source3Source3Join1Join1Join2Join2Sink1Sink1測試用例1.從業務查詢復雜度出發,設計三組作業2.計算節點并發度128,數據量極少測試環境5臺物理機,Flink
8、Standalone 集群1個Session 集群Flink 集群共1.25W個 slotBenchmark Client 多線程提交作業統計10分鐘完成的作業數以及作業平均 Latency調度調度 BenchmarkBenchmark7.811.380.4416.884.261.1117.325.861.9417.197.532.1705101520Source-SinkWordCountJoinQuery QPS141632127.6721.62288.3235.43937.413625.24922.22735.968459.281856.794266.4415987.7905000100
9、001500020000Source-SinkWordCountJoinQuery Latency(MS)141632Client 線程數Client 線程數調度調度 BenchmarkBenchmark分階段優化分階段優化作業管理作業管理資源申請資源申請任務執行任務執行#3 31.1.作業管理優化作業管理優化2.資源申請優化3.任務執行優化4.Benchmark調度執行優化調度執行優化Scheduler&Execution OptimizationScheduler&Execution OptimizationDispatcher Dispatcher 管理優化管理優化執行執行線程池線程池現
10、狀及問題現狀及問題InitializeInitializeJobJobPreparePrepareJobJobFinishFinishJobJobStartStartJobJob線程池太小線程池太小作業作業執行步驟執行步驟單點處理瓶頸單點處理瓶頸線程池太忙線程池太忙InitializeInitializeJobJobPreparePrepareJobJobFinishFinishJobJobStartStartJobJob作業作業執行階段執行階段IOIO線程池線程池StoreStore線程池線程池執行執行線程池線程池優化方法優化方法加大池子加大池子單點處理單點處理步驟拆解步驟拆解增加獨立增加獨
11、立線程池線程池優化思路和實現優化思路和實現Netty/RestNetty/Rest線程池線程池Netty/RestNetty/RestPoolPoolDispatcherDispatcherActorActorAkkaAkka線程池線程池DispatcherDispatcherActorActorJobManagerJobManager 內存優化內存優化心跳檢查心跳檢查資源超時資源超時AkkaAkka 線程池線程池任務隊列任務隊列已結束已結束作業作業現狀及問題現狀及問題AkkaAkka 線程池線程池任務隊列任務隊列已結束已結束作業作業本地任務本地任務管理管理增加本地增加本地任務管理任務管理作業
12、結束作業結束及時清理及時清理結束時清理結束時清理FullGCFullGC 消失了消失了優化和實現優化和實現對象太多對象太多FullGCFullGC運行中運行中作業作業運行中運行中作業作業更多更多定時任務定時任務運行中運行中作業作業運行中運行中作業作業本地任務本地任務管理管理本地任務本地任務管理管理心跳檢查心跳檢查資源超時資源超時更多更多定時任務定時任務#3 31.作業管理優化2.2.資源申請優化資源申請優化3.任務執行優化4.Benchmark調度執行優化調度執行優化Scheduler&Execution OptimizationScheduler&Execution Optimization
13、Batch Batch 申請資源申請資源Job1Job1Job2Job2s1s1s3s3s5s5s2s2s4s4s6s6ResourceResourceManagerManagerDeadLockDeadLockJob1Job1Job2Job2ResourceManagerResourceManagerslot1slot1slot2slot2slot3slot3slot4slot4Job1Job1Job2Job2s1s1s3s3s5s5s2s2s4s4s6s6SlotSlot 粒度粒度 VS BatchVS Batch 申請資源申請資源slot4slot4SlotSlotBatchBatchB
14、atch Batch 申請難點申請難點跟跟 slot slot 粒度申請兼容粒度申請兼容Batch Batch 申請“事務性”申請“事務性”:申請申請/取消等取消等s1s1s3s3s5s5s2s2s4s4s6s6ResourceResourceManagerManagerWaitWait#3 31.作業管理優化2.資源申請優化3.3.任務執行優化任務執行優化4.Benchmark調度執行優化調度執行優化Scheduler&Execution OptimizationScheduler&Execution Optimization作業間連接復用作業間連接復用TaskManager2TaskMan
15、ager2JobAA.Task1.1A.Task1.1A.Task1.2A.Task1.2A.Task2.1A.Task2.1A.Task2.2A.Task2.2TaskManager1TaskManager1JobBB.Task1.1B.Task1.1B.Task1.2B.Task1.2B.Task2.1B.Task2.1B.Task2.2B.Task2.2ChannelChannelChannelChannelNo ReuseNo Reuse現有問題現有問題復用難點復用難點頻繁創建和關閉連接頻繁創建和關閉連接影響查詢影響查詢 Latency Latency 和和 QPSQPSCPU CPU
16、 資源增高和波動資源增高和波動穩定性影響穩定性影響(餓死餓死/死鎖等死鎖等)臟數據處理問題臟數據處理問題連接膨脹和回收問題連接膨脹和回收問題現狀和問題現狀和問題作業間連接復用作業間連接復用TaskManager2TaskManager2JobAA.Task1.1A.Task1.1A.Task1.2A.Task1.2A.Task2.1A.Task2.1A.Task2.2A.Task2.2TaskManager1TaskManager1JobBB.Task1.1B.Task1.1B.Task1.2B.Task1.2B.Task2.1B.Task2.1B.Task2.2B.Task2.2Channe
17、lChannelChannelChannelChannelChannelPoolPoolLazyLazy CreateCreateReuseReuseVerificationVerificationStatusStatusRecycleRecycleInvalidInvalidBusyBusyIdleIdleChannelChannelReuseReuseAllocateAllocateReleaseReleaseAllocateAllocateReleaseReleasePartitionRequestPartitionRequest 優化優化-BatchBatchTaskManager1T
18、askManager2TaskManager3TaskManager4Task1.1Task1.1Task1.2Task1.2Task1.3Task1.3Task1.4Task1.4Task2.3Task2.3Task2.4Task2.4Task2.1Task2.1Task2.2Task2.2TaskManager1TaskManager2TaskManager3TaskManager4Task1.1Task1.1Task1.2Task1.2Task1.3Task1.3Task1.4Task1.4Task2.3Task2.3Task2.4Task2.4Task2.1Task2.1Task2.2
19、Task2.2現狀及問題現狀及問題優化方案優化方案從并發度平方到TM數平方消息數量級消息數量級100并發,2TM10000 410000 4PartitionRequestPartitionRequest 優化優化-NotifyNotifyPartition Register ManagerNetwork Connection ManagerTaskManager2TaskManager1Task2Partition Register ManagerNetwork Connection ManagerTaskManager1Task2現狀及問題現狀及問題優化和實現優化和實現多次重試,數據量級大
20、(并發數平方)重試有等待,增加 Latency主要問題No RetryNo WaitTaskManager2Task1Task1NotifyNotFound+Retry網絡內存池優化網絡內存池優化TaskManagerNetworkBufferPoolTask遍歷總數量巨大遍歷總數量巨大O(slotO(slot數數*上游并發度上游并發度)TaskManagerNetworkBufferPoolTaskNo TraversalNo TraversalJoin 測試作業O(千萬級別)現狀及問題現狀及問題優化和實現優化和實現InitializeLocal PoolAllocationSegments
21、ReleaseLocal PoolReleaseLocal PoolAllocationSegmentsLocal Buffer Pool ListCreateDeleteLocal Buffer Pool ListCreateDeleteInitializeLocal PoolTraversal其他更多優化其他更多優化計算調度計算調度優化優化執行計劃執行計劃優化優化任務任務執行執行優化優化#3 31.作業管理優化2.資源申請優化3.任務執行優化4.4.BenchmarkBenchmark調度執行優化調度執行優化Scheduler&Execution OptimizationScheduler
22、&Execution Optimization7.811.380.4416.884.261.1117.325.861.9417.197.532.1710.52.831.2316.394.773.2219.7413.845.6633.5219.6211.6010203040Source-SinkWordCountJoinQuery QPS141632141632優化前優化前優化后優化后1277222288235937362592227368459185742661598895353816244576124132788926912561716256705000100001500020000Source-SinkWordCountJoinQuery Latency(MS)141632141632優化前優化前優化后優化后調度調度BenchmarkBenchmarkClient 線程數Client 線程數#4 4未來計劃未來計劃FutureFuture未來計劃未來計劃Flink For OLAPFlink For OLAP穩定性性能功能單節點穩定性優化運行時資源使用優化計算結果管理優化作業和計算線程管理細粒度執行管理Optimizer For OLAPRow VS Column從從0 0-11從從1 1-100100History Server 完善產品化完善慢查詢分析診斷THANKS