《4、周伊莎、馬越-Flink SQL上的狀態遷移和查詢 .pdf》由會員分享,可在線閱讀,更多相關《4、周伊莎、馬越-Flink SQL上的狀態遷移和查詢 .pdf(40頁珍藏版)》請在三個皮匠報告上搜索。
1、周伊莎/馬越 字節跳動基礎架構工程師State Migration and Query on State Migration and Query on FlinkFlink SQLSQL背景背景SQLSQL 作業的作業的狀態遷移狀態遷移StateState 的離線的離線查詢查詢未來展望未來展望#1#1#2#2#3#3#4#4#1#1背景背景狀態的作用狀態在流式作業的生命周期中扮演了十分重要的角色計算狀態在版本間遷移運行時容錯計算邏輯正確性校驗一些定義:一些定義:SnapshotSnapshot-a global,consistent image of the state of a Flink
2、jobCheckpointCheckpoint-a snapshot taken automatically by FlinkSavepointSavepointa snapshot triggered manually by a userSQL 與狀態DataStream 作業SQL 作業可以通過 API 完成對狀態的控制:1.注冊和使用狀態2.查詢和修改狀態3.設置 UID/UIDHash 以進行狀態遷移4.自定義 State Serializer5.RelNodeTransformationSQLJobGraphStreamGraphSQLTableDataStream APIRunti
3、meSQL作業的狀態對用戶來說是完全黑盒的?SQL 作業使用狀態的痛點l狀態查詢困難狀態查詢困難n 校驗邏輯正確性n 快速定位異常輸出來源l狀態狀態難以難以在迭代版本間遷移在迭代版本間遷移n 簡單的字段增減不影響狀態恢復n 簡單的邏輯變更不影響狀態恢復#2#2SQLSQL 作業的狀態遷移作業的狀態遷移現狀狀態無法遷移時:1.丟棄舊狀態2.回撥 Offset,重跑任務資源浪費產生 LAG長周期任務輸入缺失為什么需要遷移舊狀態?現狀狀態遷移的充分必要條件:1.Operator ID 保持不變2.State Serializer 前后兼容問題1:DAG 極易發生變更SourceCalcGroupAg
4、gSink1.隱式修改:修改計算邏輯、打開 minibatch 等2.顯式修改:增刪維表、source、sink 等為什么 SQL 作業狀態難以遷移?現狀狀態遷移的充分必要條件:1.Operator ID 保持不變2.State Serializer 前后兼容為什么為什么 SQLSQL 作業狀態難以遷移?作業狀態難以遷移?問題問題2 2:State Serializer 不可兼容1.增刪字段2.修改字段的類型AccumulatorAccumulators:ValueStateMAXMAXCOUNTCOUNTSUMSUMSUMSUMMAXMAXCOUNTCOUNTSUMSUMSUMSUMLAST
5、_VALUELAST_VALUEData Talks線上最近30日狀態恢復失敗 Case 分析由拓撲圖發生變更導致的 Case 占比75%由 State Serializer不可兼容導致的Case 占比25%解決思路RelNodeTransformationSQLJobGraphStreamGraphSQLTableDataStream APIRuntimeFlink 在 DataStream API 層為狀態遷移提供的能力:設置設置 UID/UID/UIDHashUIDHashSQL 作業如何使用這種能力?1.為 SQL 作業提供的 DAG 預覽2.允許用戶對 DAG 進行編輯3.將用戶編輯
6、的 UID/UIDHash 傳遞到運行時DAG 預覽#1 為什么不直接存儲StreamGraph?#2 如何與算子形成穩定映射?復用 StreamGraphHasherV2 來為每個節點生成確定性的 id保持隔離,不被 Flink 迭代影響PlanGraph 抽象FieldExplainid確定性idgeneratedOperatorID與 JobGraph 中算子的OperatorID 對應userProvidedHash用戶提供的 UIDHashhasState此算子是否使用狀態其他 StreamNode 屬性DAG 預覽初始可視化效果初始可視化效果DAG 預覽任務修改產生 Diff 后整
7、體使用流程SQLNormal Configs生成1 12 23 3PlanGraphSQLNormal Configs修改生成2 23 34 4PlanGraph1 1自動映射/手工修改2 2333 3PlanGraph1 12 23 34 4JobGraph1 1共同提交舊版本易用性問題對于較為復雜的 DAG 來說1.為所有節點手動配置成本較高2.難以快速地定位需要配置的節點易用性問題的解決方案對于較為復雜的 DAG 來說1.為所有節點手動配置成本較高2.難以快速地定位需要配置的節點提供 Best Effort 的自動映射高亮使用狀態的節點提供 JSON 代碼比較Best Effort 的自
8、動映射Note:算子的 Description 是描述算子強有力信息A1A2A1A2A3A1A2#1#1分別在舊圖和新圖里收集具有相同Description 的算子#2#2為每一對節點計算相似度,并放入最大堆A1A2Maximum Heap#3#3輪詢最大堆,直到新圖或舊圖中的節點都已完成匹配#4 4為匹配的節點,從舊節點中取出 GeneratedOperatorID 填入到新節點的 UserProvidedHash中A1A2相似度計算相似度計算 TipsTips:1.比較所有入節點的屬性2.比較所有出節點的屬性JSON 代碼比較JSON 中所有的節點已按拓撲排序的順序展示通過文本模式的對比,
9、使用戶能夠更快速地定位需要的節點其他優化有狀態節點使用特殊標識提示用戶重點關注支持按節點屬性進行搜索總結SQL 作業 DAG 極易發生變更狀態遷移困難SourceCalcGroupAggSink我們怎么解決的?我們遇到了什么問題?提供 DAG 預覽,允許用戶為算子設置UID/UIDHash 等提供 Best Effort 的自動映射功能高亮有狀態節點、提供 JSON 代碼對比等,提升手動編輯易用性#1#2#3#3 3StateState 的離線查詢的離線查詢State Processor API特性介紹Flink 1.9.0 推出的新功能使用 DataSet API用于 讀取/修改/生成 Sa
10、vepoint狀態讀取狀態讀取狀態修正狀態修正狀態初始化狀態初始化State Processor APIState Processor APISavepointSavepointState Processor API使用方式Step 1:創建 Existing SavepointExistingSavepoint savepoint=Savepoint.load(env,savepointPath,StateBackend);Step 2:定義 ReaderFunctionclass ReaderFunction extends KeyedStateReaderFunction/重新注冊 st
11、ate/遍歷所有 key,訪問 StateStep 3:讀取狀態savepoint.readKeyedState(my-uid,new ReaderFunction();State Processor API狀態查詢原理ClientKeyedStateInputFormatSavepoint_metadata_metadataopAopA-1 1-statestateopAopA-2 2-statestateopBopB-1 1-statestateopBopB-2 2-statestateOperatorIDSavepointPathStateBackendparse metadatacre
12、ate inputSplitrestore statereaderFunc.open()register stateKeys IteratorreaderFunc.readKey(key)traversekey開發成本開發成本需要開發 Java 代碼使用門檻使用門檻需要了解狀態定義細節查詢限制查詢限制無法查詢多個算子無法查詢狀態元信息內容匱乏內容匱乏主要問題成本降低成本降低功能增強功能增強使用 Flink Batch SQL 查詢狀態不需要知道任何信息支持查詢多個算子任務支持查詢狀態元信息RuntimeRuntimeStorageStoragephysicalphysicalsavepoint
13、savepoint-1 1savepointsavepoint-n nlogiclogicDB&TablesDB&TablesDB&TablesDB&TablesmappingmappingStateState Query on Query on FlinkFlink SQLSQL解決方案如何用 SQL 表達 State如何實現 0 門檻查詢如何表示單個 State如何表示一個算子/任務所有 State如何表示任務有哪些 State查詢狀態需要哪些信息?如何對用戶屏蔽 State 細節?實現難點查詢查詢 StateState 需要哪些信息?需要哪些信息?BackendTypeKeyedType
14、StateNameOperatorIDSavepointPathStateSerializerReaderFunctionExistingSavepointToo much to KnowToo much to KnowSavepointPathStateMetaInformationStateMetaInformationBackendTypeKeyedTypeStateNameOperatorIDStateSerializerStateMetaStateMeta SnapshotSnapshot將 StateMeta 信息添加到 Savepoint 中SavepointSavepointS
15、tateMeta SnapshotTMTM 上報上報 StateMetaStateMetaOPCheckpointCoordinatorsavepointstate1stateNStateMetaInfooperatorNameoperatorIDKeySerializerStateDescriptorsDistrisbuteTypesnamespaceSerializersTaskTask 制作制作 SavepointSavepoint 時,對時,對 StateMetaStateMeta 進行快照進行快照快照完成之后,將快照完成之后,將 StateMetaStateMeta 信息上報至信息上
16、報至 JMJMState State 注冊時,注冊時,Task Task 保存保存 StateMetaStateMetaJMJM 持久化持久化 StateMetaStateMetasnapshotStateHandlesStateMetaInformationSnapshot StateMeta_metadata_stateInfoPersist匯總寫入匯總寫入 SavepointSavepoint 目錄中目錄中對相同對相同 operator operator 的的 StateMetaStateMeta 進行合并進行合并StateMeta Snapshot如何用 SQL 表達 State?Ta
17、ble1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNTable1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNTable1Table2Table3TableNTable1Table2Table3TableNDatabaseTable1Table2Table3TableNCatalogTable1Table1DatabaseTable1DatabaseSP1Savepointstate_met
18、aop1_state1op1_all_stateall_stateCatalog.Database.TableSavepoint.SP_ID.StateTableState 元信息單個 State算子/任務所有 StateState As DatabasenamespaceAggregate(op2)Sink(op3)ValueState sumListState offsetkeyBykeyBy partition:1,offset:63 partition:2,offset:88 partition:3,offset:75 valuekeyKafkaSource(op1)valuevoid
19、10void8k1k2namespacevoid6k3如何表示單個 Stateop2_sumop2_sumop1_offsetop1_offsetTableName=OperatorID+StateNameOperatorStateTableOperatorStateTableKeyedStateTableKeyedStateTableState As Database如何表示一個算子所有的 state?namespacekeyvaluevoid1void2k1k2namespacekeyvaluevoid3void4k1k2value1086namespacekeyvaluevoid1voi
20、d2k1k2void3k1state_namekeyedState1keyedState1keyedState2void4null10k2nullnull8nullkeyedState2op_stateop_statenull6nullop_stateTableName=OperatorID+all_statesop1_all_statesop1_all_statesop1_keyedState2op1_keyedState2op1_keyedState1op1_keyedState1op1_op_statop1_op_state eAllStatesTableAllStatesTableSt
21、ate As Database如何表示任務有哪些 state?Aggregate(OP2)Sink(OP3)ValueState sumListState offsetkeyBykeyByKafkaSource(OP1)state_metastate_metaop_idop_namestate_nameOP1kafkaSourceoffsetAggregateOP2sumstate_typeListStateValueStateTableName=state_metaStateMetaTableStateMetaTableState As Database2、啟動作業3、制作 Savepoin
22、t,生成 SavepointID1、編輯 SQL 任務準備工作準備工作使用 Flink SQL 查詢狀態場景一:查詢任務有哪些狀態場景二:查詢 Source 算子的 Offset場景三:查詢聚合算子某個 Key 的中間結果使用介紹使用介紹使用 Flink SQL 查詢狀態#4 4未來展望未來展望未來展望豐富 State 功能加強 State 可用性使用 Flink SQL 實現 State 的修改和初始化優化 State Serializer 持續增強 State 的恢復能力提供完善的 State 不可恢復事前檢查能力支持 State 到異步數據源的導入和導出THANKSContact usContact us: