FFA2024分論壇-核心技術.pdf

編號:184572 PDF 246頁 35.07MB 下載積分:VIP專享
下載報告請您先登錄!

FFA2024分論壇-核心技術.pdf

1、拋棄并行度設置:Flink 智能擴展,資源消耗最小化范瑞Shopee Flink Runtime 負責人Apache Flink PMC Memeber為什么需要并行度全托管?01估算合適的并行度02高效擴縮容03生產實踐&深度優化04收益分析&未來規劃05為什么需要并行度全托管?手動設置并行度的痛點資源利用率易用性1 萬+作業按照流量和負載評估,人力成本極高并行度過低Lag業務邏輯變更重新評估固定的并行度流量波動(小時,天,月)新用戶不了解如何設置并行度并行度過高資源浪費!只 scale up 不 scale down!Lag 報警后 scale up僅針對流作業估算合適的并行度多少并行度足

2、夠?Source 處理一條數據需要 10 ms單線程的處理能力:每秒 100 records Kafka topic 輸入速率:每秒 700 records并行度 7 就足夠了=Source 輸入速率單線程的處理能力700100=7 合理嗎?多少并行度合適?!預期負載:70%實際負載=輸入速率/總處理能力70%=700/1000合適的并行度=期望的總處理能力單線程的處理能力1000100=輸入速率/預期負載單線程的處理能力700/0.7100=10期望的總處理能力?下游 Task 的輸入速率=上游 Task 的輸出速率期望的總處理能力重啟耗時輸入速率期望的總處理能力當前的Lag基礎處理能力=輸

3、入速率/預期負載追 Lag 能力=重啟后的 Lag預期的追 lag 耗時當前 Lag+重啟耗時*輸入速率預期的追 lag 耗時=單線程的處理能力*實際負載=老作業實際總處理速率Total Records 差值時間差(單線程的處理能力*并行度)Busy 率就是實際負載單線程的處理能力=Total Records 差值時間差*并行度*實際負載=估算合適的并行度預期的負載預期的負載范圍追趕 Lag重啟耗時FLIP-271:Autoscaling 1Autoscaler doc 2flink-kubernetes-operator repo 3Autoscaler Configuraions 4Par

4、allelism 的邏輯上限:任何 task 的并行度=maxParallelismKafka source 的并行度=topic 的分區數Metric 窗口大?。J分析最近 15 分鐘 Metric 來估算并行度)用戶可配置 Parallelism 的上限和下限并行度調整的倍數高效擴縮容啟動作業耗時啟動Flink Client啟動 JM生成 Graph申請所有TM啟動所有TM部署TaskYarn/K8s 啟動作業耗時:50s+12 s痛點:手動重啟作業,斷流時間 2 分鐘!申請資源 啟動進程原地擴縮容流程申請擴容的 TM啟動擴容的 TMCancelTask部署Task!斷流時間 2 秒原地

5、擴容流程:CancelTask部署Task異步釋放空閑的 TM!斷流時間 2 秒原地縮容流程:優化大狀態的恢復效率Flink 2.0 存算分離狀態存儲Adaptive SchedulerFLIP-138:Declarative Resource management 5FLIP-160:Adaptive Scheduler 6Adaptive Schedulerjobmanager.scheduler:adaptive設置并行度的區間:lowerBound,upperBound3,10Default Scheduler設置并行度為 5固定的,不支持修改資源=實際并行度,則取消 trigger(

6、避免無效擴縮容)優化天級縮容的頻率天級縮容的業務目標:按照每天高峰期申請資源擴容保持敏感(默認 15 分鐘)scale-down.interval=24 h24 h 內估算的所有并行度 80lowerBound=1upperBound=80(期望的并行度)實際:50-5151-6262-80FLIP-472:Aligning timeout logic in the AdaptiveScheduler WaitingForResources and Executing states 20(Flink 2.0)FLIP-322:Cooldown period for adaptive sched

7、uler 21(Flink 1.19)scaling-interval.min:30s(FLIP-322 在 1.19 引入 2.0 已廢棄)jobmanager.adaptive-scheduler.19 (前綴)executing.resource-stabilization-timeout:1min(FLIP-472 在 Flink 2.0 引入)收到擴容請求1 分鐘最多等 1 分鐘資源提前達到預期,直接擴容評估實際耗時Rescale 后 Lag 過大斷流時間很短,但 Rescale 后 Lag 很大Rescale 后 Lag 過大Rescale 后,從 Latest checkpoin

8、t 恢復Checkpoint 間隔:15 分鐘CheckpointCheckpointRescale14 分鐘 Lagjobmanager.adaptive-scheduler.20rescale-trigger.max-checkpoint-failures:2FLIP-461(Flink 2.0)22下次 Checkpoint 完成后再執行 Rescale!FLINK-36753 23資源 Ready 后主動觸發 Checkpoint !縮容后資源不釋放期望:20-104 個 TM每個 TM 5 個 slot實際 期望!FLINK-33977 24估算的并行度不符合預期Kafka topi

9、c 有 100 個 partition估算的并行度 30實際:10 個并行度:消費 4 個 partition20 個并行度:消費 3 個 partition單 Subtask 處理能力:只能 cover 3.3 個 partition為什么不是 25?向上調整并行度:盡可能均衡地消費 Partition調整結果:50 50*2向上調整并行度:打破瓶頸即可(FLINK-36527 25)每個 Subtask 消費的 partition 數量 VectorSchemaRoot-ArrowArray-RowVector和Paimon結合,支持Native Parquet/Orc Reader 支持

10、更多算子p 維表p 狀態算子支持更多SIMD,支持SQL全類型,對齊FLink所有內置函數THANK YOU謝 謝 觀 看Flink 2.0 存算分離狀態存儲 ForSt DBIntroducing ForSt DB,the disaggregated state store for Flink 2.0蘭兆千阿里云技術專家 Apache Flink Committer存算分離架構介紹 狀態存儲內核 ForSt工作進展&展望Flink 2.0 存算分離架構介紹檢查點開銷大網絡與CPU周期性洪峰輕量級檢查點無需大量數據上傳本地磁盤瓶頸包括空間不足和IO性能不足作業恢復慢作業改配置不靈活遠端作為主存

11、擁有近乎無限的空間,且可擴展啟動速度快狀態可以存儲于遠端,直接啟動Flink 2.0 存算分離提出背景遇到問題存算分離Flink 2.0 存算分離的幾大工作異步化性能優化訪問遠端會導致IO延遲加大支持狀態存儲于遠端最基本的需求快速的檢查點與恢復包括異步化引入的問題Flink 2.0 存算分離架構介紹TaskState backendBatchingInputsCallbacksDFSCP-1CP-2CP-3Async State API社區提出了一系列FLIP:FLIP-423 存算分離總體介紹FLIP-424 新引入的支持異步化的State APIFLIP-425 支持異步化的執行模型FLI

12、P-426 將State訪問攢批FLIP-427 新引入的支持遠端存儲的存儲內核 ForStFLIP-455 更快速的針對in-flight State異步請求的Checkpoint以及APIFLIP-473 基于異步API的SQL實現Flink 2.0 存算分離 全新State API同步API仍被支持(可混用)Flink 2.0 存算分離 異步執行模型RecordsCallbacksStateRequestState Executor(Async,In batch)現實情況較為復雜:Record 保序問題 保證同Key的數據按順序處理Watermark/Timer 的語義保持Checkpo

13、int 的處理Task Thread FLIP-425Flink 2.0 存算分離 異步保序問題Records321相同Key=K1BlockingActive123State Executor21Callbacks2121引用計數=0?Flink 2.0 存算分離 攢批執行BatchingWrite ThreadRead Thread 1Read Thread 2Read Thread 3Write BatchGet/MultiGetIterator load讀寫線程分離、自動分割Get/MultiGetFlink 2.0 存算分離 檢查點TaskInput1Input2StateBacke

14、ndBarrierOutputAligned CP:Operator state snapshot+Keyed State snapshotKeyed State snapshotOperator State snapshotIn-flight data snapshot?存算分離 Async 模型:需要等待所有 Record 處理結束FLIP-455:+當前尚未開始的 State RequestFLIP-455Unaligned CP:In-flight data Flink 2.0 存算分離 檢查點TaskStateBackendFLIP-455 一套聲明用戶邏輯的API&StateReq

15、uest持久化到CP的方式StateRequest易于序列化直接序列化會有諸多問題-只記錄名字(id)“聲明”Callback(作業實際運前)存算分離&嵌入式存儲:ForSt存算分離&嵌入式存儲:ForSt遠端讀寫批量、并發讀寫本地磁盤Cache嵌入式DB支持Flink特性,TTL、Snapshot 等https:/ StatebackendCache on local diskDFSUnified FileSystem LayerMemDISkFor Streaming DB=ForSt DBFRocksDB-ForSt 內核持續演進ForSt:遠端讀寫ForStFileSystemInte

16、rfaceStateBackend(Flink)File CacheFlink FileSystemsImplementation for ForStJNI如何實現遠端讀寫特性?利用Flink的實現,JNI反向調用,所有讀寫從Flink端實現Pros:支持所有文件系統方便檢查點文件與DB文件進行共享Cons:文件訪問性能受JNI影響ForSt:批量讀寫批量寫:使用 WriteBatch 接口批量讀:使用 MultiGet 接口減少JNI開銷;減少 Memtable 鎖沖突CPU Cacheline友好;減少 LRU 鎖沖突;IO 并行化ForSt:快速檢查點StateBackendKeyed

17、State snapshotCase1:CP&DB相同存儲+相同文件夾DFS文件移交:CP時文件被標記為CP文件,受JM管理,不再受DB管理文件被刪除=不會被DB需要Case2:CP&DB相同存儲+不同文件夾DFSCP時走 PathsCopying 接口快速拷貝Case3:CP&DB不同存儲=拷貝(默認情況)dir1dir2ForSt:快速恢復StateBackendKeyed State snapshotFailover:原地重啟僅討論 CP&DB 同一個存儲+同一個文件夾的情況手動Restore:根據Claim Mode不同Claim Mode:原地啟動No Claim Mode:復制后啟

18、動DFS原來CP文件夾現CP文件夾(因為原CP文件也歸當前job管)ForSt:快速恢復StateBackendKeyed State snapshotRescale:使用 ClipDB 與 IngestDB僅討論 CP&DB 同一個存儲+同一個文件夾的情況ClipDB:將 SST 文件按需進行裁剪和重新生成IngestDB:將多個DB合并為1個 最小化文件寫入操作05 611 12140459 1012 1314DB1Clip 1019101112141012 1314DB1Ingest14191419當前進展&未來展望存算分離:Preview版本與2.0進展已與10月中旬發布2.0-Pre

19、view版本新State API、狀態訪問的執行模型可用ForSt 內核已發布SQL Streaming Regular Join 可用純遠端訪問的性能符合預期2.0版本將具有以下功能快速檢查點/恢復功能支持本地磁盤Cache完成常見有狀態SQL Operator的改寫(Agg、Join、Rank等)性能保障:50%狀態在本地Cache時性能不弱于 FRocksDB 方案存算分離:性能測試WordCount 純遠端測試:Key隨機訪問,沖突較少的聚合19.22.011629.605101520253035FRocksDB LocalForSt Remote SyncForSt Remote A

20、syncForSt Remote Async(50%cache)Throughput(k/s)Word Number:300MWord Length:16 bytesMachine:Alibaba Cloud ECS ecs.g6a.16xlarge(64 vCPU 256 GB)x 1HDFS(for ForSt):On same LAN,built on ESSD 6800 IOPS x 4Local Disk:ESSD 6800 IOPSJob parallelism:1Managed memory per TM:512M存算分離:性能測試Nexmark Q20 測試:Key沖突略多的

21、Regular Join3192063250100200300400FRocksDB LocalForSt Remote AsyncForSt Remote Async(50%cache)Throughput(k/s)FRocksDB LocalForSt Remote AsyncForSt Remote Async(50%cache)Nexmark Event:200MMachine:Alibaba Cloud ECS ecs.g6a.16xlarge(64 vCPU 256 GB)x 4HDFS(for ForSt):On same LAN,built on ESSD 6800 IOPS

22、x 4Local Disk:ESSD 6800 IOPSJob parallelism:8Managed memory per TM:512M存算分離展望:2.0版本后順滑體驗FLIP-455 極速檢查點功能性完善SQL Operator 100%支持更多特性Remote Compaction等ForSt展望:更多的流特性支持原生TTL支持文件級別的直接裁剪更輕量快速檢查點原生Memtable檢查點時間分片高效管理大狀態狀態遷移Lazy 模式更優Compaction基于訪問頻率的調度、遠端流的訪問特點點查與范圍查詢的自適應優化THANK YOU謝 謝 觀 看打破Watermark壁壘:實現(跨

23、)Flink作業的進度感知與自動對齊Break the Limitation of Watermark:Implementing Progress Awareness and Automatic Alignment in(across)Flink Jobs陳宇小紅書數據引擎開發工程師背景與問題多流Flink作業進度不一致所帶來的問題實現(跨)作業進度對齊初嘗試&優雅解應用與收益以小紅書實時樣本打寬場景為例背景與問題Flink作業數據進度不一致所帶來的困擾雙流 JoinSource B12:10Source A15:00JoinLeft StateRight State12:10緩存大量Sour

24、ce A數據,Checkpoint成本上漲!Lookup JoinSource ALookupSinkPKSource B12:0012:10Join Miss!數據質量問題!數據正確性保證任意一條流延遲仍能得到正確結果存在數據正確性風險Flink無法感知維表是否延遲Join狀態成本高狀態中需要緩存兩條流的數據狀態不可共享每個Flink作業都需付出狀態成本無狀態Flink成本低依賴外部服務存儲右流可共享維表不同的Flink作業可共享維表數據雙流 Join vs.Lookup Join雙流JoinLookupJoin實現(跨)Flink作業進度對齊初嘗試&優雅解如何衡量一條流的進度?容忍亂序少量

25、亂序數據不會導致進度波動反應處理進度代表數據目前的處理水平調節能力能夠校正不同流之間的進度WATERMARKWatermark AlignmentSource ASource B10:3010:5010:1010:40SourceCoordinatorAlignment Event10:50FLIP-182&FLIP-217&FLIP-296Lookup Join 怎么辦?Watermark 僅在單個Flink作業內有效 如何實現容忍維表延遲?維表復用程度高,一旦維表故障大量下游受影響初嘗試:延遲消費下游可見性變差(固定延遲x分鐘)無法動態調整延遲時間,存在數據質量風險難以評估不同作業的延遲時

26、間Source ALookupSinkClient延遲x分鐘消費可見性延遲x分鐘Pub-Sub PatternPublisherSubscriberChannel進度元信息更新維表作業維表查詢作業維表存儲介質如何設計進度元數據?怎么用Watermark表示維表的更新進度?維表是如何更新的?寬維表的多種寫入場景 PK PK作業B作業A作業B作業A不同作業寫入不同字段不同作業寫入相同字段(主備/不同源Union)每個字段均可能存在一個或多個作業寫入字段粒度的進度元數據進度元數據Job A,10:08Col 1Col 2Col 3Col Job A,10:08Job B,10:09Job B,10:

27、09Job BJob A PK10:0810:09Col 1Col 2Col 3讓Watermark跨Flink作業流動!PublisherSubscriber通過元數據判定是否延遲10:08推送數據&元數據Dim SourceETLJobManagerOperatorCoordinatorSink字段粒度元數據Job A,10:08Col 1Col 2Col 3Col Job A,10:08Job B,10:09Job B,10:09Source AAsync LookupSink10:09Async Timer自動延展Source AAsync LookupSink持續阻塞可能觸發Asyn

28、c Lookup Timeout,造成Failover定時器觸發阻塞狀態延展定時器阻塞時主動延展Async Timer心跳機制維表更新頻率低,夜間流量跌0,Watermark不推進00:0006:0010record/stimeCHK能有效表示作業是故障/無流量Source A02:5000:40Sink00:40REBALANCE心跳機制維表更新頻率低,夜間流量跌0,Watermark不推進00:0006:0010record/stimeDim SourceETLSink字段粒度元數據Job A,10:08Col 1Col 2Col 3Col Job A,10:08Job B,10:09Jo

29、b B,10:09HeartbeatJob A,10:05Notify CHK Complete推送心跳CHK能有效表示作業是故障/無流量Trade-off:Max Allow Ahead機制數據質量與穩定性之間的Trade-off元數據推送天然存在延遲,左流進度與維表近乎一致延遲響應時間維表更新作業維表關聯作業元數據存儲介質10:08:2210:08:2510:08:21Blocked!每5s推送引入Max Allow Ahead參數允許左流超前x秒在小紅書的廣泛應用應用與收益全量跨云/地域同步的成本問題實時樣本流Region NRegion HRegion S整庫同步整庫同步樣本流A1樣本

30、流A2樣本流An跨云/地域同步Lookup Join對象存儲落庫實時消費Flink按需同步與細粒度管控Region NRegion HRegion S按需同步30%同步任務樣本流A1樣本流A2樣本流An10:48:52A110:47:21A210:46:37AnLookup JoinProgress Meta直播數倉中的廣泛應用訂單退款明細訂單流關聯直播間基礎信息維表直播間商品合約明細直播間商品數據流關聯商品基礎信息維表收益質量可靠顯著降低維表關聯場景下Miss率一呼百應維表運維/故障/延遲,關聯方自動阻塞成本收益帶寬成本降低70%+總結l Watermark Alignment 解決了作業內

31、的進度對齊問題l Lookup Alignment 解決了作業間的進度對齊問題n Source與Source間對齊n Source內Task間對齊n 提供了通用的Watermark跨作業傳遞方案n 填補了維表關聯場景中進度對齊能力的空缺n 主動阻塞能力提供了數據質量保障,降低運維壓力THANK YOU謝 謝 觀 看突破狀態恢復困境:進一步解決FlinkSQL算子內的狀態兼容性問題Breaking the State Recovery Dilemma:Further Addressing State Compatibility within the FlinkSQL Operator袁 奎小紅書

32、數據引擎開發工程師狀態恢復困境背景介紹、原因分析解決方案探索典型場景下的解決方案未來展望著眼未來的發力點FlinkSQL作業狀態恢復困境背景介紹、原因分析SQL作業難以從狀態恢復的現狀有狀態作業窗口聚合無限流聚合無限流去重async lookup.社區狀態兼容遷移方案作業正常運行時Flink持續讀寫本地State,在checkpoint/savepoint時將state上傳到遠端文件系統社區狀態兼容遷移方案作業從狀態恢復時使用上一次運行的序列化器反序列化之前的狀態,使用新的序列化器序列化新的狀態SQL作業難以從狀態恢復的原因分析SQL作業狀態數據結構SQL作業難以從狀態恢復的原因分析RowDa

33、taSerializerRowDataSerializerSQL作業從狀態恢復時重啟前后都時RowData數據結構存儲狀態,都使用RowDataSerializer序列化器逐項檢查重啟前后RowData各字段的類型是否相同造成的結果:用戶修改聚合項之后無法精確判斷能否恢復;如果沒有改變聚合項的返回類型,能夠恢復但可能是從錯誤的狀態恢復檢查重啟前后RowData中字段數量是否相同造成的結果:用戶無法新增和刪除聚合項SQL作業難以從狀態恢復的原因分析RowDataSerializerFlinkSQL作業狀態兼容方案典型場景下的狀態兼容方案介紹更精細化的檢查各聚合項類型比對各聚合項類型比對聚合項數量

34、狀態兼容檢查原方案參與聚合的字段列表各聚合項名稱Checkpoint存儲的元信息聚合項位置是否改變是否增加/刪除了聚合項狀態兼容檢查改進方案方案核心各聚合項類型聚合項本身是否改變一個聚合項包含如下信息:1.聚合名稱2.參與聚合的字段列表3.聚合項的類型聚合名稱、參與聚合的字段列表、聚合項的存儲類型可以唯一定義一個聚合單元。Checkpoint中存入聚合單元列表,比對重啟前后的兩個聚合單元列表即可得知是否增加或刪除了聚合單元,聚合單元位置是否發生了改變,參與聚合的字段是否發生了改變。Checkpoint存儲的元信息更精細化的檢查通過對聚合項的原子化定義,可以識別出用戶對聚合項的修改:max(co

35、l1)和avg(col3)更換了順序刪除了sum(col2)增加了min(col4)增加遷移接口方案核心增加狀態遷移接口,用來將舊的狀態數據轉換成適應新狀態的初始數據改進過后的狀態兼容遷移方案改進過后的狀態兼容遷移方案回撤場景下增加聚合項編號輸入(gender,name,age)輸出狀態1+IM,Tom,10+IM,1M:1修改SQL,增加聚合項:AVG(age),并從狀態恢復從狀態恢復后的狀態:M:1,null(0)2-UM,Tom,10-UM,1,nullM:0,null(-1)3+UM,Tom,12+UM,1,nullM:1,null(0)4+IM,Jesse,14-UM,1,null+

36、UM,2,14M:2,14(1)期望的計算結果:M,2,13 實際的計算結果:M,2,14新增的聚合項可能會因為回撤操作產生錯誤的結果有限影響場景下依然可以使用,比如時間字段參與聚合的場景回撤場景下增加聚合項有限的影響,比如天級/小時級聚合業務允許有限的誤差(允許當天/當前小時的數據出現誤差)使用條件去重場景下增加字段proc_time+descproc_time+asc4種去重類型是否是否產生回撤去重算子row_time+ascrow_time+asc是是去重類型以及回撤關系狀態中的值:A,B,C回撤消息值:A,B,C,?,?去重算子中增加字段,增加的字段在狀態中無值,無法產生準確的回撤消息

37、 業務可接受范圍內:回撤的時候新增字段對應位置下發null:A,B,C,null,nullDistinct關鍵字處理select version,max(height),count(distinct age),sum(distinct height)from MyTable2 group by versionCodegen生成的代碼中,Distinct狀態的key有一個自增后綴,自增名稱的生成會給啟動前后的狀態映射造成麻煩,這要求不能隨意增刪改Distinct聚合項,但可以在末尾增加Distinct聚合項Async Wait Operator狀態恢復Checkpoint時會將queue存入狀態

38、使用queue緩存當前未處理的消息引入狀態只為保證數據完整性,不依賴狀態計算ASYNC無狀態的Async Wait Operator方案:Checkpoint的時候不使用狀態而是確保Queue中的消息都處理完理論上Checkpoint的時間會變長不使用狀態即可任意修改SQL上線收益回溯數據,重新積累狀態回溯12小時左右數據回溯數據階段主備鏈路不能切換業務迭代時間久新建任務應對新的業務需求不改動原始任務新建任務應對新的業務需求需要額外的計算、存儲資源原任務直接從狀態恢復不需要再回溯數據業務迭代時間從小時級別降至分鐘級增強鏈路穩定性原任務直接從狀態恢復不需要再創建新的flink任務節省計算、存儲資

39、源減少鏈路維護成本WorkAround方案新方案未來展望著眼未來的發力點未來展望深入calcite,探究生成穩定流圖的方案按需支持Join等更多算子的狀態兼容遷移降低用戶使用復雜度,做到開箱即用THANK YOU謝 謝 觀 看極致性能優化:使用Apache Fury加速Flink數據傳輸和狀態讀寫性能楊朝坤 Apache Fury作者、螞蟻多模態計算引擎&視頻架構負責人Flink性能瓶頸分析Flink數據傳輸和狀態持久化瓶頸Fury序列化框架Apache Fury序列化框架簡介Fury加速Flink性能Fury替換Flink/Kryo序列化深入分析Fury序列化原理Flink性能瓶頸分析Fli

40、nk數據傳輸和狀態持久化瓶頸Flink任務的序列化開銷分析BinaryRow任意Java對象SourceShuffle算StateWindow、KeyBy、JoinStateShuffle算SinkState數據傳輸數據傳輸DataStream作業SQL作業狀態序列化自定義算子State任意Java 對象數據序列化狀態序列化部分序列化開銷BinaryRow/String/Array:二進制數據,無序列化開銷BinaryRow任意Java對象(String/POJO/Map/List/Set/Array):大量序列化開銷任意Java對象內置算子State:BinaryRow,BinaryStri

41、ngUDAF State:任意Java對象數據序列化大量序列化開銷UDAF SQL任務存在部分序列化瓶頸狀態序列化無序列化瓶頸大部分SQL任務部分序列化瓶頸UDAF SQL任務大量序列化瓶頸DataStream任務Flink DataStream數據傳輸瓶頸DataStream 算上下游間跨進程傳輸復雜Java對象時,需要將對象序列化為Binary,序列化耗時占可超過80%Flink DataStream狀態訪問瓶頸每條數據讀寫offheap狀態(RocksDB等)時,需要通過序列化將Java對象與State的Binary數據互轉,序列化存在性能瓶頸序列化瓶頸如何優化Flink StringS

42、erializerFlink PojoSerializerFlink KryoSerializerFury StringSerializerFury Generated ObjectSerializerFurySerializerFlink內置序列化器和Kryo序列化器替換為Apache Fury序列化器Apache Fury序列化框架介紹Fury簡介及使用示例Apache Fury簡介Apache Fury 是一個基于動態代碼生成和零拷貝技術的多語言序列化框架,實現了無需IDL編譯的原生多語言編程范式,顯著降低多語言系統的研發成本并提供最高170倍的性能。支持語言應用場景規模數據傳輸并發RP

43、CAI在線服務搜索推薦系統數據計算分析JavaScalaKotlinPythonGoJavaScriptRustC+云原中間件無需IDL的多語言編程模型IDL-free:動序列化對象,需IDL定義、編譯和數據對象轉換;編程語原類型系統:開箱即;可選的跨語共享和循環引:動序列化復雜對象圖;對象原多態:動序列化interface/trait等動態類型;對比動態序列化框架:最高快170倍JVM-Serializers benchmark最快的序列化框架.比JDK/Hessian/Kryo最高快20170倍;運行時動態生成序列化代碼:最小化內存訪問、方法調用開銷、減少條件分支和循環;高效的序列化協議:

44、Meta Packing共享、Buffer/Array ZeroCopy對比靜態序列化框架Protobuf序列化性能是Protobuf的6倍;反序列化性能是Protobuf的4倍;Fury的元數據Packing Schema Evolution模式避免了Protobuf的KV結構性能開銷;Fury支持Schema強一致模式,可完全避免Schema進化的開銷;Fury運行時動態生成更加高效的序列化代碼;緊湊的編碼協議:更小的序列化體積復雜對象,序列化體積只有Protobuf 的45%;Fury的元數據Packing Schema Evolution模式:相比Protobuf的KV結構,相同類型多

45、個對象只需寫一次Schema;對于各節點Schema一致場景(分布式計算系統),Fury提供Schema強一致模式,消除Schema Evolution開銷;Fury應用案例:節省資源,降低延遲Lakesoul 使用Fury優化 Flink CDC序列化的性能,相比Kryo和Flink原生序列化,端到端吞吐提升一倍搜索推薦系統需要序列化和傳輸大量高密度特征,用于特征構建、打分、召回、排序等。通過Fury,唯品會和美團搜推服務平均耗時降低 30ms+Fury Java序列化示例數據類型JDK序列化初始化FuryFury序列化Fury反序列化1.通過類型注冊白名單提高反序列化安全性2.性能大幅超越

46、JDK/Kryo/HessianFury Scala序列化示例數據類型JDK序列化初始化FuryFury序列化Fury反序列化1.持任意Scala對象序列化2.性能幅超越JDK/Kryo/ChillFury加速Flink性能Fury替換Flink/Kryo序列化深入分析Fury序列化原理03DataStream數據傳輸優化DataStream 算子上下游間跨進程傳輸復雜Java對象時,Fury將序列化CPU占比從從51%51%降低到降低到15%15%Flink Kryo序列化Fury序列化DataStream狀態訪問優化每條數據讀寫offheap狀態(RocksDB等)時,需要通過序列化將Ja

47、va對象與State的Binary數據互轉,Fury將String狀態序列化CPU占降低倍Kryo序列化Fury序列化Fury String序列化優化Fury字符串優化原理在各類字符串場景下,Fury字符串序列化性能是JDK/Kryo/Flink字符串序列化器的數倍1.同時支持Latin1/UTF8/UTF16三種編碼,JDK11+Latin1字符串直接零拷貝2.ASCII 檢測向量化,superword位掩碼一次檢查8字節3.UTF8編碼向量化,ASCII字符一次編碼多字節Fury Array序列化優化支持多層嵌套的Array/Buffer等對象的out-of-band零拷貝,避免復制到中間

48、buffer的開銷,可用于直寫共享內存和網絡零拷貝支持類型基本類型數組,ByteBuffer,ArrowRecordBatch,VectorSchemaRootarray,numpy,pyarrow.Table,pyarrow.RecordBatchbyte sliceJavaPythonGoFury Collection序列化優化大部分List/Set/Collection的元素相同類型,且都不為空;大量集合的size小于10;集合遍歷開銷??;Go,Rust,C+的集合元素數據,值類型居多,引用和多態較少;集合數據特性編碼策略預先遍歷集合,提前計算header;元素Header(4bits)

49、:解析引用、包含空值、值類型相同、值類型是聲明類型合并元素Header和集合長度合并編碼減少開銷;集合元素不再需要寫入是否為空標記和元素類型信息;Compaction集合協議:通過提前檢測重復元數據,最小化Java多態和引用帶來的空間和性能開銷Fury Map序列化優化Key的類型相同,Key一般不為null,且最多一個為null;Value的類型都相同,且都不為空;KV都是非空的基本類型和字符串;Go,Rust,C+的map KV數據,值類型居多,多態和引用較少;大部分Map數據特性編碼策略分Chunk編碼:每個chunk的Keys和Values元數據相同,不一致時開始新chunk;多級退化

50、編碼:先假設元數據最大相似性,不一致時減少下個chunk相似性,最后退化為通用編碼模式Chunk By Chunk Map協議:動態檢測重復元數據,最小化Java多態和引用帶來的空間和性能開銷Map是樹結構,迭代遍歷一次開銷較大,可達到序列化的1/2,不適合提前計算KV統計信息;Header預計算開銷大LayoutFury POJO序列化優化不寫入字段名稱、類型等meta信息,字段預排序后按順序依次寫入,避免了Protobuf和Json的KV結構開銷;基于當前類型的Schema反序列化數據;性能最高,空間開銷最少Schema 強一致協議字段元數據打包壓縮編碼,多個相同類型對象只寫一次元數據,避

51、免Protobuf和Json的KV結構開銷字段數據按照Schema強一致協議寫入;基于解析的Schema反序列化數據;Schema Evolution協議分布式計算引擎上下游數據傳輸:進程代碼一致,Schema一致費業務的基礎類型:Schema不太會發生變更應用場景在線服務:不同微服務獨立部署,獨立升級,調用方和被調用方代碼存在不一致,Class Schema存在差異Flink ValueState兼容性:POJO增刪字段時,則需要通過該協議從Checkpoint/SavePoint恢復數據應用場景通過代碼生成加速POJO序列化通過代碼生成減少內存訪問、類型分發和虛方法調用開銷;提前合并多次內

52、存地址檢查,最小化檢查開銷;動態編譯優化:大方法自動拆分成小方法,多層調用全部JIT內聯;數據結構類型Fury解釋執行模式Fury動態生成代碼模式通過代碼生成加速POJO集合字段序列化通過代碼生成減少內存訪問和虛方法調用開銷;支持多態類型優化,運行時生成多份代碼,動態分發執行;Chunk by chunk Map encoding:NodeJS Codegen完成,Java開發中;數據結構類型List 解釋執行模式List 動態生成代碼模式Map 解釋執行模式Map 動態生成代碼模式Fury序列化器動態編譯加速效果序列化對象JVM C2編譯日志Fury動態生成的序列化代碼序列化火焰圖THANK

53、 YOU謝 謝 觀 看楊朝坤|Github:chaokunyangFlink 批處理自適應執行計劃優化孫夏阿里集團 愛橙科技 高級開發工程師Apache Flink Contributor自適應邏輯執行計劃自適應 Join算子優化未來改進基于StreamGraph的作業提交動態邏輯執行計劃優化動態選擇 Join 方式自適應Join傾斜優化支持更多場景更智能的自適應優化自適應邏輯執行計劃作業的執行計劃SELECT*FROM T1,T2 WHERE a1=a2 and b2 FilterJoin-SinkSourceABCA1B1B2C1C2用戶SQL算子層面對外暴露的信息逐步減少C1C2A1B1

54、B2并行度數據傳輸方式(shuffle)數據劃分方式(partition)計算邏輯自動并發度推斷均衡數據分發特定算子邏輯優化數據傳輸方式優化自適應批處理調度器基于StreamGraph的作業提交用戶代碼CompilerOptimizerFlink ClientJob ManagerREST APIDispatcherJobMasterSchedulerJobGraphExecution GraphStream Graph用戶代碼CompilerOptimizerFlink ClientJob ManagerREST APIDispatcherJobMasterSchedulerStreamGr

55、aphExecution GraphStream GraphJob GraphAdaptiveGraphManagerFLIP-468ExecutionPlan:通用的執行計劃表達用戶代碼CompilerOptimizerFlink ClientJob ManagerREST APIDispatcherJobMasterSchedulerStreamGraphExecution GraphStream GraphJob Graphpublic interface ExecutionPlan implementAdaptiveGraphManagerCompiledPlan?動態邏輯執行計劃:漸

56、進式構建Stream GraphSourceFilterSortMergeJoinFrozen NodesSourceHashHashforwardJob GraphSource-FilterSourceIntermediateDatasetIntermediateDatasetJobVertex 的創建時機:1.Source節點將在作業調度前被創建,為作業的啟動提供起點。2.其余節點將在其上游所有節點完成后生成,此時可以獲取到準確的輸入數據的信息,來幫助做出更好的優化決策。FLIP-469動態執行圖優化Stream GraphSourceFilterBroadcastJoinFrozen N

57、odesSourceForwardRescaleforwardAdaptiveBatchSchedulerAdaptiveExecutionHandlerStreamGraphOptimizerAdaptiveBroadcastJoinStrategySkewedJoinOptimizationStrategyOtherJobEventAdaptiveGraphManagerTry updateJob GraphSource-FilterSourceIntermediateDatasetIntermediateDatasetBroadcastJoinNotify job graph updat

58、edNotify job vertex finishedNotify to update Execution GraphNew Web UIFlink Web UI僅會展示當前已經生成的Job VertexNew Web UI提供Show Pending Operators開關展示出Pending中的Stream Nodes詳細信息自適應 Join節點優化整體優化流程整體優化流程SQLLogical Relnode TreePhysical Relnode TreeExecNode DAGTransformation DAGExecNodeGraph ProcessorsMultipleInp

59、utNodeCreationForwardHashExchangeAdaptiveJoinInjectTable PlannerRuntimeStream GraphSourceAdaptiveJoinSourceOptimized Stream GraphSourceOptimizedJoinSourceUpdated JobGraphUpdated ExecutionGraph優化步驟:1.Table Planner階段識別符合優化條件的Join算子標記為AdaptiveJoin算子。2.Runtime階段在會根據StreamGraphOptimizationStrategy優化項對Str

60、eamGraph進行優化。3.AdaptiveBatchScheduler更新物理執行計劃。Adaptive Broadcast Join OptimizationAdaptive Broadcast Join Optimizationtable.optimizer.join.broadcast-threshold:10MBConfigScanScanTable1Table2FilterSortMergeJoinSize:15MBHashHash根據靜態統計信息,右表數據量15MB大于Broadcast join 10MB的閾值,因此SQL優化器選擇了 SortMergeJoin 作為實際Jo

61、in方式。相比于SortMergeJoin,BroadcastHashJoin具有更好的性能表現。ProducedBytes:5MBAdaptive Broadcast Join OptimizationAdaptive Broadcast Join Optimizationtable.optimizer.join.broadcast-threshold:10MBtable.optimizer.adaptive-broadcast-join.strategy:AUTOConfigScanScanTable1Table2FilterSortMergeJoin(AdaptiveJoin)Size:

62、15MBProducedBytes:5MBHashHashRunning StageScanScanTable1Table2FilterBroadcastHashJoinForwardBroadcastFinished StageOptimizeFinished StageFinished Stage優化步驟:1.當上游jobVertex執行結束,嘗試優化下游存在的AdaptiveJoin算子。2.通過(1)check產出數據大小是否小于broadcast閾值。(2)根據join類型和所在邊check是否支持作為build端。3.若滿足優化條件,則會對Join算子的input輸入順序、輸入邊的

63、數據分發方式進行重新調整,最后根據新的條件生成最終的OperatorFactory。Adaptive Broadcast Join OptimizationAdaptive Broadcast Join OptimizationScanScanTable1Table2FilterBroadcastHashJoinForwardBroadcastFinished StageFinished Stage仍存在的問題:1.大表側與Join節點還存在一次網絡傳輸。2.小表側產出的分區是HashPartitioner產出分區,存在較多的小文件。網絡分區傳輸優化Input locality優化Forwar

64、d邊優化為Rescale邊TaskManagerSlotSlotSlotScan TaskJoinTaskdeployScanBroadcastJoinForwardFinished StageScanRescaleFinished StageScan TaskScan TasksubpartitionsJoinTaskHashFullFilledBufferSchedulerPreferred LocationsIGRPinputChannelsAdaptive Broadcast Join OptimizationAdaptive Broadcast Join OptimizationSk

65、ewed Join OptimizationSkewed Join OptimizationVertex1Task1Task2Task3Vertex2Task1Task2Task3Vertex3(InnerJoin)Task1Task2Task3Task1消費數據量明顯大于其他節點,造成任務長尾。Skewed Join OptimizationSkewed Join Optimization優化步驟:1.當上游jobVertex執行結束,嘗試優化下游AdaptiveJoin算子。2.通過(1)check當前input side是否支持分區切片。(2)判斷分區數據是否傾斜來進行判斷。3.在Dec

66、ider階段會根據調整后的分區進行重新劃分。Vertex1Task1Task2Task3Vertex2Task1Task2Task3Vertex3(InnerJoin)Task1Task2Task3Vertex1Task1Task2Task3Vertex2Task1Task2Task3Vertex3(InnerJoin)Task1Task3Task4SplitDuplicateTask2Optimizetable.optimizer.skewed-join-optimization.strategy:AUTOtable.optimizer.skewed-join-optimization.sk

67、ewed-factor:4table.optimizer.skewed-join-optimization.skewed-threshold:256mConfig數據均衡分發數據均衡分發為了更好描述這種拆分與復制,我們將其抽象為兩種關聯關系:1.IntraInputCorrelation(內部關聯):同一個 key group 下的數據必須發送給相同的下游 join 并發實例。2.InterInputCorrelation(外部關聯):多個Input之間存在關聯關系,對其中一個Input的key group的數據做拆分時,其他Input的對應key group的數據必須進行復制。UpStrea

68、mVertexInnerJoinAllToAll兩端輸入都可拆分UpStreamVertexUpStreamVertexLeft/RightJoinAllToAll僅有一端可拆分UpStreamVertexUpStreamVertexDownstreamVertexPointWise可拆分partition1partition1partition3111222Downstream Task1Downstream Task2=V=V333partition1partition1partition3111222333111111222222Downstream Task3=V333333Inter

69、InputCorrelation:true11subpartition122subpartition233subpartition3V:the amount of data user expects each task to processALL_TO_ALLKeyGroup IntraInputCorrelation:trueIntraInputCorrelation:true數據均衡分發數據均衡分發InterInputCorrelation:truepartition1partition1partition3111222Downstream Task1Downstream Task2=V=

70、V333partition1partition1partition3111222333111111222222Downstream Task3=V333333111111222222333333Split111222222333333Split111111ALL_TO_ALLKeyGroupIntraInputCorrelation:falseIntraInputCorrelation:true數據均衡分發數據均衡分發partition1partition1partition3111222Downstream Task1Downstream Task2=V=V333partition1part

71、ition1partition3111222333111111222222Downstream Task3=V333333InterInputCorrelation:true111111222222333333SplitSplit1111222222333333Split11111111ALL_TO_ALLIntraInputCorrelation:falseIntraInputCorrelation:false數據均衡分發數據均衡分發KeyGrouppartition1partition2partition3111222121212Downstream Task1Downstream Tas

72、k2=V=VPOINT_WISE數據均衡分發數據均衡分發目前PointWise的劃分算法只能將數據分組劃分為一組連續的partitions或同一partition內的連續subpartitionspartition1partition2partition3111222121212Downstream Task1Downstream Task2=V=VPOINT_WISE數據均衡分發數據均衡分發支持了跨Partition與Subpartition Range劃分算法未來改進03未來改進與operatorfusion codegen結合支持更多場景MultipleInput更智能的自適應優化策略T

73、HANK YOU謝 謝 觀 看Flink基于Paimon的實時湖倉解決方案的演進蘇軒楠阿里云高級研發工程師,Apache Flink Committer背景介紹Paimon 與 Flink 結合的場景技術演進 Paimon 與 Flink 的技術迭代發展規劃探討未來的規劃與方向背景介紹Paimon 與 Flink 結合的場景流式湖倉架構Apache FlinkApache Paimon流批一體湖倉格式流批一體計算引擎 支持分鐘級別數據更新 流批一體的特性可以同時支持實時和離線的計算 Paimon的每一層數據都可以直接接受變更數據,無需覆寫分區 Paimon 為流計算而生,和 Flink 結合完

74、善2.01.0技術演進 Paimon 與 Flink 的技術迭代全增量一體入 Paimon 優化CDC Source全量階段增量階段binlogPaimon Sinkchangelogdata通過 Flink 流批融合,CDC source 可以通過事件通知 Paimon sink 當前的階段全量階段可以優化寫出 changelog 性能在全量階段下,Paimon sink 可以獲得 20%的性能提升Partial Update在實時湖倉打寬表中,通常會用到 Flink 的 Join 來實現Paimon 提供了 Partial Update 的合并機制,可以替代 Flink 的 join 來實

75、現打寬表的效果user_idab11null22nulluser_idab1null12null2user_idab111222問題:如果一個作業有多個 source 寫入同一張 Paimon 表時會有多個 sink 算子。多個 sink 同時做 compaction 會導致作業 FOSink 算子合并Partial Update 場景下,會有一個作業有多個 source 寫入同一張 Paimon 表Flink SQL planner 編譯作業時,如果有多個 sink 寫入同一張 Paimon 表時,會使用同一個 sink 算子無主鍵表引入 Range Partition and Sort問題

76、:對于無主鍵表,所有的數據都是亂序的,查詢的時候需要讀取大量的數據,導致查詢效率很低。同時亂序的數據,也會影響列存格式的壓縮率。無主鍵表引入 Range Partition and Sort在寫入無主鍵 Paimon 表的批作業中通過 range partition and sort 可以把數據按列進行排序再寫出文件,提升查詢效率。1.數據采樣:估算需要排序的列的范圍,并且 broadcast 發送到下游無主鍵表引入 Range Partition and Sort在寫入無主鍵 Paimon 表的批作業中通過 range partition and sort 可以把數據按列進行排序再寫出文件,

77、提升查詢效率。1.數據采樣:估算需要排序的列的范圍,并且 broadcast 發送到下游2.分配 cluster:按照估算的范圍來分配每個數據對應的 cluster index,并且按照 cluster index 來分發到下游算子無主鍵表引入 Range Partition and Sort在寫入無主鍵 Paimon 表的批作業中通過 range partition and sort 可以把數據按列進行排序再寫出文件,提升查詢效率。1.數據采樣:估算需要排序的列的范圍,并且 broadcast 發送到下游2.分配 cluster:按照估算的范圍來分配每個數據對應的 cluster index

78、,并且按照 cluster index 來分發到下游算子3.刪除 cluster index:在寫出數據之前,刪除 cluster index無主鍵表引入 Range Partition and Sort在寫入無主鍵 Paimon 表的批作業中通過 range partition and sort 可以把數據按列進行排序再寫出文件,提升查詢效率。1.數據采樣:估算需要排序的列的范圍,并且 broadcast 發送到下游2.分配 cluster:按照估算的范圍來分配每個數據對應的 cluster index,并且按照 cluster index 來分發到下游算子3.刪除 cluster inde

79、x:在寫出數據之前,刪除 cluster index4.Sort Partition:對每個 cluster 內的數據排序Lookup Join 優化Flink 不知道 Paimon 表的 bucket 分配方式,導致 shuffle 的時候,所有的 lookup 并發都要加載并保存全量數據內存占用高作業啟動 overhead 高Lookup join 性能差Bucket 1Key:1-2Bucket 2Key:3-4Paimon 表LookupLookup事實表k=1,k=3k=2,k=4cachek=1,k=2,k=3,k=4cachek=1,k=2,k=3,k=4Lookup Join

80、優化事實表Flink SQL planner 引入 SupportsLookupCustomShuffle 接口讓 Paimon source 支持配置 Lookup shuffle 的方式Bucket 1Key:1-2Bucket 2Key:3-4Paimon 表LookupLookupk=1,k=2k=3,k=4cachek=1,k=2,k=3,k=4cachek=1,k=2,k=3,k=4Fixed Bucket Table Lookup JoinFixed Bucket Table 表的桶數量是固定的對于一條數據,他所對應的 bucket 可以通過以下的方式計算出來key_hashco

81、de%numBucketsLookup 的 shuffle 方式和數據的分桶方式一致Lookup 算子只需要讀取并存儲所負責的 bucketBucket 1Key:1-2Bucket 2Key:3-4Paimon 表LookupLookup事實表k=1,k=2k=3,k=4cachek=1,k=2cachek=3,k=4Dynamic Bucket Table Lookup Join事實表Bucket 1Key:1-2Bucket 2Key:3-4Paimon 表LookupLookupk=1,k=3k=2,k=4cachek=1,k=3cachek=2,k=4Dynamic Bucket T

82、able 表的桶數量是動態的對于一條數據,他所對應的 bucket 需要通過讀取 paimon 的 index 文件來獲取Lookup 的 shuffle 方式根據 Lookup key hash 來計算Join_key_hashcode%numSubtaskLookup 算子雖然需要讀取全量數據,但是只需要保存負責的 join key 的數據Skew Join事實表Bucket 2Key:2Bucket 3Key:3Paimon 表LookupLookup事實表k:1,1,1,2,3k:1,1,1,1,3Bucket 1Key:1Lookupk:1,1,1,1,1,1,1,1k:2k:3,3

83、當 Lookup Join 的事實表存在 hot key 的時候,那負責該 hot key 的 Join 算子并發將有可能成為作業的性能瓶頸Skew Join當 Lookup Join 的事實表存在 hot key 的時候,那負責該 hot key 的 Join 算子并發將有可能成為作業的性能瓶頸在這種情況下,如果作業對于 per-key 沒有順序要求,可以使用 skew join 的優化Paimon Lookup 自定義的 shuffle 會將同一個 key 的數據隨機打散到指定的N個并發上,來解決事實表 hot key 導致的性能瓶頸事實表Bucket 2Key:2Bucket 3Key:

84、3Paimon 表LookupLookup事實表k:1,1,1,2,3k:1,1,1,1,3Bucket 1Key:1Lookupk:1,1,1,1,1k:1,1,1k:2,3,3對接 Materialized Table用戶只需要通過 Flink SQL 寫業務代碼,Flink 會自動根據對應的新鮮度需求來啟動流作業或者批作業,保證 Paimon 表的數據符合新鮮度的要求對接 Materialized Table用戶只需要通過 Flink SQL 寫業務代碼,Flink 會自動根據對應的新鮮度需求來啟動流作業或者批作業,保證 Paimon 表的數據符合新鮮度的要求優化 Procedure 易

85、用性procedure 支持 named argument,用戶不需要配置有默認值的參數優化 Procedure 易用性 Paimon 提供了很多 action 來管理 Paimon catalog,包括 Compaction,snapshot 管理等 一些 Action 沒有對應的 Flink procedure 實現,需要通過 action jar 來調用 Paimon 0.9 之后,所有 action 都實現了對應的 procedure??梢允褂?SQL Call procedure 的方式來執行所有 Paimon action。Clone TablesPaimon 0.9 以前1.下載

86、 Paimon action jar 包2.通過 flink run 命令啟動 clone tables 作業Clone TablesPaimon 0.9 之后直接 Flink SQL 調用 CALL發展規劃未來的規劃與方向 開源功能 無主鍵表引入 range partition and sort【Paimon 0.9】Procedure 易用性優化【Paimon 0.9】對接 Materialized Table【Paimon 1.0】商業化功能將在 Flink 2.0 之后逐步推進社區 全增量一體入 Paimon 優化 Partial Update 多 sink 合并 Lookup Join 優化 持續優化 Paimon 在實時湖倉場景下的讀寫性能 利用 Paimon merge engine 來優化 Flink aggregation 的性能 優化 Paimon 在 batch 作業的讀寫性能展望未來THANK YOU謝 謝 觀 看

友情提示

1、下載報告失敗解決辦法
2、PDF文件下載后,可能會被瀏覽器默認打開,此種情況可以點擊瀏覽器菜單,保存網頁到桌面,就可以正常下載了。
3、本站不支持迅雷下載,請使用電腦自帶的IE瀏覽器,或者360瀏覽器、谷歌瀏覽器下載即可。
4、本站報告下載后的文檔和圖紙-無水印,預覽文檔經過壓縮,下載后原文更清晰。

本文(FFA2024分論壇-核心技術.pdf)為本站 (可不可以) 主動上傳,三個皮匠報告文庫僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對上載內容本身不做任何修改或編輯。 若此文所含內容侵犯了您的版權或隱私,請立即通知三個皮匠報告文庫(點擊聯系客服),我們立即給予刪除!

溫馨提示:如果因為網速或其他原因下載失敗請重新下載,重復下載不扣分。
客服
商務合作
小程序
服務號
折疊
午夜网日韩中文字幕,日韩Av中文字幕久久,亚洲中文字幕在线一区二区,最新中文字幕在线视频网站